Added consume_timeout for kafka. (#5525)

This commit is contained in:
李铭昕 2023-03-15 09:34:38 +08:00 committed by GitHub
parent 195fb9bb98
commit 1f7ac50650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 31 deletions

View File

@ -8,7 +8,8 @@
- [#5513](https://github.com/hyperf/hyperf/pull/5513) Use default normalizer for `rpc-multiplex` and use `protocol.normalizer` for `rpc-server`.
- [#5518](https://github.com/hyperf/hyperf/pull/5518) Added `SwooleConnection::getSocket` to get swoole response.
- [#5520](https://github.com/hyperf/hyperf/pull/5520) Added `Coroutine::stats()` and `Coroutine::exists()`.
- [#5525](https://github.com/hyperf/hyperf/pull/5525) Added `kafka.default.consume_timeout` to control the consumer for consuming messages.
-
## Fixed
- [#5519](https://github.com/hyperf/hyperf/pull/5519) Fixed bug that worker cannot exit caused by kafka `producer->loop()`.
@ -17,6 +18,7 @@
## Optimized
- [#5510](https://github.com/hyperf/hyperf/pull/5510) Allow developers to replace the `normalizer` of `RPC Client` themselves.
- [#5525](https://github.com/hyperf/hyperf/pull/5525) Running in an independent coroutine when consume kafka message.
# v3.0.10 - 2023-03-11

View File

@ -36,17 +36,8 @@ return [
'offset_retry' => 5,
'auto_create_topic' => true,
'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR,
'sasl' => [
],
'ssl' => [
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
'sasl' => [],
'ssl' => [],
'consume_timeout' => 600,
],
];

View File

@ -27,10 +27,10 @@ use longlang\phpkafka\Client\SwooleClient;
use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer as LongLangConsumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
use longlang\phpkafka\Exception\KafkaErrorException;
use longlang\phpkafka\Socket\SwooleSocket;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;
class ConsumerManager
{
@ -99,6 +99,8 @@ class ConsumerManager
$longLangConsumer = new LongLangConsumer(
$consumerConfig,
function (ConsumeMessage $message) use ($consumer, $consumerConfig) {
$config = $this->getConfig();
wait(function () use ($consumer, $consumerConfig, $message) {
$this->dispatcher?->dispatch(new BeforeConsume($consumer, $message));
$result = $consumer->consume($message);
@ -118,6 +120,7 @@ class ConsumerManager
}
$this->dispatcher?->dispatch(new AfterConsume($consumer, $message, $result));
}, $config['consume_timeout'] ?? -1);
}
);
@ -128,7 +131,7 @@ class ConsumerManager
}
$longLangConsumer->start();
} catch (\Throwable $exception) {
} catch (Throwable $exception) {
$this->stdoutLogger->warning((string) $exception);
$this->dispatcher?->dispatch(new FailToConsume($this->consumer, [], $exception));
}
@ -137,9 +140,14 @@ class ConsumerManager
$longLangConsumer->close();
}
public function getConfig(): array
{
return $this->config->get('kafka.' . $this->consumer->getPool());
}
public function getConsumerConfig(): ConsumerConfig
{
$config = $this->config->get('kafka.' . $this->consumer->getPool());
$config = $this->getConfig();
$consumerConfig = new ConsumerConfig();
$consumerConfig->setAutoCommit($this->consumer->isAutoCommit());
$consumerConfig->setRackId($config['rack_id']);