`Kafka` is an open source stream processing platform developed by `Apache Software Foundation`, written by `Scala` and `Java`. The goal of this project is to provide a unified, high-throughput, low-latency platform for processing real-time data. Its persistence layer is essentially a "large-scale publish/subscribe message queue based on the distributed transaction log architecture"
[longlang/phpkafka](https://github.com/longyan/phpkafka) component is provided by [Longzhiyan](http://longlang.org/) and supports `PHP-FPM` and `Swoole`. Thank you `Swoole Team` and `ZenTao Team` for their contributions to the community.
> This component is a Beta version, please use it with caution.
## Installation
```bash
composer require hyperf/kafka
```
## Version requirements
- Kafka >= 1.0.0
## Usage
### Configuration
The configuration file of the `kafka` component is located in `config/autoload/kafka.php` by default. If the file does not exist, you can use the `php bin/hyperf.php vendor:publish hyperf/kafka` command to publish the corresponding configuration file.
| connect_timeout | int|float | -1 | Connection timeout time (unit: second, support decimal), if it is-1, there is no limit |
| send_timeout | int|float | -1 | Send timeout time (unit: second, support decimal), if it is-1, there is no limit |
| recv_timeout | int|float | -1 | Receiving timeout time (unit: second, support decimal), if it is-1, there is no limit |
| client_id | string | null | Kafka Client ID |
| max_write_attempts | int | 3 | Maximum number of write attempts |
| brokers | array | [] | Manually configure the list of brokers, if you want to use manual configuration, please set updateBrokers to true |
| bootstrap_server | array | '127.0.0.1:9092' | Bootstrap server, if this value is configured, it will automatically connect to the server and automatically update brokers |
| acks | int | 0 | The producer asks the leader to confirm the value that has been received before the confirmation request is completed. Allowed values: 0 means no confirmation, 1 means leader only,-1 means complete ISR |
| interval | int|float | 0 | How many seconds to delay trying again when the message is not received, the default is 0, no delay (unit: second, decimal) |
| session_timeout | int|float | 60 | If no heartbeat signal is received after the timeout, the coordinator will consider the user dead. (Unit: seconds, decimals are supported) |
| rebalance_timeout | int|float | 60 | The longest time the coordinator waits for each member to rejoin when rebalancing the group (unit: seconds, decimals are supported). |
| replica_id | int | -1 | Replica ID |
| rack_id | int | -1 | Rack Number |
| group_retry | int | 5 | Grouping operation, the number of automatic retries when matching the preset error code |
| group_retry_sleep | int | 1 | Group operation retry delay, unit: second |
| group_heartbeat | int | 3 | Group heartbeat interval, unit: second |
| offset_retry | int | 5 | Offset operation, the number of automatic retries when matching the preset error code |
You can also use the `Hyperf\Kafka\Annotation\Consumer` annotation to declare a subclass of the `Hyperf/Kafka/AbstractConsumer` abstract class to complete the definition of a `Consumer`, where `Hyperf\ Both Kafka\Annotation\Consumer` annotations and abstract classes contain the following attributes:
You can call `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)` to deliver messages, the following is an example of message delivery in `Controller`:
The `Hyperf\Kafka\Producer::sendBatch(array $messages)` method is used to deliver messages in batches to `kafka`, the following is an example of message delivery in `Controller`: