Parallel DAG in Python

10 Mar 2018

Directed acyclic graph (DAG) is commonly used as a dependency graph. It could be used to describe the dependencies of tasks.

The order of task executions must comply with the dependencies, where tasks with direct or indirect path must run in sequence, and tasks without any connection could run in parallel.

paradag is created to simplify the parallel execution of tasks with dependencies.

Installation

$ pip install paradag

Create a DAG

Before running tasks, first create a DAG, with each vertex representing a task. The vertex of DAG instance could be any hashable object, like integer, string, tuple of hashable objects, instance of user-defined class, etc.

from paradag import DAG

class Vtx(object):
    def __init__(self, v):
        self.__value = v

vtx = Vtx(999)

dag = DAG()
dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx)

dag.add_edge(123, 'abcde')                  # 123 -> 'abcde'
dag.add_edge('abcde', ('a', 'b', 3), vtx)   # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx

add_edge accepts one starting vertex and one or more ending vertices. Please pay attention not to make a cycle with add_edge, which will raise a DAGCycleError.

The common DAG properties are accessible:

print(dag.vertex_size())
print(dag.edge_size())

print(dag.successors('abcde'))
print(dag.predecessors(vtx))

print(dag.all_starts())
print(dag.all_terminals())

Run tasks in sequence

Write your executor and optionally a selector. The executor handles the real execution for each vertex.

from paradag import dag_run
from paradag import SequentialProcessor

class CustomExecutor:
    def param(self, vertex):
        return vertex

    def execute(self, param):
        print('Executing:', param)

print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor()))

dag_run is the core function for task scheduling.

Run tasks in parallel

Run tasks in parallel is quite similar, while only change the processor to MultiThreadProcessor.

from paradag import MultiThreadProcessor

dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())

The default selector FullSelector will try to find as many tasks as possible which could run in parallel. This could be adjusted with custom selector. The following selector will only allow at most 4 tasks running in parallel.

class CustomSelector(object):
    def select(self, running, idle):
        task_number = max(0, 4-len(running))
        return list(idle)[:task_number]

dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor())

Once you are using MultiThreadProcessor, great attentions must be paid that execute of executor could run in parallel. Try not to modify any variables outside the execute function, and all parameters should be passed by the param argument. Also make sure that the return values generated from param function are independent.

Get task running status

The executor could also implement the optional methods which could get the task running status.

class CustomExecutor:
    def param(self, vertex):
        return vertex

    def execute(self, param):
        print('Executing:', param)

    def report_start(self, vertices):
        print('Start to run:', vertices)

    def report_running(self, vertices):
        print('Current running:', vertices)

    def report_finish(self, vertices_result):
        for vertex, result in vertices_result:
            print('Finished running {0} with result: {1}'.format(vertex, result))

dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())

Deliver result to descendants

In case the result for one task should be used for its descendants, deliver method could be implemented in executor.

class CustomExecutor:
    def __init__(self):
        self.__level = {}

    def param(self, vertex):
        return self.__level.get(vertex, 0)

    def execute(self, param):
        return param + 1

    def report_finish(self, vertices_result):
        for vertex, result in vertices_result:
            print('Vertex {0} finished, level: {1}'.format(vertex, result))

    def deliver(self, vertex, result):
        self.__level[vertex] = result

The result from parent will be delivered to the vertex before execution.

Topological sorting

Topological sorting could also be done by paradag.dag_run function. The return value of dag_run could be considered as the result of topological sorting.

A simple topological sorting without any execution:

from paradag import SingleSelector, RandomSelector, ShuffleSelector

dag = DAG()
dag.add_vertex(1, 2, 3, 4, 5)
dag.add_edge(1, 4)
dag.add_edge(4, 2, 5)

print(dag_run(dag))
print(dag_run(dag, selector=SingleSelector()))
print(dag_run(dag, selector=RandomSelector()))
print(dag_run(dag, selector=ShuffleSelector()))

The solution for topological sorting is not necessarily unique, and the final orders may vary with different selectors.