Added config options for services.consumers.

This commit is contained in:
李铭昕 2019-12-24 11:22:29 +08:00
parent 635150f7ef
commit 39cf8c2187
2 changed files with 35 additions and 18 deletions

View File

@ -81,8 +81,8 @@ class TcpServer extends Server
$protocol = 'jsonrpc-tcp-length-check';
}
$this->protocol = new Protocol($this->container, $this->protocolManager, $protocol);
$this->packer = $this->protocol->getPacker($this->serverConfig['settings'] ?? []);
$this->protocol = new Protocol($this->container, $this->protocolManager, $protocol, $this->serverConfig['settings'] ?? []);
$this->packer = $this->protocol->getPacker();
$this->responseBuilder = make(ResponseBuilder::class, [
'dataFormatter' => $this->protocol->getDataFormatter(),
'packer' => $this->packer,

View File

@ -87,7 +87,7 @@ abstract class AbstractServiceClient
{
$this->container = $container;
$this->loadBalancerManager = $container->get(LoadBalancerManager::class);
$protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol);
$protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol, $this->getOptions());
$loadBalancer = $this->createLoadBalancer(...$this->createNodes());
$transporter = $protocol->getTransporter()->setLoadBalancer($loadBalancer);
$this->client = make(Client::class)
@ -137,6 +137,34 @@ abstract class AbstractServiceClient
return $loadBalancer;
}
protected function getOptions(): array
{
$consumer = $this->getConsumerConfig();
return $consumer['options'] ?? [];
}
protected function getConsumerConfig(): array
{
if (! $this->container->has(ConfigInterface::class)) {
throw new RuntimeException(sprintf('The object implementation of %s missing.', ConfigInterface::class));
}
$config = $this->container->get(ConfigInterface::class);
// According to the registry config of the consumer, retrieve the nodes.
$consumers = $config->get('services.consumers', []);
$config = [];
foreach ($consumers as $consumer) {
if (isset($consumer['name']) && $consumer['name'] === $this->serviceName) {
$config = $consumer;
break;
}
}
return $config;
}
/**
* Create nodes the first time.
*
@ -144,24 +172,11 @@ abstract class AbstractServiceClient
*/
protected function createNodes(): array
{
if (! $this->container->has(ConfigInterface::class)) {
throw new RuntimeException(sprintf('The object implementation of %s missing.', ConfigInterface::class));
}
$refreshCallback = null;
$config = $this->container->get(ConfigInterface::class);
// According to the registry config of the consumer, retrieve the nodes.
$consumers = $config->get('services.consumers', []);
$isMatch = false;
foreach ($consumers as $consumer) {
if (isset($consumer['name']) && $consumer['name'] === $this->serviceName) {
$isMatch = true;
break;
}
}
$consumer = $this->getConsumerConfig();
// Current $consumer is the config of the specified consumer.
if ($isMatch && isset($consumer['registry']['protocol'], $consumer['registry']['address'])) {
if (isset($consumer['registry']['protocol'], $consumer['registry']['address'])) {
// According to the protocol and address of the registry, retrieve the nodes.
switch ($registryProtocol = $consumer['registry']['protocol'] ?? '') {
case 'consul':
@ -177,6 +192,7 @@ abstract class AbstractServiceClient
}
return [$nodes, $refreshCallback];
}
// Not exists the registry config, then looking for the 'nodes' property.
if (isset($consumer['nodes'])) {
$nodes = [];
@ -190,6 +206,7 @@ abstract class AbstractServiceClient
}
return [$nodes, $refreshCallback];
}
throw new InvalidArgumentException('Config of registry or nodes missing.');
}