# Kafka `Kafka` is an open source stream processing platform developed by `Apache Software Foundation`, written by `Scala` and `Java`. The goal of this project is to provide a unified, high-throughput, low-latency platform for processing real-time data. Its persistence layer is essentially a "large-scale publish/subscribe message queue based on the distributed transaction log architecture" [longlang/phpkafka](https://github.com/longyan/phpkafka) component is provided by [Longzhiyan](http://longlang.org/) and supports `PHP-FPM` and `Swoole`. Thank you `Swoole Team` and `ZenTao Team` for their contributions to the community. > This component is a Beta version, please use it with caution. ## Installation ```bash composer require hyperf/kafka ``` ## Version requirements - Kafka >= 1.0.0 ## Usage ### Configuration The configuration file of the `kafka` component is located in `config/autoload/kafka.php` by default. If the file does not exist, you can use the `php bin/hyperf.php vendor:publish hyperf/kafka` command to publish the corresponding configuration file. The default configuration file is as follows: | Configuration | Type | Default | Description | | :---------------------------: | :--------: | :---------------------------: | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | | connect_timeout | int|float | -1 | Connection timeout time (unit: second, support decimal), if it is-1, there is no limit | | send_timeout | int|float | -1 | Send timeout time (unit: second, support decimal), if it is-1, there is no limit | | recv_timeout | int|float | -1 | Receiving timeout time (unit: second, support decimal), if it is-1, there is no limit | | client_id | string | null | Kafka Client ID | | max_write_attempts | int | 3 | Maximum number of write attempts | | brokers | array | [] | Manually configure the list of brokers, if you want to use manual configuration, please set updateBrokers to true | | bootstrap_server | array | '127.0.0.1:9092' | Bootstrap server, if this value is configured, it will automatically connect to the server and automatically update brokers | | update_brokers | bool | true | Whether to automatically update brokers | | acks | int | 0 | The producer asks the leader to confirm the value that has been received before the confirmation request is completed. Allowed values: 0 means no confirmation, 1 means leader only,-1 means complete ISR | | producer_id | int | -1 | Producer ID | | producer_epoch | int | -1 | Producer Epoch | | partition_leader_epoch | int | -1 | Partition Leader Epoch | | broker | string | '' | broker, format: '127.0.0.1:9092' | | interval | int|float | 0 | How many seconds to delay trying again when the message is not received, the default is 0, no delay (unit: second, decimal) | | session_timeout | int|float | 60 | If no heartbeat signal is received after the timeout, the coordinator will consider the user dead. (Unit: seconds, decimals are supported) | | rebalance_timeout | int|float | 60 | The longest time the coordinator waits for each member to rejoin when rebalancing the group (unit: seconds, decimals are supported). | | replica_id | int | -1 | Replica ID | | rack_id | int | -1 | Rack Number | | group_retry | int | 5 | Grouping operation, the number of automatic retries when matching the preset error code | | group_retry_sleep | int | 1 | Group operation retry delay, unit: second | | group_heartbeat | int | 3 | Group heartbeat interval, unit: second | | offset_retry | int | 5 | Offset operation, the number of automatic retries when matching the preset error code | | auto_create_topic | bool | true | Whether to automatically create topic | | partition_assignment_strategy | string | KafkaStrategy::RANGE_ASSIGNOR | Consumer partition allocation strategy, optional: range allocation (`KafkaStrategy::RANGE_ASSIGNOR`) polling allocation (`KafkaStrategy::ROUND_ROBIN_ASSIGNOR`)) | | pool | object | | Connection pool configuration | ```php [ 'connect_timeout' => -1, 'send_timeout' => -1, 'recv_timeout' => -1, 'client_id' => '', 'max_write_attempts' => 3, 'brokers' => [ '127.0.0.1:9092', ], 'bootstrap_server' => '127.0.0.1:9092', 'update_brokers' => true, 'acks' => 0, 'producer_id' => -1, 'producer_epoch' => -1, 'partition_leader_epoch' => -1, 'broker' => '', 'interval' => 0, 'session_timeout' => 60, 'rebalance_timeout' => 60, 'replica_id' => -1, 'rack_id' => '', 'group_retry' => 5, 'group_retry_sleep' => 1, 'group_heartbeat' => 3, 'offset_retry' => 5, 'auto_create_topic' => true, 'partition_assignment_strategy' => KafkaStrategy::RANGE_ASSIGNOR, 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, 'max_idle_time' => 60.0, ], ], ]; ``` ### Creating a consumer The `gen:kafka-consumer` command can quickly generate a consumer (Consumer) to consume the message. ```bash php bin/hyperf.php gen:kafka-consumer KafkaConsumer ``` You can also use the `Hyperf\Kafka\Annotation\Consumer` annotation to declare a subclass of the `Hyperf/Kafka/AbstractConsumer` abstract class to complete the definition of a `Consumer`, where `Hyperf\ Both Kafka\Annotation\Consumer` annotations and abstract classes contain the following attributes: | Configuration | Type | Default | Description | | :-----------: | :----------------: | :-----------: | :----------------------------------------------------------------------------------------------: | | topic | string or string[] | '' | topic to monitor | | groupId | string | '' | groupId to be monitored | | memberId | string | '' | memberId to be monitored | | autoCommit | string | '' | Whether to commit automatically | | name | string | KafkaConsumer | Consumer's name | | nums | int | 1 | Number of consumer processes | | pool | string | default | The connection corresponding to the consumer, corresponding to the key of the configuration file | ```php getTopic() . ':' . $message->getKey() . ':' . $message->getValue()); } } ``` ### Producing a message You can call `Hyperf\Kafka\Producer::send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)` to deliver messages, the following is an example of message delivery in `Controller`: ```php send('hyperf', 'value', 'key'); } } ``` ### Send multiple messages at once The `Hyperf\Kafka\Producer::sendBatch(array $messages)` method is used to deliver messages in batches to `kafka`, the following is an example of message delivery in `Controller`: ```php sendBatch([ new ProduceMessage('hyperf1', 'hyperf1_value', 'hyperf1_key'), new ProduceMessage('hyperf2', 'hyperf2_value', 'hyperf2_key'), new ProduceMessage('hyperf3', 'hyperf3_value', 'hyperf3_key'), ]); } } ```