mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-12-05 05:07:58 +08:00
141 lines
3.3 KiB
Markdown
141 lines
3.3 KiB
Markdown
# AMQP组件
|
||
|
||
[hyperf/amqp](https://github.com/hyperf-cloud/amqp) 是实现 AMQP 标准的组件,主要适用于对 RabbitMQ 的使用。
|
||
|
||
## 安装
|
||
|
||
```bash
|
||
composer require hyperf/amqp
|
||
```
|
||
|
||
## 默认配置
|
||
```php
|
||
<?php
|
||
|
||
return [
|
||
'default' => [
|
||
'host' => 'localhost',
|
||
'port' => 5672,
|
||
'user' => 'guest',
|
||
'password' => 'guest',
|
||
'vhost' => '/',
|
||
'pool' => [
|
||
'min_connections' => 1,
|
||
'max_connections' => 10,
|
||
'connect_timeout' => 10.0,
|
||
'wait_timeout' => 3.0,
|
||
'heartbeat' => -1,
|
||
],
|
||
'params' => [
|
||
'insist' => false,
|
||
'login_method' => 'AMQPLAIN',
|
||
'login_response' => null,
|
||
'locale' => 'en_US',
|
||
'connection_timeout' => 3.0,
|
||
'read_write_timeout' => 3.0,
|
||
'context' => null,
|
||
'keepalive' => false,
|
||
'heartbeat' => 0,
|
||
],
|
||
],
|
||
'pool2' => [
|
||
...
|
||
]
|
||
];
|
||
```
|
||
|
||
可在 `producer` 或者 `consumer` 的 `__construct` 函数中, 设置不同 `pool`.
|
||
|
||
## 投递消息
|
||
|
||
使用 `generator` 工具新建一个 `producer`
|
||
```
|
||
php bin/hyperf.php gen:amqp-producer DemoProducer
|
||
```
|
||
|
||
在DemoProducer文件中,我们可以修改Producer注解对应的字段来替换对应的 `exchange` 和 `routingKey`。
|
||
其中 `payload` 就是最终投递到消息队列中的数据,所以我们可以随意改写 `__construct` 方法,只要最后赋值 `payload` 即可。
|
||
示例如下。
|
||
|
||
```php
|
||
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Amqp\Producers;
|
||
|
||
use Hyperf\Amqp\Annotation\Producer;
|
||
use Hyperf\Amqp\Message\ProducerMessage;
|
||
use App\Models\User;
|
||
|
||
/**
|
||
* DemoProducer
|
||
* @Producer(exchange="hyperf", routingKey="hyperf")
|
||
*/
|
||
class DemoProducer extends ProducerMessage
|
||
{
|
||
public function __construct($id)
|
||
{
|
||
// 设置不同 pool
|
||
$this->poolName = 'pool2';
|
||
|
||
$user = User::where('id', $id)->first();
|
||
$this->payload = [
|
||
'id' => $id,
|
||
'data' => $user->toArray()
|
||
];
|
||
}
|
||
}
|
||
|
||
```
|
||
|
||
通过container获取Producer实例,即可投递消息。以下实例直接使用ApplicationContext获取Producer其实并不合理,container具体使用请到di模块中查看。
|
||
|
||
```php
|
||
<?php
|
||
use Hyperf\Amqp\Producer;
|
||
use App\Amqp\Producers\DemoProducer;
|
||
use Hyperf\Utils\ApplicationContext;
|
||
|
||
$message = new DemoProducer(1);
|
||
$producer = ApplicationContext::getContainer()->get(Producer::class);
|
||
$result = $producer->produce($message);
|
||
|
||
```
|
||
|
||
## 消费消息
|
||
|
||
使用 `generator` 工具新建一个 `consumer`。
|
||
```
|
||
php bin/hyperf.php gen:amqp-consumer DemoConsumer
|
||
```
|
||
|
||
在DemoConsumer文件中,我们可以修改Consumer注解对应的字段来替换对应的 `exchange`、`routingKey` 和 `queue`。
|
||
其中 `$data` 就是解析后的元数据。
|
||
示例如下。
|
||
|
||
```php
|
||
<?php
|
||
|
||
declare(strict_types=1);
|
||
|
||
namespace App\Amqp\Consumers;
|
||
|
||
use Hyperf\Amqp\Annotation\Consumer;
|
||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||
use Hyperf\Amqp\Result;
|
||
|
||
/**
|
||
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
|
||
*/
|
||
class DemoConsumer extends ConsumerMessage
|
||
{
|
||
public function consume($data): string
|
||
{
|
||
print_r($data);
|
||
return Result::ACK;
|
||
}
|
||
}
|
||
```
|
||
|
||
框架会根据Consumer注解自动创建Process进程,进程意外退出后会被重新拉起。 |