propagate errors to parallel::wait()

请看我新加的一个测试用例。以前的逻辑下,wait会永久hang住。当前逻辑下,会把Coroutine中没Catch到的错误传播到wait()函数这里抛出。
This commit is contained in:
reasno 2019-11-10 00:23:39 +08:00
parent a8281d9fb0
commit 572ba68f83

View File

@ -48,17 +48,27 @@ class Parallel
public function wait(): array
{
$result = [];
$wg = new WaitGroup();
$wg->add(count($this->callbacks));
$done = new Channel(count($this->callbacks));
foreach ($this->callbacks as $key => $callback) {
$this->concurrentChannel && $this->concurrentChannel->push(true);
Coroutine::create(function () use ($callback, $key, $wg, &$result) {
$result[$key] = call($callback);
$this->concurrentChannel && $this->concurrentChannel->pop();
$wg->done();
Coroutine::create(function () use ($callback, $key, $done, &$result) {
try {
$result[$key] = call($callback);
} catch (\Throwable $t) {
$done->push($t);
return;
} finally {
$this->concurrentChannel && $this->concurrentChannel->pop();
}
$done->push(true);
});
}
$wg->wait();
for ($i = 0; $i < count($this->callbacks); ++$i) {
$ok = $done->pop();
if ($ok !== true) {
throw $ok;
}
}
return $result;
}