mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-12-01 03:08:06 +08:00
11 KiB
11 KiB
DAG
hyperf/dag
是一個輕量級有向無環圖 (Directed Acyclic Graph) 任務編排庫。
場景
假設我們有一系列任務需要執行。
- 如果他們之間存在依賴關係,則可以將他們順序執行。
- 如果他們並不相互依賴,那麼我們可以選擇併發執行,以加快執行速度。
- 兩者間還存在中間狀態:一部分任務存在依賴關係,而另一些任務又可以併發執行。
我們可以將第三種複雜的場景抽象成 DAG
來解決。
安裝
composer require hyperf/dag
示例
假設我們有一系列任務,拓撲結構如上圖所示,頂點代表任務,邊緣代表依賴關係。(A 完成後才能完成 B、C、D,B 完成後才能完成 H、E、F...)
通過 hyperf/dag
可以使用如下方式構建 DAG
並執行。
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
$dag->addVertex($a)
->addVertex($b)
->addVertex($c)
->addVertex($d)
->addVertex($e)
->addVertex($f)
->addVertex($g)
->addVertex($h)
->addVertex($i)
->addEdge($a, $b)
->addEdge($a, $c)
->addEdge($a, $d)
->addEdge($b, $h)
->addEdge($b, $e)
->addEdge($b, $f)
->addEdge($c, $f)
->addEdge($c, $g)
->addEdge($d, $g)
->addEdge($h, $i)
->addEdge($e, $i)
->addEdge($f, $i)
->addEdge($g, $i);
// 需要在協程環境下執行
$dag->run();
輸出:
// 1s 後
A
// 2s 後
D
C
B
// 3s 後
G
F
E
H
// 4s 後
I
DAG 會按照儘可能早的原則調度任務。嘗試將 B 點的耗時調整為 2 秒,會發現 B 和 G 一起完成。
訪問前步結果
每一個任務可以接收一個數組參數,數組中包含所有前置依賴的結果。DAG
執行完畢後,也會返回一個同樣結構的數組,包含每一步的執行結果。
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
return $results[$a->key] + 1;
});
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
assert($results[$a->key] === 1);
assert($results[$b->key] === 2);
定義一個任務
在上述文檔中,我們使用了閉包來定義一個任務。格式如下。
// Vertex::make 的第二個參數為可選參數,作為 vertex 的 key,也就是結果數組的鍵值。
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
除了使用閉包函數定義任務外,還可以使用實現了 \Hyperf\Dag\Runner
接口的類來定義,並通過 Vertex::of
將其轉化為一個頂點。
class MyJob implements \Hyperf\Dag\Runner {
public function run($results = []) {
return 'hello';
}
}
\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
\Hyperf\Dag\Dag
本身也實現了 \Hyperf\Dag\Runner
接口,所以可以嵌套使用。
<?php
// 命名空間已省略
$a = Vertex::make(function () { return 1;});
$b = Vertex::make(function () { return 2;});
$c = Vertex::make(function () { return 3;});
$nestedDag = new Dag();
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
$d = Vertex::of($nestedDag);
$superDag = new Dag();
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
$superDag->run();
控制併發數
\Hyperf\Dag\Dag
類提供了 setConcurrency(int n)
方法控制最大併發數。默認為 10。