fix: make kafka producer usable in cluster setup (#3189)

This commit is contained in:
谷溪 2021-01-24 10:30:19 +08:00 committed by GitHub
parent 0fbda4f915
commit d8a4b1fce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 311 additions and 26 deletions

View File

@ -144,7 +144,7 @@ class KafkaConsumer extends AbstractConsumer
### 投递消息
您可以通过调用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], int $partitionIndex = 0, ?int $brokerId = null)` 方法来向 `kafka` 投递消息, 下面是在 `Controller` 进行消息投递的一个示例:
您可以通过调用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)` 方法来向 `kafka` 投递消息, 下面是在 `Controller` 进行消息投递的一个示例:
```php
<?php
@ -161,10 +161,8 @@ use Hyperf\Kafka\Producer;
*/
class IndexController extends AbstractController
{
public function index()
public function index(Producer $producer)
{
$producer = make(Producer::class);
$producer->send('hyperf', 'value', 'key');
}
}
@ -173,7 +171,7 @@ class IndexController extends AbstractController
### 一次性投递多条消息
`Hyperf\Kafka\Producer::sendBatch(array $messages, ?int $brokerId = null)` 方法来向 `kafka` 批量的投递消息, 下面是在 `Controller` 进行消息投递的一个示例:
`Hyperf\Kafka\Producer::sendBatch(array $messages)` 方法来向 `kafka` 批量的投递消息, 下面是在 `Controller` 进行消息投递的一个示例:
```php
@ -192,10 +190,8 @@ use longlang\phpkafka\Producer\ProduceMessage;
*/
class IndexController extends AbstractController
{
public function index()
public function index(Producer $producer)
{
$producer = make(Producer::class);
$producer->sendBatch([
new ProduceMessage('hyperf1', 'hyperf1_value', 'hyperf1_key'),
new ProduceMessage('hyperf2', 'hyperf2_value', 'hyperf2_key'),

View File

@ -21,7 +21,9 @@ return [
'brokers' => [
'127.0.0.1:9092',
],
'bootstrap_server' => '127.0.0.1:9092',
'bootstrap_server' => [
'127.0.0.1:9092',
],
'update_brokers' => true,
'acks' => 0,
'producer_id' => -1,

View File

@ -11,6 +11,7 @@ declare(strict_types=1);
*/
namespace Hyperf\Kafka;
use Hyperf\Kafka\Listener\AfterWorkerExitListener;
use Hyperf\Kafka\Listener\BeforeMainServerStartListener;
class ConfigProvider
@ -20,6 +21,7 @@ class ConfigProvider
return [
'listeners' => [
BeforeMainServerStartListener::class => 99,
AfterWorkerExitListener::class => 1,
],
'dependencies' => [
],

View File

@ -0,0 +1,46 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\OnWorkerExit;
use Hyperf\Kafka\Producer;
use Hyperf\Kafka\ProducerManager;
use Psr\Container\ContainerInterface;
class AfterWorkerExitListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function listen(): array
{
return [OnWorkerExit::class];
}
public function process(object $event)
{
if ($this->container->has(Producer::class)) {
$this->container->get(Producer::class)->close();
}
if ($this->container->has(ProducerManager::class)) {
$this->container->get(ProducerManager::class)->closeAll();
}
}
}

View File

@ -12,20 +12,31 @@ declare(strict_types=1);
namespace Hyperf\Kafka;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Engine\Channel;
use Hyperf\Kafka\Transport\SwooleSocket;
use longlang\phpkafka\Broker;
use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Producer\Producer as LongLangProducer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Socket\SwooleSocket;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine;
/**
* @method send(string $topic, ?string $value, ?string $key = null, array $headers = [], int $partitionIndex = 0, ?int $brokerId = null)
* @method sendBatch(array $messages, ?int $brokerId = null)
* @method close()
* @method getConfig()
* @method getBroker()
*/
class Producer
{
/**
* @var ConfigInterface
*/
protected $config;
/**
* @var string
*/
protected $name;
/**
* @var ?Channel
*/
protected $chan;
/**
* @var LongLangProducer
*/
@ -34,12 +45,117 @@ class Producer
/**
* @var array
*/
protected $config;
protected $topicsMeta;
public function __construct(ContainerInterface $container, string $name = 'default')
public function __construct(ConfigInterface $config, string $name = 'default')
{
$config = $container->get(ConfigInterface::class)->get('kafka.' . $name);
$this->config = $config;
$this->name = $name;
}
public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null): void
{
$this->loop();
$ack = new Channel();
$this->chan->push(function () use ($topic, $key, $value, $headers, $partitionIndex, $ack) {
try {
if (! isset($this->topicsMeta[$topic])) {
$this->producer->send($topic, $value, $key, $headers);
$ack->close();
return;
}
if (! is_int($partitionIndex)) {
$index = $this->getIndex($key, $value, count($this->topicsMeta[$topic]));
$partitionIndex = array_keys($this->topicsMeta[$topic])[$index];
}
$this->producer->send(
$topic,
$value,
$key,
$headers,
$partitionIndex,
$this->topicsMeta[$topic][$partitionIndex]
);
$ack->close();
} catch (\Throwable $e) {
$ack->push($e);
}
});
if ($e = $ack->pop()) {
throw $e;
}
}
/**
* @param ProduceMessage[] $messages
*/
public function sendBatch(array $messages): void
{
$this->loop();
$ack = new Channel();
$this->chan->push(function () use ($messages, $ack) {
try {
$messagesByBroker = $this->slitByBroker($messages);
foreach ($messagesByBroker as $brokerId => $messages) {
$this->producer->sendBatch($messages, $brokerId);
}
$ack->close();
} catch (\Throwable $e) {
$ack->push($e);
}
});
if ($e = $ack->pop()) {
throw $e;
}
}
public function close(): void
{
if ($this->chan) {
$this->chan->close();
}
}
public function getConfig(): ProducerConfig
{
return $this->producer->getConfig();
}
public function getBroker(): Broker
{
return $this->producer->getBroker();
}
protected function loop()
{
if ($this->chan != null) {
return;
}
$this->chan = new Channel(1);
Coroutine::create(function () {
try {
$this->producer = $this->makeProducer();
$this->topicsMeta = $this->fetchMeta();
while (true) {
$closure = $this->chan->pop();
if (! $closure) {
break;
}
$closure->call($this);
}
} finally {
$this->chan = null;
$this->producer->close();
}
});
}
private function makeProducer(): LongLangProducer
{
$config = $this->config->get('kafka.' . $this->name);
$producerConfig = new ProducerConfig();
$producerConfig->setConnectTimeout($config['connect_timeout']);
$producerConfig->setSendTimeout($config['send_timeout']);
@ -47,20 +163,51 @@ class Producer
$producerConfig->setClientId($config['client_id']);
$producerConfig->setMaxWriteAttempts($config['max_write_attempts']);
$producerConfig->setSocket(SwooleSocket::class);
$producerConfig->setBrokers($config['brokers']);
$producerConfig->setBootstrapServer($config['bootstrap_server']);
$producerConfig->setUpdateBrokers($config['update_brokers']);
$producerConfig->setBrokers($config['brokers']);
$producerConfig->setAcks($config['acks']);
$producerConfig->setProducerId($config['producer_id']);
$producerConfig->setProducerEpoch($config['producer_epoch']);
$producerConfig->setPartitionLeaderEpoch($config['partition_leader_epoch']);
$producerConfig->setAutoCreateTopic($config['auto_create_topic']);
$this->producer = new LongLangProducer($producerConfig);
return new LongLangProducer($producerConfig);
}
public function __call($name, $arguments)
private function getIndex($key, $value, $max)
{
return $this->producer->{$name}(...$arguments);
if ($key === null) {
return crc32($value) % $max;
}
return crc32($key) % $max;
}
/**
* @param ProduceMessage[] $messages
*/
private function slitByBroker(array $messages): array
{
$messageByBroker = [];
foreach ($messages as $message) {
$messageByBroker[$this->getMessageBrokerId($message)][] = $message;
}
return $messageByBroker;
}
private function getMessageBrokerId(ProduceMessage $message): int
{
return $this->topicsMeta[$message->getTopic()][$message->getPartitionIndex()];
}
private function fetchMeta(): array
{
$metaCache = [];
$topicMeta = $this->producer->getBroker()->getTopicsMeta();
foreach ($topicMeta as $meta) {
foreach ($meta->getPartitions() as $partition) {
$metaCache[$meta->getName()][$partition->getPartitionIndex()] = $partition->getLeaderId();
}
}
return $metaCache;
}
}

View File

@ -0,0 +1,48 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka;
use Psr\Container\ContainerInterface;
class ProducerManager
{
/**
* @var array<string, Producer>
*/
private $producers = [];
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function getProducer(string $name = 'default'): Producer
{
if (isset($this->producers[$name])) {
return $this->producers[$name];
}
$this->producers[$name] = make(Producer::class, ['name' => $name]);
return $this->producers[$name];
}
public function closeAll(): void
{
foreach ($this->producers as $producer) {
$producer->close();
}
}
}

View File

@ -0,0 +1,44 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Kafka\Transport;
class SwooleSocket extends \longlang\phpkafka\Socket\SwooleSocket
{
public function recv(int $length, ?float $timeout = null): string
{
$beginTime = microtime(true);
if ($timeout === null) {
$timeout = $this->config->getRecvTimeout();
}
$leftTime = $timeout;
/* @phpstan-ignore-next-line */
while ($this->socket && ! isset($this->receivedBuffer[$length - 1]) && ($timeout == -1 || $leftTime > 0)) {
$buffer = $this->socket->recv($timeout);
if ($buffer === false) {
return '';
}
$this->receivedBuffer .= $buffer;
if ($timeout > 0) {
$leftTime = $timeout - (microtime(true) - $beginTime);
}
}
if (isset($this->receivedBuffer[$length - 1])) {
$result = substr($this->receivedBuffer, 0, $length);
$this->receivedBuffer = substr($this->receivedBuffer, $length);
return $result;
}
return '';
}
}