# NSQ [NSQ](https://nsq.io) 是一个由 Go 语言编写的开源、轻量级、高性能的实时分布式消息中间件。 ## 安装 ```bash composer requir hyperf/nsq ``` ## 使用 ### 配置 NSQ 组件的配置文件默认位于 `config/autoload/nsq.php` 内,如该文件不存在,可通过 `php bin/hyperf.php vendor:publish hyperf/nsq` 命令来将发布对应的配置文件。 默认配置文件如下: ```php [ 'host' => '127.0.0.1', 'port' => 4150, 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, 'max_idle_time' => 60.0, ], ], ]; ``` ### 创建消费者 通过 `gen:nsq-consumer` 命令可以快速的生成一个 消费者(Consumer) 对消息进行消费。 ```bash php bin/hyperf.php gen:nsq-consumer DemoConsumer ``` 您也可以通过使用 `Hyperf\Nsq\Annotation\Consumer` 注解来对一个 `Hyperf/Nsq/AbstractConsumer` 抽象类的子类进行声明,来完成一个 消费者(Consumer) 的定义,其中`Hyperf\Nsq\Annotation\Consumer` 注解和抽象类均包含以下属性: | 配置 | 类型 | 注解或抽象类默认值 | 备注 | |:-------:|:------:|:------:|:----------------:| | topic | string | '' | 要监听的 topic | | channel | string | '' | 要监听的 channel | | name | string | NsqConsumer | 消费者的名称 | | nums | int | 1 | 消费者的进程数 | | pool | string | default | 消费者对应的连接,对应配置文件的 key | 这些注解属性是可选的,因为 `Hyperf/Nsq/AbstractConsumer` 抽象类中也分别定义了对应的成员属性以及 getter 和 setter,当不对注解属性进行定义时,会使用抽象类的属性默认值。 ```php getBody()); return Result::ACK; } } ``` ### 禁止消费进程自启 默认情况下,使用了 `@Consumer` 注解定义后,框架会在启动时自动创建子进程来启动消费者,并且会在子进程异常退出后,自动重新拉起。但如果在处于开发阶段进行某些调试工作时,可能会因为消费者的自动消费导致调试的不便。 在这种情况下,您可通过全局关闭和局部关闭两种形式来控制消费进程的自启。 #### 全局关闭 您可以在默认配置文件 `config/autoload/nsq.php` 中,将对应连接的 `enable` 选项设置为 `false`,即代表该连接下的所有消费者进程都关闭自启功能。 #### 局部关闭 当您只需要关闭个别消费进程的自启功能,只需要在对应的消费者中重写父类方法 `isEnable()` 并返回 `false` 即可关闭此消费者的自启功能; ```php getBody(), true); var_dump($body); return Result::ACK; } } ``` ### 投递消息 您可以通过调用 `Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0)` 方法来向 NSQ 投递消息, 下面是在 Command 进行消息投递的一个示例: ```php publish($topic, $message); $this->line('success', 'info'); } } ``` ### 一次性投递多条消息 `Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0)` 方法的第二个参数除了可以传递一个字符串外,还可以传递一个字符串数组,来实现一次性向一个 Topic 投递多条消息的功能,示例如下: ```php publish($topic, $messages); $this->line('success', 'info'); } } ``` ### 投递延迟消息 当您希望您投递的消息在特定的时间后再去消费,也可通过对 `Hyperf\Nsq\Nsq::publish(string $topic, $message, float $deferTime = 0.0)` 方法的第三个参数传递对应的延迟时长,单位为秒,示例如下: ```php publish($topic, $message, $deferTime); $this->line('success', 'info'); } } ``` ## NSQ 协议 > https://nsq.io/clients/tcp_protocol_spec.html - Socket 基础 ```plantuml @startuml autonumber hide footbox title **Socket 基础** participant "客户端" as client participant "服务器" as server #orange activate client activate server note right of server: 建立连接 client -> server: socket->connect(ip, port) ... note right of server: 多次通信 send/recv client -> server: socket->send() server-> client: socket->recv() ... note right of server: 关闭连接 client->server: socket->close() deactivate client deactivate server @enduml ``` - NSQ 协议流程 ```plantuml @startuml autonumber hide footbox title **NSQ 协议** participant "客户端" as client participant "服务器" as server #orange activate client activate server == connect == note left of client: connect 后都为 socket->send/recv client -> server: socket->connect(ip, host) note left of client: protocol version client->server: magic: V2 == auth == note left of client: client metadata client->server: IDENTIFY note right of server: 如果需要 auth server->client: auth_required=true client->server: AUTH ... == pub == note left of client: 发送一条消息 client -> server: PUB note left of client: 发送多条消息 client -> server: MPUB note left of client: 发送一条延时消息 client -> server: DPUB ... == sub == note left of client: client 使用 channel 订阅 topic note right of server: SUB 成功后, client 出于 RDY 0 阶段 client -> server: SUB note left of client: 使用 RDY 告诉 server 准备好消费 条消息 client -> server: RDY note right of server: server 返回 client 条消息 server -> client: msg note left of client: 标记消息完成消费(消费成功) client -> server: FIN note left of client: 消息重新入队(消费失败, 重新入队) client -> server: REQ note left of client: 重置消息超时时间 client -> server: TOUCH ... == heartbeat == server -> client: _heartbeat_ note right of server: client 2 次没有应答 NOP, server 将断开连接 client -> server: NOP ... == close == note left of client: clean close connection, 表示没有消息了, 关闭连接 client -> server: CLS note right of server: server 端成功应答 server -> client: CLOSE_WAIT deactivate client deactivate server @enduml ```