Wrap job throwables into a new exception with clearer message

This commit is contained in:
reasno 2019-11-11 00:09:03 +08:00
parent 242e7b81c0
commit 25092d0340
3 changed files with 77 additions and 14 deletions

View File

@ -0,0 +1,46 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Utils\Exception;
class ParallelExecutionException extends \RuntimeException
{
/**
* @var array
*/
private $results;
/**
* @var array
*/
private $throwables;
public function getResults()
{
return $this->results;
}
public function setResults(array $results)
{
$this->results = $results;
}
public function getThrowables()
{
return $this->throwables;
}
public function setThrowables(array $throwables)
{
return $this->throwables = $throwables;
}
}

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\Utils;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Swoole\Coroutine\Channel;
class Parallel
@ -45,29 +46,32 @@ class Parallel
}
}
public function wait(): array
public function wait(bool $throw = true): array
{
$result = [];
$done = new Channel(count($this->callbacks));
$wg = new WaitGroup();
$wg->add(count($this->callbacks));
$throwables = [];
foreach ($this->callbacks as $key => $callback) {
$this->concurrentChannel && $this->concurrentChannel->push(true);
Coroutine::create(function () use ($callback, $key, $done, &$result) {
Coroutine::create(function () use ($callback, $key, $wg, &$result, &$throwables) {
try {
$result[$key] = call($callback);
} catch (\Throwable $t) {
$done->push($t);
return;
} catch (\Throwable $throwable) {
$throwables[$key] = $throwable;
} finally {
$this->concurrentChannel && $this->concurrentChannel->pop();
$wg->done();
}
$done->push(true);
});
}
for ($i = 0; $i < count($this->callbacks); ++$i) {
$ok = $done->pop();
if ($ok !== true) {
throw $ok;
}
$wg->wait();
if ($throw && count($throwables) > 0) {
$msg = 'At least one throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($throwables);
$pee = new ParallelExecutionException($msg);
$pee->setResults($result);
$pee->setThrowables($throwables);
throw $pee;
}
return $result;
}
@ -76,4 +80,16 @@ class Parallel
{
$this->callbacks = [];
}
/**
* Format throwables into a nice list.
*/
private function formatThrowables(array $e): string
{
$out = '';
foreach ($e as $key => $value) {
$out .= \sprintf('(%s) %s: %s' . PHP_EOL, $key, get_class($value), $value->getMessage());
}
return $out;
}
}

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
namespace HyperfTest\Utils;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Hyperf\Utils\Parallel;
use PHPUnit\Framework\TestCase;
@ -121,7 +122,7 @@ class ParallelTest extends TestCase
$err = function () {
Coroutine::sleep(0.001);
throw new \RuntimeException();
throw new \RuntimeException('something bad happened');
};
$ok = function () {
@ -133,7 +134,7 @@ class ParallelTest extends TestCase
for ($i = 0; $i < 4; ++$i) {
$parallel->add($ok);
}
$this->expectException(\RuntimeException::class);
$this->expectException(ParallelExecutionException::class);
$res = $parallel->wait();
}