李铭昕 4bdf88524f
Release v2.1.7 (#3284)
Co-authored-by: limingxinleo <limingxinleo@users.noreply.github.com>
2021-02-22 08:56:22 +08:00

210 lines
11 KiB
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Kafka
`Kafka` 是由 `Apache 軟件基金會` 開發的一個開源流處理平台,由 `Scala``Java` 編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平台。其持久化層本質上是一個 "按照分佈式事務日誌架構的大規模發佈/訂閲消息隊列"
[longlang/phpkafka](https://github.com/longyan/phpkafka) 組件由 [龍之言](http://longlang.org/) 提供,支持 `PHP-FPM``Swoole`。感謝 `Swoole 團隊``禪道團隊` 對社區做出的貢獻。
> 本組件為 Beta 版本,謹慎使用
## 安裝
composer require hyperf/kafka
## 版本要求
- Kafka >= 1.0.0
## 使用
### 配置
`kafka` 組件的配置文件默認位於 `config/autoload/kafka.php` 內,如該文件不存在,可通過 `php bin/hyperf.php vendor:publish hyperf/kafka` 命令來將發佈對應的配置文件。
| 配置 | 類型 | 默認值 | 備註 |
| :---------------------------: | :--------: | :---------------------------: | :------------------------------------------------------------------------------------------------------------------: |
| connect_timeout | intfloat | -1 | 連接超時時間(單位:秒,支持小數),為 - 1 則不限制 |
| send_timeout | intfloat | -1 | 發送超時時間(單位:秒,支持小數),為 - 1 則不限制 |
| recv_timeout | intfloat | -1 | 接收超時時間(單位:秒,支持小數),為 - 1 則不限制 |
| client_id | stirng | null | Kafka 客户端標識 |
| max_write_attempts | int | 3 | 最大寫入嘗試次數 |
| brokers | array | [] | 手動配置 brokers 列表,如果要使用手動配置,請把 updateBrokers 設為 true |
| bootstrap_server | array | '' | 引導服務器,如果配置了該值,會自動連接該服務器,並自動更新 brokers |
| update_brokers | bool | true | 是否自動更新 brokers |
| acks | int | 0 | 生產者要求領導者在確認請求完成之前已收到的確認數值。允許的值0 表示無確認1 表示僅領導者,- 1 表示完整的 ISR。 |
| producer_id | int | -1 | 生產者 ID |
| producer_epoch | int | -1 | 生產者 Epoch |
| partition_leader_epoch | int | -1 | 分區 Leader Epoch |
| broker | string | ''| broker格式'' |
| interval | intfloat | 0 | 未獲取消息到消息時,延遲多少秒再次嘗試,默認為 0 則不延遲(單位:秒,支持小數) |
| session_timeout | intfloat | 60 | 如果超時後沒有收到心跳信號,則協調器會認為該用户死亡。(單位:秒,支持小數) |
| rebalance_timeout | intfloat | 60 | 重新平衡組時,協調器等待每個成員重新加入的最長時間(單位:秒,支持小數)。 |
| replica_id | int | -1 | 副本 ID |
| rack_id | int | -1 | 機架編號 |
| group_retry | int | 5 | 分組操作,匹配預設的錯誤碼時,自動重試次數 |
| group_retry_sleep | int | 1 | 分組操作重試延遲,單位:秒 |
| group_heartbeat | int | 3 | 分組心跳時間間隔,單位:秒 |
| offset_retry | int | 5 | 偏移量操作,匹配預設的錯誤碼時,自動重試次數 |
| auto_create_topic | bool | true | 是否需要自動創建 topic |
| partition_assignment_strategy | string | KafkaStrategy::RANGE_ASSIGNOR | 消費者分區分配策略, 可選:範圍分配(`KafkaStrategy::RANGE_ASSIGNOR`) 輪詢分配(`KafkaStrategy::ROUND_ROBIN_ASSIGNOR`)) |
| pool | object | | 連接池配置 |
use Hyperf\Kafka\Constants\KafkaStrategy;
return [
'default' => [
'connect_timeout' => -1,
'send_timeout' => -1,
'recv_timeout' => -1,
'client_id' => '',
'max_write_attempts' => 3,
'brokers' => [
'bootstrap_server' => '',
'update_brokers' => true,
'acks' => 0,
'producer_id' => -1,
'producer_epoch' => -1,
'partition_leader_epoch' => -1,
'broker' => '',
'interval' => 0,
'session_timeout' => 60,
'rebalance_timeout' => 60,
'replica_id' => -1,
'rack_id' => '',
'group_retry' => 5,
'group_retry_sleep' => 1,
'group_heartbeat' => 3,
'offset_retry' => 5,
'auto_create_topic' => true,
'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR,
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60.0,
### 創建消費者
通過 gen:kafka-consumer 命令可以快速的生成一個 消費者(Consumer) 對消息進行消費。
php bin/hyperf.php gen:kafka-consumer KafkaConsumer
您也可以通過使用 `Hyperf\Kafka\Annotation\Consumer` 註解來對一個 `Hyperf/Kafka/AbstractConsumer` 抽象類的子類進行聲明,來完成一個 `消費者(Consumer)` 的定義,其中 `Hyperf\Kafka\Annotation\Consumer` 註解和抽象類均包含以下屬性:
| 配置 | 類型 | 註解或抽象類默認值 | 備註 |
| :--------: | :----------------: | :----------------: | :----------------------------------: |
| topic | string or string[] | '' | 要監聽的 topic |
| groupId | string | '' | 要監聽的 groupId |
| memberId | string | '' | 要監聽的 memberId |
| autoCommit | string | '' | 是否需要自動提交 |
| name | string | KafkaConsumer | 消費者的名稱 |
| nums | int | 1 | 消費者的進程數 |
| pool | string | default | 消費者對應的連接,對應配置文件的 key |
namespace App\kafka;
use Hyperf\Kafka\AbstractConsumer;
use Hyperf\Kafka\Annotation\Consumer;
use longlang\phpkafka\Consumer\ConsumeMessage;
* @Consumer(topic="hyperf", nums=5, groupId="hyperf", autoCommit=true)
class KafkaConsumer extends AbstractConsumer
public function consume(ConsumeMessage $message): string
var_dump($message->getTopic() . ':' . $message->getKey() . ':' . $message->getValue());
### 投遞消息
您可以通過調用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)` 方法來向 `kafka` 投遞消息, 下面是在 `Controller` 進行消息投遞的一個示例:
namespace App\Controller;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Kafka\Producer;
* @AutoController()
class IndexController extends AbstractController
public function index(Producer $producer)
$producer->send('hyperf', 'value', 'key');
### 一次性投遞多條消息
`Hyperf\Kafka\Producer::sendBatch(array $messages)` 方法來向 `kafka` 批量的投遞消息, 下面是在 `Controller` 進行消息投遞的一個示例:
namespace App\Controller;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Kafka\Producer;
use longlang\phpkafka\Producer\ProduceMessage;
* @AutoController()
class IndexController extends AbstractController
public function index(Producer $producer)
new ProduceMessage('hyperf1', 'hyperf1_value', 'hyperf1_key'),
new ProduceMessage('hyperf2', 'hyperf2_value', 'hyperf2_key'),
new ProduceMessage('hyperf3', 'hyperf3_value', 'hyperf3_key'),