mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-11-30 10:47:44 +08:00
Added kafka into docs. (#3102)
This commit is contained in:
parent
7139be6a43
commit
301f28e237
@ -1,6 +1,6 @@
|
||||
# kafka
|
||||
# Kafka
|
||||
|
||||
[kafka](https://github.com/hyperf/kafka) 是实现 `kafka` 标准的组件,主要适用于对 `kafka` 的使用
|
||||
`Kafka` 是由 `Apache软件基金会` 开发的一个开源流处理平台,由 `Scala` 和 `Java` 编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个 "按照分布式事务日志架构的大规模发布/订阅消息队列"
|
||||
|
||||
> 本组件为 Beta 版本,谨慎使用
|
||||
|
||||
@ -18,35 +18,35 @@ composer require hyperf/kafka
|
||||
|
||||
默认配置文件如下:
|
||||
|
||||
|
||||
| 配置 | 类型 | 默认值 | 备注 |
|
||||
|:---------------------------:|:---------:|:------:|:----------------------------------------------:|
|
||||
| connect_timeout | int|float | -1 | 连接超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| send_timeout | int|float | -1 | 发送超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| recv_timeout | int|float | -1 | 接收超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| client_id | stirng | null | Kafka 客户端标识 |
|
||||
| max_write_attempts | int | 3 | 最大写入尝试次数 |
|
||||
| brokers | array | [] | 手动配置 brokers 列表,如果要使用手动配置,请把 updateBrokers 设为 true|
|
||||
| bootstrap_server | array | '127.0.0.1:9092' | 引导服务器,如果配置了该值,会自动连接该服务器,并自动更新 brokers |
|
||||
| update_brokers | bool | true | 是否自动更新 brokers |
|
||||
| acks | int | 0 | 生产者要求领导者,在确认请求完成之前已收到的确认数值。允许的值:0 表示无确认,1 表示仅领导者,- 1 表示完整的 ISR。 |
|
||||
| producer_id | int | -1 | 生产者 ID |
|
||||
| producer_epoch | int | -1 | 生产者 Epoch |
|
||||
| partition_leader_epoch | int | -1 | 分区 Leader Epoch |
|
||||
| broker | string | ''| broker,格式:'127.0.0.1:9092' |
|
||||
| interval | int|float | 0 | 未获取消息到消息时,延迟多少秒再次尝试,默认为 0 则不延迟(单位:秒,支持小数) |
|
||||
| session_timeout | int|float | 60 | 如果超时后没有收到心跳信号,则协调器会认为该用户死亡。(单位:秒,支持小数) |
|
||||
| rebalance_timeout | int|float | 60 | 重新平衡组时,协调器等待每个成员重新加入的最长时间(单位:秒,支持小数)。 |
|
||||
| partitions | array | [0] | 分区列表 |
|
||||
| replica_id | int | -1 | 副本 ID |
|
||||
| rack_id | int | -1 | 机架编号 |
|
||||
| group_retry | int | 5 | 分组操作,匹配预设的错误码时,自动重试次数 |
|
||||
| group_retry_sleep | int | 1 | 分组操作重试延迟,单位:秒 |
|
||||
| group_heartbeat | int | 3 | 分组心跳时间间隔,单位:秒 |
|
||||
| offset_retry | int | 5 | 偏移量操作,匹配预设的错误码时,自动重试次数 |
|
||||
| auto_create_topic | bool | true | 是否需要自动创建 topic |
|
||||
| partition_assignment_strategy | string | KafkaStrategy::RANGE_ASSIGNOR | 消费者分区分配策略, 可选:范围分配(`KafkaStrategy::RANGE_ASSIGNOR`) 轮询分配(`KafkaStrategy::ROUND_ROBIN_ASSIGNOR`)) |
|
||||
| pool | object | | 连接池配置 |
|
||||
|
||||
| 配置 | 类型 | 默认值 | 备注 |
|
||||
| :---------------------------: | :--------: | :---------------------------: | :------------------------------------------------------------------------------------------------------------------: |
|
||||
| connect_timeout | int|float | -1 | 连接超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| send_timeout | int|float | -1 | 发送超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| recv_timeout | int|float | -1 | 接收超时时间(单位:秒,支持小数),为 - 1 则不限制 |
|
||||
| client_id | stirng | null | Kafka 客户端标识 |
|
||||
| max_write_attempts | int | 3 | 最大写入尝试次数 |
|
||||
| brokers | array | [] | 手动配置 brokers 列表,如果要使用手动配置,请把 updateBrokers 设为 true |
|
||||
| bootstrap_server | array | '127.0.0.1:9092' | 引导服务器,如果配置了该值,会自动连接该服务器,并自动更新 brokers |
|
||||
| update_brokers | bool | true | 是否自动更新 brokers |
|
||||
| acks | int | 0 | 生产者要求领导者,在确认请求完成之前已收到的确认数值。允许的值:0 表示无确认,1 表示仅领导者,- 1 表示完整的 ISR。 |
|
||||
| producer_id | int | -1 | 生产者 ID |
|
||||
| producer_epoch | int | -1 | 生产者 Epoch |
|
||||
| partition_leader_epoch | int | -1 | 分区 Leader Epoch |
|
||||
| broker | string | '' | broker,格式:'127.0.0.1:9092' |
|
||||
| interval | int|float | 0 | 未获取消息到消息时,延迟多少秒再次尝试,默认为 0 则不延迟(单位:秒,支持小数) |
|
||||
| session_timeout | int|float | 60 | 如果超时后没有收到心跳信号,则协调器会认为该用户死亡。(单位:秒,支持小数) |
|
||||
| rebalance_timeout | int|float | 60 | 重新平衡组时,协调器等待每个成员重新加入的最长时间(单位:秒,支持小数)。 |
|
||||
| partitions | array | [0] | 分区列表 |
|
||||
| replica_id | int | -1 | 副本 ID |
|
||||
| rack_id | int | -1 | 机架编号 |
|
||||
| group_retry | int | 5 | 分组操作,匹配预设的错误码时,自动重试次数 |
|
||||
| group_retry_sleep | int | 1 | 分组操作重试延迟,单位:秒 |
|
||||
| group_heartbeat | int | 3 | 分组心跳时间间隔,单位:秒 |
|
||||
| offset_retry | int | 5 | 偏移量操作,匹配预设的错误码时,自动重试次数 |
|
||||
| auto_create_topic | bool | true | 是否需要自动创建 topic |
|
||||
| partition_assignment_strategy | string | KafkaStrategy::RANGE_ASSIGNOR | 消费者分区分配策略, 可选:范围分配(`KafkaStrategy::RANGE_ASSIGNOR`) 轮询分配(`KafkaStrategy::ROUND_ROBIN_ASSIGNOR`)) |
|
||||
| pool | object | | 连接池配置 |
|
||||
|
||||
|
||||
```php
|
||||
@ -94,44 +94,34 @@ return [
|
||||
],
|
||||
],
|
||||
];
|
||||
|
||||
|
||||
```
|
||||
|
||||
|
||||
### 创建消费者
|
||||
|
||||
通过 gen:kafka-consumer 命令可以快速的生成一个 消费者(Consumer) 对消息进行消费。
|
||||
|
||||
```bash
|
||||
php bin/hyperf.php gen:kafka-consumer KafkaConsumer`
|
||||
php bin/hyperf.php gen:kafka-consumer KafkaConsumer
|
||||
```
|
||||
|
||||
您也可以通过使用 `Hyperf\Kafka\Annotation\Consumer` 注解来对一个 `Hyperf/Kafka/AbstractConsumer` 抽象类的子类进行声明,来完成一个 `消费者(Consumer)` 的定义,其中 `Hyperf\Kafka\Annotation\Consumer` 注解和抽象类均包含以下属性:
|
||||
|
||||
| 配置 | 类型 | 注解或抽象类默认值 | 备注 |
|
||||
|:-------:|:------:|:------:|:----------------:|
|
||||
| topic | string | '' | 要监听的 topic |
|
||||
| groupId | string | '' | 要监听的 groupId |
|
||||
| memberId | string | '' | 要监听的 memberId |
|
||||
| autoCommit | string | '' | 是否需要自动提交 |
|
||||
| name | string | KafkaConsumer | 消费者的名称 |
|
||||
| nums | int | 1 | 消费者的进程数 |
|
||||
| pool | string | default | 消费者对应的连接,对应配置文件的 key |
|
||||
| 配置 | 类型 | 注解或抽象类默认值 | 备注 |
|
||||
| :--------: | :----: | :----------------: | :----------------------------------: |
|
||||
| topic | string | '' | 要监听的 topic |
|
||||
| groupId | string | '' | 要监听的 groupId |
|
||||
| memberId | string | '' | 要监听的 memberId |
|
||||
| autoCommit | string | '' | 是否需要自动提交 |
|
||||
| name | string | KafkaConsumer | 消费者的名称 |
|
||||
| nums | int | 1 | 消费者的进程数 |
|
||||
| pool | string | default | 消费者对应的连接,对应配置文件的 key |
|
||||
|
||||
|
||||
```php
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://hyperf.wiki
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace App\kafka;
|
||||
|
||||
use Hyperf\Kafka\AbstractConsumer;
|
||||
@ -151,7 +141,6 @@ class KafkaConsumer extends AbstractConsumer
|
||||
|
||||
```
|
||||
|
||||
|
||||
### 投递消息
|
||||
|
||||
您可以通过调用 `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], int $partitionIndex = 0, ?int $brokerId = null)` 方法来向 `kafka` 投递消息, 下面是在 `Controller` 进行消息投递的一个示例:
|
||||
@ -160,14 +149,6 @@ class KafkaConsumer extends AbstractConsumer
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://hyperf.wiki
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace App\Controller;
|
||||
|
||||
@ -179,7 +160,6 @@ use Hyperf\Kafka\Producer;
|
||||
*/
|
||||
class IndexController extends AbstractController
|
||||
{
|
||||
|
||||
public function index()
|
||||
{
|
||||
$producer = make(Producer::class);
|
||||
@ -199,14 +179,6 @@ class IndexController extends AbstractController
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://hyperf.wiki
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace App\Controller;
|
||||
|
||||
@ -219,7 +191,6 @@ use longlang\phpkafka\Producer\ProduceMessage;
|
||||
*/
|
||||
class IndexController extends AbstractController
|
||||
{
|
||||
|
||||
public function index()
|
||||
{
|
||||
$producer = make(Producer::class);
|
||||
@ -229,7 +200,7 @@ class IndexController extends AbstractController
|
||||
new ProduceMessage('hyperf2', 'hyperf2_value', 'hyperf2_key'),
|
||||
new ProduceMessage('hyperf3', 'hyperf3_value', 'hyperf3_key'),
|
||||
]);
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,6 +91,7 @@
|
||||
* [AMQP](zh-cn/amqp.md)
|
||||
* [Nats](zh-cn/nats.md)
|
||||
* [NSQ](zh-cn/nsq.md)
|
||||
* [Kafka](zh-cn/kafka.md)
|
||||
|
||||
* 客户端
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user