Merge branch 'master' into pr/325

This commit is contained in:
李铭昕 2019-08-03 13:19:16 +08:00
commit a004e8c48f
11 changed files with 288 additions and 64 deletions

View File

@ -1,4 +1,6 @@
# v1.0.9 - TBD
# v1.0.10 - TBD
# v1.0.9 - 2019-08-03
## Added
@ -10,6 +12,7 @@
- [#300](https://github.com/hyperf-cloud/hyperf/pull/300) Let message queues run in sub-coroutines. Fixed async queue attempts twice to handle message, but only once actually.
- [#305](https://github.com/hyperf-cloud/hyperf/pull/305) Fixed `$key` of method `Arr::set` not support `int` and `null`.
- [#312](https://github.com/hyperf-cloud/hyperf/pull/312) Fixed amqp process collect listener will be handled later than the process boot listener.
- [#315](https://github.com/hyperf-cloud/hyperf/pull/315) Fixed config etcd center not work after worker restart or in user process.
- [#318](https://github.com/hyperf-cloud/hyperf/pull/318) Fixed service will register to service center ceaselessly.
## Changed

View File

@ -59,9 +59,9 @@ As we said that coroutine is a lightweight thread. Coroutines and threads are su
Blocking code in the coroutine will cause the coroutine scheduler cannot switch to another coroutine to continue executing code, so we must prevent the blocking code exist in the coroutine. Assuming we have started `4 Worker` to handle `HTTP` Request (usually the number of `Worker` started is the same as the number of `CPU` cores or `2` times of `CPU` cores). If there is blocking code in the coroutine, theoretically, if each request will block `1` seconds, then the application `QPS ` will also degenerate to `4/s`, which is undoubtedly degenerate into a similar situation to `PHP-FPM`, so we must not allows blocking code exist in the coroutine.
So which ones are blocking code? We can simply think that most of the asynchronous functions provided by non-Swoole are `MySQL`, `Redis`, `Memcache`, `MongoDB`, `HTTP`, `Socket`, file operations. , `sleep/usleep`, etc. are blocking code, which covers almost all daily operations, so how to solve it? `Swoole` provides the MySQL client, `PostgreSQL`, `Redis`, `HTTP`, `Socket` for the coroutine clientin addition to, after `Swoole 4.1`, Swoole provide `\Swoole\Coroutine::enableCoroutine()` function to make most of blocking code to coroutinedjust execute `\Swoole\Coroutine::enableCoroutine()` before create coroutine`Swoole` will turn all sockets that use php_stream for coroutine scheduling, which can be understood as the most common operations become coroutined, except `curl`. More detailed information can be found in this section of the [Swoole Documentation](https://wiki.swoole.com/wiki/page/965.html).
So which ones are blocking code? We can simply think that most of the asynchronous functions provided by non-Swoole are `MySQL`, `Redis`, `Memcache`, `MongoDB`, `HTTP`, `Socket`, file operations. , `sleep/usleep`, etc. are blocking code, which covers almost all daily operations, so how to solve it? `Swoole` provides the MySQL client, `PostgreSQL`, `Redis`, `HTTP`, `Socket` for the coroutine clientin addition to, after `Swoole 4.1`, Swoole provide `\Swoole\Runtime::enableCoroutine()` function to make most of blocking code to coroutinedjust execute `\Swoole\Runtime::enableCoroutine()` before create coroutine`Swoole` will turn all sockets that use php_stream for coroutine scheduling, which can be understood as the most common operations become coroutined, except `curl`. More detailed information can be found in this section of the [Swoole Documentation](https://wiki.swoole.com/wiki/page/965.html).
In `Hyperf`, we have handled this for you, you only need to pay attention to the blocking code that `\Swoole\Coroutine::enableCoroutine()` still cannot be coroutined automatically.
In `Hyperf`, we have handled this for you, you only need to pay attention to the blocking code that `\Swoole\Runtime::enableCoroutine()` still cannot be coroutined automatically.
### Cannot store status via global variables
@ -190,4 +190,4 @@ $result = parallel([
### Coroutine Context
Since the coroutines in the same process are shared by memory, the execution/switching of the coroutines is non-sequential, which means that it is difficult to control which one of the current coroutines is * (in fact, it could, but no one would like to do like this) *, so we need to be able to switch the corresponding context at the same time when a coroutine switch occurs.
Implementing context management for coroutines in Hyperf is very simple, based on `set(string $id, $value)`, `get(string $id, $default = null)`, `has(string $id)` static methods of the `Hyperf\Utils\Context` can complete the management of context data. The values set and obtained by these methods are limited to the current coroutine. At the end of the coroutine, the corresponding context will be automatically released. No need to manage manually, no need to worry about the risk of memory leaks.
Implementing context management for coroutines in Hyperf is very simple, based on `set(string $id, $value)`, `get(string $id, $default = null)`, `has(string $id)` static methods of the `Hyperf\Utils\Context` can complete the management of context data. The values set and obtained by these methods are limited to the current coroutine. At the end of the coroutine, the corresponding context will be automatically released. No need to manage manually, no need to worry about the risk of memory leaks.

View File

@ -59,9 +59,9 @@ Swoole 协程也是对异步回调的一种解决方案,在 `PHP` 语言下,
协程内代码的阻塞会导致协程调度器无法切换到另一个协程继续执行代码,所以我们绝不能在协程内存在阻塞代码,假设我们启动了 `4``Worker` 来处理 `HTTP` 请求(通常启动的 `Worker` 数量与 `CPU` 核心数一致或 `2` 倍),如果代码中存在阻塞,暂且理论的认为每个请求都会阻塞`1` 秒,那么系统的 `QPS` 也将退化为 `4/s`,这无疑就是退化成了与 `PHP-FPM` 类似的情况,所以我们绝对不能在协程中存在阻塞代码。
那么到底哪些是阻塞代码呢?我们可以简单的认为绝大多数你所熟知的非 `Swoole` 提供的异步函数的 `MySQL`、`Redis`、`Memcache`、`MongoDB`、`HTTP`、`Socket`等客户端,文件操作、`sleep/usleep` 等均为阻塞函数,这几乎涵盖了所有日常操作,那么要如何解决呢?`Swoole` 提供了 `MySQL`、`PostgreSQL`、`Redis`、`HTTP`、`Socket` 的协程客户端可以使用,同时 `Swoole 4.1` 之后提供了一键协程化的方法 `\Swoole\Coroutine::enableCoroutine()`,只需在使用协程前运行这一行代码,`Swoole` 会将 所有使用 `php_stream` 进行 `socket` 操作均变成协程调度的异步 `I/O`,可以理解为除了 `curl` 绝大部分原生的操作都可以适用,关于此部分可查阅 [Swoole 文档](https://wiki.swoole.com/wiki/page/965.html) 获得更具体的信息。
那么到底哪些是阻塞代码呢?我们可以简单的认为绝大多数你所熟知的非 `Swoole` 提供的异步函数的 `MySQL`、`Redis`、`Memcache`、`MongoDB`、`HTTP`、`Socket`等客户端,文件操作、`sleep/usleep` 等均为阻塞函数,这几乎涵盖了所有日常操作,那么要如何解决呢?`Swoole` 提供了 `MySQL`、`PostgreSQL`、`Redis`、`HTTP`、`Socket` 的协程客户端可以使用,同时 `Swoole 4.1` 之后提供了一键协程化的方法 `\Swoole\Runtime::enableCoroutine()`,只需在使用协程前运行这一行代码,`Swoole` 会将 所有使用 `php_stream` 进行 `socket` 操作均变成协程调度的异步 `I/O`,可以理解为除了 `curl` 绝大部分原生的操作都可以适用,关于此部分可查阅 [Swoole 文档](https://wiki.swoole.com/wiki/page/965.html) 获得更具体的信息。
`Hyperf` 中我们已经为您处理好了这一切,您只需关注 `\Swoole\Coroutine::enableCoroutine()` 仍无法协程化的阻塞代码即可。
`Hyperf` 中我们已经为您处理好了这一切,您只需关注 `\Swoole\Runtime::enableCoroutine()` 仍无法协程化的阻塞代码即可。
### 不能通过全局变量储存状态
@ -190,4 +190,4 @@ $result = parallel([
### 协程上下文
由于同一个进程内协程间是内存共享的,但协程的执行/切换是非顺序的,也就意味着我们很难掌控当前的协程是哪一个*(事实上可以,但通常没人这么干)*,所以我们需要在发生协程切换时能够同时切换对应的上下文。
在 Hyperf 里实现协程的上下文管理将非常简单,基于 `Hyperf\Utils\Context` 类的 `set(string $id, $value)`、`get(string $id, $default = null)`、`has(string $id)` 静态方法即可完成上下文数据的管理,通过这些方法设置和获取的值,都仅限于当前的协程,在协程结束时,对应的上下文也会自动跟随释放掉,无需手动管理,无需担忧内存泄漏的风险。
在 Hyperf 里实现协程的上下文管理将非常简单,基于 `Hyperf\Utils\Context` 类的 `set(string $id, $value)`、`get(string $id, $default = null)`、`has(string $id)` 静态方法即可完成上下文数据的管理,通过这些方法设置和获取的值,都仅限于当前的协程,在协程结束时,对应的上下文也会自动跟随释放掉,无需手动管理,无需担忧内存泄漏的风险。

View File

@ -62,54 +62,6 @@ return [
```
## 使用
以下以 `BladeEngine` 为例,首先在对应的目录里创建视图文件 `index.blade.php`
```blade
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Hyperf</title>
</head>
<body>
Hello, {{ $name }}. You are using blade template now.
</body>
</html>
```
控制器中获取 `Hyperf\View\Render` 示例,然后调用 `render` 方法并传递视图文件地址 `index``渲染数据` 即可,文件地址忽略视图文件的后缀名。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\View\RenderInterface;
/**
* @AutoController
*/
class ViewController
{
public function index(RenderInterface $render)
{
return $render->render('index', ['name' => 'Hyperf']);
}
}
```
访问对应的 URL即可获得如下所示的视图页面
```
Hello, Hyperf. You are using blade template now.
```
## 视图渲染引擎
官方目前支持 `Blade``Smarty` 两种模板,默认安装 [hyperf/view](https://github.com/hyperf-cloud/view) 时不会自动安装任何模板引擎,需要您根据自身需求,自行安装对应的模板引擎,使用前必须安装任一模板引擎。
@ -167,3 +119,52 @@ return [
],
];
```
## 使用
以下以 `BladeEngine` 为例,首先在对应的目录里创建视图文件 `index.blade.php`
```blade
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Hyperf</title>
</head>
<body>
Hello, {{ $name }}. You are using blade template now.
</body>
</html>
```
控制器中获取 `Hyperf\View\Render` 示例,然后调用 `render` 方法并传递视图文件地址 `index``渲染数据` 即可,文件地址忽略视图文件的后缀名。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\View\RenderInterface;
/**
* @AutoController
*/
class ViewController
{
public function index(RenderInterface $render)
{
return $render->render('index', ['name' => 'Hyperf']);
}
}
```
访问对应的 URL即可获得如下所示的视图页面
```
Hello, Hyperf. You are using blade template now.
```

View File

@ -0,0 +1,101 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\ConfigEtcd\Listener;
use Hyperf\ConfigEtcd\ClientInterface;
use Hyperf\ConfigEtcd\KV;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\PackerInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Packer\JsonPacker;
use Psr\Container\ContainerInterface;
/**
* @Listener
*/
class BootProcessListener implements ListenerInterface
{
/**
* @var ConfigInterface
*/
private $config;
/**
* @var StdoutLoggerInterface
*/
private $logger;
/**
* @var array
*/
private $mapping;
/**
* @var ClientInterface
*/
private $client;
/**
* @var PackerInterface
*/
private $packer;
public function __construct(ContainerInterface $container)
{
$this->config = $container->get(ConfigInterface::class);
$this->logger = $container->get(StdoutLoggerInterface::class);
$this->client = $container->get(ClientInterface::class);
$this->mapping = $this->config->get('config_etcd.mapping', []);
$this->packer = $container->get($this->config->get('config_etcd.packer', JsonPacker::class));
}
public function listen(): array
{
return [
BeforeWorkerStart::class,
BeforeProcessHandle::class,
];
}
public function process(object $event)
{
if ($config = $this->client->pull()) {
$configurations = $this->format($config);
foreach ($configurations as $kv) {
$key = $this->mapping[$kv->key] ?? null;
if (is_string($key)) {
$this->config->set($key, $this->packer->unpack($kv->value));
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
}
}
}
/**
* Format kv configurations.
*/
protected function format(array $config): array
{
$result = [];
foreach ($config as $value) {
$result[] = new KV($value);
}
return $result;
}
}

View File

@ -12,7 +12,6 @@ declare(strict_types=1);
namespace Hyperf\ConfigEtcd\Listener;
use Hyperf\ConfigEtcd\ClientInterface;
use Hyperf\ConfigEtcd\KV;
use Hyperf\ConfigEtcd\PipeMessage;
use Hyperf\Contract\ConfigInterface;
@ -21,6 +20,7 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Process\Event\PipeMessage as UserProcessPipMessage;
use Hyperf\Utils\Packer\JsonPacker;
use Psr\Container\ContainerInterface;
@ -39,11 +39,6 @@ class OnPipeMessageListener implements ListenerInterface
*/
private $logger;
/**
* @var ClientInterface
*/
private $client;
/**
* @var array
*/
@ -58,7 +53,6 @@ class OnPipeMessageListener implements ListenerInterface
{
$this->config = $container->get(ConfigInterface::class);
$this->logger = $container->get(StdoutLoggerInterface::class);
$this->client = $container->get(ClientInterface::class);
$this->mapping = $this->config->get('config_etcd.mapping', []);
$this->packer = $container->get($this->config->get('config_etcd.packer', JsonPacker::class));
@ -71,6 +65,7 @@ class OnPipeMessageListener implements ListenerInterface
{
return [
OnPipeMessage::class,
UserProcessPipMessage::class,
];
}
@ -80,7 +75,7 @@ class OnPipeMessageListener implements ListenerInterface
*/
public function process(object $event)
{
if ($event instanceof OnPipeMessage && $event->data instanceof PipeMessage) {
if (property_exists($event, 'data') && $event->data instanceof PipeMessage) {
/** @var PipeMessage $data */
$data = $event->data;

View File

@ -18,6 +18,7 @@ use Hyperf\ConfigEtcd\PipeMessage;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
use Hyperf\Process\ProcessCollector;
use Psr\Container\ContainerInterface;
use Swoole\Server;
@ -71,8 +72,17 @@ class ConfigFetcherProcess extends AbstractProcess
if ($config !== $this->cacheConfig) {
$this->cacheConfig = $config;
$workerCount = $this->server->setting['worker_num'] + $this->server->setting['task_worker_num'] - 1;
$pipeMessage = new PipeMessage($this->format($config));
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
$this->server->sendMessage(new PipeMessage($this->format($config)), $workerId);
$this->server->sendMessage($pipeMessage, $workerId);
}
$string = serialize($pipeMessage);
$processes = ProcessCollector::all();
/** @var \Swoole\Process $process */
foreach ($processes as $process) {
$process->exportSocket()->send($string);
}
}

View File

@ -50,6 +50,7 @@ class WorkerStartCallback
} else {
$this->eventDispatcher->dispatch(new OtherWorkerStart($server, $workerId));
}
if ($server->taskworker) {
$this->logger->info("TaskWorker#{$workerId} started.");
} else {

View File

@ -13,10 +13,14 @@ declare(strict_types=1);
namespace Hyperf\Process;
use Hyperf\Contract\ProcessInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use Hyperf\Process\Event\AfterProcessHandle;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Process\Event\PipeMessage;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Swoole\Event;
use Swoole\Process as SwooleProcess;
use Swoole\Server;
@ -57,6 +61,11 @@ abstract class AbstractProcess implements ProcessInterface
*/
protected $event;
/**
* @var SwooleProcess
*/
protected $process;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
@ -76,10 +85,43 @@ abstract class AbstractProcess implements ProcessInterface
for ($i = 0; $i < $num; ++$i) {
$process = new SwooleProcess(function (SwooleProcess $process) use ($i) {
$this->event && $this->event->dispatch(new BeforeProcessHandle($this, $i));
$this->process = $process;
$this->listen();
$this->handle();
$this->event && $this->event->dispatch(new AfterProcessHandle($this, $i));
}, $this->redirectStdinStdout, $this->pipeType, $this->enableCoroutine);
$server->addProcess($process);
if ($this->enableCoroutine) {
ProcessCollector::add($this->name, $process);
}
}
}
/**
* Added event for listening data from worker/task.
*/
protected function listen()
{
go(function () {
while (true) {
try {
/** @var \Swoole\Coroutine\Socket $sock */
$sock = $this->process->exportSocket();
$recv = $sock->recv();
if ($this->event && $data = unserialize($recv)) {
$this->event->dispatch(new PipeMessage($data));
}
} catch (\Throwable $exception) {
if ($this->container->has(StdoutLoggerInterface::class) && $this->container->has(FormatterInterface::class)) {
$logger = $this->container->get(StdoutLoggerInterface::class);
$formatter = $this->container->get(FormatterInterface::class);
$logger->error($formatter->format($exception));
}
}
}
});
}
}

View File

@ -0,0 +1,29 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Process\Event;
class PipeMessage
{
/**
* @var mixed
*/
public $data;
/**
* @param mixed $data
*/
public function __construct($data)
{
$this->data = $data;
}
}

View File

@ -0,0 +1,42 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Process;
use Swoole\Process;
/**
* Only collect coroutine process.
*/
class ProcessCollector
{
protected static $processes = [];
public static function add($name, Process $process)
{
static::$processes[$name][] = $process;
}
public static function get($name): array
{
return static::$processes[$name] ?? [];
}
public static function all(): array
{
$result = [];
foreach (static::$processes as $name => $processes) {
$result = array_merge($result, $processes);
}
return $result;
}
}