Merge pull request #1419 from Reasno/config

Allow config fetcher to start in a coroutine instead of a process
This commit is contained in:
李铭昕 2020-03-19 10:19:25 +08:00 committed by GitHub
commit ba7b751f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 187 additions and 16 deletions

View File

@ -3,6 +3,7 @@
## Added ## Added
- [#1393](https://github.com/hyperf/hyperf/pull/1393) Implements methods for `Hyperf\HttpMessage\Stream\SwooleStream`. - [#1393](https://github.com/hyperf/hyperf/pull/1393) Implements methods for `Hyperf\HttpMessage\Stream\SwooleStream`.
- [#1419](https://github.com/hyperf/hyperf/pull/1419) Allow config fetcher to start in a coroutine instead of a process.
- [#1424](https://github.com/hyperf/hyperf/pull/1424) Allow user modify the session_name by configuration file. - [#1424](https://github.com/hyperf/hyperf/pull/1424) Allow user modify the session_name by configuration file.
# v1.1.20 - 2020-03-12 # v1.1.20 - 2020-03-12

View File

@ -40,6 +40,8 @@ composer require hyperf/config-aliyun-acm
return [ return [
// 是否开启配置中心的接入流程,为 true 时会自动启动一个 ConfigFetcherProcess 进程用于更新配置 // 是否开启配置中心的接入流程,为 true 时会自动启动一个 ConfigFetcherProcess 进程用于更新配置
'enable' => true, 'enable' => true,
// 是否使用独立进程来拉取config如果否则将在worker内以协程方式拉取
'use_standalone_process' => true,
// Apollo Server // Apollo Server
'server' => 'http://127.0.0.1:8080', 'server' => 'http://127.0.0.1:8080',
// 您的 AppId // 您的 AppId
@ -74,6 +76,8 @@ return [
return [ return [
// 是否开启配置中心的接入流程,为 true 时会自动启动一个 ConfigFetcherProcess 进程用于更新配置 // 是否开启配置中心的接入流程,为 true 时会自动启动一个 ConfigFetcherProcess 进程用于更新配置
'enable' => true, 'enable' => true,
// 是否使用独立进程来拉取config如果否则将在worker内以协程方式拉取
'use_standalone_process' => true,
// 配置更新间隔(秒) // 配置更新间隔(秒)
'interval' => 5, 'interval' => 5,
// 阿里云 ACM 断点地址,取决于您的可用区 // 阿里云 ACM 断点地址,取决于您的可用区
@ -135,6 +139,7 @@ composer require hyperf/config-etcd
<?php <?php
return [ return [
'enable' => true, 'enable' => true,
'use_standalone_process' => true,
'namespaces' => [ 'namespaces' => [
'/test', '/test',
], ],

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
return [ return [
'enable' => false, 'enable' => false,
'use_standalone_process' => true,
'interval' => 5, 'interval' => 5,
'endpoint' => env('ALIYUN_ACM_ENDPOINT', 'acm.aliyun.com'), 'endpoint' => env('ALIYUN_ACM_ENDPOINT', 'acm.aliyun.com'),
'namespace' => env('ALIYUN_ACM_NAMESPACE', ''), 'namespace' => env('ALIYUN_ACM_NAMESPACE', ''),

View File

@ -12,12 +12,14 @@ declare(strict_types=1);
namespace Hyperf\ConfigAliyunAcm\Listener; namespace Hyperf\ConfigAliyunAcm\Listener;
use Hyperf\Command\Event\BeforeHandle;
use Hyperf\ConfigAliyunAcm\ClientInterface; use Hyperf\ConfigAliyunAcm\ClientInterface;
use Hyperf\Contract\ConfigInterface; use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface; use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart; use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle; use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
class BootProcessListener implements ListenerInterface class BootProcessListener implements ListenerInterface
@ -49,6 +51,7 @@ class BootProcessListener implements ListenerInterface
return [ return [
BeforeWorkerStart::class, BeforeWorkerStart::class,
BeforeProcessHandle::class, BeforeProcessHandle::class,
BeforeHandle::class,
]; ];
} }
@ -59,11 +62,33 @@ class BootProcessListener implements ListenerInterface
} }
if ($config = $this->client->pull()) { if ($config = $this->client->pull()) {
foreach ($config as $key => $value) { $this->updateConfig($config);
if (is_string($key)) { }
$this->config->set($key, $value);
$this->logger->debug(sprintf('Config [%s] is updated', $key)); if (! $this->config->get('aliyun_acm.use_standalone_process', true)) {
} Coroutine::create(function () {
$interval = $this->config->get('aliyun_acm.interval', 5);
retry(INF, function () use ($interval) {
$prevConfig = [];
while (true) {
sleep($interval);
$config = $this->client->pull();
if ($config !== $prevConfig) {
$this->updateConfig($config);
}
$prevConfig = $config;
}
}, $interval * 1000);
});
}
}
protected function updateConfig(array $config)
{
foreach ($config as $key => $value) {
if (is_string($key)) {
$this->config->set($key, $value);
$this->logger->debug(sprintf('Config [%s] is updated', $key));
} }
} }
} }

View File

@ -59,7 +59,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool public function isEnable(): bool
{ {
return $this->config->get('aliyun_acm.enable', false); return $this->config->get('aliyun_acm.enable', false)
&& $this->config->get('aliyun_acm.use_standalone_process', true);
} }
public function handle(): void public function handle(): void

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
return [ return [
'enable' => false, 'enable' => false,
'use_standalone_process' => true,
'server' => 'http://localhost:8080', 'server' => 'http://localhost:8080',
'appid' => 'test', 'appid' => 'test',
'cluster' => 'default', 'cluster' => 'default',

View File

@ -12,11 +12,13 @@ declare(strict_types=1);
namespace Hyperf\ConfigApollo\Listener; namespace Hyperf\ConfigApollo\Listener;
use Hyperf\Command\Event\BeforeHandle;
use Hyperf\ConfigApollo\Option; use Hyperf\ConfigApollo\Option;
use Hyperf\ConfigApollo\PipeMessage; use Hyperf\ConfigApollo\PipeMessage;
use Hyperf\ConfigApollo\ReleaseKey; use Hyperf\ConfigApollo\ReleaseKey;
use Hyperf\Framework\Event\BeforeWorkerStart; use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle; use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
class BootProcessListener extends OnPipeMessageListener class BootProcessListener extends OnPipeMessageListener
{ {
@ -25,6 +27,7 @@ class BootProcessListener extends OnPipeMessageListener
return [ return [
BeforeWorkerStart::class, BeforeWorkerStart::class,
BeforeProcessHandle::class, BeforeProcessHandle::class,
BeforeHandle::class,
]; ];
} }
@ -62,5 +65,17 @@ class BootProcessListener extends OnPipeMessageListener
$callbacks[$namespace] = $ipcCallback; $callbacks[$namespace] = $ipcCallback;
} }
$this->client->pull($namespaces, $callbacks); $this->client->pull($namespaces, $callbacks);
if (! $this->config->get('apollo.use_standalone_process', true)) {
Coroutine::create(function () use ($namespaces, $callbacks) {
$interval = $this->config->get('apollo.interval', 5);
retry(INF, function () use ($namespaces, $callbacks, $interval) {
while (true) {
sleep($interval);
$this->client->pull($namespaces, $callbacks);
}
}, $interval * 1000);
});
}
} }
} }

View File

@ -54,7 +54,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool public function isEnable(): bool
{ {
return $this->config->get('apollo.enable', false); return $this->config->get('apollo.enable', false)
&& $this->config->get('apollo.use_standalone_process', true);
} }
public function handle(): void public function handle(): void

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
return [ return [
'enable' => false, 'enable' => false,
'packer' => Hyperf\Utils\Packer\JsonPacker::class, 'packer' => Hyperf\Utils\Packer\JsonPacker::class,
'use_standalone_process' => true,
'namespaces' => [ 'namespaces' => [
'application', 'application',
], ],

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\ConfigEtcd\Listener; namespace Hyperf\ConfigEtcd\Listener;
use Hyperf\Command\Event\BeforeHandle;
use Hyperf\ConfigEtcd\ClientInterface; use Hyperf\ConfigEtcd\ClientInterface;
use Hyperf\ConfigEtcd\KV; use Hyperf\ConfigEtcd\KV;
use Hyperf\Contract\ConfigInterface; use Hyperf\Contract\ConfigInterface;
@ -20,6 +21,7 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface; use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart; use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle; use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Packer\JsonPacker; use Hyperf\Utils\Packer\JsonPacker;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
@ -65,6 +67,7 @@ class BootProcessListener implements ListenerInterface
return [ return [
BeforeWorkerStart::class, BeforeWorkerStart::class,
BeforeProcessHandle::class, BeforeProcessHandle::class,
BeforeHandle::class,
]; ];
} }
@ -75,13 +78,35 @@ class BootProcessListener implements ListenerInterface
} }
if ($config = $this->client->pull()) { if ($config = $this->client->pull()) {
$configurations = $this->format($config); $this->updateConfig($config);
foreach ($configurations as $kv) { }
$key = $this->mapping[$kv->key] ?? null;
if (is_string($key)) { if (! $this->config->get('config_etcd.use_standalone_process', true)) {
$this->config->set($key, $this->packer->unpack($kv->value)); Coroutine::create(function () {
$this->logger->debug(sprintf('Config [%s] is updated', $key)); $interval = $this->config->get('config_etcd.interval', 5);
} retry(INF, function () use ($interval) {
$prevConfig = [];
while (true) {
sleep($interval);
$config = $this->client->pull();
if ($config !== $prevConfig) {
$this->updateConfig($config);
}
$prevConfig = $config;
}
}, $interval * 1000);
});
}
}
protected function updateConfig(array $config)
{
$configurations = $this->format($config);
foreach ($configurations as $kv) {
$key = $this->mapping[$kv->key] ?? null;
if (is_string($key)) {
$this->config->set($key, $this->packer->unpack($kv->value));
$this->logger->debug(sprintf('Config [%s] is updated', $key));
} }
} }
} }

View File

@ -60,7 +60,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool public function isEnable(): bool
{ {
return $this->config->get('config_etcd.enable', false); return $this->config->get('config_etcd.enable', false)
&& $this->config->get('config_etcd.use_standalone_process', true);
} }
public function handle(): void public function handle(): void

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
return [ return [
'enable' => false, 'enable' => false,
'use_standalone_process' => true,
'interval' => 5, 'interval' => 5,
'server' => env('ZOOKEEPER_SERVER', '127.0.0.1:2181'), 'server' => env('ZOOKEEPER_SERVER', '127.0.0.1:2181'),
'path' => env('ZOOKEEPER_CONFIG_PATH', '/conf'), 'path' => env('ZOOKEEPER_CONFIG_PATH', '/conf'),

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\ConfigZookeeper; namespace Hyperf\ConfigZookeeper;
use Hyperf\ConfigZookeeper\Listener\BootProcessListener;
use Hyperf\ConfigZookeeper\Listener\OnPipeMessageListener; use Hyperf\ConfigZookeeper\Listener\OnPipeMessageListener;
use Hyperf\ConfigZookeeper\Process\ConfigFetcherProcess; use Hyperf\ConfigZookeeper\Process\ConfigFetcherProcess;
@ -28,6 +29,7 @@ class ConfigProvider
], ],
'listeners' => [ 'listeners' => [
OnPipeMessageListener::class, OnPipeMessageListener::class,
BootProcessListener::class,
], ],
'annotations' => [ 'annotations' => [
'scan' => [ 'scan' => [

View File

@ -0,0 +1,90 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\ConfigZookeeper\Listener;
use Hyperf\Command\Event\BeforeHandle;
use Hyperf\ConfigZookeeper\ClientInterface;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
class BootProcessListener implements ListenerInterface
{
/**
* @var ConfigInterface
*/
private $config;
/**
* @var StdoutLoggerInterface
*/
private $logger;
/**
* @var ClientInterface
*/
private $client;
public function __construct(ConfigInterface $config, StdoutLoggerInterface $logger, ClientInterface $client)
{
$this->config = $config;
$this->logger = $logger;
$this->client = $client;
}
public function listen(): array
{
return [
BeforeWorkerStart::class,
BeforeProcessHandle::class,
BeforeHandle::class,
];
}
public function process(object $event)
{
if (! $this->config->get('zookeeper.enable', false)) {
return;
}
if (! $this->config->get('zookeeper.use_standalone_process', true)) {
Coroutine::create(function () {
$interval = $this->config->get('zookeeper.interval', 5);
retry(INF, function () use ($interval) {
$prevConfig = [];
while (true) {
sleep($interval);
$config = $this->client->pull();
if ($config !== $prevConfig) {
$this->updateConfig($config);
}
$prevConfig = $config;
}
}, $interval * 1000);
});
}
}
protected function updateConfig(array $config)
{
foreach ($config as $key => $value) {
if (is_string($key)) {
$this->config->set($key, $value);
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
}
}
}

View File

@ -63,7 +63,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool public function isEnable(): bool
{ {
return $this->config->get('zookeeper.enable', false); return $this->config->get('zookeeper.enable', false)
&& $this->config->get('zookeeper.use_standalone_process', true);
} }
public function handle(): void public function handle(): void