Python 中根据 DAG 进行并行任务处理
2018 年 03 月 10 日有向无环图 (Directed acyclic graph, DAG) 通常被用作依赖关系图,可以用于描述任务运行之间的依赖关系。
任务执行顺序必须遵守 DAG 依赖关系。任务之间如果有直接或者间接的依赖,则必须依次执行,如果任务之间不存在依赖则可以并行执行。
我经常在 Python 下遇到类似的问题,于是写了 paradag 这个包,用于描述 DAG,并且可以大大简化任务之间的并行处理。
安装
$ pip install paradag
创建 DAG
开始运行任务之前,先要创建描述任务依赖关系的 DAG,DAG 中的每个顶点代表一个任务。DAG 顶点的值可以是任意可哈希 (hashable) 对象,比如整数,字符串,包含可哈希对象的元组,用户定义类的实例等等。
from paradag import DAG
class Vtx(object):
def __init__(self, v):
self.__value = v
vtx = Vtx(999)
dag = DAG()
dag.add_vertex(123, 'abcde', 'xyz', ('a', 'b', 3), vtx)
dag.add_edge(123, 'abcde') # 123 -> 'abcde'
dag.add_edge('abcde', ('a', 'b', 3), vtx) # 'abcde' -> ('a', 'b', 3), 'abcde' -> vtx
add_edge
用于创建描述依赖关系的有向边,第一个参数为起点,剩余参数为终点,可以同时创建多个边。创建边时需要注意不要形成环,那样会导致触发
DAGCycleError
异常。
以下是常用的 DAG 属性:
print(dag.vertex_size())
print(dag.edge_size())
print(dag.successors('abcde'))
print(dag.predecessors(vtx))
print(dag.all_starts())
print(dag.all_terminals())
串行执行任务
任务执行需要提供一个 executor,以及可选的 selector。executor 用于处理每个顶点的任务执行。
from paradag import dag_run
from paradag import SequentialProcessor
class CustomExecutor:
def param(self, vertex):
return vertex
def execute(self, param):
print('Executing:', param)
print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor()))
dag_run
是核心的任务调度执行函数。
并行执行任务
并行执行任务和串行基本类似,只需要把 processor 换成
MultiThreadProcessor
就可以了。
from paradag import MultiThreadProcessor
dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())
默认的 selector 是 FullSelector
,它会尝试把尽可能多可以并行的任务全部交给
processor 执行。如果希望能够控制具体的执行过程,可以自己写一个
selector。下面这个 selector 最多只允许 4 个任务同时并行运行。
class CustomSelector(object):
def select(self, running, idle):
task_number = max(0, 4-len(running))
return list(idle)[:task_number]
dag_run(dag, processor=MultiThreadProcessor(), selector=CustomSelector(), executor=CustomExecutor())
一旦使用 MultiThreadProcessor
,就必须要注意并行任务的安全问题。executor
中只有 execute
函数是会并行运行的,其它函数都在主线程内执行。所以在
execute
函数内尽可能不要去修改函数外的变量,所有的参数都通过
param
传递,而 param
函数所返回的用于 execute
函数的参数也尽可能保证相互独立。
获取任务状态
在 executor 中还可以实现下面这些可选方法 (report_*
) 用于获取任务运行状态。
class CustomExecutor:
def param(self, vertex):
return vertex
def execute(self, param):
print('Executing:', param)
def report_start(self, vertices):
print('Start to run:', vertices)
def report_running(self, vertices):
print('Current running:', vertices)
def report_finish(self, vertices_result):
for vertex, result in vertices_result:
print('Finished running {0} with result: {1}'.format(vertex, result))
dag_run(dag, processor=MultiThreadProcessor(), executor=CustomExecutor())
传递运行结果给后续任务
如果任务需要把运行结果传递给被依赖的后续任务,可以实现 deliver
方法。
class CustomExecutor:
def __init__(self):
self.__level = {}
def param(self, vertex):
return self.__level.get(vertex, 0)
def execute(self, param):
return param + 1
def report_finish(self, vertices_result):
for vertex, result in vertices_result:
print('Vertex {0} finished, level: {1}'.format(vertex, result))
def deliver(self, vertex, result):
self.__level[vertex] = result
上一级的运行结果在当前任务开始前自动通过调用 deliver
函数传递进来。
拓扑排序
拓扑排序也可以通过
paradag.dag_run
函数实现。dag_run
函数的返回值就可以看作是拓扑排序的结果。
仅仅得到拓扑排序的结果而不执行任务:
from paradag import SingleSelector, RandomSelector, ShuffleSelector
dag = DAG()
dag.add_vertex(1, 2, 3, 4, 5)
dag.add_edge(1, 4)
dag.add_edge(4, 2, 5)
print(dag_run(dag))
print(dag_run(dag, selector=SingleSelector()))
print(dag_run(dag, selector=RandomSelector()))
print(dag_run(dag, selector=ShuffleSelector()))
拓扑排序的结果不是唯一的,使用不同 selector 也会得到不同的结果。