Merge branch 'master' into 1.1-superglobals

This commit is contained in:
李铭昕 2020-01-07 16:23:30 +08:00
commit 5c8e9035e5
59 changed files with 1422 additions and 151 deletions

View File

@ -1,14 +1,47 @@
# v1.1.13 - TBD
# v1.1.14 - TBD
## Added
- [#1166](https://github.com/hyperf/hyperf/pull/1166) Added KeepaliveIO for amqp.
- [#1219](https://github.com/hyperf/hyperf/pull/1219) Added property `enable` for amqp consumer, which controls whether consumers should start along with the service.
## Fixed
- [#1223](https://github.com/hyperf/hyperf/pull/1223) Fixed the scanner will missing the packages at require-dev of composer.json
## Optimized
- [#1174](https://github.com/hyperf/hyperf/pull/1174) Adjusted the format of exception printer of `Hyperf\Utils\Parallel`.
## Changed
- [#1227](https://github.com/hyperf/hyperf/pull/1227) Upgraded jcchavezs/zipkin-php-opentracing to 0.1.4.
# v1.1.13 - 2020-01-03
## Added
- [#1137](https://github.com/hyperf/hyperf/pull/1137) Added translator for constants.
- [#1165](https://github.com/hyperf/hyperf/pull/1165) Added a method `route` for `Hyperf\HttpServer\Contract\RequestInterface`.
- [#1195](https://github.com/hyperf/hyperf/pull/1195) Added max offset for `Cacheable` and `CachePut`.
- [#1204](https://github.com/hyperf/hyperf/pull/1204) Added `insertOrIgnore` for database.
- [#1216](https://github.com/hyperf/hyperf/pull/1216) Added default value for `$data` of `RenderInterface::render()`.
- [#1221](https://github.com/hyperf/hyperf/pull/1221) Added `traceId` and `spanId` of the `swoole-tracker` component.
## Fixed
- [#1175](https://github.com/hyperf/hyperf/pull/1175) Fixed `Hyperf\Utils\Collection::random` does not works when the number is null.
- [#1199](https://github.com/hyperf/hyperf/pull/1199) Fixed variadic arguments do not work in task annotation.
- [#1200](https://github.com/hyperf/hyperf/pull/1200) Request path shouldn't include query parameters in hyperf/metric middleware.
- [#1210](https://github.com/hyperf/hyperf/pull/1210) Fixed validation `size` does not works without `numeric` or `integer` rules when the type of value is numeric.
## Optimized
- [#1211](https://github.com/hyperf/hyperf/pull/1211) Convert app name to valid prometheus namespace.
## Changed
- [#1217](https://github.com/hyperf/hyperf/pull/1217) Replaced `zendframework/zend-mime` into `laminas/laminas-mine`.
# v1.1.12 - 2019-12-26

View File

@ -24,8 +24,9 @@
"grpc/grpc": "^1.15",
"guzzlehttp/guzzle": "^6.3",
"ircmaxell/random-lib": "^1.2",
"jcchavezs/zipkin-opentracing": "^0.1.2",
"jcchavezs/zipkin-opentracing": "^0.1.4",
"jean85/pretty-package-versions": "^1.2",
"laminas/laminas-mime": "^2.7",
"monolog/monolog": "^1.24",
"nesbot/carbon": "^2.0",
"nikic/fast-route": "^1.3",

View File

@ -159,7 +159,41 @@ class DemoConsumer extends ConsumerMessage
}
```
框架会根据 `@Consumer` 注解自动创建 `Process 进程`,进程意外退出后会被重新拉起。
### 禁止消费进程自启
默认情况下,使用了 `@Consumer` 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。
如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便。
这种情况,只需要在 `@Consumer` 注解中配置 `enable=false` (默认为 `true` 跟随服务启动)或者在对应的消费者中重写类方法 `isEnable()` 返回 `false` 即可
```php
<?php
declare(strict_types=1);
namespace App\Amqp\Consumers;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1, enable=false)
*/
class DemoConsumer extends ConsumerMessage
{
public function consume($data): string
{
print_r($data);
return Result::ACK;
}
public function isEnable(): bool
{
return parent::isEnable();
}
}
```
### 消费结果

View File

@ -115,3 +115,65 @@ class IndexController extends Controller
```
### 可变参数
在使用 `ErrorCode::getMessage(ErrorCode::SERVER_ERROR)` 来获取对应错误信息时,我们也可以传入可变参数,进行错误信息组合。比如以下情况
```php
<?php
use Hyperf\Constants\AbstractConstants;
use Hyperf\Constants\Annotation\Constants;
/**
* @Constants
*/
class ErrorCode extends AbstractConstants
{
/**
* @Message("Params %s is invalid.")
*/
const PARAMS_INVALID = 1000;
}
$message = ErrorCode::getMessage(ErrorCode::PARAMS_INVALID, ['user_id']);
// 1.2 版本以下 可以使用以下方式,但会在 1.2 版本移除
$message = ErrorCode::getMessage(ErrorCode::PARAMS_INVALID, 'user_id');
```
### 国际化
```
composer require hyperf/translation
```
相关配置详见 [国际化](./translation.md)
```php
<?php
// 国际化配置
return [
'params.invalid' => 'Params :param is invalid.',
];
use Hyperf\Constants\AbstractConstants;
use Hyperf\Constants\Annotation\Constants;
/**
* @Constants
*/
class ErrorCode extends AbstractConstants
{
/**
* @Message("params.invalid")
*/
const PARAMS_INVALID = 1000;
}
$message = ErrorCode::getMessage(ErrorCode::SERVER_ERROR, ['param' => 'user_id']);
```

View File

@ -30,14 +30,14 @@ class UserController
public function index(RequestInterface $request)
{
$currentPage = $request->input('page', 1);
$perPage = 2;
$perPage = $request->input('per_page', 2);
$users = [
['id' => 1, 'name' => 'Tom'],
['id' => 2, 'name' => 'Sam'],
['id' => 3, 'name' => 'Tim'],
['id' => 4, 'name' => 'Joe'],
];
return new Paginator($users, $perPage, $currentPage);
return new Paginator($users, (int) $perPage, (int) $currentPage);
}
}
```

View File

@ -76,6 +76,32 @@ class IndexController
}
```
除了可以通过依赖注入获取路由参数,还可以通过`route`方法获取,如下所示:
```php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\HttpServer\Annotation\AutoController;
/**
* @AutoController()
*/
class IndexController
{
public function info(RequestInterface $request)
{
// 存在则返回,不存在则返回默认值 null
$id = $request->route('id');
// 存在则返回,不存在则返回默认值 0
$id = $request->route('id', 0);
// ...
}
}
```
### 请求路径 & 方法
`Hyperf\HttpServer\Contract\RequestInterface` 除了使用 [PSR-7](https://www.php-fig.org/psr/psr-7/) 标准定义的 `APIs` 之外,还提供了多种方法来检查请求,下面我们提供一些方法的示例:

View File

@ -180,6 +180,18 @@ public function info(int $id)
}
```
通过`route`方法获取
```php
public function index(RequestInterface $request)
{
// 存在则返回,不存在则返回默认值 null
$id = $request->route('id');
// 存在则返回,不存在则返回默认值 0
$id = $request->route('id', 0);
}
```
#### 必填参数
我们可以对 `$uri` 进行一些参数定义,通过 `{}` 来声明参数,如 `/user/{id}` 则声明了 `id` 值为一个必填参数。

View File

@ -44,4 +44,9 @@ class Consumer extends AbstractAnnotation
* @var int
*/
public $nums = 1;
/**
* @var null|bool
*/
public $enable;
}

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
namespace Hyperf\Amqp;
use Hyperf\Amqp\Connection\AMQPSwooleConnection;
use Hyperf\Amqp\Connection\KeepaliveIO;
use Hyperf\Amqp\Pool\AmqpConnectionPool;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Pool\Connection as BaseConnection;
@ -113,6 +114,10 @@ class Connection extends BaseConnection implements ConnectionInterface
public function reconnect(): bool
{
if ($this->connection && $this->connection->getIO() instanceof KeepaliveIO) {
$this->connection->getIO()->close();
}
$this->connection = $this->initConnection();
$this->channel = null;
$this->confirmChannel = null;
@ -127,6 +132,10 @@ class Connection extends BaseConnection implements ConnectionInterface
public function close(): bool
{
$this->connection->close();
if ($this->connection->getIO() instanceof KeepaliveIO) {
$this->connection->getIO()->close();
}
$this->channel = null;
$this->confirmChannel = null;
return true;

View File

@ -32,7 +32,11 @@ class AMQPSwooleConnection extends AbstractConnection
bool $keepalive = false,
int $heartbeat = 0
) {
if ($keepalive) {
$io = new KeepaliveIO($host, $port, $connectionTimeout, $readWriteTimeout, $context, $keepalive, $heartbeat);
} else {
$io = new SwooleIO($host, $port, $connectionTimeout, $readWriteTimeout, $context, $keepalive, $heartbeat);
}
parent::__construct(
$user,
@ -47,4 +51,9 @@ class AMQPSwooleConnection extends AbstractConnection
$connectionTimeout
);
}
public function getIO()
{
return $this->io;
}
}

View File

@ -0,0 +1,228 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Connection;
use InvalidArgumentException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Swoole\Coroutine\Client;
class KeepaliveIO extends AbstractIO
{
/**
* @var string
*/
protected $host;
/**
* @var int
*/
protected $port;
/**
* @var float
*/
protected $connectionTimeout;
/**
* @var float
*/
protected $readWriteTimeout;
/**
* @var resource
*/
protected $context;
/**
* @var bool
*/
protected $keepalive;
/**
* @var int
*/
protected $heartbeat;
/**
* @var int
*/
private $initialHeartbeat;
/**
* @var Socket
*/
private $sock;
/**
* @var string
*/
private $buffer = '';
/**
* SwooleIO constructor.
*
* @param null|mixed $context
* @throws \InvalidArgumentException when readWriteTimeout argument does not 2x the heartbeat
*/
public function __construct(
string $host,
int $port,
float $connectionTimeout,
float $readWriteTimeout,
$context = null,
bool $keepalive = false,
int $heartbeat = 0
) {
if ($heartbeat !== 0 && ($readWriteTimeout < ($heartbeat * 2))) {
throw new InvalidArgumentException('Argument readWriteTimeout must be at least 2x the heartbeat.');
}
$this->host = $host;
$this->port = $port;
$this->connectionTimeout = $connectionTimeout;
$this->readWriteTimeout = $readWriteTimeout;
$this->context = $context;
$this->keepalive = $keepalive;
$this->heartbeat = $heartbeat;
$this->initialHeartbeat = $heartbeat;
}
/**
* Sets up the stream connection.
*/
public function connect()
{
$this->sock = make(Socket::class, [$this->host, $this->port, $this->connectionTimeout, $this->heartbeat]);
}
/**
* Reconnects the socket.
*/
public function reconnect()
{
$this->close();
$this->connect();
}
/**
* @param int $len
* @throws AMQPRuntimeException
* @return string
*/
public function read($len)
{
return $this->sock->call(function (Client $client) use ($len) {
do {
if ($len <= strlen($this->buffer)) {
$data = substr($this->buffer, 0, $len);
$this->buffer = substr($this->buffer, $len);
return $data;
}
if (! $client->connected) {
throw new AMQPRuntimeException('Broken pipe or closed connection');
}
$read_buffer = $client->recv($this->readWriteTimeout ? $this->readWriteTimeout : -1);
if ($read_buffer === false) {
throw new AMQPRuntimeException('Error receiving data, errno=' . $client->errCode);
}
if ($read_buffer === '') {
throw new AMQPRuntimeException('Connection is closed.');
}
$this->buffer .= $read_buffer;
} while (true);
});
}
/**
* @param string $data
* @throws AMQPRuntimeException
*/
public function write($data)
{
$this->sock->call(function ($client) use ($data) {
$buffer = $client->send($data);
if ($buffer === false) {
throw new AMQPRuntimeException('Error sending data');
}
});
}
/**
* No effect in KeeyaliveIO.
*/
public function check_heartbeat()
{
return true;
}
public function close()
{
if (isset($this->sock) && $this->sock instanceof Socket) {
$this->sock->close();
}
}
public function getSocket()
{
throw new AMQPRuntimeException('Socket of KeepaliveIO is forbidden to be used by others.');
}
/**
* @param int $sec
* @param int $usec
* @return int
*/
public function select($sec, $usec)
{
return 1;
}
/**
* @return $this
*/
public function disableHeartbeat()
{
$this->heartbeat = 0;
return $this;
}
/**
* @return $this
*/
public function reenableHeartbeat()
{
$this->heartbeat = $this->initialHeartbeat;
return $this;
}
/**
* Sends a heartbeat message.
*/
protected function write_heartbeat()
{
$this->sock->heartbeat();
}
protected function do_select($sec, $usec)
{
return 1;
}
}

View File

@ -0,0 +1,160 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Connection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPWriter;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Client;
use Swoole\Timer;
class Socket
{
/**
* @var Channel
*/
protected $channel;
/**
* @var null|int
*/
protected $timerId;
/**
* @var bool
*/
protected $connected = false;
/**
* @var string
*/
protected $host;
/**
* @var int
*/
protected $port;
/**
* @var float
*/
protected $timeout;
/**
* @var int
*/
protected $heartbeat;
/**
* @var float
*/
protected $waitTimeout = 10.0;
public function __construct(string $host, int $port, float $timeout, int $heartbeat)
{
$this->host = $host;
$this->port = $port;
$this->timeout = $timeout;
$this->heartbeat = $heartbeat;
$this->connect();
}
public function __destruct()
{
$this->clear();
}
public function call(\Closure $closure)
{
if (! $this->isConnected()) {
$this->connect();
}
$client = $this->channel->pop($this->waitTimeout);
$result = $closure($client);
$this->channel->push($client);
return $result;
}
public function connect()
{
$sock = new Client(SWOOLE_SOCK_TCP);
if (! $sock->connect($this->host, $this->port, $this->timeout)) {
throw new AMQPRuntimeException(
sprintf(
'Error Connecting to server(%s): %s ',
$sock->errCode,
swoole_strerror($sock->errCode)
),
$sock->errCode
);
}
$this->channel = new Channel(1);
$this->channel->push($sock);
$this->connected = true;
$this->addHeartbeat();
}
public function isConnected(): bool
{
return $this->connected;
}
public function close()
{
$this->connected = false;
$this->clear();
}
/**
* Sends a heartbeat message.
*/
public function heartbeat()
{
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$data = $pkt->getvalue();
$this->call(function ($client) use ($data) {
$buffer = $client->send($data);
if ($buffer === false) {
throw new AMQPRuntimeException('Error sending data');
}
});
}
protected function addHeartbeat()
{
$this->clear();
$this->timerId = Timer::tick($this->heartbeat * 1000, function () {
$this->heartbeat();
});
}
protected function clear()
{
if ($this->timerId) {
Timer::clear($this->timerId);
$this->timerId = null;
}
}
}

View File

@ -43,9 +43,11 @@ class ConsumerManager
if (! $instance instanceof ConsumerMessageInterface) {
continue;
}
$annotation->exchange && $instance->setExchange($annotation->exchange);
$annotation->routingKey && $instance->setRoutingKey($annotation->routingKey);
$annotation->queue && $instance->setQueue($annotation->queue);
! is_null($annotation->enable) && $instance->setEnable($annotation->enable);
property_exists($instance, 'container') && $instance->container = $this->container;
$nums = $annotation->nums;
$process = $this->createProcess($instance);
@ -79,6 +81,16 @@ class ConsumerManager
{
$this->consumer->consume($this->consumerMessage);
}
public function getConsumerMessage(): ConsumerMessageInterface
{
return $this->consumerMessage;
}
public function isEnable(): bool
{
return $this->consumerMessage->isEnable();
}
};
}
}

View File

@ -44,6 +44,11 @@ abstract class ConsumerMessage extends Message implements ConsumerMessageInterfa
*/
protected $qos;
/**
* @var bool
*/
protected $enable = true;
public function setQueue(string $queue): self
{
$this->queue = $queue;
@ -82,4 +87,15 @@ abstract class ConsumerMessage extends Message implements ConsumerMessageInterfa
{
return implode(',', (array) $this->getRoutingKey());
}
public function isEnable(): bool
{
return $this->enable;
}
public function setEnable(bool $enable): self
{
$this->enable = $enable;
return $this;
}
}

View File

@ -29,4 +29,8 @@ interface ConsumerMessageInterface extends MessageInterface
public function getQueueBuilder(): QueueBuilder;
public function getConsumerTag(): string;
public function isEnable(): bool;
public function setEnable(bool $enable);
}

View File

@ -38,6 +38,8 @@ class AmqpConnectionPool extends Pool
$this->config = $config->get($key);
$options = Arr::get($this->config, 'pool', []);
$this->frequency = make(Frequency::class);
parent::__construct($container, $options);
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Pool;
use Hyperf\Pool\Frequency as DefaultFrequency;
class Frequency extends DefaultFrequency
{
}

View File

@ -0,0 +1,93 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Amqp;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\ConsumerManager;
use Hyperf\Amqp\Message\ConsumerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Process\ProcessManager;
use HyperfTest\Amqp\Stub\ContainerStub;
use HyperfTest\Amqp\Stub\DemoConsumer;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class ConsumerManagerTest extends TestCase
{
protected function tearDown()
{
ProcessManager::clear();
}
public function testConsumerAnnotation()
{
$container = ContainerStub::getContainer();
AnnotationCollector::collectClass(DemoConsumer::class, Consumer::class, new Consumer([
'exchange' => $exchange = uniqid(),
'routingKey' => $routingKey = uniqid(),
'queue' => $queue = uniqid(),
'nums' => $nums = rand(1, 10),
]));
$manager = new ConsumerManager($container);
$manager->run();
$hasRegisted = false;
foreach (ProcessManager::all() as $item) {
if (method_exists($item, 'getConsumerMessage')) {
$hasRegisted = true;
/** @var ConsumerMessageInterface $message */
$message = $item->getConsumerMessage();
$this->assertTrue($item->isEnable());
$this->assertSame($exchange, $message->getExchange());
$this->assertSame($routingKey, $message->getRoutingKey());
$this->assertSame($queue, $message->getQueue());
$this->assertSame($nums, $item->nums);
break;
}
}
$this->assertTrue($hasRegisted);
}
public function testConsumerAnnotationNotEnable()
{
$container = ContainerStub::getContainer();
AnnotationCollector::collectClass(DemoConsumer::class, Consumer::class, new Consumer([
'exchange' => $exchange = uniqid(),
'routingKey' => $routingKey = uniqid(),
'queue' => $queue = uniqid(),
'nums' => $nums = rand(1, 10),
'enable' => false,
]));
$manager = new ConsumerManager($container);
$manager->run();
$hasRegisted = false;
foreach (ProcessManager::all() as $item) {
if (method_exists($item, 'getConsumerMessage')) {
$hasRegisted = true;
$this->assertFalse($item->isEnable());
break;
}
}
$this->assertTrue($hasRegisted);
}
}

View File

@ -0,0 +1,101 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Amqp;
use Hyperf\Amqp\Connection\KeepaliveIO;
use Hyperf\Amqp\Connection\Socket;
use Hyperf\Utils\Context;
use HyperfTest\Amqp\Stub\ContainerStub;
use HyperfTest\Amqp\Stub\SocketStub;
use Mockery;
use PhpAmqpLib\Wire\AMQPWriter;
use PHPUnit\Framework\TestCase;
use Swoole\Timer;
/**
* @internal
* @coversNothing
*/
class KeepaliveIOTest extends TestCase
{
protected function tearDown()
{
Mockery::close();
Timer::clearAll();
Context::set('test.amqp.send.data', null);
}
public function testKeepaliveConnect()
{
$host = '127.0.0.1';
$port = 5672;
$timeout = 5;
$heartbeat = 3;
$container = ContainerStub::getHyperfContainer();
$container->shouldReceive('make')->with(Socket::class, Mockery::any())->andReturnUsing(function ($_, $args) use ($host, $port, $timeout, $heartbeat) {
$this->assertEquals([$host, $port, $timeout, $heartbeat], $args);
return Mockery::mock(Socket::class);
});
$io = new KeepaliveIO($host, $port, $timeout, 6, null, true, $heartbeat);
$io->connect();
}
public function testKeepaliveRead()
{
$host = '127.0.0.1';
$port = 5672;
$timeout = 5;
$heartbeat = 3;
$container = ContainerStub::getHyperfContainer();
$container->shouldReceive('make')->with(Socket::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new SocketStub(...$args);
});
$io = new KeepaliveIO($host, $port, $timeout, 6, null, true, $heartbeat);
$io->connect();
$res = $io->read(10);
$this->assertTrue(strlen($res) === 10);
}
public function testKeepaliveHeartbeatTimer()
{
$host = '127.0.0.1';
$port = 5672;
$timeout = 5;
$heartbeat = 1;
$container = ContainerStub::getHyperfContainer();
$container->shouldReceive('make')->with(Socket::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new SocketStub(...$args);
});
$io = new KeepaliveIO($host, $port, $timeout, 6, null, true, $heartbeat);
$this->assertTrue(count(Timer::list()) === 0);
$io->connect();
$this->assertTrue(count(Timer::list()) === 1);
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$data = $pkt->getvalue();
$this->assertSame($data, Context::get('test.amqp.send.data'));
}
}

View File

@ -12,8 +12,10 @@ declare(strict_types=1);
namespace HyperfTest\Amqp\Stub;
use Hyperf\Amqp\Consumer;
use Hyperf\Amqp\Pool\PoolFactory;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Di\Container;
use Hyperf\Utils\ApplicationContext;
use Mockery;
use Psr\Container\ContainerInterface;
@ -41,7 +43,23 @@ class ContainerStub
$container->shouldReceive('has')->andReturnUsing(function ($class) {
return true;
});
$container->shouldReceive('get')->with(Consumer::class)->andReturnUsing(function () use ($container) {
return new Consumer($container, $container->get(PoolFactory::class), $container->get(StdoutLoggerInterface::class));
});
return $container;
}
public static function getHyperfContainer()
{
$container = Mockery::mock(Container::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(PoolFactory::class)->andReturn(new PoolFactory($container));
$container->shouldReceive('get')->with(EventDispatcherInterface::class)->andReturn(
Mockery::mock(EventDispatcherInterface::class)
);
$container->shouldReceive('has')->andReturn(true);
return $container;
}
}

View File

@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Amqp\Stub;
use Hyperf\Amqp\Connection\Socket;
use Hyperf\Utils\Context;
use Hyperf\Utils\Str;
use Mockery;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\Client;
class SocketStub extends Socket
{
public function connect()
{
$sock = Mockery::mock(Client::class);
$sock->connected = true;
$sock->shouldReceive('send')->andReturnUsing(function ($data) {
Context::set('test.amqp.send.data', $data);
});
$sock->shouldReceive('recv')->andReturnUsing(function ($timeout) {
return Str::random(1);
});
$this->channel = new Channel(1);
$this->channel->push($sock);
$this->connected = true;
$this->addHeartbeat();
$this->heartbeat();
}
}

View File

@ -21,6 +21,7 @@
"friendsofphp/php-cs-fixer": "^2.9"
},
"suggest": {
"hyperf/translation": "Required to use translation."
},
"autoload": {
"psr-4": {

View File

@ -13,6 +13,8 @@ declare(strict_types=1);
namespace Hyperf\Constants;
use Hyperf\Constants\Exception\ConstantsException;
use Hyperf\Contract\TranslatorInterface;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Str;
abstract class AbstractConstants
@ -35,10 +37,36 @@ abstract class AbstractConstants
array_shift($arguments);
if (count($arguments) > 0) {
if ($result = self::translate($message, $arguments)) {
return $result;
}
$count = count($arguments);
if ($count > 0) {
if ($count === 1 && is_array($arguments[0])) {
return sprintf($message, ...$arguments[0]);
}
// TODO: Removed in v1.2
return sprintf($message, ...$arguments);
}
return $message;
}
protected static function translate($key, $arguments): ?string
{
if (! ApplicationContext::hasContainer() || ! ApplicationContext::getContainer()->has(TranslatorInterface::class)) {
return null;
}
$replace = $arguments[0] ?? [];
if (! is_array($replace)) {
return null;
}
$translator = ApplicationContext::getContainer()->get(TranslatorInterface::class);
return $translator->trans($key, $replace);
}
}

View File

@ -14,7 +14,14 @@ namespace HyperfTest\Constants;
use Hyperf\Constants\AnnotationReader;
use Hyperf\Constants\ConstantsCollector;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Contract\TranslatorInterface;
use Hyperf\Translation\ArrayLoader;
use Hyperf\Translation\Translator;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Context;
use HyperfTest\Constants\Stub\ErrorCodeStub;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
@ -32,10 +39,14 @@ class AnnotationReaderTest extends TestCase
$data = $reader->getAnnotations($classConstants);
ConstantsCollector::set(ErrorCodeStub::class, $data);
Context::set(sprintf('%s::%s', TranslatorInterface::class, 'locale'), null);
}
public function testGetAnnotations()
{
$this->getContainer();
$data = ConstantsCollector::get(ErrorCodeStub::class);
$this->assertSame('Server Error!', $data[ErrorCodeStub::SERVER_ERROR]['message']);
@ -47,6 +58,8 @@ class AnnotationReaderTest extends TestCase
public function testGetMessageWithArguments()
{
$this->getContainer();
$res = ErrorCodeStub::getMessage(ErrorCodeStub::PARAMS_INVALID);
$this->assertSame('Params[%s] is invalid.', $res);
@ -54,5 +67,47 @@ class AnnotationReaderTest extends TestCase
$res = ErrorCodeStub::getMessage(ErrorCodeStub::PARAMS_INVALID, 'user_id');
$this->assertSame('Params[user_id] is invalid.', $res);
$res = ErrorCodeStub::getMessage(ErrorCodeStub::PARAMS_INVALID, ['order_id']);
$this->assertSame('Params[order_id] is invalid.', $res);
}
public function testGetMessageUsingTranslator()
{
$container = $this->getContainer(true);
$res = ErrorCodeStub::getMessage(ErrorCodeStub::SERVER_ERROR);
$this->assertSame('Server Error!', $res);
$res = ErrorCodeStub::getMessage(ErrorCodeStub::TRANSLATOR_ERROR_MESSAGE);
$this->assertSame('Error Message', $res);
$res = ErrorCodeStub::getMessage(ErrorCodeStub::TRANSLATOR_NOT_EXIST, ['name' => 'Hyperf']);
$this->assertSame('Hyperf is not exist.', $res);
Context::set(sprintf('%s::%s', TranslatorInterface::class, 'locale'), 'zh_CN');
$res = ErrorCodeStub::getMessage(ErrorCodeStub::TRANSLATOR_ERROR_MESSAGE);
$this->assertSame('错误信息', $res);
}
protected function getContainer($has = false)
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(TranslatorInterface::class)->andReturnUsing(function () {
$loader = new ArrayLoader();
$loader->addMessages('en', 'error', [
'message' => 'Error Message',
'not_exist' => ':name is not exist.',
]);
$loader->addMessages('zh_CN', 'error', ['message' => '错误信息']);
return new Translator($loader, 'en');
});
$container->shouldReceive('has')->andReturn($has);
return $container;
}
}

View File

@ -33,4 +33,14 @@ class ErrorCodeStub extends AbstractConstants
* @Message("Params[%s] is invalid.")
*/
const PARAMS_INVALID = 503;
/**
* @Message("error.message")
*/
const TRANSLATOR_ERROR_MESSAGE = 504;
/**
* @Message("error.not_exist")
*/
const TRANSLATOR_NOT_EXIST = 505;
}

View File

@ -2188,6 +2188,28 @@ class Builder
return $this->connection->insert($this->grammar->compileInsertUsing($this, $columns, $sql), $this->cleanBindings($bindings));
}
/**
* Insert ignore a new record into the database.
*/
public function insertOrIgnore(array $values): int
{
if (empty($values)) {
return 0;
}
if (! is_array(reset($values))) {
$values = [$values];
} else {
foreach ($values as $key => $value) {
ksort($value);
$values[$key] = $value;
}
}
return $this->connection->affectingStatement(
$this->grammar->compileInsertOrIgnore($this, $values),
$this->cleanBindings(Arr::flatten($values, 1))
);
}
/**
* Update a record in the database.
*

View File

@ -104,6 +104,14 @@ class Grammar extends BaseGrammar
return 'RANDOM()';
}
/**
* Compile an insert ignore statement into SQL.
*/
public function compileInsertOrIgnore(Builder $query, array $values)
{
throw new RuntimeException('This database engine does not support insert or ignore.');
}
/**
* Compile an exists statement into SQL.
*

View File

@ -64,6 +64,14 @@ class MySqlGrammar extends Grammar
return $sql;
}
/**
* Compile an insert ignore statement into SQL.
*/
public function compileInsertOrIgnore(Builder $query, array $values): string
{
return substr_replace($this->compileInsert($query, $values), ' ignore', 6, 0);
}
/**
* Compile the random statement into SQL.
*

View File

@ -28,6 +28,7 @@ use Hyperf\Utils\Context;
use InvalidArgumentException;
use Mockery;
use PHPUnit\Framework\TestCase;
use RuntimeException;
/**
* @internal
@ -518,6 +519,22 @@ class QueryBuilderTest extends TestCase
$this->assertEquals([0 => 1], $builder->getBindings());
}
public function testInsertOrIgnoreMethod()
{
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('This database engine does not support insert or ignore.');
$builder = $this->getBuilder();
$builder->from('users')->insertOrIgnore(['email' => 'foo']);
}
public function testMySqlInsertOrIgnoreMethod()
{
$builder = $this->getMySqlBuilder();
$builder->getConnection()->shouldReceive('affectingStatement')->once()->with('insert ignore into `users` (`email`) values (?)', ['foo'])->andReturn(1);
$result = $builder->from('users')->insertOrIgnore(['email' => 'foo']);
$this->assertEquals(1, $result);
}
public function testEmptyWhereIns()
{
$builder = $this->getBuilder();

View File

@ -15,6 +15,7 @@ namespace Hyperf\Di\Aop;
use Closure;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Di\Exception\Exception;
use Hyperf\Di\ReflectionManager;
class ProceedingJoinPoint
{
@ -100,4 +101,12 @@ class ProceedingJoinPoint
return $result;
});
}
public function getReflectMethod(): \ReflectionMethod
{
return ReflectionManager::reflectMethod(
$this->className,
$this->methodName
);
}
}

View File

@ -11,7 +11,7 @@
],
"require": {
"psr/http-message": "^1.0",
"zendframework/zend-mime": "^2.7"
"laminas/laminas-mime": "^2.7"
},
"require-dev": {
"swoft/swoole-ide-helper": "dev-master",

View File

@ -14,7 +14,7 @@ namespace Hyperf\HttpMessage\Base;
use Hyperf\HttpMessage\Stream\SwooleStream;
use Psr\Http\Message\StreamInterface;
use Zend\Mime\Decode;
use Laminas\Mime\Decode;
/**
* Trait implementing functionality common to requests and responses.

View File

@ -65,6 +65,12 @@ interface RequestInterface extends ServerRequestInterface
*/
public function header(string $key, $default = null);
/**
* Retrieve the data from route parameters.
* @param mixed $default
*/
public function route(string $key, $default = null);
/**
* Returns the path being requested relative to the executed script.
* The path info always starts with a /.

View File

@ -14,6 +14,7 @@ namespace Hyperf\HttpServer;
use Hyperf\HttpMessage\Upload\UploadedFile;
use Hyperf\HttpServer\Contract\RequestInterface;
use Hyperf\HttpServer\Router\Dispatched;
use Hyperf\Utils\Arr;
use Hyperf\Utils\Context;
use Hyperf\Utils\Str;
@ -59,6 +60,20 @@ class Request implements RequestInterface
return data_get($this->getQueryParams(), $key, $default);
}
/**
* Retrieve the data from route parameters.
*
* @param mixed $default
*/
public function route(string $key, $default = null)
{
$route = $this->getAttribute(Dispatched::class);
if (is_null($route)) {
return $default;
}
return array_key_exists($key, $route->params) ? $route->params[$key] : $default;
}
/**
* Retrieve the data from parsed body, if $key is null, will return all parsed body.
*

View File

@ -21,6 +21,7 @@ use Hyperf\Metric\Contract\CounterInterface;
use Hyperf\Metric\Contract\GaugeInterface;
use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Utils\Str;
use InfluxDB\Client;
use InfluxDB\Database;
use InfluxDB\Database\RetentionPolicy;
@ -140,6 +141,7 @@ class MetricFactory implements MetricFactoryInterface
private function getNamespace(): string
{
return $this->config->get("metric.metric.{$this->name}.namespace");
$name = $this->config->get("metric.metric.{$this->name}.namespace");
return preg_replace('#[^a-zA-Z0-9:_]#', '_', Str::snake($name));
}
}

View File

@ -20,6 +20,7 @@ use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Metric\Exception\InvalidArgumentException;
use Hyperf\Metric\Exception\RuntimeException;
use Hyperf\Utils\Str;
use Prometheus\CollectorRegistry;
use Prometheus\RenderTextFormat;
use Swoole\Coroutine;
@ -139,7 +140,8 @@ class MetricFactory implements MetricFactoryInterface
private function getNamespace(): string
{
return $this->config->get("metric.metric.{$this->name}.namespace");
$name = $this->config->get("metric.metric.{$this->name}.namespace");
return preg_replace('#[^a-zA-Z0-9:_]#', '_', Str::snake($name));
}
private function guardConfig()

View File

@ -64,7 +64,6 @@ class Redis implements Adapter
/**
* Redis constructor.
* @param array $options
*/
public function __construct(array $options = [])
{
@ -73,7 +72,7 @@ class Redis implements Adapter
}
/**
* Create an instance from an established redis connection
* Create an instance from an established redis connection.
* @param \Hyperf\Redis\Redis|\Redis $redis
* @return Redis
*/
@ -88,17 +87,11 @@ class Redis implements Adapter
return $self;
}
/**
* @param array $options
*/
public static function setDefaultOptions(array $options): void
{
self::$defaultOptions = array_merge(self::$defaultOptions, $options);
}
/**
* @param string $prefix
*/
public static function setPrefix(string $prefix): void
{
self::$prefix = $prefix;
@ -132,7 +125,6 @@ class Redis implements Adapter
}
/**
* @param array $data
* @throws StorageException
*/
public function updateHistogram(array $data): void
@ -171,7 +163,6 @@ LUA
}
/**
* @param array $data
* @throws StorageException
*/
public function updateGauge(array $data): void
@ -209,7 +200,6 @@ LUA
}
/**
* @param array $data
* @throws StorageException
*/
public function updateCounter(array $data): void
@ -261,9 +251,6 @@ LUA
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->options['read_timeout']);
}
/**
* @return bool
*/
private function connectToServer(): bool
{
try {
@ -280,9 +267,6 @@ LUA
}
}
/**
* @return array
*/
private function collectHistograms(): array
{
$keys = $this->redis->sMembers(self::$prefix . Histogram::TYPE . self::PROMETHEUS_METRIC_KEYS_SUFFIX);
@ -351,9 +335,6 @@ LUA
return $histograms;
}
/**
* @return array
*/
private function collectGauges(): array
{
$keys = $this->redis->sMembers(self::$prefix . Gauge::TYPE . self::PROMETHEUS_METRIC_KEYS_SUFFIX);
@ -380,9 +361,6 @@ LUA
return $gauges;
}
/**
* @return array
*/
private function collectCounters(): array
{
$keys = $this->redis->sMembers(self::$prefix . Counter::TYPE . self::PROMETHEUS_METRIC_KEYS_SUFFIX);
@ -409,10 +387,6 @@ LUA
return $counters;
}
/**
* @param int $cmd
* @return string
*/
private function getRedisCommand(int $cmd): string
{
switch ($cmd) {
@ -427,10 +401,6 @@ LUA
}
}
/**
* @param array $data
* @return string
*/
private function toMetricKey(array $data): string
{
return implode(':', [self::$prefix, $data['type'], $data['name']]);

View File

@ -1,5 +1,15 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Adapter\Prometheus;
use Hyperf\Contract\ConfigInterface;
@ -7,7 +17,8 @@ use Psr\Container\ContainerInterface;
class RedisStorageFactory
{
public function __invoke(ContainerInterface $container){
public function __invoke(ContainerInterface $container)
{
$redis = $container->get(\Redis::class);
$appName = $container->get(ConfigInterface::class)->get('app_name', 'skeleton');
Redis::setPrefix($appName);

View File

@ -18,7 +18,6 @@ interface MetricFactoryInterface
* Create a Counter.
* @param string $name name of the metric
* @param string[] $labelNames key of your label kvs
* @return CounterInterface
*/
public function makeCounter(string $name, ?array $labelNames = []): CounterInterface;
@ -26,7 +25,6 @@ interface MetricFactoryInterface
* Create a Gauge.
* @param string $name name of the metric
* @param string[] $labelNames key of your label kvs
* @return GaugeInterface
*/
public function makeGauge(string $name, ?array $labelNames = []): GaugeInterface;
@ -34,7 +32,6 @@ interface MetricFactoryInterface
* Create a HistogramInterface.
* @param string $name name of the metric
* @param string[] $labelNames key of your label kvs
* @return HistogramInterface
*/
public function makeHistogram(string $name, ?array $labelNames = []): HistogramInterface;

View File

@ -15,58 +15,30 @@ namespace Hyperf\Metric\Listener;
use Hyperf\Contract\ConfigInterface;
use Hyperf\DbConnection\Pool\PoolFactory;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
/**
* A simple mysql connection watcher served as an example.
* This listener is not auto enabled.Tweak it to fit your
* own need.
*/
class DBPoolWatcher implements ListenerInterface
class DBPoolWatcher extends PoolWatcher implements ListenerInterface
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeWorkerStart::class,
];
}
public $prefix = 'mysql';
/**
* Periodically scan metrics.
*/
public function process(object $event)
{
$config = $this->container->get(ConfigInterface::class);
$poolNames = array_keys($config->get('databases', ['default' => []]));
foreach ($poolNames as $poolName) {
$workerId = $event->workerId;
$pool = $this
->container
->get(PoolFactory::class)
->getPool('default');
$gauge = $this
->container
->get(MetricFactoryInterface::class)
->makeGauge('mysql_connections_in_use', ['pool', 'worker'])
->with('default', (string) $workerId);
$config = $this->container->get(ConfigInterface::class);
$timerInterval = $config->get('metric.default_metric_interval', 5);
Timer::tick($timerInterval * 1000, function () use ($gauge, $pool) {
$gauge->set((float) $pool->getCurrentConnections());
});
->getPool($poolName);
$this->watch($pool, $poolName, $workerId);
}
}
}

View File

@ -0,0 +1,85 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Metric\Listener;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Pool\Pool;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
abstract class PoolWatcher
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var string
*/
private $prefix = '';
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeWorkerStart::class,
];
}
/**
* Periodically scan metrics.
*/
abstract public function process(object $event);
public function watch(Pool $pool, string $poolName, int $workerId)
{
$connectionsInUseGauge = $this
->container
->get(MetricFactoryInterface::class)
->makeGauge($this->prefix . '_connections_in_use', ['pool', 'worker'])
->with($poolName, (string) $workerId);
$connectionsInWaitingGauge = $this
->container
->get(MetricFactoryInterface::class)
->makeGauge($this->prefix . '_connections_in_waiting', ['pool', 'worker'])
->with($poolName, (string) $workerId);
$maxConnectionsGauge = $this
->container
->get(MetricFactoryInterface::class)
->makeGauge($this->prefix . '_max_connections', ['pool', 'worker'])
->with($poolName, (string) $workerId);
$config = $this->container->get(ConfigInterface::class);
$timerInterval = $config->get('metric.default_metric_interval', 5);
Timer::tick($timerInterval * 1000, function () use (
$connectionsInUseGauge,
$connectionsInWaitingGauge,
$maxConnectionsGauge,
$pool
) {
$maxConnectionsGauge->set((float) $pool->getOption()->getMaxConnections());
$connectionsInWaitingGauge->set((float) $pool->getConnectionsInChannel());
$connectionsInUseGauge->set((float) $pool->getCurrentConnections());
});
}
}

View File

@ -14,59 +14,31 @@ namespace Hyperf\Metric\Listener;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Redis\Pool\PoolFactory;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
/**
* A simple mysql connection watcher served as an example.
* This listener is not auto enabled.Tweak it to fit your
* own need.
*/
class RedisPoolWatcher implements ListenerInterface
class RedisPoolWatcher extends PoolWatcher implements ListenerInterface
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeWorkerStart::class,
];
}
public $prefix = 'redis';
/**
* Periodically scan metrics.
*/
public function process(object $event)
{
$config = $this->container->get(ConfigInterface::class);
$poolNames = array_keys($config->get('redis', ['default' => []]));
foreach ($poolNames as $poolName) {
$workerId = $event->workerId;
$pool = $this
->container
->get(PoolFactory::class)
->getPool('default');
$gauge = $this
->container
->get(MetricFactoryInterface::class)
->makeGauge('redis_connections_in_use', ['pool', 'worker'])
->with('default', (string) $workerId);
$config = $this->container->get(ConfigInterface::class);
$timerInterval = $config->get('metric.default_metric_interval', 5);
Timer::tick($timerInterval * 1000, function () use ($gauge, $pool) {
$gauge->set((float) $pool->getCurrentConnections());
});
->getPool($poolName);
$this->watch($pool, $poolName, $workerId);
}
}
}

View File

@ -23,9 +23,6 @@ trait MetricSetter
* Try to set every stats available to the gauge.
* Some of the stats might be missing depending
* on the platform.
* @param string $prefix
* @param array $metrics
* @param array $stats
*/
private function trySet(string $prefix, array $metrics, array $stats): void
{

View File

@ -20,6 +20,7 @@ use Hyperf\Metric\Exception\RuntimeException;
use Mockery;
use PHPUnit\Framework\TestCase;
use Prometheus\CollectorRegistry;
use ReflectionMethod;
/**
* @internal
@ -51,4 +52,27 @@ class MetricFactoryTest extends TestCase
$this->expectException(RuntimeException::class);
$p = new PrometheusFactory($config, $r, $c);
}
public function testGetNamespace()
{
$config = new Config([
'metric' => [
'default' => 'prometheus',
'use_standalone_process' => true,
'metric' => [
'prometheus' => [
'driver' => PrometheusFactory::class,
'mode' => Constants::SCRAPE_MODE,
'namespace' => 'Hello-World!',
],
],
],
]);
$r = Mockery::mock(CollectorRegistry::class);
$c = Mockery::mock(ClientFactory::class);
$p = new PrometheusFactory($config, $r, $c);
$method = new ReflectionMethod(PrometheusFactory::class, 'getNamespace');
$method->setAccessible(true);
$this->assertEquals('hello__world_', $method->invoke($p));
}
}

View File

@ -58,6 +58,7 @@ abstract class Pool implements PoolInterface
public function get(): ConnectionInterface
{
$connection = $this->getConnection();
if ($this->frequency instanceof FrequencyInterface) {
$this->frequency->hit();
}
@ -81,9 +82,16 @@ abstract class Pool implements PoolInterface
$num = $this->getConnectionsInChannel();
if ($num > 0) {
/** @var ConnectionInterface $conn */
while ($this->currentConnections > $this->option->getMinConnections() && $conn = $this->channel->pop($this->option->getWaitTimeout())) {
$conn->close();
--$this->currentConnections;
--$num;
if ($num <= 0) {
// Ignore connections queued during flushing.
break;
}
}
}
}

View File

@ -129,17 +129,11 @@ class PoolOption implements PoolOptionInterface
return $this;
}
/**
* @return float
*/
public function getMaxIdleTime(): float
{
return $this->maxIdleTime;
}
/**
* @param float $maxIdleTime
*/
public function setMaxIdleTime(float $maxIdleTime): self
{
$this->maxIdleTime = $maxIdleTime;

View File

@ -36,8 +36,10 @@ class HttpServerMiddleware implements MiddlewareInterface
if (class_exists(Stats::class)) {
$path = $request->getUri()->getPath();
$ip = current(swoole_get_local_ip());
$traceId = $request->getHeaderLine("x-swoole-traceid") ?: "";
$spanId = $request->getHeaderLine("x-swoole-spanid") ?: "";
$tick = Stats::beforeExecRpc($path, $this->name, $ip);
$tick = Stats::beforeExecRpc($path, $this->name, $ip, $traceId, $spanId);
try {
$response = $handler->handle($request);
Stats::afterExecRpc($tick, true, $response->getStatusCode());

View File

@ -48,7 +48,17 @@ class TaskAspect extends AbstractAspect
$class = $proceedingJoinPoint->className;
$method = $proceedingJoinPoint->methodName;
$arguments = $proceedingJoinPoint->getArguments();
$arguments = [];
$parameters = $proceedingJoinPoint->getReflectMethod()->getParameters();
foreach ($parameters as $parameter) {
$arg = $proceedingJoinPoint->arguments['keys'][$parameter->getName()];
if ($parameter->isVariadic()) {
$arguments = array_merge($arguments, $arg);
} else {
$arguments[] = $arg;
}
}
$timeout = 10;
$metadata = $proceedingJoinPoint->getAnnotationMetadata();

View File

@ -10,7 +10,7 @@ declare(strict_types=1);
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task\Cases;
namespace HyperfTest\Task;
use Hyperf\Task\ChannelFactory;
use PHPUnit\Framework\TestCase;

View File

@ -10,7 +10,7 @@ declare(strict_types=1);
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task\Cases;
namespace HyperfTest\Task;
use Hyperf\Framework\Event\OnTask;
use Hyperf\Task\ChannelFactory;

View File

@ -23,4 +23,14 @@ class Foo
{
throw new \RuntimeException('Foo::exception failed.');
}
public function getIdAndName($id, $name)
{
return ['id' => $id, 'name' => $name];
}
public function dump($id, ...$arguments)
{
return ['id' => $id, 'arguments' => $arguments];
}
}

View File

@ -0,0 +1,106 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace HyperfTest\Task;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Hyperf\Task\Aspect\TaskAspect;
use Hyperf\Task\Task;
use Hyperf\Task\TaskExecutor;
use Hyperf\Utils\ApplicationContext;
use HyperfTest\Task\Stub\Foo;
use Mockery;
use PHPUnit\Framework\TestCase;
/**
* @internal
* @coversNothing
*/
class TaskAspectTest extends TestCase
{
protected $isTaskEnvironment = false;
protected function tearDown()
{
Mockery::close();
$this->isTaskEnvironment = false;
}
public function testTaskAspect()
{
$container = $this->getContainer();
$aspect = new TaskAspect($container);
$closure = function ($id, $name) {
return ['id' => $id, 'name' => $name];
};
$point = new ProceedingJoinPoint($closure, Foo::class, 'getIdAndName', [
'keys' => $data = ['id' => uniqid(), 'name' => 'Hyperf'],
'order' => ['id', 'name'],
]);
$point->pipe = function (ProceedingJoinPoint $point) {
return $point->processOriginalMethod();
};
$res = $aspect->process($point);
$this->assertSame($data, $res);
$this->isTaskEnvironment = true;
$res = $aspect->process($point);
$this->assertSame($data, $res);
}
public function testTaskAspectVariadic()
{
$container = $this->getContainer();
$aspect = new TaskAspect($container);
$closure = function ($id, $arguments) {
return ['id' => $id, 'arguments' => $arguments];
};
$point = new ProceedingJoinPoint($closure, Foo::class, 'dump', [
'keys' => $data = ['id' => 1, 'arguments' => [1, 2, 3]],
'order' => ['id', 'arguments'],
]);
$point->pipe = function (ProceedingJoinPoint $point) {
return $point->processOriginalMethod();
};
$res = $aspect->process($point);
$this->assertSame($data, $res);
$this->isTaskEnvironment = true;
$res = $aspect->process($point);
$this->assertSame($data, $res);
}
protected function getContainer()
{
$container = Mockery::mock(ContainerInterface::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('get')->with(Foo::class)->andReturn(new Foo());
$container->shouldReceive('get')->with(TaskExecutor::class)->andReturnUsing(function () use ($container) {
$executor = Mockery::mock(TaskExecutor::class);
$executor->shouldReceive('isTaskEnvironment')->andReturn($this->isTaskEnvironment);
$executor->shouldReceive('execute')->with(Mockery::any(), Mockery::any())->andReturnUsing(function (Task $task, $_) use ($container) {
[$class, $method] = $task->callback;
return $container->get($class)->{$method}(...$task->arguments);
});
return $executor;
});
return $container;
}
}

View File

@ -52,7 +52,9 @@ class Composer
throw new \RuntimeException('composer.lock not found.');
}
self::$content = collect(json_decode(file_get_contents($path), true));
foreach (self::$content->offsetGet('packages') ?? [] as $package) {
$packages = self::$content->offsetGet('packages') ?? [];
$packagesDev = self::$content->offsetGet('packages-dev') ?? [];
foreach (array_merge($packages, $packagesDev) as $package) {
$packageName = '';
foreach ($package ?? [] as $key => $value) {
if ($key === 'name') {

View File

@ -65,8 +65,8 @@ class Parallel
});
}
$wg->wait();
if ($throw && count($throwables) > 0) {
$message = 'At least one throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($throwables);
if ($throw && ($throwableCount = count($throwables)) > 0) {
$message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($throwables);
$executionException = new ParallelExecutionException($message);
$executionException->setResults($result);
$executionException->setThrowables($throwables);
@ -82,12 +82,14 @@ class Parallel
/**
* Format throwables into a nice list.
*
* @param \Throwable[] $throwables
*/
private function formatThrowables(array $exception): string
private function formatThrowables(array $throwables): string
{
$output = '';
foreach ($exception as $key => $value) {
$output .= \sprintf('(%s) %s: %s' . PHP_EOL, $key, get_class($value), $value->getMessage());
foreach ($throwables as $key => $value) {
$output .= \sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());
}
return $output;
}

View File

@ -15,6 +15,7 @@ namespace HyperfTest\Utils;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Exception\ParallelExecutionException;
use Hyperf\Utils\Parallel;
use Hyperf\Utils\Str;
use PHPUnit\Framework\TestCase;
/**
@ -119,17 +120,14 @@ class ParallelTest extends TestCase
public function testParallelThrows()
{
$parallel = new Parallel();
$err = function () {
Coroutine::sleep(0.001);
throw new \RuntimeException('something bad happened');
};
$ok = function () {
Coroutine::sleep(0.001);
return 1;
};
$parallel->add($err);
for ($i = 0; $i < 4; ++$i) {
$parallel->add($ok);
@ -138,6 +136,41 @@ class ParallelTest extends TestCase
$res = $parallel->wait();
}
public function testParallelResultsAndThrows()
{
$parallel = new Parallel();
$err = function () {
Coroutine::sleep(0.001);
throw new \RuntimeException('something bad happened');
};
$parallel->add($err);
$ids = [1 => uniqid(), 2 => uniqid(), 3 => uniqid(), 4 => uniqid()];
foreach ($ids as $id) {
$parallel->add(function () use ($id) {
Coroutine::sleep(0.001);
return $id;
});
}
try {
$parallel->wait();
throw new \RuntimeException();
} catch (ParallelExecutionException $exception) {
foreach (['Detecting', 'RuntimeException', '#0'] as $keyword) {
$this->assertTrue(Str::contains($exception->getMessage(), $keyword));
}
$result = $exception->getResults();
$this->assertEquals($ids, $result);
$throwables = $exception->getThrowables();
$this->assertTrue(count($throwables) === 1);
$this->assertSame('something bad happened', $throwables[0]->getMessage());
}
}
public function returnCoId()
{
return Coroutine::id();

View File

@ -1492,7 +1492,7 @@ trait ValidatesAttributes
return $value->getSize() / 1024;
}
return mb_strlen($value);
return mb_strlen((string) $value);
}
/**

View File

@ -1505,6 +1505,12 @@ class ValidationValidatorTest extends TestCase
$v = new Validator($trans, ['foo' => '3'], ['foo' => 'Numeric|Size:3']);
$this->assertTrue($v->passes());
$v = new Validator($trans, ['foo' => 123], ['foo' => 'Size:123']);
$this->assertFalse($v->passes());
$v = new Validator($trans, ['foo' => 3], ['foo' => 'Size:1']);
$this->assertTrue($v->passes());
$v = new Validator($trans, ['foo' => [1, 2, 3]], ['foo' => 'Array|Size:3']);
$this->assertTrue($v->passes());

View File

@ -58,7 +58,7 @@ class Render implements RenderInterface
$this->container = $container;
}
public function render(string $template, array $data)
public function render(string $template, array $data = [])
{
switch ($this->mode) {
case Mode::SYNC:

View File

@ -14,5 +14,5 @@ namespace Hyperf\View;
interface RenderInterface
{
public function render(string $template, array $data);
public function render(string $template, array $data = []);
}