diff --git a/docs/zh-cn/kafka.md b/docs/zh-cn/kafka.md index f3f60f13a..1559e1d7c 100644 --- a/docs/zh-cn/kafka.md +++ b/docs/zh-cn/kafka.md @@ -144,7 +144,7 @@ class KafkaConsumer extends AbstractConsumer ### 投递消息 -您可以通过调用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], int $partitionIndex = 0, ?int $brokerId = null)` 方法来向 `kafka` 投递消息, 下面是在 `Controller` 进行消息投递的一个示例: +您可以通过调用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)` 方法来向 `kafka` 投递消息, 下面是在 `Controller` 进行消息投递的一个示例: ```php send('hyperf', 'value', 'key'); } } @@ -173,7 +171,7 @@ class IndexController extends AbstractController ### 一次性投递多条消息 -`Hyperf\Kafka\Producer::sendBatch(array $messages, ?int $brokerId = null)` 方法来向 `kafka` 批量的投递消息, 下面是在 `Controller` 进行消息投递的一个示例: +`Hyperf\Kafka\Producer::sendBatch(array $messages)` 方法来向 `kafka` 批量的投递消息, 下面是在 `Controller` 进行消息投递的一个示例: ```php @@ -192,10 +190,8 @@ use longlang\phpkafka\Producer\ProduceMessage; */ class IndexController extends AbstractController { - public function index() + public function index(Producer $producer) { - $producer = make(Producer::class); - $producer->sendBatch([ new ProduceMessage('hyperf1', 'hyperf1_value', 'hyperf1_key'), new ProduceMessage('hyperf2', 'hyperf2_value', 'hyperf2_key'), diff --git a/src/kafka/publish/kafka.php b/src/kafka/publish/kafka.php index 91e35e82a..5a8473f0f 100644 --- a/src/kafka/publish/kafka.php +++ b/src/kafka/publish/kafka.php @@ -21,7 +21,9 @@ return [ 'brokers' => [ '127.0.0.1:9092', ], - 'bootstrap_server' => '127.0.0.1:9092', + 'bootstrap_server' => [ + '127.0.0.1:9092', + ], 'update_brokers' => true, 'acks' => 0, 'producer_id' => -1, diff --git a/src/kafka/src/ConfigProvider.php b/src/kafka/src/ConfigProvider.php index 3f5e51bb8..24a5291fe 100644 --- a/src/kafka/src/ConfigProvider.php +++ b/src/kafka/src/ConfigProvider.php @@ -11,6 +11,7 @@ declare(strict_types=1); */ namespace Hyperf\Kafka; +use Hyperf\Kafka\Listener\AfterWorkerExitListener; use Hyperf\Kafka\Listener\BeforeMainServerStartListener; class ConfigProvider @@ -20,6 +21,7 @@ class ConfigProvider return [ 'listeners' => [ BeforeMainServerStartListener::class => 99, + AfterWorkerExitListener::class => 1, ], 'dependencies' => [ ], diff --git a/src/kafka/src/Listener/AfterWorkerExitListener.php b/src/kafka/src/Listener/AfterWorkerExitListener.php new file mode 100644 index 000000000..0f2f2ec98 --- /dev/null +++ b/src/kafka/src/Listener/AfterWorkerExitListener.php @@ -0,0 +1,46 @@ +container = $container; + } + + public function listen(): array + { + return [OnWorkerExit::class]; + } + + public function process(object $event) + { + if ($this->container->has(Producer::class)) { + $this->container->get(Producer::class)->close(); + } + if ($this->container->has(ProducerManager::class)) { + $this->container->get(ProducerManager::class)->closeAll(); + } + } +} diff --git a/src/kafka/src/Producer.php b/src/kafka/src/Producer.php index 6ba0adb7a..a67912e73 100644 --- a/src/kafka/src/Producer.php +++ b/src/kafka/src/Producer.php @@ -12,20 +12,31 @@ declare(strict_types=1); namespace Hyperf\Kafka; use Hyperf\Contract\ConfigInterface; +use Hyperf\Engine\Channel; +use Hyperf\Kafka\Transport\SwooleSocket; +use longlang\phpkafka\Broker; +use longlang\phpkafka\Producer\ProduceMessage; use longlang\phpkafka\Producer\Producer as LongLangProducer; use longlang\phpkafka\Producer\ProducerConfig; -use longlang\phpkafka\Socket\SwooleSocket; -use Psr\Container\ContainerInterface; +use Swoole\Coroutine; -/** - * @method send(string $topic, ?string $value, ?string $key = null, array $headers = [], int $partitionIndex = 0, ?int $brokerId = null) - * @method sendBatch(array $messages, ?int $brokerId = null) - * @method close() - * @method getConfig() - * @method getBroker() - */ class Producer { + /** + * @var ConfigInterface + */ + protected $config; + + /** + * @var string + */ + protected $name; + + /** + * @var ?Channel + */ + protected $chan; + /** * @var LongLangProducer */ @@ -34,12 +45,117 @@ class Producer /** * @var array */ - protected $config; + protected $topicsMeta; - public function __construct(ContainerInterface $container, string $name = 'default') + public function __construct(ConfigInterface $config, string $name = 'default') { - $config = $container->get(ConfigInterface::class)->get('kafka.' . $name); $this->config = $config; + $this->name = $name; + } + + public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null): void + { + $this->loop(); + $ack = new Channel(); + $this->chan->push(function () use ($topic, $key, $value, $headers, $partitionIndex, $ack) { + try { + if (! isset($this->topicsMeta[$topic])) { + $this->producer->send($topic, $value, $key, $headers); + $ack->close(); + return; + } + + if (! is_int($partitionIndex)) { + $index = $this->getIndex($key, $value, count($this->topicsMeta[$topic])); + $partitionIndex = array_keys($this->topicsMeta[$topic])[$index]; + } + + $this->producer->send( + $topic, + $value, + $key, + $headers, + $partitionIndex, + $this->topicsMeta[$topic][$partitionIndex] + ); + $ack->close(); + } catch (\Throwable $e) { + $ack->push($e); + } + }); + if ($e = $ack->pop()) { + throw $e; + } + } + + /** + * @param ProduceMessage[] $messages + */ + public function sendBatch(array $messages): void + { + $this->loop(); + $ack = new Channel(); + $this->chan->push(function () use ($messages, $ack) { + try { + $messagesByBroker = $this->slitByBroker($messages); + foreach ($messagesByBroker as $brokerId => $messages) { + $this->producer->sendBatch($messages, $brokerId); + } + $ack->close(); + } catch (\Throwable $e) { + $ack->push($e); + } + }); + if ($e = $ack->pop()) { + throw $e; + } + } + + public function close(): void + { + if ($this->chan) { + $this->chan->close(); + } + } + + public function getConfig(): ProducerConfig + { + return $this->producer->getConfig(); + } + + public function getBroker(): Broker + { + return $this->producer->getBroker(); + } + + protected function loop() + { + if ($this->chan != null) { + return; + } + $this->chan = new Channel(1); + Coroutine::create(function () { + try { + $this->producer = $this->makeProducer(); + $this->topicsMeta = $this->fetchMeta(); + + while (true) { + $closure = $this->chan->pop(); + if (! $closure) { + break; + } + $closure->call($this); + } + } finally { + $this->chan = null; + $this->producer->close(); + } + }); + } + + private function makeProducer(): LongLangProducer + { + $config = $this->config->get('kafka.' . $this->name); $producerConfig = new ProducerConfig(); $producerConfig->setConnectTimeout($config['connect_timeout']); $producerConfig->setSendTimeout($config['send_timeout']); @@ -47,20 +163,51 @@ class Producer $producerConfig->setClientId($config['client_id']); $producerConfig->setMaxWriteAttempts($config['max_write_attempts']); $producerConfig->setSocket(SwooleSocket::class); - $producerConfig->setBrokers($config['brokers']); $producerConfig->setBootstrapServer($config['bootstrap_server']); $producerConfig->setUpdateBrokers($config['update_brokers']); + $producerConfig->setBrokers($config['brokers']); $producerConfig->setAcks($config['acks']); $producerConfig->setProducerId($config['producer_id']); $producerConfig->setProducerEpoch($config['producer_epoch']); $producerConfig->setPartitionLeaderEpoch($config['partition_leader_epoch']); $producerConfig->setAutoCreateTopic($config['auto_create_topic']); - - $this->producer = new LongLangProducer($producerConfig); + return new LongLangProducer($producerConfig); } - public function __call($name, $arguments) + private function getIndex($key, $value, $max) { - return $this->producer->{$name}(...$arguments); + if ($key === null) { + return crc32($value) % $max; + } + return crc32($key) % $max; + } + + /** + * @param ProduceMessage[] $messages + */ + private function slitByBroker(array $messages): array + { + $messageByBroker = []; + foreach ($messages as $message) { + $messageByBroker[$this->getMessageBrokerId($message)][] = $message; + } + return $messageByBroker; + } + + private function getMessageBrokerId(ProduceMessage $message): int + { + return $this->topicsMeta[$message->getTopic()][$message->getPartitionIndex()]; + } + + private function fetchMeta(): array + { + $metaCache = []; + $topicMeta = $this->producer->getBroker()->getTopicsMeta(); + foreach ($topicMeta as $meta) { + foreach ($meta->getPartitions() as $partition) { + $metaCache[$meta->getName()][$partition->getPartitionIndex()] = $partition->getLeaderId(); + } + } + return $metaCache; } } diff --git a/src/kafka/src/ProducerManager.php b/src/kafka/src/ProducerManager.php new file mode 100644 index 000000000..cc62ee964 --- /dev/null +++ b/src/kafka/src/ProducerManager.php @@ -0,0 +1,48 @@ + + */ + private $producers = []; + + /** + * @var ContainerInterface + */ + private $container; + + public function __construct(ContainerInterface $container) + { + $this->container = $container; + } + + public function getProducer(string $name = 'default'): Producer + { + if (isset($this->producers[$name])) { + return $this->producers[$name]; + } + $this->producers[$name] = make(Producer::class, ['name' => $name]); + return $this->producers[$name]; + } + + public function closeAll(): void + { + foreach ($this->producers as $producer) { + $producer->close(); + } + } +} diff --git a/src/kafka/src/Transport/SwooleSocket.php b/src/kafka/src/Transport/SwooleSocket.php new file mode 100644 index 000000000..180f8cf66 --- /dev/null +++ b/src/kafka/src/Transport/SwooleSocket.php @@ -0,0 +1,44 @@ +config->getRecvTimeout(); + } + $leftTime = $timeout; + /* @phpstan-ignore-next-line */ + while ($this->socket && ! isset($this->receivedBuffer[$length - 1]) && ($timeout == -1 || $leftTime > 0)) { + $buffer = $this->socket->recv($timeout); + if ($buffer === false) { + return ''; + } + $this->receivedBuffer .= $buffer; + if ($timeout > 0) { + $leftTime = $timeout - (microtime(true) - $beginTime); + } + } + + if (isset($this->receivedBuffer[$length - 1])) { + $result = substr($this->receivedBuffer, 0, $length); + $this->receivedBuffer = substr($this->receivedBuffer, $length); + + return $result; + } + + return ''; + } +}