From 241516006c7822b22fa45407ee5a2ebc445b9abe Mon Sep 17 00:00:00 2001 From: daydaygo Date: Fri, 26 Jul 2019 17:09:54 +0800 Subject: [PATCH] amqp consumer queue support multi routing_key --- src/amqp/src/Consumer.php | 15 +++++++++++++-- src/amqp/src/Listener/MainWorkerStartListener.php | 6 +++++- src/amqp/src/Message/Message.php | 4 ++-- src/amqp/src/Message/MessageInterface.php | 4 ++-- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/amqp/src/Consumer.php b/src/amqp/src/Consumer.php index ef4acc13d..75e7ba07b 100644 --- a/src/amqp/src/Consumer.php +++ b/src/amqp/src/Consumer.php @@ -52,9 +52,13 @@ class Consumer extends Builder $this->declare($consumerMessage, $channel); + $routingKey = $consumerMessage->getRoutingKey(); + if (is_array($routingKey)) { + $routingKey = join(',', $routingKey); + } $channel->basic_consume( $consumerMessage->getQueue(), - $consumerMessage->getRoutingKey(), + $routingKey, false, false, false, @@ -108,6 +112,13 @@ class Consumer extends Builder $channel->queue_declare($builder->getQueue(), $builder->isPassive(), $builder->isDurable(), $builder->isExclusive(), $builder->isAutoDelete(), $builder->isNowait(), $builder->getArguments(), $builder->getTicket()); - $channel->queue_bind($message->getQueue(), $message->getExchange(), $message->getRoutingKey()); + $routingKey = $message->getRoutingKey(); + if (is_array($routingKey)) { + foreach ($routingKey as $v) { + $channel->queue_bind($message->getQueue(), $message->getExchange(), $v); + } + } else { + $channel->queue_bind($message->getQueue(), $message->getExchange(), $routingKey); + } } } diff --git a/src/amqp/src/Listener/MainWorkerStartListener.php b/src/amqp/src/Listener/MainWorkerStartListener.php index 870c80958..51eb78a02 100644 --- a/src/amqp/src/Listener/MainWorkerStartListener.php +++ b/src/amqp/src/Listener/MainWorkerStartListener.php @@ -79,7 +79,11 @@ class MainWorkerStartListener implements ListenerInterface $annotation->routingKey && $instance->setRoutingKey($annotation->routingKey); try { $producer->declare($instance); - $this->logger->debug(sprintf('AMQP exchange[%s] and routingKey[%s] were created successfully.', $instance->getExchange(), $instance->getRoutingKey())); + $routingKey = $instance->getRoutingKey(); + if (is_array($routingKey)) { + $routingKey = join(',', $routingKey); + } + $this->logger->debug(sprintf('AMQP exchange[%s] and routingKey[%s] were created successfully.', $instance->getExchange(), $routingKey)); } catch (AMQPProtocolChannelException $e) { $this->logger->debug('AMQPProtocolChannelException: ' . $e->getMessage()); // Do nothing. diff --git a/src/amqp/src/Message/Message.php b/src/amqp/src/Message/Message.php index 89730d0d0..c8f616ecd 100644 --- a/src/amqp/src/Message/Message.php +++ b/src/amqp/src/Message/Message.php @@ -70,13 +70,13 @@ abstract class Message implements MessageInterface return $this->exchange; } - public function setRoutingKey(string $routingKey): self + public function setRoutingKey($routingKey): self { $this->routingKey = $routingKey; return $this; } - public function getRoutingKey(): string + public function getRoutingKey() { return $this->routingKey; } diff --git a/src/amqp/src/Message/MessageInterface.php b/src/amqp/src/Message/MessageInterface.php index 09ec260b5..b6d272c2c 100644 --- a/src/amqp/src/Message/MessageInterface.php +++ b/src/amqp/src/Message/MessageInterface.php @@ -29,9 +29,9 @@ interface MessageInterface public function getExchange(): string; - public function setRoutingKey(string $routingKey); + public function setRoutingKey($routingKey); - public function getRoutingKey(): string; + public function getRoutingKey(); public function getExchangeBuilder(): ExchangeBuilder;