Fixed bug that defer cannot be triggered in nsq consumer. (#3255)

* Fixed bug that `defer` cannot be triggered in nsq consumer.

* Update CHANGELOG-2.1.md

* Update ContainerStub.php
This commit is contained in:
李铭昕 2021-02-07 15:17:18 +08:00 committed by GitHub
parent 1d0213fbaf
commit d9b73cb71b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 10 deletions

View File

@ -4,6 +4,7 @@
- [#3233](https://github.com/hyperf/hyperf/pull/3233) Fixed connection exhausted, when connect amqp server failed.
- [#3245](https://github.com/hyperf/hyperf/pull/3245) Fixed `autoCommit` does not works when you set `false` for `hyperf/kafka`.
- [#3255](https://github.com/hyperf/hyperf/pull/3255) Fixed bug that `defer` cannot be triggered in nsq consumer.
## Optimized

View File

@ -21,6 +21,7 @@ use Hyperf\Nsq\Event\BeforeSubscribe;
use Hyperf\Nsq\Event\FailToConsume;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\ProcessManager;
use Hyperf\Utils\Waiter;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
@ -84,6 +85,11 @@ class ConsumerManager
*/
private $config;
/**
* @var Waiter
*/
private $waiter;
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
{
parent::__construct($container);
@ -93,6 +99,7 @@ class ConsumerManager
'container' => $container,
'pool' => $consumer->getPool(),
]);
$this->waiter = make(Waiter::class, [0]);
if ($container->has(EventDispatcherInterface::class)) {
$this->dispatcher = $container->get(EventDispatcherInterface::class);
@ -119,17 +126,19 @@ class ConsumerManager
$this->consumer->getTopic(),
$this->consumer->getChannel(),
function ($data) {
$result = null;
try {
$this->dispatcher && $this->dispatcher->dispatch(new BeforeConsume($this->consumer, $data));
$result = $this->consumer->consume($data);
$this->dispatcher && $this->dispatcher->dispatch(new AfterConsume($this->consumer, $data, $result));
} catch (\Throwable $throwable) {
$result = Result::DROP;
$this->dispatcher && $this->dispatcher->dispatch(new FailToConsume($this->consumer, $data, $throwable));
}
return $this->waiter->wait(function () use ($data) {
$result = null;
try {
$this->dispatcher && $this->dispatcher->dispatch(new BeforeConsume($this->consumer, $data));
$result = $this->consumer->consume($data);
$this->dispatcher && $this->dispatcher->dispatch(new AfterConsume($this->consumer, $data, $result));
} catch (\Throwable $throwable) {
$result = Result::DROP;
$this->dispatcher && $this->dispatcher->dispatch(new FailToConsume($this->consumer, $data, $throwable));
}
return $result;
return $result;
});
}
);

View File

@ -17,6 +17,7 @@ use Hyperf\Nsq\MessageBuilder;
use Hyperf\Nsq\Nsq;
use Hyperf\Nsq\Pool\NsqPoolFactory;
use Hyperf\Utils\ApplicationContext;
use Hyperf\Utils\Waiter;
use Mockery;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
@ -31,6 +32,9 @@ class ContainerStub
$container = Mockery::mock(Container::class);
ApplicationContext::setContainer($container);
$container->shouldReceive('make')->with(Waiter::class, Mockery::any())->andReturnUsing(function ($_, $args) {
return new Waiter(...$args);
});
$container->shouldReceive('get')->with(StdoutLoggerInterface::class)->andReturnUsing(function () {
$logger = Mockery::mock(StdoutLoggerInterface::class);
$logger->shouldReceive('debug')->andReturn(null);