Split hyperf/coroutine from hyperf/utils (#5619)

Co-authored-by: 李铭昕 <715557344@qq.com>
This commit is contained in:
Deeka Wong 2023-04-11 09:38:53 +08:00 committed by GitHub
parent 8f212533db
commit 399c85fcce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1671 additions and 0 deletions

View File

@ -5,6 +5,7 @@
- [#5616](https://github.com/hyperf/hyperf/pull/5616) Split `ApplicationContext` from `hyperf/utils` to `hyperf/context`.
- [#5617](https://github.com/hyperf/hyperf/pull/5617) Removed the requirement `hyperf/guzzle` from `hyperf/consul`.
- [#5618](https://github.com/hyperf/hyperf/pull/5618) Support to set the default router for swagger.
- [#5619](https://github.com/hyperf/hyperf/pull/5619) Split `hyperf/coroutine` from `hyperf/utils`.
# v3.0.15 - 2023-04-07

View File

@ -111,6 +111,7 @@
"hyperf/context": "*",
"hyperf/contract": "*",
"hyperf/coordinator": "*",
"hyperf/coroutine": "*",
"hyperf/crontab": "*",
"hyperf/dag": "*",
"hyperf/database": "*",
@ -195,6 +196,7 @@
"files": [
"src/collection/src/Functions.php",
"src/config/src/Functions.php",
"src/coroutine/src/Functions.php",
"src/nats/src/Functions.php",
"src/tappable/src/Functions.php",
"src/translation/src/Functions.php",
@ -222,6 +224,7 @@
"Hyperf\\Context\\": "src/context/src/",
"Hyperf\\Contract\\": "src/contract/src/",
"Hyperf\\Coordinator\\": "src/coordinator/src/",
"Hyperf\\Coroutine\\": "src/coroutine/src/",
"Hyperf\\Crontab\\": "src/crontab/src/",
"Hyperf\\DB\\": "src/db/src/",
"Hyperf\\Dag\\": "src/dag/src/",
@ -322,6 +325,7 @@
"HyperfTest\\Consul\\": "src/consul/tests/",
"HyperfTest\\Context\\": "src/context/tests/",
"HyperfTest\\Coordinator\\": "src/coordinator/tests/",
"HyperfTest\\Coroutine\\": "src/coroutine/tests/",
"HyperfTest\\Crontab\\": "src/crontab/tests/",
"HyperfTest\\DB\\": "src/db/tests/",
"HyperfTest\\Dag\\": "src/dag/tests/",

2
src/coroutine/.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
/tests export-ignore
/.github export-ignore

View File

@ -0,0 +1,13 @@
name: Close Pull Request
on:
pull_request_target:
types: [opened]
jobs:
run:
runs-on: ubuntu-latest
steps:
- uses: superbrothers/close-pull-request@v3
with:
comment: "Hi, this is a READ-ONLY repository, please submit your PR on the https://github.com/hyperf/hyperf repository.<br><br> This Pull Request will close automatically.<br><br> Thanks! "

View File

@ -0,0 +1,25 @@
on:
push:
# Sequence of patterns matched against refs/tags
tags:
- 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10
name: Release
jobs:
release:
name: Release
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Release ${{ github.ref }}
draft: false
prerelease: false

21
src/coroutine/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Hyperf
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,45 @@
{
"name": "hyperf/coroutine",
"description": "Hyperf Coroutine",
"license": "MIT",
"keywords": [
"php",
"swoole",
"hyperf",
"coroutine"
],
"homepage": "https://hyperf.io",
"support": {
"docs": "https://hyperf.wiki",
"issues": "https://github.com/hyperf/hyperf/issues",
"pull-request": "https://github.com/hyperf/hyperf/pulls",
"source": "https://github.com/hyperf/hyperf"
},
"require": {
"php": ">=8.0",
"hyperf/context": "~3.0.0",
"hyperf/contract": "~3.0.0",
"hyperf/engine": "^1.2|^2.0"
},
"autoload": {
"psr-4": {
"Hyperf\\Coroutine\\": "src/"
},
"files": [
"src/Functions.php"
]
},
"autoload-dev": {
"psr-4": {
"HyperfTest\\Coroutine\\": "tests/"
}
},
"config": {
"sort-packages": true
},
"extra": {
"branch-alias": {
"dev-master": "3.0-dev"
}
}
}

View File

@ -0,0 +1,89 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Hyperf\Context\ApplicationContext;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coroutine\Exception\InvalidArgumentException;
use Hyperf\Engine\Channel;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Throwable;
/**
* @method bool isFull()
* @method bool isEmpty()
*/
class Concurrent
{
protected Channel $channel;
public function __construct(protected int $limit)
{
$this->channel = new Channel($limit);
}
public function __call($name, $arguments)
{
if (in_array($name, ['isFull', 'isEmpty'])) {
return $this->channel->{$name}(...$arguments);
}
throw new InvalidArgumentException(sprintf('The method %s is not supported.', $name));
}
public function getLimit(): int
{
return $this->limit;
}
public function length(): int
{
return $this->channel->getLength();
}
public function getLength(): int
{
return $this->channel->getLength();
}
public function getRunningCoroutineCount(): int
{
return $this->getLength();
}
public function getChannel(): Channel
{
return $this->channel;
}
public function create(callable $callable): void
{
$this->channel->push(true);
Coroutine::create(function () use ($callable) {
try {
$callable();
} catch (Throwable $exception) {
if (ApplicationContext::hasContainer()) {
$container = ApplicationContext::getContainer();
if ($container->has(StdoutLoggerInterface::class) && $container->has(FormatterInterface::class)) {
$logger = $container->get(StdoutLoggerInterface::class);
$formatter = $container->get(FormatterInterface::class);
$logger->error($formatter->format($exception));
}
}
} finally {
$this->channel->pop();
}
});
}
}

View File

@ -0,0 +1,111 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Hyperf\Context\ApplicationContext;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Engine\Coroutine as Co;
use Hyperf\Engine\Exception\CoroutineDestroyedException;
use Hyperf\Engine\Exception\RunningInNonCoroutineException;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Throwable;
class Coroutine
{
/**
* Returns the current coroutine ID.
* Returns -1 when running in non-coroutine context.
*/
public static function id(): int
{
return Co::id();
}
public static function defer(callable $callable): void
{
Co::defer(static function () use ($callable) {
try {
$callable();
} catch (Throwable $throwable) {
static::printLog($throwable);
}
});
}
public static function sleep(float $seconds): void
{
usleep(intval($seconds * 1000 * 1000));
}
/**
* Returns the parent coroutine ID.
* Returns 0 when running in the top level coroutine.
* @throws RunningInNonCoroutineException when running in non-coroutine context
* @throws CoroutineDestroyedException when the coroutine has been destroyed
*/
public static function parentId(?int $coroutineId = null): int
{
return Co::pid($coroutineId);
}
/**
* @return int Returns the coroutine ID of the coroutine just created.
* Returns -1 when coroutine create failed.
*/
public static function create(callable $callable): int
{
$coroutine = Co::create(static function () use ($callable) {
try {
$callable();
} catch (Throwable $throwable) {
static::printLog($throwable);
}
});
try {
return $coroutine->getId();
} catch (\Throwable) {
return -1;
}
}
public static function inCoroutine(): bool
{
return Co::id() > 0;
}
public static function stats(): array
{
return Co::stats();
}
public static function exists(int $id): bool
{
return Co::exists($id);
}
private static function printLog(Throwable $throwable): void
{
if (ApplicationContext::hasContainer()) {
$container = ApplicationContext::getContainer();
if ($container->has(StdoutLoggerInterface::class)) {
$logger = $container->get(StdoutLoggerInterface::class);
if ($container->has(FormatterInterface::class)) {
$formatter = $container->get(FormatterInterface::class);
$logger->warning($formatter->format($throwable));
} else {
$logger->warning((string) $throwable);
}
}
}
}
}

View File

@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Exception;
use Throwable;
final class ExceptionThrower
{
public function __construct(private Throwable $throwable)
{
}
public function getThrowable(): Throwable
{
return $this->throwable;
}
}

View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Exception;
class InvalidArgumentException extends \InvalidArgumentException
{
}

View File

@ -0,0 +1,41 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Exception;
use RuntimeException;
class ParallelExecutionException extends RuntimeException
{
private array $results = [];
private array $throwables = [];
public function getResults(): array
{
return $this->results;
}
public function setResults(array $results)
{
$this->results = $results;
}
public function getThrowables(): array
{
return $this->throwables;
}
public function setThrowables(array $throwables)
{
return $this->throwables = $throwables;
}
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Exception;
use RuntimeException;
class TimeoutException extends RuntimeException
{
}

View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Exception;
class WaitTimeoutException extends TimeoutException
{
}

View File

@ -0,0 +1,75 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Closure;
use Hyperf\Context\ApplicationContext;
use RuntimeException;
/**
* @param callable[] $callables
* @param int $concurrent if $concurrent is equal to 0, that means unlimited
*/
function parallel(array $callables, int $concurrent = 0): array
{
$parallel = new Parallel($concurrent);
foreach ($callables as $key => $callable) {
$parallel->add($callable, $key);
}
return $parallel->wait();
}
function wait(Closure $closure, ?float $timeout = null)
{
if (ApplicationContext::hasContainer()) {
$waiter = ApplicationContext::getContainer()->get(Waiter::class);
return $waiter->wait($closure, $timeout);
}
return (new Waiter())->wait($closure, $timeout);
}
function co(callable $callable): bool|int
{
$id = Coroutine::create($callable);
return $id > 0 ? $id : false;
}
function defer(callable $callable): void
{
Coroutine::defer($callable);
}
function go(callable $callable): bool|int
{
$id = Coroutine::create($callable);
return $id > 0 ? $id : false;
}
/**
* Run callable in non-coroutine environment, all hook functions by Swoole only available in the callable.
*
* @param array|callable $callbacks
*/
function run($callbacks, int $flags = SWOOLE_HOOK_ALL): bool
{
if (Coroutine::inCoroutine()) {
throw new RuntimeException('Function \'run\' only execute in non-coroutine environment.');
}
\Swoole\Runtime::enableCoroutine($flags);
/* @phpstan-ignore-next-line */
$result = \Swoole\Coroutine\run(...(array) $callbacks);
\Swoole\Runtime::enableCoroutine(false);
return $result;
}

View File

@ -0,0 +1,66 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Hyperf\Coroutine\Traits\Container;
use Hyperf\Engine\Constant;
use Hyperf\Engine\Coroutine as Co;
use Swoole\Coroutine as SwooleCoroutine;
class Locker
{
use Container;
public static function add(string $key, int $id): void
{
self::$container[$key][] = $id;
}
public static function clear(string $key): void
{
unset(self::$container[$key]);
}
public static function lock(string $key): bool
{
if (! self::has($key)) {
self::add($key, 0);
return true;
}
self::add($key, Coroutine::id());
// TODO: When the verion of `hyperf/engine` >= 2.0, use `Co::yield()` instead.
match (Constant::ENGINE) {
'Swoole' => SwooleCoroutine::yield(),
/* @phpstan-ignore-next-line */
default => Co::yield(),
};
return false;
}
public static function unlock(string $key): void
{
if (self::has($key)) {
$ids = self::get($key);
foreach ($ids as $id) {
if ($id > 0) {
// TODO: When the verion of `hyperf/engine` >= 2.0, use `Co::resumeById()` instead.
match (Constant::ENGINE) {
'Swoole' => SwooleCoroutine::resume($id),
/* @phpstan-ignore-next-line */
default => Co::resumeById($id),
};
}
}
self::clear($key);
}
}
}

View File

@ -0,0 +1,111 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Hyperf\Coroutine\Exception\ParallelExecutionException;
use Hyperf\Engine\Channel;
use Throwable;
use function sprintf;
class Parallel
{
/**
* @var callable[]
*/
protected array $callbacks = [];
protected ?Channel $concurrentChannel = null;
protected array $results = [];
/**
* @var Throwable[]
*/
protected array $throwables = [];
/**
* @param int $concurrent if $concurrent is equal to 0, that means unlimit
*/
public function __construct(int $concurrent = 0)
{
if ($concurrent > 0) {
$this->concurrentChannel = new Channel($concurrent);
}
}
public function add(callable $callable, $key = null)
{
if (is_null($key)) {
$this->callbacks[] = $callable;
} else {
$this->callbacks[$key] = $callable;
}
}
public function wait(bool $throw = true): array
{
$wg = new WaitGroup();
$wg->add(count($this->callbacks));
foreach ($this->callbacks as $key => $callback) {
$this->concurrentChannel && $this->concurrentChannel->push(true);
$this->results[$key] = null;
Coroutine::create(function () use ($callback, $key, $wg) {
try {
$this->results[$key] = $callback();
} catch (Throwable $throwable) {
$this->throwables[$key] = $throwable;
unset($this->results[$key]);
} finally {
$this->concurrentChannel && $this->concurrentChannel->pop();
$wg->done();
}
});
}
$wg->wait();
if ($throw && ($throwableCount = count($this->throwables)) > 0) {
$message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($this->throwables);
$executionException = new ParallelExecutionException($message);
$executionException->setResults($this->results);
$executionException->setThrowables($this->throwables);
unset($this->results, $this->throwables);
throw $executionException;
}
return $this->results;
}
public function count(): int
{
return count($this->callbacks);
}
public function clear(): void
{
$this->callbacks = [];
$this->results = [];
$this->throwables = [];
}
/**
* Format throwables into a nice list.
*
* @param Throwable[] $throwables
*/
private function formatThrowables(array $throwables): string
{
$output = '';
foreach ($throwables as $key => $value) {
$output .= sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());
}
return $output;
}
}

View File

@ -0,0 +1,61 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine\Traits;
trait Container
{
protected static array $container = [];
/**
* Add a value to container by identifier.
* @param mixed $value
*/
public static function set(string $id, $value)
{
static::$container[$id] = $value;
}
/**
* Finds an entry of the container by its identifier and returns it,
* Returns $default when does not exist in the container.
* @param null|mixed $default
*/
public static function get(string $id, $default = null)
{
return static::$container[$id] ?? $default;
}
/**
* Returns true if the container can return an entry for the given identifier.
* Returns false otherwise.
*/
public static function has(string $id): bool
{
return isset(static::$container[$id]);
}
/**
* Returns the container.
*/
public static function list(): array
{
return static::$container;
}
/**
* Clear the container.
*/
public static function clear(): void
{
static::$container = [];
}
}

View File

@ -0,0 +1,79 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use BadMethodCallException;
use Hyperf\Engine\Channel;
use InvalidArgumentException;
/**
* This file mostly code come from `swoole/library`.
*/
class WaitGroup
{
protected Channel $chan;
protected int $count = 0;
protected bool $waiting = false;
public function __construct(int $delta = 0)
{
$this->chan = new Channel(1);
if ($delta > 0) {
$this->add($delta);
}
}
public function add(int $delta = 1): void
{
if ($this->waiting) {
throw new BadMethodCallException('WaitGroup misuse: add called concurrently with wait');
}
$count = $this->count + $delta;
if ($count < 0) {
throw new InvalidArgumentException('WaitGroup misuse: negative counter');
}
$this->count = $count;
}
public function done(): void
{
$count = $this->count - 1;
if ($count < 0) {
throw new BadMethodCallException('WaitGroup misuse: negative counter');
}
$this->count = $count;
if ($count === 0 && $this->waiting) {
$this->chan->push(true);
}
}
public function wait(float $timeout = -1): bool
{
if ($this->waiting) {
throw new BadMethodCallException('WaitGroup misuse: reused before previous wait has returned');
}
if ($this->count > 0) {
$this->waiting = true;
$done = $this->chan->pop($timeout);
$this->waiting = false;
return $done;
}
return true;
}
public function count(): int
{
return $this->count;
}
}

View File

@ -0,0 +1,61 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Coroutine;
use Closure;
use Hyperf\Coroutine\Exception\ExceptionThrower;
use Hyperf\Coroutine\Exception\WaitTimeoutException;
use Hyperf\Engine\Channel;
use Throwable;
class Waiter
{
protected float $pushTimeout = 10.0;
protected float $popTimeout = 10.0;
public function __construct(float $timeout = 10.0)
{
$this->popTimeout = $timeout;
}
/**
* @param null|float $timeout seconds
*/
public function wait(Closure $closure, ?float $timeout = null)
{
if ($timeout === null) {
$timeout = $this->popTimeout;
}
$channel = new Channel(1);
Coroutine::create(function () use ($channel, $closure) {
try {
$result = $closure();
} catch (Throwable $exception) {
$result = new ExceptionThrower($exception);
} finally {
$channel->push($result ?? null, $this->pushTimeout);
}
});
$result = $channel->pop($timeout);
if ($result === false && $channel->isTimeout()) {
throw new WaitTimeoutException(sprintf('Channel wait failed, reason: Timed out for %s s', $timeout));
}
if ($result instanceof ExceptionThrower) {
throw $result->getThrowable();
}
return $result;
}
}

View File

@ -0,0 +1,90 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Exception;
use Hyperf\Context\ApplicationContext;
use Hyperf\Coroutine\Concurrent;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine;
/**
* @internal
* @coversNothing
*/
class ConcurrentTest extends TestCase
{
protected function setUp(): void
{
$this->getContainer();
}
public function testConcurrent()
{
$concurrent = new Concurrent($limit = 10, 1);
$this->assertSame($limit, $concurrent->getLimit());
$this->assertTrue($concurrent->isEmpty());
$this->assertFalse($concurrent->isFull());
$count = 0;
for ($i = 0; $i < 15; ++$i) {
$concurrent->create(function () use (&$count) {
Coroutine::sleep(0.1);
++$count;
});
}
$this->assertTrue($concurrent->isFull());
$this->assertSame(5, $count);
$this->assertSame($limit, $concurrent->getRunningCoroutineCount());
$this->assertSame($limit, $concurrent->getLength());
$this->assertSame($limit, $concurrent->length());
while (! $concurrent->isEmpty()) {
Coroutine::sleep(0.1);
}
$this->assertSame(15, $count);
}
public function testException()
{
$con = new Concurrent(10, 1);
$count = 0;
for ($i = 0; $i < 15; ++$i) {
$con->create(function () use (&$count) {
Coroutine::sleep(0.1);
++$count;
throw new Exception('ddd');
});
}
$this->assertSame(5, $count);
$this->assertSame(10, $con->getRunningCoroutineCount());
while (! $con->isEmpty()) {
Coroutine::sleep(0.1);
}
$this->assertSame(15, $count);
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
$container->shouldReceive('has')->andReturn(false);
ApplicationContext::setContainer($container);
}
}

View File

@ -0,0 +1,103 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Exception;
use Hyperf\Context\ApplicationContext;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Engine\Channel;
use Hyperf\Engine\Exception\CoroutineDestroyedException;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Throwable;
/**
* @internal
* @coversNothing
*/
class CoroutineTest extends TestCase
{
protected function tearDown(): void
{
Mockery::close();
}
public function testCoroutineParentId()
{
$pid = Coroutine::id();
Coroutine::create(function () use ($pid) {
$this->assertSame($pid, Coroutine::parentId());
$pid = Coroutine::id();
$id = Coroutine::create(function () use ($pid) {
$this->assertSame($pid, Coroutine::parentId(Coroutine::id()));
usleep(1000);
});
Coroutine::create(function () use ($pid) {
$this->assertSame($pid, Coroutine::parentId());
});
$this->assertSame($pid, Coroutine::parentId($id));
});
}
public function testCoroutineParentIdHasBeenDestroyed()
{
$id = Coroutine::create(function () {
});
try {
Coroutine::parentId($id);
$this->assertTrue(false);
} catch (Throwable $exception) {
$this->assertInstanceOf(CoroutineDestroyedException::class, $exception);
}
}
/**
* @group NonCoroutine
*/
public function testCoroutineInTopCoroutine()
{
run(function () {
$this->assertSame(0, Coroutine::parentId());
});
}
public function testCoroutineAndDeferWithException()
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('has')->withAnyArgs()->andReturnTrue();
$container->shouldReceive('get')->with(StdoutLoggerInterface::class)->andReturn($logger = Mockery::mock(StdoutLoggerInterface::class));
$logger->shouldReceive('warning')->with('unit')->twice()->andReturnNull();
$container->shouldReceive('get')->with(FormatterInterface::class)->andReturn($formatter = Mockery::mock(FormatterInterface::class));
$formatter->shouldReceive('format')->with($exception = new Exception())->twice()->andReturn('unit');
$chan = new Channel(1);
go(static function () use ($chan, $exception) {
defer(static function () use ($chan, $exception) {
try {
throw $exception;
} finally {
$chan->push(1);
}
});
throw $exception;
});
$this->assertTrue(true);
}
}

View File

@ -0,0 +1,89 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Engine\Channel;
use PHPUnit\Framework\TestCase;
use Swoole\Runtime;
use function Hyperf\Coroutine\defer;
use function Hyperf\Coroutine\go;
use function Hyperf\Coroutine\parallel;
use function Hyperf\Coroutine\run;
/**
* @internal
* @coversNothing
*/
class FunctionTest extends TestCase
{
public function testReturnOfGo()
{
$uniqid = uniqid();
$id = go(function () use (&$uniqid) {
$uniqid = 'Hyperf';
});
$this->assertTrue(is_int($id));
$this->assertSame('Hyperf', $uniqid);
}
/**
* @group NonCoroutine
*/
public function testRun()
{
$asserts = [
SWOOLE_HOOK_ALL,
SWOOLE_HOOK_SLEEP,
SWOOLE_HOOK_CURL,
];
foreach ($asserts as $flags) {
run(function () use ($flags) {
$this->assertTrue(Coroutine::inCoroutine());
$this->assertSame($flags, Runtime::getHookFlags());
}, $flags);
}
}
public function testDefer()
{
$channel = new Channel(10);
parallel([function () use ($channel) {
defer(function () use ($channel) {
$channel->push(0);
});
defer(function () use ($channel) {
$channel->push(1);
defer(function () use ($channel) {
$channel->push(2);
});
defer(function () use ($channel) {
$channel->push(3);
});
});
defer(function () use ($channel) {
$channel->push(4);
});
$channel->push(5);
}]);
$this->assertSame(5, $channel->pop(0.001));
$this->assertSame(4, $channel->pop(0.001));
$this->assertSame(1, $channel->pop(0.001));
$this->assertSame(3, $channel->pop(0.001));
$this->assertSame(2, $channel->pop(0.001));
$this->assertSame(0, $channel->pop(0.001));
}
}

View File

@ -0,0 +1,55 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Hyperf\Coroutine\Locker;
use Hyperf\Engine\Channel;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class LockerTest extends TestCase
{
public function testLockAndUnlock()
{
$chan = new Channel(10);
go(function () use ($chan) {
Locker::lock('foo');
$chan->push(1);
usleep(10000);
$chan->push(2);
Locker::unlock('foo');
});
go(function () use ($chan) {
Locker::lock('foo');
$chan->push(3);
usleep(10000);
$chan->push(4);
});
go(function () use ($chan) {
Locker::lock('foo');
$chan->push(5);
$chan->push(6);
});
$ret = [];
while ($res = $chan->pop(1)) {
$ret[] = $res;
}
$this->assertSame([1, 2, 3, 5, 6, 4], $ret);
}
}

View File

@ -0,0 +1,283 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Exception;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Coroutine\Exception\ParallelExecutionException;
use Hyperf\Coroutine\Parallel;
use PHPUnit\Framework\TestCase;
use RuntimeException;
use Throwable;
/**
* @internal
* @covers \Hyperf\Coroutine\Parallel
*/
class ParallelTest extends TestCase
{
public function testParallel()
{
// Closure
$parallel = new Parallel();
for ($i = 0; $i < 3; ++$i) {
$parallel->add(function () {
return Coroutine::id();
});
}
$result = $parallel->wait();
$id = $result[0];
$this->assertSame([$id, $id + 1, $id + 2], $result);
// Array
$parallel = new Parallel();
for ($i = 0; $i < 3; ++$i) {
$parallel->add([$this, 'returnCoId']);
}
$result = $parallel->wait();
$id = $result[0];
$this->assertSame([$id, $id + 1, $id + 2], $result);
}
public function testParallelConcurrent()
{
$parallel = new Parallel();
$num = 0;
$callback = function () use (&$num) {
++$num;
Coroutine::sleep(0.01);
return $num;
};
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$this->assertSame([4, 4, 4, 4], array_values($res));
$parallel = new Parallel(2);
$num = 0;
$callback = function () use (&$num) {
++$num;
Coroutine::sleep(0.01);
return $num;
};
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
sort($res);
$this->assertSame([2, 3, 4, 4], array_values($res));
$num = 10;
$callbacks = [];
for ($i = 0; $i < 4; ++$i) {
$callbacks[] = function () use (&$num) {
++$num;
Coroutine::sleep(0.01);
return $num;
};
}
$res = parallel($callbacks, 2);
sort($res);
$this->assertSame([12, 13, 14, 14], array_values($res));
}
public function testParallelCallbackCount()
{
$parallel = new Parallel();
$callback = function () {
return 1;
};
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$this->assertEquals(count($res), 4);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$this->assertEquals(count($res), 8);
}
public function testParallelClear()
{
$parallel = new Parallel();
$callback = function () {
return 1;
};
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$parallel->clear();
$this->assertEquals(count($res), 4);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$parallel->clear();
$this->assertEquals(count($res), 4);
}
public function testParallelKeys()
{
$parallel = new Parallel();
$callback = function () {
return 1;
};
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback);
}
$res = $parallel->wait();
$parallel->clear();
$this->assertSame([1, 1, 1, 1], $res);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback, 'id_' . $i);
}
$res = $parallel->wait();
$parallel->clear();
$this->assertSame(['id_0' => 1, 'id_1' => 1, 'id_2' => 1, 'id_3' => 1], $res);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($callback, $i - 1);
}
$res = $parallel->wait();
$parallel->clear();
$this->assertSame([-1 => 1, 0 => 1, 1 => 1, 2 => 1], $res);
$parallel->add($callback, 1.0);
$res = $parallel->wait();
$parallel->clear();
$this->assertSame([1.0 => 1], $res);
}
public function testParallelThrows()
{
$parallel = new Parallel();
$err = function () {
Coroutine::sleep(0.001);
throw new RuntimeException('something bad happened');
};
$ok = function () {
Coroutine::sleep(0.001);
return 1;
};
$parallel->add($err);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($ok);
}
$this->expectException(ParallelExecutionException::class);
$res = $parallel->wait();
}
public function testParallelResultsAndThrows()
{
$parallel = new Parallel();
$err = function () {
Coroutine::sleep(0.001);
throw new RuntimeException('something bad happened');
};
$parallel->add($err);
$ids = [1 => uniqid(), 2 => uniqid(), 3 => uniqid(), 4 => uniqid()];
foreach ($ids as $id) {
$parallel->add(function () use ($id) {
Coroutine::sleep(0.001);
return $id;
});
}
try {
$parallel->wait();
throw new RuntimeException();
} catch (ParallelExecutionException $exception) {
foreach (['Detecting', 'RuntimeException', '#0'] as $keyword) {
$this->assertTrue(str_contains($exception->getMessage(), $keyword));
}
$result = $exception->getResults();
$this->assertEquals($ids, $result);
$throwables = $exception->getThrowables();
$this->assertTrue(count($throwables) === 1);
$this->assertSame('something bad happened', $throwables[0]->getMessage());
}
}
public function testParallelCount()
{
$parallel = new Parallel();
$id = 0;
$parallel->add(static function () use (&$id) {
++$id;
});
$parallel->add(static function () use (&$id) {
++$id;
});
$this->assertSame(2, $parallel->count());
$parallel->wait();
$this->assertSame(2, $parallel->count());
$this->assertSame(2, $id);
$parallel->wait();
$this->assertSame(2, $parallel->count());
$this->assertSame(4, $id);
}
public function testTheResultSort()
{
$res = parallel(['a' => function () {
usleep(1000);
return 1;
}, 'b' => function () {
return 2;
}]);
$this->assertSame(['a' => 1, 'b' => 2], $res);
$res = parallel(['a' => function () {
usleep(1000);
return 1;
}, 'b' => function () {
}]);
$this->assertSame(['a' => 1, 'b' => null], $res);
}
public function testThrowExceptionInParallel()
{
try {
parallel([
static function () {
throw new Exception();
},
]);
} catch (ParallelExecutionException $exception) {
/** @var Throwable $exception */
$exception = $exception->getThrowables()[0];
$traces = $exception->getTrace();
ob_start();
var_dump($traces);
$content = ob_get_clean();
$this->assertStringNotContainsString('*RECURSION*', $content);
}
}
public function returnCoId()
{
return Coroutine::id();
}
}

View File

@ -0,0 +1,54 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Hyperf\Coroutine\WaitGroup;
use PHPUnit\Framework\TestCase;
use Swoole\Coroutine;
/**
* @internal
* @coversNothing
*/
class WaitGroupTest extends TestCase
{
public function testWaitAgain()
{
$wg = new WaitGroup();
$wg->add(2);
$result = [];
$i = 2;
while ($i--) {
Coroutine::create(function () use ($wg, &$result) {
Coroutine::sleep(0.001);
$result[] = true;
$wg->done();
});
}
$wg->wait(1);
$this->assertTrue(count($result) === 2);
$wg->add();
$wg->add();
$result = [];
$i = 2;
while ($i--) {
Coroutine::create(function () use ($wg, &$result) {
Coroutine::sleep(0.001);
$result[] = true;
$wg->done();
});
}
$wg->wait(1);
$this->assertTrue(count($result) === 2);
}
}

View File

@ -0,0 +1,116 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Coroutine;
use Hyperf\Context\ApplicationContext;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Coroutine\Exception\WaitTimeoutException;
use Hyperf\Engine\Channel;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use RuntimeException;
/**
* @internal
* @coversNothing
*/
class WaiterTest extends TestCase
{
protected function setUp(): void
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(\Hyperf\Utils\Waiter::class)->andReturn(new \Hyperf\Utils\Waiter());
}
protected function tearDown(): void
{
Mockery::close();
}
public function testWait()
{
$id = uniqid();
$result = wait(function () use ($id) {
return $id;
});
$this->assertSame($id, $result);
$id = rand(0, 9999);
$result = wait(function () use ($id) {
return $id + 1;
});
$this->assertSame($id + 1, $result);
}
public function testWaitNone()
{
$callback = function () {
};
$result = wait($callback);
$this->assertSame($result, $callback());
$this->assertSame(null, $result);
$callback = function () {
return null;
};
$result = wait($callback);
$this->assertSame($result, $callback());
$this->assertSame(null, $result);
}
public function testWaitException()
{
$message = uniqid();
$callback = function () use ($message) {
throw new RuntimeException($message);
};
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage($message);
wait($callback);
}
public function testWaitReturnException()
{
$message = uniqid();
$callback = function () use ($message) {
return new RuntimeException($message);
};
$result = wait($callback);
$this->assertInstanceOf(RuntimeException::class, $result);
$this->assertSame($message, $result->getMessage());
}
public function testPushTimeout()
{
$channel = new Channel(1);
$this->assertSame(true, $channel->push(1, 1));
$this->assertSame(false, $channel->push(1, 1));
}
public function testTimeout()
{
$callback = function () {
Coroutine::sleep(0.5);
return true;
};
$this->expectException(WaitTimeoutException::class);
$this->expectExceptionMessage('Channel wait failed, reason: Timed out for 0.001 s');
wait($callback, 0.001);
}
}