Fixed bug that amqp consumer couldn't restart when rabbitmq server stopped. (#3798)

This commit is contained in:
李铭昕 2021-07-11 09:05:42 +08:00 committed by GitHub
parent 1b2cb6aa18
commit 58bb6392d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 30 additions and 0 deletions

View File

@ -85,3 +85,4 @@
- [#3770](https://github.com/hyperf/hyperf/pull/3770) Fixed type error when using `Str::slug()`.
- [#3788](https://github.com/hyperf/hyperf/pull/3788) Fixed type error when using `BladeCompiler::getRawPlaceholder()`.
- [#3794](https://github.com/hyperf/hyperf/pull/3794) Fixed bug that `retry_interval` does not work for `rpc-multiplex`.
- [#3798](https://github.com/hyperf/hyperf/pull/3798) Fixed bug that amqp consumer couldn't restart when rabbitmq server stopped.

View File

@ -16,6 +16,7 @@ use Hyperf\Utils\Channel\ChannelManager;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use Hyperf\Utils\Exception\ChannelClosedException;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
@ -186,6 +187,12 @@ class AMQPConnection extends AbstractConnection
}
}
public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0])
{
$this->channelManager->flush();
return parent::close($reply_code, $reply_text, $method_sig);
}
protected function makeChannelId(): int
{
for ($i = 0; $i < $this->channel_max; ++$i) {
@ -233,6 +240,9 @@ class AMQPConnection extends AbstractConnection
if ($chan->isTimeout()) {
throw new AMQPTimeoutException('Timeout waiting on channel');
}
if ($chan->isClosing()) {
throw new ChannelClosedException('Wait channel was closed.');
}
}
return $data;

View File

@ -14,6 +14,9 @@ namespace HyperfTest\Amqp;
use Hyperf\Amqp\ConnectionFactory;
use Hyperf\Amqp\Consumer;
use Hyperf\Utils\Coroutine\Concurrent;
use Hyperf\Utils\Exception\ChannelClosedException;
use Hyperf\Utils\Reflection\ClassInvoker;
use HyperfTest\Amqp\Stub\AMQPConnectionStub;
use HyperfTest\Amqp\Stub\ContainerStub;
use Mockery;
use PHPUnit\Framework\TestCase;
@ -40,4 +43,17 @@ class ConsumerTest extends TestCase
$concurrent = $method->invokeArgs($consumer, ['co']);
$this->assertSame(5, $concurrent->getLimit());
}
public function testWaitChannel()
{
$connection = new AMQPConnectionStub();
$invoker = new ClassInvoker($connection);
$chan = $invoker->channelManager->get(1, true);
$chan->push($id = uniqid());
$this->assertSame($id, $invoker->wait_channel(1));
$this->expectException(ChannelClosedException::class);
$chan->close();
$invoker->wait_channel(1);
}
}

View File

@ -12,11 +12,14 @@ declare(strict_types=1);
namespace HyperfTest\Amqp\Stub;
use Hyperf\Amqp\AMQPConnection;
use Hyperf\Utils\Channel\ChannelManager;
class AMQPConnectionStub extends AMQPConnection
{
public function __construct()
{
$this->channelManager = new ChannelManager(16);
$this->channelManager->get(0, true);
}
public function setLastChannelId(int $id)