From 58bb6392d01ea420926b2efaaf57dcc20169c891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= Date: Sun, 11 Jul 2021 09:05:42 +0800 Subject: [PATCH] Fixed bug that amqp consumer couldn't restart when rabbitmq server stopped. (#3798) --- CHANGELOG-2.2.md | 1 + src/amqp/src/AMQPConnection.php | 10 ++++++++++ src/amqp/tests/ConsumerTest.php | 16 ++++++++++++++++ src/amqp/tests/Stub/AMQPConnectionStub.php | 3 +++ 4 files changed, 30 insertions(+) diff --git a/CHANGELOG-2.2.md b/CHANGELOG-2.2.md index 55d6e6750..46d3f82c7 100644 --- a/CHANGELOG-2.2.md +++ b/CHANGELOG-2.2.md @@ -85,3 +85,4 @@ - [#3770](https://github.com/hyperf/hyperf/pull/3770) Fixed type error when using `Str::slug()`. - [#3788](https://github.com/hyperf/hyperf/pull/3788) Fixed type error when using `BladeCompiler::getRawPlaceholder()`. - [#3794](https://github.com/hyperf/hyperf/pull/3794) Fixed bug that `retry_interval` does not work for `rpc-multiplex`. +- [#3798](https://github.com/hyperf/hyperf/pull/3798) Fixed bug that amqp consumer couldn't restart when rabbitmq server stopped. diff --git a/src/amqp/src/AMQPConnection.php b/src/amqp/src/AMQPConnection.php index b6ddac258..04c0d4447 100644 --- a/src/amqp/src/AMQPConnection.php +++ b/src/amqp/src/AMQPConnection.php @@ -16,6 +16,7 @@ use Hyperf\Utils\Channel\ChannelManager; use Hyperf\Utils\Coordinator\Constants; use Hyperf\Utils\Coordinator\CoordinatorManager; use Hyperf\Utils\Coroutine; +use Hyperf\Utils\Exception\ChannelClosedException; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Exception\AMQPRuntimeException; @@ -186,6 +187,12 @@ class AMQPConnection extends AbstractConnection } } + public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0]) + { + $this->channelManager->flush(); + return parent::close($reply_code, $reply_text, $method_sig); + } + protected function makeChannelId(): int { for ($i = 0; $i < $this->channel_max; ++$i) { @@ -233,6 +240,9 @@ class AMQPConnection extends AbstractConnection if ($chan->isTimeout()) { throw new AMQPTimeoutException('Timeout waiting on channel'); } + if ($chan->isClosing()) { + throw new ChannelClosedException('Wait channel was closed.'); + } } return $data; diff --git a/src/amqp/tests/ConsumerTest.php b/src/amqp/tests/ConsumerTest.php index 40d8be40b..d6af64a50 100644 --- a/src/amqp/tests/ConsumerTest.php +++ b/src/amqp/tests/ConsumerTest.php @@ -14,6 +14,9 @@ namespace HyperfTest\Amqp; use Hyperf\Amqp\ConnectionFactory; use Hyperf\Amqp\Consumer; use Hyperf\Utils\Coroutine\Concurrent; +use Hyperf\Utils\Exception\ChannelClosedException; +use Hyperf\Utils\Reflection\ClassInvoker; +use HyperfTest\Amqp\Stub\AMQPConnectionStub; use HyperfTest\Amqp\Stub\ContainerStub; use Mockery; use PHPUnit\Framework\TestCase; @@ -40,4 +43,17 @@ class ConsumerTest extends TestCase $concurrent = $method->invokeArgs($consumer, ['co']); $this->assertSame(5, $concurrent->getLimit()); } + + public function testWaitChannel() + { + $connection = new AMQPConnectionStub(); + $invoker = new ClassInvoker($connection); + $chan = $invoker->channelManager->get(1, true); + $chan->push($id = uniqid()); + $this->assertSame($id, $invoker->wait_channel(1)); + + $this->expectException(ChannelClosedException::class); + $chan->close(); + $invoker->wait_channel(1); + } } diff --git a/src/amqp/tests/Stub/AMQPConnectionStub.php b/src/amqp/tests/Stub/AMQPConnectionStub.php index 6a2d6b6b8..3734040f5 100644 --- a/src/amqp/tests/Stub/AMQPConnectionStub.php +++ b/src/amqp/tests/Stub/AMQPConnectionStub.php @@ -12,11 +12,14 @@ declare(strict_types=1); namespace HyperfTest\Amqp\Stub; use Hyperf\Amqp\AMQPConnection; +use Hyperf\Utils\Channel\ChannelManager; class AMQPConnectionStub extends AMQPConnection { public function __construct() { + $this->channelManager = new ChannelManager(16); + $this->channelManager->get(0, true); } public function setLastChannelId(int $id)