Merge pull request #909 from Reasno/utils

propagate errors to parallel::wait()
This commit is contained in:
谷溪 2019-11-11 12:12:55 +08:00 committed by GitHub
commit 2eced375d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 8 deletions

View File

@ -13,6 +13,7 @@
- [#903](https://github.com/hyperf/hyperf/pull/903) Fixed execute `init-proxy` command can not stop when `hyperf/rpc-client` component exists.
- [#904](https://github.com/hyperf/hyperf/pull/904) Fixed the hooked I/O request does not works in the listener that listening `Hyperf\Framework\Event\BeforeMainServerStart` event.
- [#906](https://github.com/hyperf/hyperf/pull/906) Fixed `port` property of URI of `Hyperf\HttpMessage\Server\Request`.
- [#909](https://github.com/hyperf/hyperf/pull/909) Fixed a issue that causes staled parallel execution.
# v1.1.5 - 2019-11-07

View File

@ -155,6 +155,8 @@ $wg->wait();
```php
<?php
use Hyperf\Utils\Exception\ParallelExecutionException;
$parallel = new \Hyperf\Utils\Parallel();
$parallel->add(function () {
\Hyperf\Utils\Coroutine::sleep(1);
@ -164,8 +166,13 @@ $parallel->add(function () {
\Hyperf\Utils\Coroutine::sleep(1);
return \Hyperf\Utils\Coroutine::id();
});
// $result 结果为 [1, 2]
$result = $parallel->wait();
try{
// $results 结果为 [1, 2]
$results = $parallel->wait();
} catch(ParallelExecutionException $e){
//$e->getResults() 获取协程中的返回值。
//$e->getThrowables() 获取协程中出现的异常。
}
```
通过上面的代码我们可以看到仅花了 1 秒就得到了两个不同的协程的 ID在调用 `add(callable $callable)` 的时候 `Parallel` 类会为之自动创建一个协程,并加入到 `WaitGroup` 的调度去。

View File

@ -0,0 +1,46 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Utils\Exception;
class ParallelExecutionException extends \RuntimeException
{
/**
* @var array
*/
private $results;
/**
* @var array
*/
private $throwables;
public function getResults()
{
return $this->results;
}
public function setResults(array $results)
{
$this->results = $results;
}
public function getThrowables()
{
return $this->throwables;
}
public function setThrowables(array $throwables)
{
return $this->throwables = $throwables;
}
}

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\Utils;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Swoole\Coroutine\Channel;
class Parallel
@ -45,20 +46,32 @@ class Parallel
}
}
public function wait(): array
public function wait(bool $throw = true): array
{
$result = [];
$result = $throwables = [];
$wg = new WaitGroup();
$wg->add(count($this->callbacks));
foreach ($this->callbacks as $key => $callback) {
$this->concurrentChannel && $this->concurrentChannel->push(true);
Coroutine::create(function () use ($callback, $key, $wg, &$result) {
$result[$key] = call($callback);
$this->concurrentChannel && $this->concurrentChannel->pop();
$wg->done();
Coroutine::create(function () use ($callback, $key, $wg, &$result, &$throwables) {
try {
$result[$key] = call($callback);
} catch (\Throwable $throwable) {
$throwables[$key] = $throwable;
} finally {
$this->concurrentChannel && $this->concurrentChannel->pop();
$wg->done();
}
});
}
$wg->wait();
if ($throw && count($throwables) > 0) {
$message = 'At least one throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($throwables);
$executionException = new ParallelExecutionException($message);
$executionException->setResults($result);
$executionException->setThrowables($throwables);
throw $executionException;
}
return $result;
}
@ -66,4 +79,16 @@ class Parallel
{
$this->callbacks = [];
}
/**
* Format throwables into a nice list.
*/
private function formatThrowables(array $exception): string
{
$output = '';
foreach ($exception as $key => $value) {
$output .= \sprintf('(%s) %s: %s' . PHP_EOL, $key, get_class($value), $value->getMessage());
}
return $output;
}
}

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
namespace HyperfTest\Utils;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Hyperf\Utils\Parallel;
use PHPUnit\Framework\TestCase;
@ -115,6 +116,28 @@ class ParallelTest extends TestCase
$this->assertEquals(count($res), 4);
}
public function testParallelThrows()
{
$parallel = new Parallel();
$err = function () {
Coroutine::sleep(0.001);
throw new \RuntimeException('something bad happened');
};
$ok = function () {
Coroutine::sleep(0.001);
return 1;
};
$parallel->add($err);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($ok);
}
$this->expectException(ParallelExecutionException::class);
$res = $parallel->wait();
}
public function returnCoId()
{
return Coroutine::id();