Allow config fetcher to start in a coroutine instead of standalone process.

This commit is contained in:
reasno 2020-03-14 17:13:51 +08:00
parent 2e20d8d06c
commit 47bdf2ccb9
9 changed files with 75 additions and 15 deletions

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
return [
'enable' => false,
'use_standalone_process' => true,
'interval' => 5,
'endpoint' => env('ALIYUN_ACM_ENDPOINT', 'acm.aliyun.com'),
'namespace' => env('ALIYUN_ACM_NAMESPACE', ''),

View File

@ -18,6 +18,7 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
class BootProcessListener implements ListenerInterface
@ -59,11 +60,30 @@ class BootProcessListener implements ListenerInterface
}
if ($config = $this->client->pull()) {
foreach ($config as $key => $value) {
if (is_string($key)) {
$this->config->set($key, $value);
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
$this->updateConfig($config);
}
if (! $this->config->get('aliyun_acm.use_standalone_process', true)) {
Coroutine::create(function () {
$interval = $this->config->get('aliyun_acm.interval', 5);
retry(INF, function () use ($interval) {
while (true) {
sleep($interval);
$config = $this->client->pull();
if ($config !== $this->config) {
$this->updateConfig($config);
}
}
}, $interval * 1000);
});
}
}
protected function updateConfig(array $config){
foreach ($config as $key => $value) {
if (is_string($key)) {
$this->config->set($key, $value);
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
}
}

View File

@ -59,7 +59,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool
{
return $this->config->get('aliyun_acm.enable', false);
return $this->config->get('aliyun_acm.enable', false)
&& $this->config->get('aliyun_acm.use_standalone_process', true);
}
public function handle(): void

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
return [
'enable' => false,
'use_standalone_process' => true,
'server' => 'http://localhost:8080',
'appid' => 'test',
'cluster' => 'default',

View File

@ -17,6 +17,7 @@ use Hyperf\ConfigApollo\PipeMessage;
use Hyperf\ConfigApollo\ReleaseKey;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
class BootProcessListener extends OnPipeMessageListener
{
@ -62,5 +63,17 @@ class BootProcessListener extends OnPipeMessageListener
$callbacks[$namespace] = $ipcCallback;
}
$this->client->pull($namespaces, $callbacks);
if (! $this->config->get('apollo.use_standalone_process', true)) {
Coroutine::create(function () use ($namespaces, $callbacks) {
$interval = $this->config->get('apollo.interval', 5);
retry(INF, function () use ($namespaces, $callbacks, $interval) {
while (true) {
sleep($interval);
$this->client->pull($namespaces, $callbacks);
}
}, $interval * 1000);
});
}
}
}

View File

@ -54,7 +54,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool
{
return $this->config->get('apollo.enable', false);
return $this->config->get('apollo.enable', false)
&& $this->config->get('apollo.use_standalone_process', true);
}
public function handle(): void

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
return [
'enable' => false,
'packer' => Hyperf\Utils\Packer\JsonPacker::class,
'use_standalone_process' => true,
'namespaces' => [
'application',
],

View File

@ -20,6 +20,7 @@ use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeWorkerStart;
use Hyperf\Process\Event\BeforeProcessHandle;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Packer\JsonPacker;
use Psr\Container\ContainerInterface;
@ -75,13 +76,33 @@ class BootProcessListener implements ListenerInterface
}
if ($config = $this->client->pull()) {
$configurations = $this->format($config);
foreach ($configurations as $kv) {
$key = $this->mapping[$kv->key] ?? null;
if (is_string($key)) {
$this->config->set($key, $this->packer->unpack($kv->value));
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
$this->updateConfig($config);
}
if (! $this->config->get('config_etcd.use_standalone_process', true)) {
Coroutine::create(function () {
$interval = $this->config->get('config_etcd.interval', 5);
retry(INF, function () use ($interval) {
while (true) {
sleep($interval);
$config = $this->client->pull();
if ($config !== $this->config) {
$this->updateConfig($config);
}
}
}, $interval * 1000);
});
}
}
protected function updateConfig(array $config)
{
$configurations = $this->format($config);
foreach ($configurations as $kv) {
$key = $this->mapping[$kv->key] ?? null;
if (is_string($key)) {
$this->config->set($key, $this->packer->unpack($kv->value));
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
}
}

View File

@ -60,7 +60,8 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool
{
return $this->config->get('config_etcd.enable', false);
return $this->config->get('config_etcd.enable', false)
&& $this->config->get('config_etcd.use_standalone_process', true);
}
public function handle(): void