hyperf/docs/zh-tw/amqp.md
李铭昕 989c07ab55 Merge branch 'master' into 2.2-merge
# Conflicts:
#	bin/release.sh
#	bin/split.sh
#	src/validation/src/Concerns/FormatsMessages.php
2021-06-28 09:40:51 +08:00

8.9 KiB
Raw Blame History

AMQP 元件

hyperf/amqp 是實現 AMQP 標準的元件,主要適用於對 RabbitMQ 的使用。

安裝

composer require hyperf/amqp

預設配置

配置 型別 預設值 備註
host string localhost Host
port int 5672 埠號
user string guest 使用者名稱
password string guest 密碼
vhost string / vhost
concurrent.limit int 0 同時消費的數量
pool.connections int 1 程序內保持的連線數
pool object 連線池配置
params object 基本配置
<?php

return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'concurrent' => [
            'limit' => 1,
        ],
        'pool' => [
            'connections' => 1,
        ],
        'params' => [
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 6.0,
            'context' => null,
            'keepalive' => false,
            'heartbeat' => 3,
            'close_on_destruct' => false,
        ],
    ],
    'pool2' => [
        ...
    ]
];

可在 producer 或者 consumer__construct 函式中,設定不同 pool,例如上述的 defaultpool2

投遞訊息

使用 gen:producer 命令建立一個 producer

php bin/hyperf.php gen:amqp-producer DemoProducer

在 DemoProducer 檔案中,我們可以修改 @Producer 註解對應的欄位來替換對應的 exchangeroutingKey。 其中 payload 就是最終投遞到訊息佇列中的資料,所以我們可以隨意改寫 __construct 方法,只要最後賦值 payload 即可。 示例如下。

使用 @Producer 註解時需 use Hyperf\Amqp\Annotation\Producer; 名稱空間;

<?php

declare(strict_types=1);

namespace App\Amqp\Producers;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
use App\Models\User;

/**
 * @Producer(exchange="hyperf", routingKey="hyperf")
 */
class DemoProducer extends ProducerMessage
{
    public function __construct($id)
    {
        // 設定不同 pool
        $this->poolName = 'pool2';

        $user = User::where('id', $id)->first();
        $this->payload = [
            'id' => $id,
            'data' => $user->toArray()
        ];
    }
}

通過 DI Container 獲取 Hyperf\Amqp\Producer 例項,即可投遞訊息。以下例項直接使用 ApplicationContext 獲取 Hyperf\Amqp\Producer 其實並不合理DI Container 具體使用請到 依賴注入 章節中檢視。

<?php
use Hyperf\Amqp\Producer;
use App\Amqp\Producers\DemoProducer;
use Hyperf\Utils\ApplicationContext;

$message = new DemoProducer(1);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);

消費訊息

使用 gen:amqp-consumer 命令建立一個 consumer

php bin/hyperf.php gen:amqp-consumer DemoConsumer

在 DemoConsumer 檔案中,我們可以修改 @Consumer 註解對應的欄位來替換對應的 exchangeroutingKeyqueue。 其中 $data 就是解析後的訊息資料。 示例如下。

使用 @Consumer 註解時需 use Hyperf\Amqp\Annotation\Consumer; 名稱空間;

<?php

declare(strict_types=1);

namespace App\Amqp\Consumers;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        print_r($data);
        return Result::ACK;
    }
}

禁止消費程序自啟

預設情況下,使用了 @Consumer 註解後,框架會自動建立子程序啟動消費者,並且會在子程序異常退出後,重新拉起。 如果出於開發階段,進行消費者除錯時,可能會因為消費其他訊息而導致除錯不便。

這種情況,只需要在 @Consumer 註解中配置 enable=false (預設為 true 跟隨服務啟動)或者在對應的消費者中重寫類方法 isEnable() 返回 false 即可

<?php

declare(strict_types=1);

namespace App\Amqp\Consumers;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1, enable=false)
 */
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        print_r($data);
        return Result::ACK;
    }

    public function isEnable(): bool
    {
        return parent::isEnable();
    }
}

設定最大消費數

可以修改 @Consumer 註解中的 maxConsumption 屬性,設定此消費者最大處理的訊息數,達到指定消費數後,消費者程序會重啟。

消費結果

框架會根據 Consumer 內的 consume 方法所返回的結果來決定該訊息的響應行為,共有 4 中響應結果,分別為 \Hyperf\Amqp\Result::ACK\Hyperf\Amqp\Result::NACK\Hyperf\Amqp\Result::REQUEUE\Hyperf\Amqp\Result::DROP,每個返回值分別代表如下行為:

返回值 行為
\Hyperf\Amqp\Result::ACK 確認訊息正確被消費掉了
\Hyperf\Amqp\Result::NACK 訊息沒有被正確消費掉,以 basic_nack 方法來響應
\Hyperf\Amqp\Result::REQUEUE 訊息沒有被正確消費掉,以 basic_reject 方法來響應,並使訊息重新入列
\Hyperf\Amqp\Result::DROP 訊息沒有被正確消費掉,以 basic_reject 方法來響應

RPC 遠端過程呼叫

除了典型的訊息佇列場景,我們還可以通過 AMQP 來實現 RPC 遠端過程呼叫,本元件也為這個實現提供了對應的支援。

建立消費者

RPC 使用的消費者,與典型訊息佇列場景的消費者實現基本無差,唯一的區別是需要通過呼叫 reply 方法返回資料給生產者。

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="rpc.reply", name="ReplyConsumer", nums=1, enable=true)
 */
class ReplyConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        $data['message'] .= 'Reply:' . $data['message'];

        $this->reply($data, $message);

        return Result::ACK;
    }
}

發起 RPC 呼叫

作為生成者發起一次 RPC 遠端過程呼叫也非常的簡單,只需通過依賴注入容器獲得 Hyperf\Amqp\RpcClient 物件並呼叫其中的 call 方法即可,返回的結果是消費者 reply 的資料,如下所示:

<?php
use Hyperf\Amqp\Message\DynamicRpcMessage;
use Hyperf\Amqp\RpcClient;
use Hyperf\Utils\ApplicationContext;

$rpcClient = ApplicationContext::getContainer()->get(RpcClient::class);
// 在 DynamicRpcMessage 上設定與 Consumer 一致的 Exchange 和 RoutingKey
$result = $rpcClient->call(new DynamicRpcMessage('hyperf', 'hyperf', ['message' => 'Hello Hyperf'])); 

// $result:
// array(1) {
//     ["message"]=>
//     string(18) "Reply:Hello Hyperf"
// }

抽象 RpcMessage

上面的 RPC 呼叫過程是直接通過 Hyperf\Amqp\Message\DynamicRpcMessage 類來完成 Exchange 和 RoutingKey 的定義,並傳遞訊息資料,在生產專案的設計上,我們可以對 RpcMessage 進行一層抽象,以統一 Exchange 和 RoutingKey 的定義。

我們可以建立對應的 RpcMessage 類如 App\Amqp\FooRpcMessage 如下:

<?php
use Hyperf\Amqp\Message\RpcMessage;

class FooRpcMessage extends RpcMessage
{

    protected $exchange = 'hyperf';

    protected $routingKey = 'hyperf';
    
    public function __construct($data)
    {
        // 要傳遞資料
        $this->payload = $data;
    }

}

這樣我們進行 RPC 呼叫時,只需直接傳遞 FooRpcMessage 例項到 call 方法即可,無需每次呼叫時都去定義 Exchange 和 RoutingKey。