2019-12-12 16:24:04 +08:00
|
|
|
|
# AMQP 元件
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
[hyperf/amqp](https://github.com/hyperf/amqp) 是實現 AMQP 標準的元件,主要適用於對 RabbitMQ 的使用。
|
2019-03-30 22:53:32 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
## 安裝
|
2019-03-30 22:53:32 +08:00
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
composer require hyperf/amqp
|
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
## 預設配置
|
2019-10-14 15:16:53 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
| 配置 | 型別 | 預設值 | 備註 |
|
2019-10-14 15:16:53 +08:00
|
|
|
|
|:----------------:|:------:|:---------:|:--------------:|
|
|
|
|
|
| host | string | localhost | Host |
|
2019-12-12 16:24:04 +08:00
|
|
|
|
| port | int | 5672 | 埠號 |
|
|
|
|
|
| user | string | guest | 使用者名稱 |
|
|
|
|
|
| password | string | guest | 密碼 |
|
2019-10-14 15:16:53 +08:00
|
|
|
|
| vhost | string | / | vhost |
|
2019-12-12 16:24:04 +08:00
|
|
|
|
| concurrent.limit | int | 0 | 同時消費的數量 |
|
|
|
|
|
| pool | object | | 連線池配置 |
|
2019-10-14 15:16:53 +08:00
|
|
|
|
| params | object | | 基本配置 |
|
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```php
|
2019-03-08 12:04:46 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
'default' => [
|
|
|
|
|
'host' => 'localhost',
|
|
|
|
|
'port' => 5672,
|
|
|
|
|
'user' => 'guest',
|
|
|
|
|
'password' => 'guest',
|
|
|
|
|
'vhost' => '/',
|
2019-10-14 15:16:53 +08:00
|
|
|
|
'concurrent' => [
|
|
|
|
|
'limit' => 1,
|
|
|
|
|
],
|
2019-03-08 12:04:46 +08:00
|
|
|
|
'pool' => [
|
|
|
|
|
'min_connections' => 1,
|
|
|
|
|
'max_connections' => 10,
|
|
|
|
|
'connect_timeout' => 10.0,
|
|
|
|
|
'wait_timeout' => 3.0,
|
|
|
|
|
'heartbeat' => -1,
|
|
|
|
|
],
|
|
|
|
|
'params' => [
|
|
|
|
|
'insist' => false,
|
|
|
|
|
'login_method' => 'AMQPLAIN',
|
|
|
|
|
'login_response' => null,
|
|
|
|
|
'locale' => 'en_US',
|
|
|
|
|
'connection_timeout' => 3.0,
|
2020-04-30 17:09:23 +08:00
|
|
|
|
'read_write_timeout' => 6.0,
|
2019-03-08 12:04:46 +08:00
|
|
|
|
'context' => null,
|
|
|
|
|
'keepalive' => false,
|
2019-12-04 16:54:31 +08:00
|
|
|
|
'heartbeat' => 3,
|
2020-06-16 01:43:41 +08:00
|
|
|
|
'close_on_destruct' => false,
|
2019-03-08 12:04:46 +08:00
|
|
|
|
],
|
|
|
|
|
],
|
2019-07-04 01:49:07 +08:00
|
|
|
|
'pool2' => [
|
|
|
|
|
...
|
|
|
|
|
]
|
2019-03-08 12:04:46 +08:00
|
|
|
|
];
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
可在 `producer` 或者 `consumer` 的 `__construct` 函式中, 設定不同 `pool`.
|
2019-07-04 01:49:07 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
## 投遞訊息
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
使用 `gen:producer` 命令建立一個 `producer`
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
|
|
|
|
```bash
|
2019-03-08 12:04:46 +08:00
|
|
|
|
php bin/hyperf.php gen:amqp-producer DemoProducer
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
在 DemoProducer 檔案中,我們可以修改 `@Producer` 註解對應的欄位來替換對應的 `exchange` 和 `routingKey`。
|
|
|
|
|
其中 `payload` 就是最終投遞到訊息佇列中的資料,所以我們可以隨意改寫 `__construct` 方法,只要最後賦值 `payload` 即可。
|
2019-03-08 12:04:46 +08:00
|
|
|
|
示例如下。
|
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
> 使用 `@Producer` 註解時需 `use Hyperf\Amqp\Annotation\Producer;` 名稱空間;
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```php
|
2019-03-08 12:04:46 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Amqp\Producers;
|
|
|
|
|
|
|
|
|
|
use Hyperf\Amqp\Annotation\Producer;
|
|
|
|
|
use Hyperf\Amqp\Message\ProducerMessage;
|
|
|
|
|
use App\Models\User;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* DemoProducer
|
|
|
|
|
* @Producer(exchange="hyperf", routingKey="hyperf")
|
|
|
|
|
*/
|
|
|
|
|
class DemoProducer extends ProducerMessage
|
|
|
|
|
{
|
|
|
|
|
public function __construct($id)
|
|
|
|
|
{
|
2019-12-12 16:24:04 +08:00
|
|
|
|
// 設定不同 pool
|
2019-07-04 01:49:07 +08:00
|
|
|
|
$this->poolName = 'pool2';
|
|
|
|
|
|
2019-03-08 12:04:46 +08:00
|
|
|
|
$user = User::where('id', $id)->first();
|
|
|
|
|
$this->payload = [
|
|
|
|
|
'id' => $id,
|
|
|
|
|
'data' => $user->toArray()
|
|
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:41:17 +08:00
|
|
|
|
通過 DI Container 獲取 `Hyperf\Amqp\Producer` 例項,即可投遞訊息。以下例項直接使用 `ApplicationContext` 獲取 `Hyperf\Amqp\Producer` 其實並不合理,DI Container 具體使用請到 [依賴注入](zh-tw/di.md) 章節中檢視。
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```php
|
2019-03-08 12:04:46 +08:00
|
|
|
|
<?php
|
|
|
|
|
use Hyperf\Amqp\Producer;
|
|
|
|
|
use App\Amqp\Producers\DemoProducer;
|
|
|
|
|
use Hyperf\Utils\ApplicationContext;
|
|
|
|
|
|
|
|
|
|
$message = new DemoProducer(1);
|
|
|
|
|
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
|
|
|
|
$result = $producer->produce($message);
|
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
## 消費訊息
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
使用 `gen:amqp-consumer` 命令建立一個 `consumer`。
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
|
|
|
|
```bash
|
2019-03-08 12:04:46 +08:00
|
|
|
|
php bin/hyperf.php gen:amqp-consumer DemoConsumer
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
在 DemoConsumer 檔案中,我們可以修改 `@Consumer` 註解對應的欄位來替換對應的 `exchange`、`routingKey` 和 `queue`。
|
|
|
|
|
其中 `$data` 就是解析後的訊息資料。
|
2019-03-08 12:04:46 +08:00
|
|
|
|
示例如下。
|
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
> 使用 `@Consumer` 註解時需 `use Hyperf\Amqp\Annotation\Consumer;` 名稱空間;
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```php
|
2019-03-08 12:04:46 +08:00
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Amqp\Consumers;
|
|
|
|
|
|
|
|
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
|
|
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
|
|
|
use Hyperf\Amqp\Result;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
|
|
|
|
|
*/
|
|
|
|
|
class DemoConsumer extends ConsumerMessage
|
|
|
|
|
{
|
|
|
|
|
public function consume($data): string
|
|
|
|
|
{
|
|
|
|
|
print_r($data);
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
}
|
|
|
|
|
}
|
2019-03-19 14:52:21 +08:00
|
|
|
|
```
|
2019-03-08 12:04:46 +08:00
|
|
|
|
|
2020-01-10 16:49:10 +08:00
|
|
|
|
### 禁止消費程序自啟
|
|
|
|
|
|
|
|
|
|
預設情況下,使用了 `@Consumer` 註解後,框架會自動建立子程序啟動消費者,並且會在子程序異常退出後,重新拉起。
|
|
|
|
|
如果出於開發階段,進行消費者除錯時,可能會因為消費其他訊息而導致除錯不便。
|
|
|
|
|
|
|
|
|
|
這種情況,只需要在 `@Consumer` 註解中配置 `enable=false` (預設為 `true` 跟隨服務啟動)或者在對應的消費者中重寫類方法 `isEnable()` 返回 `false` 即可
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Amqp\Consumers;
|
|
|
|
|
|
|
|
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
|
|
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
|
|
|
use Hyperf\Amqp\Result;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1, enable=false)
|
|
|
|
|
*/
|
|
|
|
|
class DemoConsumer extends ConsumerMessage
|
|
|
|
|
{
|
|
|
|
|
public function consume($data): string
|
|
|
|
|
{
|
|
|
|
|
print_r($data);
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function isEnable(): bool
|
|
|
|
|
{
|
|
|
|
|
return parent::isEnable();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
2020-04-30 17:09:23 +08:00
|
|
|
|
### 設定最大消費數
|
|
|
|
|
|
|
|
|
|
可以修改 `@Consumer` 註解中的 `maxConsumption` 屬性,設定此消費者最大處理的訊息數,達到指定消費數後,消費者程序會重啟。
|
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
### 消費結果
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
框架會根據 `Consumer` 內的 `consume` 方法所返回的結果來決定該訊息的響應行為,共有 4 中響應結果,分別為 `\Hyperf\Amqp\Result::ACK`、`\Hyperf\Amqp\Result::NACK`、`\Hyperf\Amqp\Result::REQUEUE`、`\Hyperf\Amqp\Result::DROP`,每個返回值分別代表如下行為:
|
2019-10-07 23:47:41 +08:00
|
|
|
|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
| 返回值 | 行為 |
|
2019-10-14 15:16:53 +08:00
|
|
|
|
|------------------------------|----------------------------------------------------------------------|
|
2019-12-12 16:24:04 +08:00
|
|
|
|
| \Hyperf\Amqp\Result::ACK | 確認訊息正確被消費掉了 |
|
|
|
|
|
| \Hyperf\Amqp\Result::NACK | 訊息沒有被正確消費掉,以 `basic_nack` 方法來響應 |
|
|
|
|
|
| \Hyperf\Amqp\Result::REQUEUE | 訊息沒有被正確消費掉,以 `basic_reject` 方法來響應,並使訊息重新入列 |
|
|
|
|
|
| \Hyperf\Amqp\Result::DROP | 訊息沒有被正確消費掉,以 `basic_reject` 方法來響應 |
|
2020-05-19 11:26:54 +08:00
|
|
|
|
|
|
|
|
|
## RPC 遠端過程呼叫
|
|
|
|
|
|
|
|
|
|
除了典型的訊息佇列場景,我們還可以通過 AMQP 來實現 RPC 遠端過程呼叫,本元件也為這個實現提供了對應的支援。
|
|
|
|
|
|
|
|
|
|
### 建立消費者
|
|
|
|
|
|
|
|
|
|
RPC 使用的消費者,與典型訊息佇列場景的消費者實現基本無差,唯一的區別是需要通過呼叫 `reply` 方法返回資料給生產者。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Amqp\Consumer;
|
|
|
|
|
|
|
|
|
|
use Hyperf\Amqp\Annotation\Consumer;
|
|
|
|
|
use Hyperf\Amqp\Message\ConsumerMessage;
|
|
|
|
|
use Hyperf\Amqp\Result;
|
|
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="rpc.reply", name="ReplyConsumer", nums=1, enable=true)
|
|
|
|
|
*/
|
|
|
|
|
class ReplyConsumer extends ConsumerMessage
|
|
|
|
|
{
|
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): string
|
|
|
|
|
{
|
|
|
|
|
$data['message'] .= 'Reply:' . $data['message'];
|
|
|
|
|
|
|
|
|
|
$this->reply($data, $message);
|
|
|
|
|
|
|
|
|
|
return Result::ACK;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 發起 RPC 呼叫
|
|
|
|
|
|
|
|
|
|
作為生成者發起一次 RPC 遠端過程呼叫也非常的簡單,只需通過依賴注入容器獲得 `Hyperf\Amqp\RpcClient` 物件並呼叫其中的 `call` 方法即可,返回的結果是消費者 reply 的資料,如下所示:
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
use Hyperf\Amqp\Message\DynamicRpcMessage;
|
|
|
|
|
use Hyperf\Amqp\RpcClient;
|
|
|
|
|
use Hyperf\Utils\ApplicationContext;
|
|
|
|
|
|
|
|
|
|
$rpcClient = ApplicationContext::getContainer()->get(RpcClient::class);
|
|
|
|
|
// 在 DynamicRpcMessage 上設定與 Consumer 一致的 Exchange 和 RoutingKey
|
|
|
|
|
$result = $rpcClient->call(new DynamicRpcMessage('hyperf', 'hyperf', ['message' => 'Hello Hyperf']));
|
|
|
|
|
|
|
|
|
|
// $result:
|
|
|
|
|
// array(1) {
|
|
|
|
|
// ["message"]=>
|
|
|
|
|
// string(18) "Reply:Hello Hyperf"
|
|
|
|
|
// }
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### 抽象 RpcMessage
|
|
|
|
|
|
|
|
|
|
上面的 RPC 呼叫過程是直接通過 `Hyperf\Amqp\Message\DynamicRpcMessage` 類來完成 Exchange 和 RoutingKey 的定義,並傳遞訊息資料,在生產專案的設計上,我們可以對 RpcMessage 進行一層抽象,以統一 Exchange 和 RoutingKey 的定義。
|
|
|
|
|
|
|
|
|
|
我們可以建立對應的 RpcMessage 類如 `App\Amqp\FooRpcMessage` 如下:
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
use Hyperf\Amqp\Message\RpcMessage;
|
|
|
|
|
|
|
|
|
|
class FooRpcMessage extends RpcMessage
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
protected $exchange = 'hyperf';
|
|
|
|
|
|
|
|
|
|
protected $routingKey = 'hyperf';
|
|
|
|
|
|
|
|
|
|
public function __construct($data)
|
|
|
|
|
{
|
|
|
|
|
// 要傳遞資料
|
|
|
|
|
$this->payload = $data;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2020-06-16 01:43:41 +08:00
|
|
|
|
這樣我們進行 RPC 呼叫時,只需直接傳遞 `FooRpcMessage` 例項到 `call` 方法即可,無需每次呼叫時都去定義 Exchange 和 RoutingKey。
|