11 KiB
DAG
hyperf/dag
is a lightweight directed acyclic graph (Directed Acyclic Graph) task orchestration library.
Scenes
Suppose we have a series of tasks to perform.
- If there is a dependency between them, they can be executed sequentially.
- If they do not depend on each other, then we can choose to execute concurrently to speed up the execution.
- There is also an intermediate state between the two: some tasks have dependencies, while others can be executed concurrently.
We can solve the third complex scenario by abstracting it into a DAG
.
Install
composer require hyperf/dag
Example
Suppose we have a series of tasks, the topology is as shown above, vertices represent tasks, and edges represent dependencies. (B, C, D can only be completed after A is completed, and H, E, F...) can only be completed after B is completed.
With hyperf/dag
, DAG
can be constructed and executed as follows.
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
$dag->addVertex($a)
->addVertex($b)
->addVertex($c)
->addVertex($d)
->addVertex($e)
->addVertex($f)
->addVertex($g)
->addVertex($h)
->addVertex($i)
->addEdge($a, $b)
->addEdge($a, $c)
->addEdge($a, $d)
->addEdge($b, $h)
->addEdge($b, $e)
->addEdge($b, $f)
->addEdge($c, $f)
->addEdge($c, $g)
->addEdge($d, $g)
->addEdge($h, $i)
->addEdge($e, $i)
->addEdge($f, $i)
->addEdge($g, $i);
// need to be executed in a coroutine environment
$dag->run();
输出:
// 1s afterwards
A
// 2s afterwards
D
C
B
// 3s afterwards
G
F
E
H
// 4s afterwards
I
The DAG will schedule tasks on the earliest possible basis. Try adjusting the time at point B to 2 seconds, and you will find that B and G are completed together.
Access previous step results
Each task can receive an array parameter containing the results of all pre-dependencies. After DAG
is executed, it will also return an array of the same structure, including the execution result of each step.
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
return $results[$a->key] + 1;
});
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
assert($results[$a->key] === 1);
assert($results[$b->key] === 2);
Define a task
In the above document, we used a closure to define a task. The format is as follows.
// The second parameter of Vertex::make is an optional parameter, which is the key of vertex, that is, the key value of the result array.
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
In addition to using closure functions to define tasks, you can also use a class that implements the \Hyperf\Dag\Runner
interface and convert it to a vertex via Vertex::of
.
class MyJob implements \Hyperf\Dag\Runner {
public function run($results = []) {
return 'hello';
}
}
\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
\Hyperf\Dag\Dag
itself also implements the \Hyperf\Dag\Runner
interface, so it can be nested.
<?php
// namespace omitted
$a = Vertex::make(function () { return 1;});
$b = Vertex::make(function () { return 2;});
$c = Vertex::make(function () { return 3;});
$nestedDag = new Dag();
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
$d = Vertex::of($nestedDag);
$superDag = new Dag();
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
$superDag->run();
Control the number of concurrency
The \Hyperf\Dag\Dag
class provides the setConcurrency(int n)
method to control the maximum number of concurrency. Default is 10.