Optimized code for kafka. (#3730)

- Support `timeout` for `Producer` to avoid requests not responding.
- Removed useless code with pool.
- Throw exceptions when connect kafka failed.
- Removed config `brokers` and `update_brokers` from kafka.
This commit is contained in:
李铭昕 2021-06-23 20:25:22 +08:00 committed by GitHub
parent 830f9f57a7
commit da60424b8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 74 additions and 395 deletions

View File

@ -29,6 +29,7 @@
- [#3715](https://github.com/hyperf/hyperf/pull/3715) Restructure nacos component, be sure to reread the documents.
- [#3722](https://github.com/hyperf/hyperf/pull/3722) Removed config `config_apollo.php`, please use `config_center.php` instead.
- [#3725](https://github.com/hyperf/hyperf/pull/3725) Removed config `config_etcd.php`, please use `config_center.php` instead.
- [#3730](https://github.com/hyperf/hyperf/pull/3730) Removed config `brokers` and `update_brokers` from kafka.
## Deprecated
@ -58,6 +59,10 @@
- [#3670](https://github.com/hyperf/hyperf/pull/3670) Adapt database component to support php8.
- [#3673](https://github.com/hyperf/hyperf/pull/3673) Adapt all components to support php8.
- [#3730](https://github.com/hyperf/hyperf/pull/3730) Optimized code for kafka component.
- Support `timeout` for `Producer` to avoid requests not responding.
- Removed useless code with pool.
- Throw exceptions when connect kafka failed.
## Fixed

View File

@ -19,7 +19,7 @@
"hyperf/pool": "~2.2.0",
"hyperf/process": "~2.2.0",
"hyperf/utils": "~2.2.0",
"longlang/phpkafka": "^1.1",
"longlang/phpkafka": "^1.1.4",
"psr/container": "^1.0|^2.0",
"psr/log": "^1.0"
},

View File

@ -18,14 +18,10 @@ return [
'recv_timeout' => -1,
'client_id' => '',
'max_write_attempts' => 3,
'brokers' => [
'127.0.0.1:9092',
],
'bootstrap_servers' => [
'127.0.0.1:9092',
],
'update_brokers' => true,
'acks' => 0,
'acks' => -1,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,

View File

@ -172,7 +172,7 @@ class ConsumerManager
$consumerConfig->setGroupInstanceId(sprintf('%s-%s', $this->consumer->getGroupId(), uniqid()));
$consumerConfig->setMemberId($this->consumer->getMemberId() ?: '');
$consumerConfig->setInterval($config['interval']);
$consumerConfig->setBrokers($config['brokers']);
$consumerConfig->setBootstrapServers($config['bootstrap_servers']);
$consumerConfig->setSocket(SwooleSocket::class);
$consumerConfig->setClient(SwooleClient::class);
$consumerConfig->setMaxWriteAttempts($config['max_write_attempts']);

View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Exception;
class ConnectionCLosedException extends KafkaException
{
}

View File

@ -11,8 +11,6 @@ declare(strict_types=1);
*/
namespace Hyperf\Kafka\Exception;
use RuntimeException;
class InvalidConsumeResultException extends RuntimeException
class InvalidConsumeResultException extends KafkaException
{
}

View File

@ -9,10 +9,10 @@ declare(strict_types=1);
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Pool;
namespace Hyperf\Kafka\Exception;
use Hyperf\Pool\Frequency as DefaultFrequency;
use RuntimeException;
class Frequency extends DefaultFrequency
class KafkaException extends RuntimeException
{
}

View File

@ -0,0 +1,16 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Exception;
class TimeoutException extends KafkaException
{
}

View File

@ -1,122 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Kafka\Pool\KafkaPool;
use Hyperf\Pool\Connection as BaseConnection;
use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\Producer as LongLangConsumer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Socket\SwooleSocket;
use Psr\Container\ContainerInterface;
class KafkaConnection extends BaseConnection implements ConnectionInterface
{
/**
* @var KafkaPool
*/
protected $pool;
/**
* @var null|LongLangConsumer
*/
protected $connection;
/**
* @var array
*/
protected $config;
/**
* @var Context|mixed
*/
protected $context;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container, KafkaPool $pool, array $config)
{
parent::__construct($container, $pool);
$this->logger = $container->get(StdoutLoggerInterface::class);
$this->config = $config;
$this->context = $container->get(Context::class);
$this->connection = $this->initConnection();
}
public function __call($name, $arguments)
{
return $this->connection->{$name}(...$arguments);
}
public function getActiveConnection()
{
if ($this->check()) {
return $this->connection;
}
$this->reconnect();
return $this->connection;
}
public function check(): bool
{
return isset($this->connection) && $this->connection instanceof Producer;
}
public function reconnect(): bool
{
if ($this->connection) {
$this->close();
}
$this->connection = $this->initConnection();
return true;
}
public function close(): bool
{
try {
$this->connection->close();
} catch (\Throwable $exception) {
$this->logger->error((string) $exception);
} finally {
$this->connection = null;
}
return true;
}
protected function initConnection()
{
$producerConfig = new ProducerConfig();
$producerConfig->setConnectTimeout($this->config['connect_timeout']);
$producerConfig->setSendTimeout($this->config['send_timeout']);
$producerConfig->setRecvTimeout($this->config['recv_timeout']);
$producerConfig->setClientId($this->config['client_id']);
$producerConfig->setMaxWriteAttempts($this->config['max_write_attempts']);
$producerConfig->setSocket(SwooleSocket::class);
$producerConfig->setBrokers($this->config['brokers']);
$producerConfig->setBootstrapServers($this->config['bootstrap_servers']);
$producerConfig->setUpdateBrokers($this->config['update_brokers']);
$producerConfig->setAcks($this->config['acks']);
$producerConfig->setProducerId($this->config['producer_id']);
$producerConfig->setProducerEpoch($this->config['producer_epoch']);
$producerConfig->setPartitionLeaderEpoch($this->config['partition_leader_epoch']);
return new LongLangConsumer($producerConfig);
}
}

View File

@ -1,90 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Pool;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Kafka\KafkaConnection;
use Hyperf\Pool\Pool;
use Hyperf\Utils\Arr;
use Psr\Container\ContainerInterface;
class KafkaPool extends Pool
{
/**
* @var string
*/
protected $name;
/**
* @var array
*/
protected $config;
public function __construct(ContainerInterface $container, string $name)
{
$this->name = $name;
$config = $container->get(ConfigInterface::class);
$key = sprintf('kafka.%s', $this->name);
if (! $config->has($key)) {
throw new \InvalidArgumentException(sprintf('config[%s] is not exist!', $key));
}
$this->config = Arr::merge($this->getDefaultConfig(), $config->get($key, []));
$options = Arr::get($this->config, 'pool', []);
parent::__construct($container, $options);
}
public function getName(): string
{
return $this->name;
}
protected function createConnection(): ConnectionInterface
{
return new KafkaConnection($this->container, $this, $this->config);
}
protected function getDefaultConfig(): array
{
return [
'connect_timeout' => -1,
'send_timeout' => -1,
'recv_timeout' => -1,
'client_id' => '',
'max_write_attempts' => 3,
'brokers' => [
'127.0.0.1:9092',
],
'bootstrap_servers' => '127.0.0.1:9092',
'update_brokers' => true,
'acks' => 0,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,
'interval' => 0,
'session_timeout' => 60,
'rebalance_timeout' => 60,
'partitions' => [0],
'replica_id' => -1,
'rack_id' => '',
'is_auto_create_topic' => true,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
],
];
}
}

View File

@ -1,53 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Pool;
use Hyperf\Contract;
use Psr\Container\ContainerInterface;
class PoolFactory
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var KafkaPool[]
*/
protected $pools = [];
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function getPool(string $name): KafkaPool
{
if (isset($this->pools[$name])) {
return $this->pools[$name];
}
return $this->pools[$name] = $this->make($name);
}
protected function make(string $name): KafkaPool
{
if ($this->container instanceof Contract\ContainerInterface) {
$pool = $this->container->make(KafkaPool::class, ['name' => $name]);
} else {
$pool = new KafkaPool($this->container, $name);
}
return $pool;
}
}

View File

@ -13,11 +13,13 @@ namespace Hyperf\Kafka;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Engine\Channel;
use Hyperf\Kafka\Transport\SwooleSocket;
use Hyperf\Kafka\Exception\ConnectionCLosedException;
use Hyperf\Kafka\Exception\TimeoutException;
use longlang\phpkafka\Broker;
use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer as LongLangProducer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Socket\SwooleSocket;
use Swoole\Coroutine;
class Producer
@ -43,50 +45,40 @@ class Producer
protected $producer;
/**
* @var array
* @var int
*/
protected $topicsMeta;
protected $timeout;
public function __construct(ConfigInterface $config, string $name = 'default')
public function __construct(ConfigInterface $config, string $name = 'default', int $timeout = 10)
{
$this->config = $config;
$this->name = $name;
$this->timeout = $timeout;
}
public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null): void
{
$this->loop();
$ack = new Channel();
$this->chan->push(function () use ($topic, $key, $value, $headers, $partitionIndex, $ack) {
$ack = new Channel(1);
$chan = $this->chan;
$chan->push(function () use ($topic, $key, $value, $headers, $partitionIndex, $ack) {
try {
if (! isset($this->topicsMeta[$topic])) {
$this->producer->send($topic, $value, $key, $headers);
$ack->close();
return;
}
if (! is_int($partitionIndex)) {
$index = $this->getIndex($key, $value, count($this->topicsMeta[$topic]));
$partitionIndex = array_keys($this->topicsMeta[$topic])[$index];
}
$this->producer->send(
$topic,
$value,
$key,
$headers,
$partitionIndex,
$this->topicsMeta[$topic][$partitionIndex]
);
$this->producer->send($topic, $value, $key, $headers, $partitionIndex);
$ack->close();
} catch (\Throwable $e) {
$ack->push($e);
throw $e;
}
});
if ($e = $ack->pop()) {
if ($chan->isClosing()) {
throw new ConnectionCLosedException('Connection closed.');
}
if ($e = $ack->pop($this->timeout)) {
throw $e;
}
if ($ack->isTimeout()) {
throw new TimeoutException('Kafka send timeout.');
}
}
/**
@ -95,22 +87,26 @@ class Producer
public function sendBatch(array $messages): void
{
$this->loop();
$ack = new Channel();
$this->chan->push(function () use ($messages, $ack) {
$ack = new Channel(1);
$chan = $this->chan;
$chan->push(function () use ($messages, $ack) {
try {
$messagesByBroker = $this->slitByBroker($messages);
foreach ($messagesByBroker as $brokerId => $messages) {
$this->producer->sendBatch($messages, $brokerId);
}
$this->producer->sendBatch($messages);
$ack->close();
} catch (\Throwable $e) {
$ack->push($e);
throw $e;
}
});
if ($chan->isClosing()) {
throw new ConnectionCLosedException('Connection closed.');
}
if ($e = $ack->pop()) {
throw $e;
}
if ($ack->isTimeout()) {
throw new TimeoutException('Kafka send timeout.');
}
}
public function close(): void
@ -139,7 +135,6 @@ class Producer
Coroutine::create(function () {
while (true) {
$this->producer = $this->makeProducer();
$this->topicsMeta = $this->fetchMeta();
while (true) {
$closure = $this->chan->pop();
if (! $closure) {
@ -154,6 +149,7 @@ class Producer
}
}
/* @phpstan-ignore-next-line */
$this->chan->close();
$this->chan = null;
});
}
@ -169,8 +165,6 @@ class Producer
$producerConfig->setMaxWriteAttempts($config['max_write_attempts']);
$producerConfig->setSocket(SwooleSocket::class);
$producerConfig->setBootstrapServers($config['bootstrap_servers']);
$producerConfig->setUpdateBrokers($config['update_brokers']);
$producerConfig->setBrokers($config['brokers']);
$producerConfig->setAcks($config['acks']);
$producerConfig->setProducerId($config['producer_id']);
$producerConfig->setProducerEpoch($config['producer_epoch']);
@ -178,41 +172,4 @@ class Producer
$producerConfig->setAutoCreateTopic($config['auto_create_topic']);
return new LongLangProducer($producerConfig);
}
private function getIndex($key, $value, $max)
{
if ($key === null) {
return crc32($value) % $max;
}
return crc32($key) % $max;
}
/**
* @param ProduceMessage[] $messages
*/
private function slitByBroker(array $messages): array
{
$messageByBroker = [];
foreach ($messages as $message) {
$messageByBroker[$this->getMessageBrokerId($message)][] = $message;
}
return $messageByBroker;
}
private function getMessageBrokerId(ProduceMessage $message): int
{
return $this->topicsMeta[$message->getTopic()][$message->getPartitionIndex()];
}
private function fetchMeta(): array
{
$metaCache = [];
$topicMeta = $this->producer->getBroker()->getTopicsMeta();
foreach ($topicMeta as $meta) {
foreach ($meta->getPartitions() as $partition) {
$metaCache[$meta->getName()][$partition->getPartitionIndex()] = $partition->getLeaderId();
}
}
return $metaCache;
}
}

View File

@ -1,44 +0,0 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Transport;
class SwooleSocket extends \longlang\phpkafka\Socket\SwooleSocket
{
public function recv(int $length, ?float $timeout = null): string
{
$beginTime = microtime(true);
if ($timeout === null) {
$timeout = $this->config->getRecvTimeout();
}
$leftTime = $timeout;
/* @phpstan-ignore-next-line */
while ($this->socket && ! isset($this->receivedBuffer[$length - 1]) && ($timeout == -1 || $leftTime > 0)) {
$buffer = $this->socket->recv($timeout);
if ($buffer === false || $buffer === '') {
return '';
}
$this->receivedBuffer .= $buffer;
if ($timeout > 0) {
$leftTime = $timeout - (microtime(true) - $beginTime);
}
}
if (isset($this->receivedBuffer[$length - 1])) {
$result = substr($this->receivedBuffer, 0, $length);
$this->receivedBuffer = substr($this->receivedBuffer, $length);
return $result;
}
return '';
}
}

View File

@ -74,7 +74,7 @@ class ConsumerManagerTest extends TestCase
$this->assertTrue(strpos($consumer->getGroupInstanceId(), $groupId) !== false);
$this->assertSame('', $consumer->getMemberId());
$this->assertSame((float) $config['interval'], $consumer->getInterval());
$this->assertTrue(in_array($config['bootstrap_servers'], $consumer->getBrokers()));
$this->assertTrue(in_array($config['bootstrap_servers'], $consumer->getBootstrapServers()));
$this->assertSame(SwooleSocket::class, $consumer->getSocket());
$this->assertSame(SwooleClient::class, $consumer->getClient());
$this->assertSame($config['max_write_attempts'], $consumer->getMaxWriteAttempts());