diff --git a/CHANGELOG-2.1.md b/CHANGELOG-2.1.md index 372833bd3..9f6bccc45 100644 --- a/CHANGELOG-2.1.md +++ b/CHANGELOG-2.1.md @@ -5,6 +5,10 @@ - [#3233](https://github.com/hyperf/hyperf/pull/3233) Fixed connection exhausted, when connect amqp server failed. - [#3245](https://github.com/hyperf/hyperf/pull/3245) Fixed `autoCommit` does not works when you set `false` for `hyperf/kafka`. +## Optimized + +- [#3249](https://github.com/hyperf/hyperf/pull/3249) Optimized `hyperf/kafka` which won't make a new producer to requeue message. + ## Removed - [#3235](https://github.com/hyperf/hyperf/pull/3235) Removed rebalance check, because `longlang/phpkafka` checked. diff --git a/src/kafka/src/ConsumerManager.php b/src/kafka/src/ConsumerManager.php index 4acca593b..3dea31d32 100644 --- a/src/kafka/src/ConsumerManager.php +++ b/src/kafka/src/ConsumerManager.php @@ -97,12 +97,18 @@ class ConsumerManager */ protected $stdoutLogger; + /** + * @var Producer + */ + protected $producer; + public function __construct(ContainerInterface $container, AbstractConsumer $consumer) { parent::__construct($container); $this->consumer = $consumer; $this->config = $container->get(ConfigInterface::class); $this->stdoutLogger = $container->get(StdoutLoggerInterface::class); + $this->producer = $container->get(Producer::class); if ($container->has(EventDispatcherInterface::class)) { $this->dispatcher = $container->get(EventDispatcherInterface::class); } @@ -129,7 +135,7 @@ class ConsumerManager } if ($result === Result::REQUEUE) { - make(Producer::class)->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders()); + $this->producer->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders()); } }