hyperf/docs/zh-hk/dag.md

11 KiB
Raw Blame History

DAG

hyperf/dag 是一個輕量級有向無環圖 (Directed Acyclic Graph) 任務編排庫。

場景

假設我們有一系列任務需要執行。

  • 如果他們之間存在依賴關係,則可以將他們順序執行。
  • 如果他們並不相互依賴,那麼我們可以選擇併發執行,以加快執行速度。
  • 兩者間還存在中間狀態:一部分任務存在依賴關係,而另一些任務又可以併發執行。

我們可以將第三種複雜的場景抽象成 DAG 來解決。

示例

假設我們有一系列任務,拓撲結構如上圖所示,頂點代表任務,邊緣代表依賴關係。(A 完成後才能完成 B、C、DB 完成後才能完成 H、E、F...)

通過 hyperf/dag 可以使用如下方式構建 DAG 並執行。

<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
$dag->addVertex($a)
    ->addVertex($b)
    ->addVertex($c)
    ->addVertex($d)
    ->addVertex($e)
    ->addVertex($f)
    ->addVertex($g)
    ->addVertex($h)
    ->addVertex($i)
    ->addEdge($a, $b)
    ->addEdge($a, $c)
    ->addEdge($a, $d)
    ->addEdge($b, $h)
    ->addEdge($b, $e)
    ->addEdge($b, $f)
    ->addEdge($c, $f)
    ->addEdge($c, $g)
    ->addEdge($d, $g)
    ->addEdge($h, $i)
    ->addEdge($e, $i)
    ->addEdge($f, $i)
    ->addEdge($g, $i);
    
// 需要在協程環境下執行
$dag->run(); 

輸出:

// 1s 後
A
// 2s 後
D
C
B
// 3s 後
G
F
E
H
// 4s 後
I

DAG 會按照儘可能早的原則調度任務。嘗試將 B 點的耗時調整為 2 秒,會發現 B 和 G 一起完成。

訪問前步結果

每一個任務可以接收一個數組參數,數組中包含所有前置依賴的結果。DAG 執行完畢後,也會返回一個同樣結構的數組,包含每一步的執行結果。

<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
    return $results[$a->key] + 1;
});
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
assert($results[$a->key] === 1);
assert($results[$b->key] === 2);

定義一個任務

在上述文檔中,我們使用了閉包來定義一個任務。格式如下。

// Vertex::make 的第二個參數為可選參數,作為 vertex 的 key也就是結果數組的鍵值。
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");

除了使用閉包函數定義任務外,還可以使用實現了 \Hyperf\Dag\Runner 接口的類來定義,並通過 Vertex::of 將其轉化為一個頂點。

class MyJob implements \Hyperf\Dag\Runner {
    public function run($results = []) {
        return 'hello';
    }
}

\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");

\Hyperf\Dag\Dag 本身也實現了 \Hyperf\Dag\Runner 接口,所以可以嵌套使用。

<?php
// 命名空間已省略
$a = Vertex::make(function () { return 1;});
$b = Vertex::make(function () { return 2;});
$c = Vertex::make(function () { return 3;});

$nestedDag = new Dag();
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
$d = Vertex::of($nestedDag);

$superDag = new Dag();
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
$superDag->run();

控制併發數

\Hyperf\Dag\Dag 類提供了 setConcurrency(int n) 方法控制最大併發數。默認為 10。