diff --git a/src/amqp/src/Annotation/Consumer.php b/src/amqp/src/Annotation/Consumer.php index 0137ac4a0..94a534606 100644 --- a/src/amqp/src/Annotation/Consumer.php +++ b/src/amqp/src/Annotation/Consumer.php @@ -1,7 +1,16 @@ poolFactory->getConnectionPool($poolName); } - } diff --git a/src/amqp/src/Connection.php b/src/amqp/src/Connection.php index 786bd4bb1..e8801a261 100644 --- a/src/amqp/src/Connection.php +++ b/src/amqp/src/Connection.php @@ -13,18 +13,15 @@ declare(strict_types=1); namespace Hyperf\Amqp; use Hyperf\Amqp\Connection\AMQPSwooleConnection; -use Hyperf\Amqp\Pool\AmqpChannelPool; use Hyperf\Amqp\Pool\AmqpConnectionPool; use Hyperf\Contract\ConnectionInterface; use Hyperf\Pool\Connection as BaseConnection; use Hyperf\Utils\Arr; use Hyperf\Utils\Coroutine; -use PhpAmqpLib\Channel\AbstractChannel; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; use Psr\Container\ContainerInterface; -use Swoole\Coroutine\Channel as SwooleChannel; class Connection extends BaseConnection implements ConnectionInterface { @@ -164,5 +161,4 @@ class Connection extends BaseConnection implements ConnectionInterface return false; } - } diff --git a/src/amqp/src/Consumer.php b/src/amqp/src/Consumer.php index abbb323d9..bff3ce1e8 100644 --- a/src/amqp/src/Consumer.php +++ b/src/amqp/src/Consumer.php @@ -24,7 +24,6 @@ use Throwable; class Consumer extends Builder { - /** * @var bool */ @@ -54,7 +53,6 @@ class Consumer extends Builder $this->logger = $logger; } - public function signalHandler(): void { $this->status = false; @@ -93,7 +91,8 @@ class Consumer extends Builder if ($result === Result::ACK) { $this->logger->debug($deliveryTag . ' acked.'); return $channel->basic_ack($deliveryTag); - } elseif ($consumerMessage->isRequeue() && $result === Result::REQUEUE) { + } + if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) { $this->logger->debug($deliveryTag . ' requeued.'); return $channel->basic_reject($deliveryTag, true); } diff --git a/src/amqp/src/ConsumerFactory.php b/src/amqp/src/ConsumerFactory.php index 8956e3b6b..505495e59 100644 --- a/src/amqp/src/ConsumerFactory.php +++ b/src/amqp/src/ConsumerFactory.php @@ -1,7 +1,16 @@ get(PoolFactory::class), $container->get(StdoutLoggerInterface::class)); } - -} \ No newline at end of file +} diff --git a/src/amqp/src/ConsumerManager.php b/src/amqp/src/ConsumerManager.php index 98f890376..9f90081fa 100644 --- a/src/amqp/src/ConsumerManager.php +++ b/src/amqp/src/ConsumerManager.php @@ -1,20 +1,27 @@ container = $this->container; $nums = $property['nums'] ?? 1; $process = $this->createProcess($instance); - $process->nums = (int)$nums; + $process->nums = (int) $nums; $process->name = 'Consumer-' . $property['queue']; ProcessRegister::register($process); } @@ -48,9 +55,7 @@ class ConsumerManager private function createProcess(ConsumerMessageInterface $consumerMessage): Process { - return new class($this->container, $consumerMessage) extends Process - { - + return new class($this->container, $consumerMessage) extends Process { /** * @var \Hyperf\Amqp\Consumer */ @@ -74,5 +79,4 @@ class ConsumerManager } }; } - -} \ No newline at end of file +} diff --git a/src/amqp/src/Listener/BeforeMainServerStartListener.php b/src/amqp/src/Listener/BeforeMainServerStartListener.php index 8e5141459..1133e4245 100644 --- a/src/amqp/src/Listener/BeforeMainServerStartListener.php +++ b/src/amqp/src/Listener/BeforeMainServerStartListener.php @@ -1,5 +1,15 @@ container->get(ConsumerManager::class); $consumerManager->run(); - } - -} \ No newline at end of file +} diff --git a/src/amqp/src/Listener/MainWorkerStartListener.php b/src/amqp/src/Listener/MainWorkerStartListener.php index 9c06e94c3..2bb96587c 100644 --- a/src/amqp/src/Listener/MainWorkerStartListener.php +++ b/src/amqp/src/Listener/MainWorkerStartListener.php @@ -1,5 +1,15 @@ queue = $queue; diff --git a/src/amqp/src/Message/ConsumerMessageInterface.php b/src/amqp/src/Message/ConsumerMessageInterface.php index 58a1080d6..c34c92b13 100644 --- a/src/amqp/src/Message/ConsumerMessageInterface.php +++ b/src/amqp/src/Message/ConsumerMessageInterface.php @@ -12,12 +12,10 @@ declare(strict_types=1); namespace Hyperf\Amqp\Message; - use Hyperf\Amqp\Builder\QueueBuilder; interface ConsumerMessageInterface extends MessageInterface { - public function consume($data): string; public function setQueue(string $queue); diff --git a/src/amqp/src/Message/Message.php b/src/amqp/src/Message/Message.php index 189ba0ac9..bd0ebcfcc 100644 --- a/src/amqp/src/Message/Message.php +++ b/src/amqp/src/Message/Message.php @@ -17,7 +17,6 @@ use Hyperf\Amqp\Exception\MessageException; abstract class Message implements MessageInterface { - /** * @var string */ diff --git a/src/amqp/src/Message/MessageInterface.php b/src/amqp/src/Message/MessageInterface.php index d22ea6695..8568ab417 100644 --- a/src/amqp/src/Message/MessageInterface.php +++ b/src/amqp/src/Message/MessageInterface.php @@ -12,7 +12,6 @@ declare(strict_types=1); namespace Hyperf\Amqp\Message; - use Hyperf\Amqp\Builder\ExchangeBuilder; interface MessageInterface diff --git a/src/amqp/src/Message/ProducerMessage.php b/src/amqp/src/Message/ProducerMessage.php index a108eb1dc..cefb87c65 100644 --- a/src/amqp/src/Message/ProducerMessage.php +++ b/src/amqp/src/Message/ProducerMessage.php @@ -18,7 +18,6 @@ use Hyperf\Framework\ApplicationContext; abstract class ProducerMessage extends Message implements ProducerMessageInterface { - /** * @var string */ diff --git a/src/amqp/src/Message/ProducerMessageInterface.php b/src/amqp/src/Message/ProducerMessageInterface.php index a35782316..5d3d134b6 100644 --- a/src/amqp/src/Message/ProducerMessageInterface.php +++ b/src/amqp/src/Message/ProducerMessageInterface.php @@ -14,11 +14,9 @@ namespace Hyperf\Amqp\Message; interface ProducerMessageInterface extends MessageInterface { - public function setPayload($data); public function payload(): string; public function getProperties(): array; - } diff --git a/src/amqp/src/Message/Type.php b/src/amqp/src/Message/Type.php index c9836faa5..1ccce75b0 100644 --- a/src/amqp/src/Message/Type.php +++ b/src/amqp/src/Message/Type.php @@ -28,5 +28,4 @@ class Type self::TOPIC, ]; } - } diff --git a/src/amqp/src/Pool/AmqpConnectionPool.php b/src/amqp/src/Pool/AmqpConnectionPool.php index a736e5dbc..0035101d5 100644 --- a/src/amqp/src/Pool/AmqpConnectionPool.php +++ b/src/amqp/src/Pool/AmqpConnectionPool.php @@ -46,6 +46,11 @@ class AmqpConnectionPool extends Pool return $this->name; } + public function release(ConnectionInterface $connection): void + { + parent::release($connection); + } + protected function initOption() { if ($poolOptions = Arr::get($this->config, 'pool')) { @@ -66,10 +71,4 @@ class AmqpConnectionPool extends Pool { return new Connection($this->container, $this, $this->config); } - - public function release(ConnectionInterface $connection): void - { - parent::release($connection); - } - } diff --git a/src/amqp/src/Pool/PoolFactory.php b/src/amqp/src/Pool/PoolFactory.php index 908f14cd9..bf495eccf 100644 --- a/src/amqp/src/Pool/PoolFactory.php +++ b/src/amqp/src/Pool/PoolFactory.php @@ -39,5 +39,4 @@ class PoolFactory return $this->pools[$name] = new AmqpConnectionPool($this->container, $name); } - } diff --git a/src/amqp/src/Producer.php b/src/amqp/src/Producer.php index dbd120d6d..4217b6881 100644 --- a/src/amqp/src/Producer.php +++ b/src/amqp/src/Producer.php @@ -18,13 +18,12 @@ use PhpAmqpLib\Message\AMQPMessage; class Producer extends Builder { - public function produce(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5): bool { $result = false; $this->injectMessageProperty($producerMessage); - + $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); $pool = $this->getConnectionPool($producerMessage->getPoolName()); /** @var \Hyperf\Amqp\Connection $connection */ @@ -50,9 +49,8 @@ class Producer extends Builder foreach ($item as $key => $value) { $setter = setter($key); if (method_exists($producerMessage, $setter)) { - $producerMessage->$setter($value); + $producerMessage->{$setter}($value); } } } - } diff --git a/src/amqp/src/Result.php b/src/amqp/src/Result.php index 547e48335..9788cfc5f 100644 --- a/src/amqp/src/Result.php +++ b/src/amqp/src/Result.php @@ -1,11 +1,19 @@