Added fetch process for config etcd.

This commit is contained in:
李铭昕 2019-07-26 18:55:09 +08:00
parent bc371db55a
commit 68cdd1f24c
6 changed files with 74 additions and 21 deletions

View File

@ -12,10 +12,40 @@ declare(strict_types=1);
namespace Hyperf\ConfigEtcd;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Etcd\KVInterface;
class Client implements ClientInterface
{
/**
* @var KVInterface
*/
protected $client;
/**
* @var ConfigInterface
*/
protected $config;
public function __construct(KVInterface $client, ConfigInterface $config)
{
$this->client = $client;
$this->config = $config;
}
public function pull(): array
{
// TODO: Implement pull() method.
$namespaces = $this->config->get('etcd.namespaces');
$kvs = [];
foreach ($namespaces as $namespace) {
$res = $this->client->fetchByPrefix($namespace);
if (isset($res['kvs'])) {
foreach ($res['kvs'] as $kv) {
$kvs[$kv['key']] = $kv;
}
}
}
return $kvs;
}
}

View File

@ -1,17 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\ConfigEtcd;
class ClientFactory
{
}

View File

@ -18,7 +18,7 @@ class ConfigProvider
{
return [
'dependencies' => [
ClientInterface::class => ClientFactory::class,
ClientInterface::class => Client::class,
],
'commands' => [
],

View File

@ -13,6 +13,7 @@ declare(strict_types=1);
namespace Hyperf\ConfigEtcd\Process;
use Hyperf\ConfigEtcd\ClientInterface;
use Hyperf\ConfigEtcd\PipeMessage;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\Annotation\Process;
@ -20,7 +21,7 @@ use Psr\Container\ContainerInterface;
use Swoole\Server;
/**
* @Process(name="apollo-config-fetcher")
* @Process(name="etcd-config-fetcher")
*/
class ConfigFetcherProcess extends AbstractProcess
{
@ -39,6 +40,11 @@ class ConfigFetcherProcess extends AbstractProcess
*/
private $config;
/**
* @var array
*/
private $cacheConfig;
public function __construct(ContainerInterface $container)
{
parent::__construct($container);
@ -54,10 +60,27 @@ class ConfigFetcherProcess extends AbstractProcess
public function isEnable(): bool
{
return $this->config->get('apollo.enable', false);
return $this->config->get('etcd.enable', false);
}
public function handle(): void
{
while (true) {
$config = $this->client->pull();
if ($config !== $this->cacheConfig) {
if ($this->cacheConfig !== null) {
$diff = array_diff($this->cacheConfig ?? [], $config);
} else {
$diff = $config;
}
$this->cacheConfig = $config;
$workerCount = $this->server->setting['worker_num'] + $this->server->setting['task_worker_num'] - 1;
for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
$this->server->sendMessage(new PipeMessage($diff), $workerId);
}
}
sleep($this->config->get('aliyun_acm.interval', 5));
}
}
}

View File

@ -18,5 +18,7 @@ interface KVInterface
public function get($key, array $options = []);
public function fetchByPrefix($prefix);
public function delete($key, array $options = []);
}

View File

@ -28,6 +28,21 @@ class KV extends Client implements KVInterface
return $this->client()->get($key, $options);
}
public function fetchByPrefix($prefix)
{
$prefix = trim($prefix);
if (!$prefix) {
return [];
}
$lastIndex = strlen($prefix) - 1;
$lastChar = $prefix[$lastIndex];
$nextAsciiCode = ord($lastChar) + 1;
$rangeEnd = $prefix;
$rangeEnd[$lastIndex] = chr($nextAsciiCode);
return $this->client()->get($prefix, ['range_end' => $rangeEnd]);
}
public function delete($key, array $options = [])
{
return $this->client()->del($key, $options);