# AMQP 元件 [hyperf/amqp](https://github.com/hyperf/amqp) 是實現 AMQP 標準的元件,主要適用於對 RabbitMQ 的使用。 ## 安裝 ```bash composer require hyperf/amqp ``` ## 預設配置 | 配置 | 型別 | 預設值 | 備註 | |:----------------:|:------:|:---------:|:---------:| | host | string | localhost | Host | | port | int | 5672 | 埠號 | | user | string | guest | 使用者名稱 | | password | string | guest | 密碼 | | vhost | string | / | vhost | | concurrent.limit | int | 0 | 同時消費的最大數量 | | pool | object | | 連線池配置 | | pool.connections | int | 1 | 程序內保持的連線數 | | params | object | | 基本配置 | ```php true, 'default' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'concurrent' => [ 'limit' => 1, ], 'pool' => [ 'connections' => 1, ], 'params' => [ 'insist' => false, 'login_method' => 'AMQPLAIN', 'login_response' => null, 'locale' => 'en_US', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => false, 'heartbeat' => 3, 'close_on_destruct' => false, ], ], 'pool2' => [ ... ] ]; ``` 可在 `producer` 或者 `consumer` 的 `__construct` 函式中,設定不同 `pool`,例如上述的 `default` 和 `pool2`。 ## 投遞訊息 使用 `gen:producer` 命令建立一個 `producer` ```bash php bin/hyperf.php gen:amqp-producer DemoProducer ``` 在 DemoProducer 檔案中,我們可以修改 `#[Producer]` 註解對應的欄位來替換對應的 `exchange` 和 `routingKey`。 其中 `payload` 就是最終投遞到訊息佇列中的資料,所以我們可以隨意改寫 `__construct` 方法,只要最後賦值 `payload` 即可。 示例如下。 > 使用 `#[Producer]` 註解時需 `use Hyperf\Amqp\Annotation\Producer;` 名稱空間; ```php poolName = 'pool2'; $user = User::where('id', $id)->first(); $this->payload = [ 'id' => $id, 'data' => $user->toArray() ]; } } ``` 透過 DI Container 獲取 `Hyperf\Amqp\Producer` 例項,即可投遞訊息。以下例項直接使用 `ApplicationContext` 獲取 `Hyperf\Amqp\Producer` 其實並不合理,DI Container 具體使用請到 [依賴注入](zh-tw/di.md) 章節中檢視。 ```php get(Producer::class); $result = $producer->produce($message); ``` ## 消費訊息 使用 `gen:amqp-consumer` 命令建立一個 `consumer`。 ```bash php bin/hyperf.php gen:amqp-consumer DemoConsumer ``` 在 DemoConsumer 檔案中,我們可以修改 `#[Consumer]` 註解對應的欄位來替換對應的 `exchange`、`routingKey` 和 `queue`。 其中 `$data` 就是解析後的訊息資料。 示例如下。 > 使用 `#[Consumer]` 註解時需 `use Hyperf\Amqp\Annotation\Consumer;` 名稱空間; ```php 0, // 同一個消費者,最高同時可以處理的訊息數。 'prefetch_count' => 30, // 因為 Hyperf 預設一個 Channel 只消費一個 佇列,所以 global 設定為 true/false 效果是一樣的。 'global' => false, ]; public function consumeMessage($data, AMQPMessage $message): Result { print_r($data); return Result::ACK; } } ``` ### 根據環境自定義消費程序數量 在 `#[Consumer]` 註解中,可以透過 `nums` 屬性來設定消費程序數量,如果需要根據不同環境來設定不同的消費程序數量,可以透過重寫 `getNums` 方法來實現,示例如下: ```php #[Consumer( exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: 'hyperf', nums: 1 )] final class DemoConsumer extends ConsumerMessage { public function getNums(): int { if (is_debug()) { return 10; } return parent::getNums(); } } ``` ## 延時佇列 AMQP 的延時佇列,並不會根據延時時間進行排序,所以,一旦你投遞了一個延時 10s 的任務,又往這個佇列中投遞了一個延時 5s 的任務,那麼也一定會在第一個 10s 任務完成後,才會消費第二個 5s 的任務。 所以,需要根據時間設定不同的佇列,如果想要更加靈活的延時佇列,可以嘗試 非同步佇列(async-queue) 和 AMQP 配合使用。 另外,AMQP 需要下載 [延時外掛](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases),並激活才能正常使用 ```shell wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/ rabbitmq-plugins enable rabbitmq_delayed_message_exchange ``` ### 生產者 使用 `gen:amqp-producer` 命令建立一個 `producer`。這裡舉例 `direct` 型別,其他型別如 `fanout`、`topic`,改生產者和消費者中的 `type` 即可。 ```bash php bin/hyperf.php gen:amqp-producer DelayDirectProducer ``` 在 DelayDirectProducer 檔案中,加入`use ProducerDelayedMessageTrait;`,示例如下: ```php payload = $data; } } ``` ### 消費者 使用 `gen:amqp-consumer` 命令建立一個 `consumer`。 ```bash php bin/hyperf.php gen:amqp-consumer DelayDirectConsumer ``` 在 `DelayDirectConsumer` 檔案中,增加引入`use ProducerDelayedMessageTrait, ConsumerDelayedMessageTrait;`,示例如下: ```php 以下是在 Command 中演示如何使用,具體用法請以實際為準 使用 `gen:command DelayCommand` 命令建立一個 `DelayCommand`。如下: ```php container = $container; parent::__construct('demo:command'); } public function configure() { parent::configure(); $this->setDescription('Hyperf Demo Command'); } public function handle() { //1.delayed + direct $message = new DelayDirectProducer('delay+direct produceTime:'.(microtime(true))); //2.delayed + fanout //$message = new DelayFanoutProducer('delay+fanout produceTime:'.(microtime(true))); //3.delayed + topic //$message = new DelayTopicProducer('delay+topic produceTime:' . (microtime(true))); $message->setDelayMs(5000); $producer = ApplicationContext::getContainer()->get(Producer::class); $producer->produce($message); } } ``` 執行命令列生產訊息 ``` php bin/hyperf.php demo:command ``` ## RPC 遠端過程呼叫 除了典型的訊息佇列場景,我們還可以透過 AMQP 來實現 RPC 遠端過程呼叫,本元件也為這個實現提供了對應的支援。 ### 建立消費者 RPC 使用的消費者,與典型訊息佇列場景的消費者實現基本無差,唯一的區別是需要透過呼叫 `reply` 方法返回資料給生產者。 ```php reply($data, $message); return Result::ACK; } } ``` ### 發起 RPC 呼叫 作為生成者發起一次 RPC 遠端過程呼叫也非常的簡單,只需透過依賴注入容器獲得 `Hyperf\Amqp\RpcClient` 物件並呼叫其中的 `call` 方法即可,返回的結果是消費者 reply 的資料,如下所示: ```php get(RpcClient::class); // 在 DynamicRpcMessage 上設定與 Consumer 一致的 Exchange 和 RoutingKey $result = $rpcClient->call(new DynamicRpcMessage('hyperf', 'hyperf', ['message' => 'Hello Hyperf'])); // $result: // array(1) { // ["message"]=> // string(18) "Reply:Hello Hyperf" // } ``` ### 抽象 RpcMessage 上面的 RPC 呼叫過程是直接透過 `Hyperf\Amqp\Message\DynamicRpcMessage` 類來完成 Exchange 和 RoutingKey 的定義,並傳遞訊息資料,在生產專案的設計上,我們可以對 RpcMessage 進行一層抽象,以統一 Exchange 和 RoutingKey 的定義。 我們可以建立對應的 RpcMessage 類如 `App\Amqp\FooRpcMessage` 如下: ```php payload = $data; } } ``` 這樣我們進行 RPC 呼叫時,只需直接傳遞 `FooRpcMessage` 例項到 `call` 方法即可,無需每次呼叫時都去定義 Exchange 和 RoutingKey。