Optimized hyperf/kafka which won't make a new producer to requeue message. (#3249)

This commit is contained in:
李铭昕 2021-02-05 11:18:15 +08:00 committed by GitHub
parent 99fe12a9aa
commit bc56f93c5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 1 deletions

View File

@ -5,6 +5,10 @@
- [#3233](https://github.com/hyperf/hyperf/pull/3233) Fixed connection exhausted, when connect amqp server failed.
- [#3245](https://github.com/hyperf/hyperf/pull/3245) Fixed `autoCommit` does not works when you set `false` for `hyperf/kafka`.
## Optimized
- [#3249](https://github.com/hyperf/hyperf/pull/3249) Optimized `hyperf/kafka` which won't make a new producer to requeue message.
## Removed
- [#3235](https://github.com/hyperf/hyperf/pull/3235) Removed rebalance check, because `longlang/phpkafka` checked.

View File

@ -97,12 +97,18 @@ class ConsumerManager
*/
protected $stdoutLogger;
/**
* @var Producer
*/
protected $producer;
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
{
parent::__construct($container);
$this->consumer = $consumer;
$this->config = $container->get(ConfigInterface::class);
$this->stdoutLogger = $container->get(StdoutLoggerInterface::class);
$this->producer = $container->get(Producer::class);
if ($container->has(EventDispatcherInterface::class)) {
$this->dispatcher = $container->get(EventDispatcherInterface::class);
}
@ -129,7 +135,7 @@ class ConsumerManager
}
if ($result === Result::REQUEUE) {
make(Producer::class)->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders());
$this->producer->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders());
}
}