2021-05-15 23:45:30 +08:00
|
|
|
|
# DAG
|
|
|
|
|
|
|
|
|
|
`hyperf/dag` 是一个轻量级有向无环图 (**D**irected **A**cyclic **G**raph) 任务编排库。
|
|
|
|
|
|
|
|
|
|
## 场景
|
|
|
|
|
|
|
|
|
|
假设我们有一系列任务需要执行。
|
|
|
|
|
|
|
|
|
|
- 如果他们之间存在依赖关系,则可以将他们顺序执行。
|
|
|
|
|
- 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
|
|
|
|
|
- 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。
|
|
|
|
|
|
|
|
|
|
我们可以将第三种复杂的场景抽象成 `DAG` 来解决。
|
|
|
|
|
|
2021-07-19 02:42:29 +08:00
|
|
|
|
## 安装
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
composer require hyperf/dag
|
|
|
|
|
```
|
|
|
|
|
|
2021-05-15 23:45:30 +08:00
|
|
|
|
## 示例
|
|
|
|
|
|
|
|
|
|
[![](https://mermaid.ink/img/eyJjb2RlIjoic3RhdGVEaWFncmFtLXYyXG4gICAgWypdIC0tPiBBXG4gICAgQSAtLT4gQlxuICAgIEEgLS0-IENcbiAgICBBIC0tPiBEXG4gICAgRCAtLT4gR1xuICAgIEMgLS0-IEdcbiAgICBDIC0tPiBGXG4gICAgQiAtLT4gRlxuICAgIEIgLS0-IEVcbiAgICBCIC0tPiBIXG4gICAgSCAtLT4gSVxuICAgIEUgLS0-IElcbiAgICBGIC0tPiBJXG4gICAgRyAtLT4gSVxuICAgIEkgLS0-IFsqXVxuICAgICAgICAgICAgIiwibWVybWFpZCI6eyJ0aGVtZSI6ImRlZmF1bHQiLCJ0aGVtZVZhcmlhYmxlcyI6eyJiYWNrZ3JvdW5kIjoid2hpdGUiLCJwcmltYXJ5Q29sb3IiOiIjRUNFQ0ZGIiwic2Vjb25kYXJ5Q29sb3IiOiIjZmZmZmRlIiwidGVydGlhcnlDb2xvciI6ImhzbCg4MCwgMTAwJSwgOTYuMjc0NTA5ODAzOSUpIiwicHJpbWFyeUJvcmRlckNvbG9yIjoiaHNsKDI0MCwgNjAlLCA4Ni4yNzQ1MDk4MDM5JSkiLCJzZWNvbmRhcnlCb3JkZXJDb2xvciI6ImhzbCg2MCwgNjAlLCA4My41Mjk0MTE3NjQ3JSkiLCJ0ZXJ0aWFyeUJvcmRlckNvbG9yIjoiaHNsKDgwLCA2MCUsIDg2LjI3NDUwOTgwMzklKSIsInByaW1hcnlUZXh0Q29sb3IiOiIjMTMxMzAwIiwic2Vjb25kYXJ5VGV4dENvbG9yIjoiIzAwMDAyMSIsInRlcnRpYXJ5VGV4dENvbG9yIjoicmdiKDkuNTAwMDAwMDAwMSwgOS41MDAwMDAwMDAxLCA5LjUwMDAwMDAwMDEpIiwibGluZUNvbG9yIjoiIzMzMzMzMyIsInRleHRDb2xvciI6IiMzMzMiLCJtYWluQmtnIjoiI0VDRUNGRiIsInNlY29uZEJrZyI6IiNmZmZmZGUiLCJib3JkZXIxIjoiIzkzNzBEQiIsImJvcmRlcjIiOiIjYWFhYTMzIiwiYXJyb3doZWFkQ29sb3IiOiIjMzMzMzMzIiwiZm9udEZhbWlseSI6IlwidHJlYnVjaGV0IG1zXCIsIHZlcmRhbmEsIGFyaWFsIiwiZm9udFNpemUiOiIxNnB4IiwibGFiZWxCYWNrZ3JvdW5kIjoiI2U4ZThlOCIsIm5vZGVCa2ciOiIjRUNFQ0ZGIiwibm9kZUJvcmRlciI6IiM5MzcwREIiLCJjbHVzdGVyQmtnIjoiI2ZmZmZkZSIsImNsdXN0ZXJCb3JkZXIiOiIjYWFhYTMzIiwiZGVmYXVsdExpbmtDb2xvciI6IiMzMzMzMzMiLCJ0aXRsZUNvbG9yIjoiIzMzMyIsImVkZ2VMYWJlbEJhY2tncm91bmQiOiIjZThlOGU4IiwiYWN0b3JCb3JkZXIiOiJoc2woMjU5LjYyNjE2ODIyNDMsIDU5Ljc3NjUzNjMxMjglLCA4Ny45MDE5NjA3ODQzJSkiLCJhY3RvckJrZyI6IiNFQ0VDRkYiLCJhY3RvclRleHRDb2xvciI6ImJsYWNrIiwiYWN0b3JMaW5lQ29sb3IiOiJncmV5Iiwic2lnbmFsQ29sb3IiOiIjMzMzIiwic2lnbmFsVGV4dENvbG9yIjoiIzMzMyIsImxhYmVsQm94QmtnQ29sb3IiOiIjRUNFQ0ZGIiwibGFiZWxCb3hCb3JkZXJDb2xvciI6ImhzbCgyNTkuNjI2MTY4MjI0MywgNTkuNzc2NTM2MzEyOCUsIDg3LjkwMTk2MDc4NDMlKSIsImxhYmVsVGV4dENvbG9yIjoiYmxhY2siLCJsb29wVGV4dENvbG9yIjoiYmxhY2siLCJub3RlQm9yZGVyQ29sb3IiOiIjYWFhYTMzIiwibm90ZUJrZ0NvbG9yIjoiI2ZmZjVhZCIsIm5vdGVUZXh0Q29sb3IiOiJibGFjayIsImFjdGl2YXRpb25Cb3JkZXJDb2xvciI6IiM2NjYiLCJhY3RpdmF0aW9uQmtnQ29sb3IiOiIjZjRmNGY0Iiwic2VxdWVuY2VOdW1iZXJDb2xvciI6IndoaXRlIiwic2VjdGlvbkJrZ0NvbG9yIjoicmdiYSgxMDIsIDEwMiwgMjU1LCAwLjQ5KSIsImFsdFNlY3Rpb25Ca2dDb2xvciI6IndoaXRlIiwic2VjdGlvbkJrZ0NvbG9yMiI6IiNmZmY0MDAiLCJ0YXNrQm9yZGVyQ29sb3IiOiIjNTM0ZmJjIiwidGFza0JrZ0NvbG9yIjoiIzhhOTBkZCIsInRhc2tUZXh0TGlnaHRDb2xvciI6IndoaXRlIiwidGFza1RleHRDb2xvciI6IndoaXRlIiwidGFza1RleHREYXJrQ29sb3IiOiJibGFjayIsInRhc2tUZXh0T3V0c2lkZUNvbG9yIjoiYmxhY2siLCJ0YXNrVGV4dENsaWNrYWJsZUNvbG9yIjoiIzAwMzE2MyIsImFjdGl2ZVRhc2tCb3JkZXJDb2xvciI6IiM1MzRmYmMiLCJhY3RpdmVUYXNrQmtnQ29sb3IiOiIjYmZjN2ZmIiwiZ3JpZENvbG9yIjoibGlnaHRncmV5IiwiZG9uZVRhc2tCa2dDb2xvciI6ImxpZ2h0Z3JleSIsImRvbmVUYXNrQm9yZGVyQ29sb3IiOiJncmV5IiwiY3JpdEJvcmRlckNvbG9yIjoiI2ZmODg4OCIsImNyaXRCa2dDb2xvciI6InJlZCIsInRvZGF5TGluZUNvbG9yIjoicmVkIiwibGFiZWxDb2xvciI6ImJsYWNrIiwiZXJyb3JCa2dDb2xvciI6IiM1NTIyMjIiLCJlcnJvclRleHRDb2xvciI6IiM1NTIyMjIiLCJjbGFzc1RleHQiOiIjMTMxMzAwIiwiZmlsbFR5cGUwIjoiI0VDRUNGRiIsImZpbGxUeXBlMSI6IiNmZmZmZGUiLCJmaWxsVHlwZTIiOiJoc2woMzA0LCAxMDAlLCA5Ni4yNzQ1MDk4MDM5JSkiLCJmaWxsVHlwZTMiOiJoc2woMTI0LCAxMDAlLCA5My41Mjk0MTE3NjQ3JSkiLCJmaWxsVHlwZTQiOiJoc2woMTc2LCAxMDAlLCA5Ni4yNzQ1MDk4MDM5JSkiLCJmaWxsVHlwZTUiOiJoc2woLTQsIDEwMCUsIDkzLjUyOTQxMTc2NDclKSIsImZpbGxUeXBlNiI6ImhzbCg4LCAxMDAlLCA5Ni4yNzQ1MDk4MDM5JSkiLCJmaWxsVHlwZTciOiJoc2woMTg4LCAxMDAlLCA5My41Mjk0MTE3NjQ3JSkifX0sInVwZGF0ZUVkaXRvciI6ZmFsc2V9)](https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoic3RhdGVEaWFncmFtLXYyXG4gICAgWypdIC0tPiBBXG4gICAgQSAtLT4gQlxuICAgIEEgLS0-IENcbiAgICBBIC0tPiBEXG4gICAgRCAtLT4gR1xuICAgIEMgLS0-IEdcbiAgICBDIC0tPiBGXG4gICAgQiAtLT4gRlxuICAgIEIgLS0-IEVcbiAgICBCIC0tPiBIXG4gICAgSCAtLT4gSVxuICAgIEUgLS0-IElcbiAgICBGIC0tPiBJXG4gICAgRyAtLT4gSVxuICAgIEkgLS0-IFsqXVxuICAgICAgICAgICAgIiwibWVybWFpZCI6eyJ0aGVtZSI6ImRlZmF1bHQiLCJ0aGVtZVZhcmlhYmxlcyI6eyJiYWNrZ3JvdW5kIjoid2hpdGUiLCJwcmltYXJ5Q29sb3IiOiIjRUNFQ0ZGIiwic2Vjb25kYXJ5Q29sb3IiOiIjZmZmZmRlIiwidGVydGlhcnlDb2xvciI6ImhzbCg4MCwgMTAwJSwgOTYuMjc0NTA5ODAzOSUpIiwicHJpbWFyeUJvcmRlckNvbG9yIjoiaHN
|
|
|
|
|
|
|
|
|
|
假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A 完成后才能完成 B、C、D,B 完成后才能完成 H、E、F...)
|
|
|
|
|
|
|
|
|
|
通过 `hyperf/dag` 可以使用如下方式构建 `DAG` 并执行。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
$dag = new \Hyperf\Dag\Dag();
|
|
|
|
|
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
|
|
|
|
|
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
|
|
|
|
|
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
|
|
|
|
|
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
|
|
|
|
|
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
|
|
|
|
|
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
|
|
|
|
|
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
|
|
|
|
|
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
|
|
|
|
|
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
|
|
|
|
|
$dag->addVertex($a)
|
|
|
|
|
->addVertex($b)
|
|
|
|
|
->addVertex($c)
|
|
|
|
|
->addVertex($d)
|
|
|
|
|
->addVertex($e)
|
|
|
|
|
->addVertex($f)
|
|
|
|
|
->addVertex($g)
|
|
|
|
|
->addVertex($h)
|
|
|
|
|
->addVertex($i)
|
|
|
|
|
->addEdge($a, $b)
|
|
|
|
|
->addEdge($a, $c)
|
|
|
|
|
->addEdge($a, $d)
|
|
|
|
|
->addEdge($b, $h)
|
|
|
|
|
->addEdge($b, $e)
|
|
|
|
|
->addEdge($b, $f)
|
|
|
|
|
->addEdge($c, $f)
|
|
|
|
|
->addEdge($c, $g)
|
|
|
|
|
->addEdge($d, $g)
|
|
|
|
|
->addEdge($h, $i)
|
|
|
|
|
->addEdge($e, $i)
|
|
|
|
|
->addEdge($f, $i)
|
|
|
|
|
->addEdge($g, $i);
|
|
|
|
|
|
|
|
|
|
// 需要在协程环境下执行
|
|
|
|
|
$dag->run();
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
输出:
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
// 1s 后
|
|
|
|
|
A
|
|
|
|
|
// 2s 后
|
|
|
|
|
D
|
|
|
|
|
C
|
|
|
|
|
B
|
|
|
|
|
// 3s 后
|
|
|
|
|
G
|
|
|
|
|
F
|
|
|
|
|
E
|
|
|
|
|
H
|
|
|
|
|
// 4s 后
|
|
|
|
|
I
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
> DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。
|
|
|
|
|
|
|
|
|
|
## 访问前步结果
|
|
|
|
|
|
|
|
|
|
每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。`DAG` 执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
$dag = new \Hyperf\Dag\Dag();
|
|
|
|
|
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
|
|
|
|
|
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
|
|
|
|
|
return $results[$a->key] + 1;
|
|
|
|
|
});
|
|
|
|
|
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
|
|
|
|
|
assert($results[$a->key] === 1);
|
|
|
|
|
assert($results[$b->key] === 2);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 定义一个任务
|
|
|
|
|
|
|
|
|
|
在上述文档中,我们使用了闭包来定义一个任务。格式如下。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
// Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。
|
|
|
|
|
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
除了使用闭包函数定义任务外,还可以使用实现了 `\Hyperf\Dag\Runner` 接口的类来定义,并通过 `Vertex::of` 将其转化为一个顶点。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
class MyJob implements \Hyperf\Dag\Runner {
|
|
|
|
|
public function run($results = []) {
|
|
|
|
|
return 'hello';
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
`\Hyperf\Dag\Dag` 本身也实现了 `\Hyperf\Dag\Runner` 接口,所以可以嵌套使用。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
// 命名空间已省略
|
|
|
|
|
$a = Vertex::make(function () { return 1;});
|
|
|
|
|
$b = Vertex::make(function () { return 2;});
|
|
|
|
|
$c = Vertex::make(function () { return 3;});
|
|
|
|
|
|
|
|
|
|
$nestedDag = new Dag();
|
|
|
|
|
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
|
|
|
|
|
$d = Vertex::of($nestedDag);
|
|
|
|
|
|
|
|
|
|
$superDag = new Dag();
|
|
|
|
|
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
|
|
|
|
|
$superDag->run();
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 控制并发数
|
|
|
|
|
`\Hyperf\Dag\Dag` 类提供了 `setConcurrency(int n)` 方法控制最大并发数。默认为 10。
|