mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-12-02 03:37:44 +08:00
Removed rebalance check (#3235)
* Removed rebalance check * Update CHANGELOG-2.1.md Co-authored-by: 李铭昕 <715557344@qq.com>
This commit is contained in:
parent
a3f52abd78
commit
e203ac5156
@ -4,6 +4,10 @@
|
||||
|
||||
- [#3233](https://github.com/hyperf/hyperf/pull/3233) Fixed connection exhausted, when connect amqp server failed.
|
||||
|
||||
## Removed
|
||||
|
||||
- [#3235](https://github.com/hyperf/hyperf/pull/3235) Removed rebalance check, because `longlang/phpkafka` checked.
|
||||
|
||||
# v2.1.5 - 2021-02-01
|
||||
|
||||
## Fixed
|
||||
|
@ -25,8 +25,6 @@ use longlang\phpkafka\Consumer\ConsumeMessage;
|
||||
use longlang\phpkafka\Consumer\Consumer as LongLangConsumer;
|
||||
use longlang\phpkafka\Consumer\ConsumerConfig;
|
||||
use longlang\phpkafka\Exception\KafkaErrorException;
|
||||
use longlang\phpkafka\Protocol\ErrorCode;
|
||||
use longlang\phpkafka\Protocol\JoinGroup\JoinGroupRequest;
|
||||
use longlang\phpkafka\Socket\SwooleSocket;
|
||||
use Psr\Container\ContainerInterface;
|
||||
use Psr\EventDispatcher\EventDispatcherInterface;
|
||||
@ -136,21 +134,11 @@ class ConsumerManager
|
||||
|
||||
retry(
|
||||
3,
|
||||
function () use ($longLangConsumer, $consumerConfig) {
|
||||
function () use ($longLangConsumer) {
|
||||
try {
|
||||
$longLangConsumer->start();
|
||||
} catch (KafkaErrorException $exception) {
|
||||
$this->stdoutLogger->error($exception->getMessage());
|
||||
switch ($exception->getCode()) {
|
||||
case ErrorCode::REBALANCE_IN_PROGRESS:
|
||||
$joinGroupRequest = new JoinGroupRequest();
|
||||
$joinGroupRequest->setGroupInstanceId($consumerConfig->getGroupInstanceId());
|
||||
$joinGroupRequest->setMemberId($consumerConfig->getMemberId());
|
||||
$joinGroupRequest->setGroupId($consumerConfig->getGroupId());
|
||||
$longLangConsumer->getBroker()->getClient()->send($joinGroupRequest);
|
||||
$longLangConsumer->start();
|
||||
break;
|
||||
}
|
||||
|
||||
$this->dispatcher && $this->dispatcher->dispatch(new FailToConsume($this->consumer, [], $exception));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user