Merge pull request #594 from limingxinleo/1.0-merge

Merge 1.0 into master.
This commit is contained in:
李铭昕 2019-09-20 10:21:35 +08:00 committed by GitHub
commit 7b1049934c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1019 additions and 116 deletions

View File

@ -48,6 +48,23 @@ return ApplicationContext::setContainer($container);
# v1.0.16 - TBD
## Added
- [#565](https://github.com/hyperf-cloud/hyperf/pull/565) Added options config for redis.
- [#580](https://github.com/hyperf-cloud/hyperf/pull/580) Added coroutine concurrency control features.
## Fixed
- [#564](https://github.com/hyperf-cloud/hyperf/pull/564) Fixed typehint error, when `Coroutine\Http2\Client->send` failed.
- [#567](https://github.com/hyperf-cloud/hyperf/pull/567) Fixed rpc-client `getReturnType` failed, when the name is not equal of service.
- [#571](https://github.com/hyperf-cloud/hyperf/pull/571) Fixed the next request will be effected after using stopPropagation.
- [#579](https://github.com/hyperf-cloud/hyperf/pull/579) Dynamic init snowflake meta data, fixed the problem that when using snowflake in command mode (e.g. `di:init-proxy`) will connect to redis server and wait timeout.
# Changed
- [#583](https://github.com/hyperf-cloud/hyperf/pull/583) Throw `GrpcClientException`, when `BaseClient::start` failed.
- [#585](https://github.com/hyperf-cloud/hyperf/pull/585) Throw exception when execute failed in task worker.
# v1.0.15 - 2019-09-11
## Fixed

View File

@ -3,7 +3,7 @@
[![Build Status](https://travis-ci.org/hyperf-cloud/hyperf.svg?branch=master)](https://travis-ci.org/hyperf-cloud/hyperf)
<a href="https://packagist.org/packages/hyperf/hyperf"><img src="https://poser.pugx.org/hyperf/hyperf/v/stable.svg" alt="Latest Stable Version"></a>
[![Php Version](https://img.shields.io/badge/php-%3E=7.2-brightgreen.svg?maxAge=2592000)](https://secure.php.net/)
[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.3.3-brightgreen.svg?maxAge=2592000)](https://github.com/swoole/swoole-src)
[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4-brightgreen.svg?maxAge=2592000)](https://github.com/swoole/swoole-src)
[![Hyperf License](https://img.shields.io/github/license/hyperf-cloud/hyperf.svg?maxAge=2592000)](https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE.md)
# 介绍
@ -31,7 +31,7 @@ Hyperf 还提供了 `依赖注入`、`注解`、`AOP 面向切面编程`、`中
- Linux, OS X or Cygwin, WSL
- PHP 7.2+
- Swoole 4.3.3+ (4.4+ 更好)
- Swoole 4.4+
# 安全漏洞

View File

@ -2,7 +2,7 @@ English | [中文](./README-CN.md)
[![Build Status](https://travis-ci.org/hyperf-cloud/hyperf.svg?branch=master)](https://travis-ci.org/hyperf-cloud/hyperf)
[![Financial Contributors on Open Collective](https://opencollective.com/hyperf/all/badge.svg?label=financial+contributors)](https://opencollective.com/hyperf) [![Php Version](https://img.shields.io/badge/php-%3E=7.2-brightgreen.svg?maxAge=2592000)](https://secure.php.net/)
[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.3.3-brightgreen.svg?maxAge=2592000)](https://github.com/swoole/swoole-src)
[![Swoole Version](https://img.shields.io/badge/swoole-%3E=4.4-brightgreen.svg?maxAge=2592000)](https://github.com/swoole/swoole-src)
[![Hyperf License](https://img.shields.io/github/license/hyperf-cloud/hyperf.svg?maxAge=2592000)](https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE.md)
# Introduction

View File

@ -191,6 +191,7 @@
"HyperfTest\\Elasticsearch\\": "src/elasticsearch/tests/",
"HyperfTest\\Etcd\\": "src/etcd/tests/",
"HyperfTest\\Event\\": "src/event/tests/",
"HyperfTest\\ExceptionHandler\\": "src/exception-handler/tests/",
"HyperfTest\\GrpcClient\\": "src/grpc-client/tests/",
"HyperfTest\\GrpcServer\\": "src/grpc-server/tests/",
"HyperfTest\\Guzzle\\": "src/guzzle/tests/",

View File

@ -136,7 +136,48 @@ return [
## 定义服务消费者
一个 `服务消费者(ServiceConsumer)` 可以理解为就是一个客户端类,但在 Hyperf 里您无需处理连接和请求相关的事情,只需要定义一个类及相关属性即可。
一个 `服务消费者(ServiceConsumer)` 可以理解为就是一个客户端类,但在 Hyperf 里您无需处理连接和请求相关的事情,只需要进行一些鉴定配置即可。
### 自动创建代理消费者类
您可通过在 `config/autoload/services.php` 配置文件内进行一些简单的配置,即可通过动态代理自动创建消费者类。
```php
<?php
return [
'consumers' => [
[
// name 需与服务提供者的 name 属性相同
'name' => 'CalculatorService',
// 服务接口名,可选,默认值等于 name 配置的值,如果 name 直接定义为接口类则可忽略此行配置,如 name 为字符串则需要配置 service 对应到接口类
'service' => \App\JsonRpc\CalculatorServiceInterface::class,
// 对应容器对象 ID可选默认值等于 service 配置的值,用来定义依赖注入的 key
'id' => \App\JsonRpc\CalculatorServiceInterface::class,
// 服务提供者的服务协议,可选,默认值为 jsonrpc-http
'protocol' => 'jsonrpc-http',
// 负载均衡算法,可选,默认值为 random
'load_balancer' => 'random',
// 这个消费者要从哪个服务中心获取节点信息,如不配置则不会从服务中心获取节点信息
'registry' => [
'protocol' => 'consul',
'address' => 'http://127.0.0.1:8500',
],
// 如果没有指定上面的 registry 配置,即为直接对指定的节点进行消费,通过下面的 nodes 参数来配置服务提供者的节点信息
'nodes' => [
['host' => '127.0.0.1', 'port' => 9504],
],
]
],
];
```
在应用启动时会自动创建客户端类的代理对象,并在容器中使用配置项 `id` 的值(如果未设置,会使用配置项 `service` 值代替)来添加绑定关系,这样就和手工编写的客户端类一样,通过注入 `CalculatorServiceInterface` 接口来直接使用客户端。
> 当服务提供者使用接口类名发布服务名,在服务消费端只需要设置配置项 `name` 值为接口类名,不需要重复设置配置项 `id``service`
### 手动创建消费者类
如您对消费者类有更多的需求,您可通过手动创建一个消费者类来实现,只需要定义一个类及相关属性即可。
```php
<?php
@ -202,32 +243,58 @@ return [
这样便可以通过注入 `CalculatorServiceInterface` 接口来使用客户端了。
### 自动创建代理客户端类
### 配置复用
当服务提供者通过接口类实现服务,在服务消费端可以通过动态代理自动创建客户端类
通常来说,一个服务消费者会同时消费多个服务提供者,当我们通过服务中心来发现服务提供者时, `config/autoload/services.php` 配置文件内就可能会重复配置很多次 `registry` 配置,但通常来说,我们的服务中心可能是统一的,也就意味着多个服务消费者配置都是从同样的服务中心去拉取节点信息,此时我们可以通过 `PHP 变量``循环` 等 PHP 代码来实现配置文件的生成
只需要在配置文件中加入配置项 `service` 即可:
#### 通过 PHP 变量生成配置
```php
<?php
$registry = [
'protocol' => 'consul',
'address' => 'http://127.0.0.1:8500',
];
return [
// 下面的 FooService 和 BarService 仅示例多服务,并不是在文档示例中真实存在的
'consumers' => [
[
// 服务接口名,可选,默认值等于 name
'service' => \App\JsonRpc\CalculatorServiceInterface::class,
// 对应容器对象ID可选默认值等于 service
'id' => \App\JsonRpc\CalculatorServiceInterface::class,
// 服务提供者的服务协议,可选,默认值为 jsonrpc-http
'protocol' => 'jsonrpc-http',
// 负载均衡算法,可选,默认值为 random
'load_balancer' => 'random',
// 此处省略其它配置
'name' => 'FooService',
'registry' => $registry,
],
[
'name' => 'BarService',
'registry' => $registry,
]
],
];
```
在应用启动时会自动创建客户端类的代理对象,并在容器中使用配置项 `id` 的值(如果未设置,会使用配置项 `service` 值代替)来添加绑定关系,这样就和手工编写的客户端类一样,通过注入 `CalculatorServiceInterface` 接口来直接使用客户端。
#### 通过循环生成配置
> 当服务提供者使用接口类名发布服务名,在服务消费端只需要设置配置项 `name` 值为接口类名,不需要重复设置配置项 `id``service`
```php
<?php
return [
'consumers' => value(function () {
$consumers = [];
// 这里示例自动创建代理消费者类的配置形式,顾存在 name 和 service 两个配置项,这里的做法不是唯一的,仅说明可以通过 PHP 代码来生成配置
// 下面的 FooServiceInterface 和 BarServiceInterface 仅示例多服务,并不是在文档示例中真实存在的
$services = [
'FooService' => App\JsonRpc\FooServiceInterface::class,
'BarService' => App\JsonRpc\BarServiceInterface::class,
];
foreach ($services as $name => $interface) {
$consumers[] = [
'name' => $name,
'service' => $interface,
'registry' => [
'protocol' => 'consul',
'address' => 'http://127.0.0.1:8500',
]
];
}
return $consumers;
}),
];
```

View File

@ -22,6 +22,7 @@
<directory suffix="Test.php">./src/dispatcher/tests</directory>
<directory suffix="Test.php">./src/elasticsearch/tests</directory>
<directory suffix="Test.php">./src/event/tests</directory>
<directory suffix="Test.php">./src/exception-handler/tests</directory>
<directory suffix="Test.php">./src/grpc-client/tests</directory>
<directory suffix="Test.php">./src/grpc-server/tests</directory>
<directory suffix="Test.php">./src/guzzle/tests</directory>

1
src/exception-handler/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
/tests export-ignore

View File

@ -15,6 +15,7 @@
},
"autoload-dev": {
"psr-4": {
"HyperfTest\\ExceptionHandler\\": "tests/"
}
},
"require": {

View File

@ -17,13 +17,6 @@ use Throwable;
abstract class ExceptionHandler
{
/**
* Determine if the exception should propagate to next handler.
*
* @var bool
*/
protected $propagationStopped = false;
/**
* Handle the exception, and return the specified result.
*/
@ -43,8 +36,8 @@ abstract class ExceptionHandler
*/
public function stopPropagation(): bool
{
$this->propagationStopped = true;
return $this->propagationStopped;
Propagation::instance()->setPropagationStopped(true);
return true;
}
/**
@ -54,6 +47,6 @@ abstract class ExceptionHandler
*/
public function isPropagationStopped(): bool
{
return $this->propagationStopped;
return Propagation::instance()->isPropagationStopped();
}
}

View File

@ -0,0 +1,38 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\ExceptionHandler;
use Hyperf\Utils\Traits\StaticInstance;
class Propagation
{
use StaticInstance;
/**
* Determine if the exception should propagate to next handler.
*
* @var bool
*/
protected $propagationStopped = false;
public function isPropagationStopped(): bool
{
return $this->propagationStopped;
}
public function setPropagationStopped(bool $propagationStopped): Propagation
{
$this->propagationStopped = $propagationStopped;
return $this;
}
}

View File

@ -0,0 +1,90 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\ExceptionHandler;
use Hyperf\ExceptionHandler\ExceptionHandlerDispatcher;
use Hyperf\HttpMessage\Base\Response;
use Hyperf\Utils\Context;
use HyperfTest\ExceptionHandler\Stub\BarExceptionHandler;
use HyperfTest\ExceptionHandler\Stub\FooExceptionHandler;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
/**
* @internal
* @coversNothing
*/
class ExceptionHandlerTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
Context::set('test.exception-handler.latest-handler', null);
}
public function testStopPropagation()
{
$handlers = [
BarExceptionHandler::class,
FooExceptionHandler::class,
];
$container = $this->getContainer();
parallel([function () use ($container, $handlers) {
$exception = new \Exception('xxx', 500);
Context::set(ResponseInterface::class, new Response());
$dispatcher = new ExceptionHandlerDispatcher($container);
$dispatcher->dispatch($exception, $handlers);
$this->assertSame(FooExceptionHandler::class, Context::get('test.exception-handler.latest-handler'));
}]);
parallel([function () use ($container, $handlers) {
$exception = new \Exception('xxx', 0);
Context::set(ResponseInterface::class, new Response());
$dispatcher = new ExceptionHandlerDispatcher($container);
$dispatcher->dispatch($exception, $handlers);
$this->assertSame(BarExceptionHandler::class, Context::get('test.exception-handler.latest-handler'));
}]);
parallel([function () use ($container, $handlers) {
$exception = new \Exception('xxx', 500);
Context::set(ResponseInterface::class, new Response());
$dispatcher = new ExceptionHandlerDispatcher($container);
$dispatcher->dispatch($exception, $handlers);
$this->assertSame(FooExceptionHandler::class, Context::get('test.exception-handler.latest-handler'));
}]);
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
$container->shouldReceive('has')->andReturn(true);
$container->shouldReceive('get')->with(BarExceptionHandler::class)->andReturn(new BarExceptionHandler());
$container->shouldReceive('get')->with(FooExceptionHandler::class)->andReturn(new FooExceptionHandler());
return $container;
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\ExceptionHandler\Stub;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\Utils\Context;
use Psr\Http\Message\ResponseInterface;
use Throwable;
class BarExceptionHandler extends ExceptionHandler
{
public function handle(Throwable $throwable, ResponseInterface $response)
{
Context::set('test.exception-handler.latest-handler', static::class);
if ($throwable->getCode() === 0) {
$this->stopPropagation();
}
return $response;
}
public function isValid(Throwable $throwable): bool
{
return true;
}
}

View File

@ -0,0 +1,32 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\ExceptionHandler\Stub;
use Hyperf\ExceptionHandler\ExceptionHandler;
use Hyperf\Utils\Context;
use Psr\Http\Message\ResponseInterface;
use Throwable;
class FooExceptionHandler extends ExceptionHandler
{
public function handle(Throwable $throwable, ResponseInterface $response)
{
Context::set('test.exception-handler.latest-handler', static::class);
return $response;
}
public function isValid(Throwable $throwable): bool
{
return true;
}
}

View File

@ -43,7 +43,14 @@ class BaseClient
$this->grpcClient = new GrpcClient(ApplicationContext::getContainer()->get(ChannelPool::class));
$this->grpcClient->set($hostname, $options);
}
$this->start();
if (! $this->start()) {
$message = sprintf(
'Grpc client start failed with error code %d when connect to %s',
$this->getGrpcClient()->getErrCode(),
$hostname
);
throw new GrpcClientException($message, StatusCode::INTERNAL);
}
}
public function __get($name)

View File

@ -14,4 +14,4 @@ namespace Hyperf\GrpcClient\Exception;
class GrpcClientException extends \RuntimeException
{
}
}

View File

@ -13,6 +13,8 @@ declare(strict_types=1);
namespace Hyperf\GrpcClient;
use BadMethodCallException;
use Hyperf\Grpc\StatusCode;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use Hyperf\Utils\ChannelPool;
use Hyperf\Utils\Coroutine;
use InvalidArgumentException;
@ -231,6 +233,9 @@ class GrpcClient
} else {
$streamId = $this->getHttpClient()->send($request);
}
if ($streamId === false) {
throw new GrpcClientException('Failed to send the request to server', StatusCode::INTERNAL);
}
if ($streamId > 0) {
$this->recvChannelMap[$streamId] = $this->channelPool->get();
}
@ -257,6 +262,11 @@ class GrpcClient
return false;
}
public function getErrCode(): int
{
return $this->httpClient ? $this->httpClient->errCode : 0;
}
/**
* @param bool|float $yield
*/

View File

@ -0,0 +1,52 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\GrpcClient;
use Hyperf\Di\Container;
use Hyperf\GrpcClient\BaseClient;
use Hyperf\GrpcClient\Exception\GrpcClientException;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\ChannelPool;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class BaseClientTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
}
public function testGrpcClientStartFailed()
{
$this->getContainer();
$this->expectException(GrpcClientException::class);
$client = new BaseClient('127.0.0.1:1111');
}
public function getContainer()
{
$container = \Mockery::mock(Container::class);
$container->shouldReceive('get')->with(ChannelPool::class)->andReturn(new ChannelPool());
ApplicationContext::setContainer($container);
return $container;
}
}

View File

@ -34,6 +34,7 @@ class RedisConnection extends BaseConnection implements ConnectionInterface
'auth' => null,
'db' => 0,
'timeout' => 0.0,
'options' => [],
];
/**
@ -81,6 +82,13 @@ class RedisConnection extends BaseConnection implements ConnectionInterface
throw new ConnectionException('Connection reconnect failed.');
}
$options = $this->config['options'] ?? [];
foreach ($options as $name => $value) {
// The name is int, value is string.
$redis->setOption($name, $value);
}
if (isset($auth) && $auth !== '') {
$redis->auth($auth);
}

View File

@ -47,6 +47,7 @@ class RedisConnectionTest extends TestCase
'auth' => 'redis',
'db' => 0,
'timeout' => 0.0,
'options' => [],
'pool' => [
'min_connections' => 1,
'max_connections' => 30,

View File

@ -0,0 +1,109 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Redis;
use Hyperf\Config\Config;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Di\Container;
use Hyperf\Pool\Channel;
use Hyperf\Pool\LowFrequencyInterface;
use Hyperf\Pool\PoolOption;
use Hyperf\Redis\Frequency;
use Hyperf\Redis\Pool\PoolFactory;
use Hyperf\Redis\Pool\RedisPool;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class RedisProxyTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
$redis = $this->getRedis();
$redis->del('test');
$redis->del('test:test');
}
public function testRedisOptionPrefix()
{
$redis = $this->getRedis([
\Redis::OPT_PREFIX => 'test:',
]);
$redis->set('test', 'yyy');
$this->assertSame('yyy', $redis->get('test'));
$this->assertSame('yyy', $this->getRedis()->get('test:test'));
}
public function testRedisOptionSerializer()
{
$redis = $this->getRedis([
\Redis::OPT_SERIALIZER => (string) \Redis::SERIALIZER_PHP,
]);
$redis->set('test', 'yyy');
$this->assertSame('yyy', $redis->get('test'));
$this->assertSame('s:3:"yyy";', $this->getRedis()->get('test'));
}
/**
* @param mixed $optinos
* @return \Redis
*/
private function getRedis($optinos = [])
{
$container = Mockery::mock(Container::class);
$container->shouldReceive('get')->once()->with(ConfigInterface::class)->andReturn(new Config([
'redis' => [
'default' => [
'host' => 'localhost',
'auth' => null,
'port' => 6379,
'db' => 0,
'options' => $optinos,
'pool' => [
'min_connections' => 1,
'max_connections' => 30,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60,
],
],
],
]));
$pool = new RedisPool($container, 'default');
$frequency = Mockery::mock(LowFrequencyInterface::class);
$frequency->shouldReceive('isLowFrequency')->andReturn(false);
$container->shouldReceive('make')->with(Frequency::class, Mockery::any())->andReturn($frequency);
$container->shouldReceive('make')->with(RedisPool::class, ['name' => 'default'])->andReturn($pool);
$container->shouldReceive('make')->with(Channel::class, ['size' => 30])->andReturn(new Channel(30));
$container->shouldReceive('make')->with(PoolOption::class, Mockery::any())->andReturnUsing(function ($class, $args) {
return new PoolOption(...array_values($args));
});
ApplicationContext::setContainer($container);
$factory = new PoolFactory($container);
return new Redis($factory);
}
}

View File

@ -17,7 +17,6 @@ use Hyperf\Di\Container;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BootApplication;
use Hyperf\RpcClient\ProxyFactory;
use Hyperf\Utils\Arr;
use Psr\Container\ContainerInterface;
class AddConsumerDefinitionListener implements ListenerInterface
@ -69,7 +68,10 @@ class AddConsumerDefinitionListener implements ListenerInterface
$container,
$consumer['name'],
$consumer['protocol'] ?? 'jsonrpc-http',
Arr::only($consumer, ['load_balancer'])
[
'load_balancer' => $consumer['load_balancer'] ?? 'random',
'service_interface' => $serviceClass,
]
);
}
);

View File

@ -26,6 +26,11 @@ class ServiceClient extends AbstractServiceClient
*/
protected $methodDefinitionCollector;
/**
* @var string
*/
protected $serviceInterface;
/**
* @var NormalizerInterface
*/
@ -52,7 +57,7 @@ class ServiceClient extends AbstractServiceClient
}
if (isset($response['result'])) {
$type = $this->methodDefinitionCollector->getReturnType($this->serviceName, $method);
$type = $this->methodDefinitionCollector->getReturnType($this->serviceInterface, $method);
return $this->normalizer->denormalize($response['result'], $type->getName());
}
@ -81,6 +86,8 @@ class ServiceClient extends AbstractServiceClient
protected function setOptions(array $options): void
{
$this->serviceInterface = $options['service_interface'] ?? $this->serviceName;
if (isset($options['load_balancer'])) {
$this->loadBalancer = $options['load_balancer'];
}

View File

@ -0,0 +1,71 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Snowflake\MetaGenerator;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Redis\RedisProxy;
use Hyperf\Snowflake\ConfigurationInterface;
use Hyperf\Snowflake\MetaGenerator;
abstract class RedisMetaGenerator extends MetaGenerator
{
const DEFAULT_REDIS_KEY = 'hyperf:snowflake:workerId';
/**
* @var ConfigInterface
*/
protected $config;
protected $workerId;
protected $dataCenterId;
public function __construct(ConfigurationInterface $configuration, int $beginTimestamp, ConfigInterface $config)
{
parent::__construct($configuration, $beginTimestamp);
$this->config = $config;
}
public function init()
{
if (is_null($this->workerId) || is_null($this->dataCenterId)) {
$pool = $this->config->get(sprintf('snowflake.%s.pool', static::class), 'default');
/** @var \Redis $redis */
$redis = make(RedisProxy::class, [
'pool' => $pool,
]);
$key = $this->config->get(sprintf('snowflake.%s.key', static::class), static::DEFAULT_REDIS_KEY);
$id = $redis->incr($key);
$this->workerId = $id % $this->configuration->maxWorkerId();
$this->dataCenterId = intval($id / $this->configuration->maxWorkerId()) % $this->configuration->maxDataCenterId();
}
}
public function getDataCenterId(): int
{
$this->init();
return $this->dataCenterId;
}
public function getWorkerId(): int
{
$this->init();
return $this->workerId;
}
}

View File

@ -13,44 +13,13 @@ declare(strict_types=1);
namespace Hyperf\Snowflake\MetaGenerator;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Redis\RedisProxy;
use Hyperf\Snowflake\ConfigurationInterface;
use Hyperf\Snowflake\MetaGenerator;
class RedisMilliSecondMetaGenerator extends MetaGenerator
class RedisMilliSecondMetaGenerator extends RedisMetaGenerator
{
const DEFAULT_REDIS_KEY = 'hyperf:snowflake:workerId';
protected $workerId;
protected $dataCenterId;
public function __construct(ConfigurationInterface $configuration, int $beginTimestamp = self::DEFAULT_BEGIN_SECOND, ConfigInterface $config)
{
parent::__construct($configuration, $beginTimestamp * 1000);
$pool = $config->get('snowflake.' . static::class . '.pool', 'default');
/** @var \Redis $redis */
$redis = make(RedisProxy::class, [
'pool' => $pool,
]);
$key = $config->get(sprintf('snowflake.%s.key', static::class), static::DEFAULT_REDIS_KEY);
$id = $redis->incr($key);
$this->workerId = $id % $configuration->maxWorkerId();
$this->dataCenterId = intval($id / $configuration->maxWorkerId()) % $configuration->maxDataCenterId();
}
public function getDataCenterId(): int
{
return $this->dataCenterId;
}
public function getWorkerId(): int
{
return $this->workerId;
parent::__construct($configuration, $beginTimestamp * 1000, $config);
}
public function getTimestamp(): int

View File

@ -13,44 +13,13 @@ declare(strict_types=1);
namespace Hyperf\Snowflake\MetaGenerator;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Redis\RedisProxy;
use Hyperf\Snowflake\ConfigurationInterface;
use Hyperf\Snowflake\MetaGenerator;
class RedisSecondMetaGenerator extends MetaGenerator
class RedisSecondMetaGenerator extends RedisMetaGenerator
{
const DEFAULT_REDIS_KEY = 'hyperf:snowflake:workerId';
protected $workerId;
protected $dataCenterId;
public function __construct(ConfigurationInterface $configuration, int $beginTimestamp = self::DEFAULT_BEGIN_SECOND, ConfigInterface $config)
{
parent::__construct($configuration, $beginTimestamp);
$pool = $config->get('snowflake.' . static::class . '.pool', 'default');
/** @var \Redis $redis */
$redis = make(RedisProxy::class, [
'pool' => $pool,
]);
$key = $config->get(sprintf('snowflake.%s.key', static::class), static::DEFAULT_REDIS_KEY);
$id = $redis->incr($key);
$this->workerId = $id % $configuration->maxWorkerId();
$this->dataCenterId = intval($id / $configuration->maxWorkerId()) % $configuration->maxDataCenterId();
}
public function getDataCenterId(): int
{
return $this->dataCenterId;
}
public function getWorkerId(): int
{
return $this->workerId;
parent::__construct($configuration, $beginTimestamp, $config);
}
public function getTimestamp(): int

View File

@ -16,8 +16,10 @@
"hyperf/framework": "~1.1.0",
"hyperf/utils": "~1.1.0",
"psr/container": "^1.0",
"psr/event-dispatcher": "^1.0",
"psr/log": "^1.0",
"psr/event-dispatcher": "^1.0"
"symfony/property-access": "^4.3",
"symfony/serializer": "^4.3"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^2.9",

View File

@ -0,0 +1,36 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Task;
use Hyperf\Utils\Serializer\ExceptionNormalizer;
use Psr\Container\ContainerInterface;
use Throwable;
class Exception
{
/**
* @var string
*/
public $class;
/**
* @var array|bool|float|int|string
*/
public $attributes;
public function __construct(ContainerInterface $container, Throwable $throwable)
{
$this->class = get_class($throwable);
$this->attributes = $container->get(ExceptionNormalizer::class)->normalize($throwable);
}
}

View File

@ -14,6 +14,7 @@ namespace Hyperf\Task\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\OnTask;
use Hyperf\Task\Exception;
use Hyperf\Task\Finish;
use Hyperf\Task\Task;
use Hyperf\Task\TaskExecutor;
@ -48,21 +49,28 @@ class OnTaskListener implements ListenerInterface
$executor = $this->container->get(TaskExecutor::class);
$executor->setIsTaskEnvironment(true);
if (is_array($data->callback)) {
[$class, $method] = $data->callback;
if ($this->container->has($class)) {
$obj = $this->container->get($class);
$result = $obj->{$method}(...$data->arguments);
$this->setResult($event, $result);
return;
}
try {
$result = $this->call($data);
$this->setResult($event, $result);
} catch (\Throwable $throwable) {
$this->setResult($event, new Exception($this->container, $throwable));
}
$result = call($data->callback, $data->arguments);
$this->setResult($event, $result);
}
}
protected function call(Task $data)
{
if (is_array($data->callback)) {
[$class, $method] = $data->callback;
if ($this->container->has($class)) {
$obj = $this->container->get($class);
return $obj->{$method}(...$data->arguments);
}
}
return call($data->callback, $data->arguments);
}
protected function setResult(OnTask $event, $result)
{
$event->setResult(new Finish($result));

View File

@ -14,6 +14,7 @@ namespace Hyperf\Task;
use Hyperf\Task\Exception\TaskException;
use Hyperf\Task\Exception\TaskExecuteException;
use Hyperf\Utils\Serializer\ExceptionNormalizer;
use Swoole\Server;
class TaskExecutor
@ -28,14 +29,20 @@ class TaskExecutor
*/
protected $factory;
/**
* @var ExceptionNormalizer
*/
protected $normalizer;
/**
* @var bool
*/
protected $isTaskEnvironment = false;
public function __construct(ChannelFactory $factory)
public function __construct(ChannelFactory $factory, ExceptionNormalizer $normalizer)
{
$this->factory = $factory;
$this->normalizer = $normalizer;
}
public function setServer(Server $server): void
@ -53,7 +60,14 @@ class TaskExecutor
throw new TaskExecuteException('Task execute failed.');
}
return $this->factory->pop($taskId, $timeout);
$result = $this->factory->pop($taskId, $timeout);
if ($result instanceof Exception) {
$exception = $this->normalizer->denormalize($result->attributes, $result->class);
throw $exception;
}
return $result;
}
public function isTaskEnvironment(): bool

View File

@ -0,0 +1,90 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task\Cases;
use Hyperf\Framework\Event\OnTask;
use Hyperf\Task\ChannelFactory;
use Hyperf\Task\Exception;
use Hyperf\Task\Finish;
use Hyperf\Task\Listener\OnTaskListener;
use Hyperf\Task\Task;
use Hyperf\Task\TaskExecutor;
use Hyperf\Utils\Serializer\ExceptionNormalizer;
use HyperfTest\Task\Stub\Foo;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Swoole\Server;
/**
* @internal
* @coversNothing
*/
class OnTaskListenerTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
}
public function testProcess()
{
$container = $this->getContainer();
$listener = new OnTaskListener($container);
$id = uniqid();
$event = Mockery::mock(OnTask::class);
$event->task = new Server\Task();
$event->task->data = new Task([Foo::class, 'get'], [$id]);
$event->shouldReceive('setResult')->with(Mockery::any())->andReturnUsing(function ($result) use ($id) {
$this->assertInstanceOf(Finish::class, $result);
$this->assertSame($id, $result->data);
});
$listener->process($event);
}
public function testProcessException()
{
$container = $this->getContainer();
$listener = new OnTaskListener($container);
$id = uniqid();
$event = Mockery::mock(OnTask::class);
$event->task = new Server\Task();
$event->task->data = new Task([Foo::class, 'exception'], [$id]);
$event->shouldReceive('setResult')->with(Mockery::any())->andReturnUsing(function ($result) use ($id) {
$this->assertInstanceOf(Finish::class, $result);
$this->assertInstanceOf(Exception::class, $result->data);
$this->assertSame(\RuntimeException::class, $result->data->class);
$this->assertSame('Foo::exception failed.', $result->data->attributes['message']);
$this->assertSame(0, $result->data->attributes['code']);
});
$listener->process($event);
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
$normalizer = new ExceptionNormalizer();
$container->shouldReceive('get')->with(ExceptionNormalizer::class)->andReturn($normalizer);
$container->shouldReceive('has')->with(Mockery::any())->andReturn(true);
$container->shouldReceive('get')->with(TaskExecutor::class)->andReturn(new TaskExecutor(new ChannelFactory(), $normalizer));
$container->shouldReceive('get')->with(Foo::class)->andReturn(new Foo());
return $container;
}
}

View File

@ -0,0 +1,26 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task\Stub;
class Foo
{
public function get($id)
{
return $id;
}
public function exception()
{
throw new \RuntimeException('Foo::exception failed.');
}
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task\Stub;
use Hyperf\Server\Server;
class SwooleServer extends Server
{
}

View File

@ -28,7 +28,8 @@
"suggest": {
"symfony/var-dumper": "Required to use the dd function (^4.1).",
"symfony/serializer": "Required to use SymfonyNormalizer (^4.3)",
"symfony/property-access": "Required to use SymfonyNormalizer (^4.3)"
"symfony/property-access": "Required to use SymfonyNormalizer (^4.3)",
"hyperf/di": "Required to use ExceptionNormalizer"
},
"autoload": {
"files": [

View File

@ -0,0 +1,103 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Utils\Coroutine;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Coroutine;
use Swoole\Coroutine\Channel;
/**
* @method bool isFull()
* @method bool isEmpty()
* @method array stats()
* @method int length()
*/
class Concurrent
{
/**
* @var Channel
*/
protected $channel;
/**
* @var float
*/
protected $timeout;
/**
* @var int
*/
protected $limit;
public function __construct(int $limit, float $timeout = 10.0)
{
$this->limit = $limit;
$this->channel = new Channel($limit);
$this->timeout = $timeout;
}
public function __call($name, $arguments)
{
if (in_array($name, ['isFull', 'isEmpty', 'length', 'stats'])) {
return $this->channel->{$name}(...$arguments);
}
}
public function getLimit(): int
{
return $this->limit;
}
public function getLength(): int
{
return $this->channel->length();
}
public function getRunningCoroutineCount(): int
{
return $this->getLength();
}
public function getTimeout(): float
{
return $this->timeout;
}
public function create(callable $callable): void
{
while (true) {
if ($this->channel->push(true, $this->getTimeout())) {
break;
}
}
Coroutine::create(function () use ($callable) {
try {
$callable();
} catch (\Throwable $exception) {
if (ApplicationContext::hasContainer()) {
$container = ApplicationContext::getContainer();
if ($container->has(StdoutLoggerInterface::class) && $container->has(FormatterInterface::class)) {
$logger = $container->get(StdoutLoggerInterface::class);
$formatter = $container->get(FormatterInterface::class);
$logger->error($formatter->format($exception));
}
}
} finally {
$this->channel->pop($this->getTimeout());
}
});
}
}

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\Utils\Serializer;
use Doctrine\Instantiator\Instantiator;
use Hyperf\Di\ReflectionManager;
use Symfony\Component\Serializer\Normalizer\CacheableSupportsMethodInterface;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
@ -19,6 +20,11 @@ use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface, CacheableSupportsMethodInterface
{
/**
* @var Instantiator
*/
protected $instantiator;
/**
* {@inheritdoc}
*/
@ -35,8 +41,8 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
}
if (is_array($data) && isset($data['message'], $data['code'])) {
try {
$exception = new $class($data['message'], $data['code']);
foreach (['file', 'line'] as $attribute) {
$exception = $this->getInstantiator()->instantiate($class);
foreach (['code', 'message', 'file', 'line'] as $attribute) {
if (isset($data[$attribute])) {
$property = ReflectionManager::reflectProperty($class, $attribute);
$property->setAccessible(true);
@ -102,4 +108,13 @@ class ExceptionNormalizer implements NormalizerInterface, DenormalizerInterface,
{
return \get_class($this) === __CLASS__;
}
protected function getInstantiator(): Instantiator
{
if ($this->instantiator instanceof Instantiator) {
return $this->instantiator;
}
return $this->instantiator = new Instantiator();
}
}

View File

@ -0,0 +1,77 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Utils\Coroutine;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Coroutine\Concurrent;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine;
/**
* @internal
* @coversNothing
*/
class ConcurrentTest extends TestCase
{
protected function setUp()
{
$this->getContainer();
}
public function testConcurrent()
{
$concurrent = new Concurrent($limit = 10, 1);
$this->assertSame($limit, $concurrent->getLimit());
$this->assertTrue($concurrent->isEmpty());
$this->assertFalse($concurrent->isFull());
$count = 0;
for ($i = 0; $i < 15; ++$i) {
$concurrent->create(function () use (&$count) {
Coroutine::sleep(0.1);
++$count;
});
}
$this->assertTrue($concurrent->isFull());
$this->assertSame(5, $count);
$this->assertSame($limit, $concurrent->getRunningCoroutineCount());
$this->assertSame($limit, $concurrent->getLength());
$this->assertSame($limit, $concurrent->length());
}
public function testException()
{
$con = new Concurrent(10, 1);
$count = 0;
for ($i = 0; $i < 15; ++$i) {
$con->create(function () use (&$count) {
Coroutine::sleep(0.1);
++$count;
throw new \Exception('ddd');
});
}
$this->assertSame(5, $count);
$this->assertSame(10, $con->getRunningCoroutineCount());
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
$container->shouldReceive('has')->andReturn(false);
ApplicationContext::setContainer($container);
}
}

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
namespace HyperfTest\Utils\Serializer;
use Hyperf\Utils\Serializer\ExceptionNormalizer;
use HyperfTest\Utils\Stub\FooException;
use HyperfTest\Utils\Stub\SerializableException;
use PHPUnit\Framework\TestCase;
@ -42,5 +43,12 @@ class ExceptionNormalizerTest extends TestCase
$this->assertInstanceOf(SerializableException::class, $ret);
$this->assertEquals($ret->getMessage(), $ex->getMessage());
$this->assertEquals($ret, $ex);
$ex = new FooException(1000, 'invalid param foo');
$result = $normalizer->normalize($ex);
$ret = $normalizer->denormalize($result, FooException::class);
$this->assertInstanceOf(FooException::class, $ret);
$this->assertEquals($ret->getMessage(), $ex->getMessage());
$this->assertEquals($ret, $ex);
}
}

View File

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Utils\Stub;
class FooException extends \Exception
{
public function __construct($code = 0, $message = '')
{
parent::__construct($message, $code);
}
}