2019-03-30 15:00:10 +08:00
|
|
|
|
# 异步队列
|
|
|
|
|
|
2021-03-14 13:29:35 +08:00
|
|
|
|
异步队列区别于 `RabbitMQ` `Kafka` 等消息队列,它只提供一种 `异步处理` 和 `异步延时处理` 的能力,并 **不能** 严格地保证消息的持久化和 **不支持** 完备的 ACK 应答机制。
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-03-30 22:53:32 +08:00
|
|
|
|
## 安装
|
|
|
|
|
|
|
|
|
|
```bash
|
2019-04-10 14:28:15 +08:00
|
|
|
|
composer require hyperf/async-queue
|
2019-03-30 22:53:32 +08:00
|
|
|
|
```
|
|
|
|
|
|
2019-03-30 15:00:10 +08:00
|
|
|
|
## 配置
|
|
|
|
|
|
2019-07-04 20:10:57 +08:00
|
|
|
|
配置文件位于 `config/autoload/async_queue.php`,如文件不存在可自行创建。
|
|
|
|
|
|
|
|
|
|
> 暂时只支持 `Redis Driver` 驱动。
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-10-08 14:37:27 +08:00
|
|
|
|
| 配置 | 类型 | 默认值 | 备注 |
|
|
|
|
|
|:----------------:|:---------:|:-------------------------------------------:|:---------------------------------------:|
|
|
|
|
|
| driver | string | Hyperf\AsyncQueue\Driver\RedisDriver::class | 无 |
|
|
|
|
|
| channel | string | queue | 队列前缀 |
|
2020-04-27 22:55:38 +08:00
|
|
|
|
| redis.pool | string | default | redis 连接池 |
|
2020-02-03 10:22:52 +08:00
|
|
|
|
| timeout | int | 2 | pop 消息的超时时间 |
|
2019-10-08 14:37:27 +08:00
|
|
|
|
| retry_seconds | int,array | 5 | 失败后重新尝试间隔 |
|
|
|
|
|
| handle_timeout | int | 10 | 消息处理超时时间 |
|
|
|
|
|
| processes | int | 1 | 消费进程数 |
|
|
|
|
|
| concurrent.limit | int | 1 | 同时处理消息数 |
|
|
|
|
|
| max_messages | int | 0 | 进程重启所需最大处理的消息数 默认不重启 |
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
'default' => [
|
2019-04-10 14:28:15 +08:00
|
|
|
|
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
|
2020-04-16 11:55:03 +08:00
|
|
|
|
'redis' => [
|
|
|
|
|
'pool' => 'default'
|
|
|
|
|
],
|
2019-03-30 15:00:10 +08:00
|
|
|
|
'channel' => 'queue',
|
2019-10-08 14:37:27 +08:00
|
|
|
|
'timeout' => 2,
|
2019-03-30 15:00:10 +08:00
|
|
|
|
'retry_seconds' => 5,
|
2019-10-08 14:37:27 +08:00
|
|
|
|
'handle_timeout' => 10,
|
2019-03-30 15:00:10 +08:00
|
|
|
|
'processes' => 1,
|
2019-10-08 14:37:27 +08:00
|
|
|
|
'concurrent' => [
|
|
|
|
|
'limit' => 5,
|
|
|
|
|
],
|
2019-03-30 15:00:10 +08:00
|
|
|
|
],
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2019-09-20 17:32:53 +08:00
|
|
|
|
`retry_seconds` 也可以传入数组,根据重试次数相应修改重试时间,例如
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
'default' => [
|
|
|
|
|
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
|
|
|
|
|
'channel' => 'queue',
|
|
|
|
|
'retry_seconds' => [1, 5, 10, 20],
|
|
|
|
|
'processes' => 1,
|
|
|
|
|
],
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2020-11-05 16:35:24 +08:00
|
|
|
|
## 工作原理
|
|
|
|
|
|
2023-01-03 07:13:43 +08:00
|
|
|
|
`ConsumerProcess` 是异步消费进程,会根据用户创建的 `Job` 或者使用 `#[AsyncQueueMessage]` 的代码块,执行消费逻辑。
|
|
|
|
|
`Job` 和 `#[AsyncQueueMessage]` 都是需要投递和执行的任务,即数据、消费逻辑都会在任务中定义。
|
2020-11-05 16:35:24 +08:00
|
|
|
|
|
|
|
|
|
- `Job` 类中成员变量即为待消费的数据,`handle()` 方法则为消费逻辑。
|
2023-01-03 07:13:43 +08:00
|
|
|
|
- `#[AsyncQueueMessage]` 注解的方法,构造函数传入的数据即为待消费的数据,方法体则为消费逻辑。
|
2020-11-05 16:35:24 +08:00
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
graph LR;
|
|
|
|
|
A[服务启动]-->B[异步消费进程启动]
|
|
|
|
|
B-->C[监听队列]
|
|
|
|
|
D[投递任务]-->C
|
|
|
|
|
C-->F[消费任务]
|
|
|
|
|
```
|
|
|
|
|
|
2019-03-30 15:00:10 +08:00
|
|
|
|
## 使用
|
|
|
|
|
|
2020-11-05 16:35:24 +08:00
|
|
|
|
### 配置异步消费进程
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2020-11-05 16:35:24 +08:00
|
|
|
|
组件已经提供了默认 `异步消费进程`,只需要将它配置到 `config/autoload/processes.php` 中即可。
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
return [
|
2019-04-10 14:28:15 +08:00
|
|
|
|
Hyperf\AsyncQueue\Process\ConsumerProcess::class,
|
2019-03-30 15:00:10 +08:00
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
当然,您也可以将以下 `Process` 添加到自己的项目中。
|
|
|
|
|
|
2020-11-05 16:35:24 +08:00
|
|
|
|
> 配置方式和注解方式,二选一即可。
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Process;
|
|
|
|
|
|
|
|
|
|
use Hyperf\AsyncQueue\Process\ConsumerProcess;
|
|
|
|
|
use Hyperf\Process\Annotation\Process;
|
|
|
|
|
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[Process(name: "async-queue")]
|
2019-07-25 10:05:33 +08:00
|
|
|
|
class AsyncQueueConsumer extends ConsumerProcess
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2019-09-24 00:04:15 +08:00
|
|
|
|
### 生产消息
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-10-16 13:56:16 +08:00
|
|
|
|
#### 传统方式
|
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
这种模式会把对象直接序列化然后存到 `Redis` 等队列中,所以为了保证序列化后的体积,尽量不要将 `Container`,`Config` 等设置为成员变量。
|
|
|
|
|
|
2023-01-03 07:13:43 +08:00
|
|
|
|
比如以下 `Job` 的定义,是 **不可取** 的,同理 `#[Inject]` 也是如此。
|
2020-10-21 13:20:40 +08:00
|
|
|
|
|
|
|
|
|
> 因为 Job 会被序列化,所以成员变量不要包含 匿名函数 等 无法被序列化 的内容,如果不清楚哪些内容无法被序列化,尽量使用注解方式。
|
2020-01-12 16:13:40 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Job;
|
|
|
|
|
|
|
|
|
|
use Hyperf\AsyncQueue\Job;
|
|
|
|
|
use Psr\Container\ContainerInterface;
|
|
|
|
|
|
|
|
|
|
class ExampleJob extends Job
|
|
|
|
|
{
|
|
|
|
|
public $container;
|
|
|
|
|
|
|
|
|
|
public $params;
|
|
|
|
|
|
|
|
|
|
public function __construct(ContainerInterface $container, $params)
|
|
|
|
|
{
|
|
|
|
|
$this->container = $container;
|
|
|
|
|
$this->params = $params;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public function handle()
|
|
|
|
|
{
|
|
|
|
|
// 根据参数处理具体逻辑
|
|
|
|
|
var_dump($this->params);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$job = make(ExampleJob::class);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
正确的 `Job` 应该是只有需要处理的数据,其他相关数据,可以在 `handle` 方法中重新获取,如下。
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
namespace App\Job;
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-04-10 14:28:15 +08:00
|
|
|
|
use Hyperf\AsyncQueue\Job;
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
|
|
|
|
class ExampleJob extends Job
|
|
|
|
|
{
|
2019-07-25 10:05:33 +08:00
|
|
|
|
public $params;
|
2020-10-29 15:59:31 +08:00
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
|
|
|
|
|
*/
|
2021-11-20 10:58:02 +08:00
|
|
|
|
protected int $maxAttempts = 2;
|
2019-07-25 02:05:31 +08:00
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
public function __construct($params)
|
2019-07-25 02:05:31 +08:00
|
|
|
|
{
|
2019-07-25 10:05:33 +08:00
|
|
|
|
// 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
|
|
|
|
|
$this->params = $params;
|
2019-07-25 02:05:31 +08:00
|
|
|
|
}
|
2019-07-25 10:05:33 +08:00
|
|
|
|
|
2019-03-30 15:00:10 +08:00
|
|
|
|
public function handle()
|
|
|
|
|
{
|
2019-07-25 10:05:33 +08:00
|
|
|
|
// 根据参数处理具体逻辑
|
2020-01-12 16:13:40 +08:00
|
|
|
|
// 通过具体参数获取模型等
|
2020-11-05 16:35:24 +08:00
|
|
|
|
// 这里的逻辑会在 ConsumerProcess 进程中执行
|
2019-07-25 02:05:31 +08:00
|
|
|
|
var_dump($this->params);
|
2019-03-30 15:00:10 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
正确定义完 `Job` 后,我们需要写一个专门投递消息的 `Service`,代码如下。
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
namespace App\Service;
|
|
|
|
|
|
|
|
|
|
use App\Job\ExampleJob;
|
2019-04-10 14:28:15 +08:00
|
|
|
|
use Hyperf\AsyncQueue\Driver\DriverFactory;
|
2019-07-04 17:41:44 +08:00
|
|
|
|
use Hyperf\AsyncQueue\Driver\DriverInterface;
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
class QueueService
|
2019-03-30 15:00:10 +08:00
|
|
|
|
{
|
2021-11-20 10:58:02 +08:00
|
|
|
|
protected DriverInterface $driver;
|
2019-03-30 15:00:10 +08:00
|
|
|
|
|
2019-05-13 02:25:03 +08:00
|
|
|
|
public function __construct(DriverFactory $driverFactory)
|
2019-03-30 15:00:10 +08:00
|
|
|
|
{
|
2019-05-13 02:25:03 +08:00
|
|
|
|
$this->driver = $driverFactory->get('default');
|
2019-03-30 15:00:10 +08:00
|
|
|
|
}
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
/**
|
2019-09-24 00:04:15 +08:00
|
|
|
|
* 生产消息.
|
2019-07-25 10:05:33 +08:00
|
|
|
|
* @param $params 数据
|
|
|
|
|
* @param int $delay 延时时间 单位秒
|
|
|
|
|
*/
|
2019-07-29 00:19:34 +08:00
|
|
|
|
public function push($params, int $delay = 0): bool
|
2019-07-04 17:41:44 +08:00
|
|
|
|
{
|
2019-07-25 10:05:33 +08:00
|
|
|
|
// 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
|
|
|
|
|
// 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
|
2019-07-25 10:25:50 +08:00
|
|
|
|
// 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
|
2019-07-25 10:05:33 +08:00
|
|
|
|
return $this->driver->push(new ExampleJob($params), $delay);
|
2019-07-25 02:05:31 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
投递消息
|
|
|
|
|
|
|
|
|
|
接下来,调用我们的 `QueueService` 投递消息即可。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Controller;
|
|
|
|
|
|
|
|
|
|
use App\Service\QueueService;
|
|
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
|
|
use Hyperf\HttpServer\Annotation\AutoController;
|
|
|
|
|
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[AutoController]
|
2020-12-03 15:58:26 +08:00
|
|
|
|
class QueueController extends AbstractController
|
2020-01-12 16:13:40 +08:00
|
|
|
|
{
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[Inject]
|
|
|
|
|
protected QueueService $service;
|
2020-01-12 16:13:40 +08:00
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 传统模式投递消息
|
|
|
|
|
*/
|
|
|
|
|
public function index()
|
|
|
|
|
{
|
|
|
|
|
$this->service->push([
|
|
|
|
|
'group@hyperf.io',
|
|
|
|
|
'https://doc.hyperf.io',
|
|
|
|
|
'https://www.hyperf.io',
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
return 'success';
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2019-10-16 13:56:16 +08:00
|
|
|
|
#### 注解方式
|
|
|
|
|
|
|
|
|
|
框架除了传统方式投递消息,还提供了注解方式。
|
|
|
|
|
|
2020-12-21 13:22:46 +08:00
|
|
|
|
> 注解方式会在非消费环境下自动投递消息到队列,故,如果我们在队列中使用注解方式时,则不会再次投递到队列当中,而是直接在本消费进程中执行。
|
|
|
|
|
> 如果仍然需要在队列中投递消息,则可以在队列中使用传统模式投递。
|
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
让我们重写上述 `QueueService`,直接将 `ExampleJob` 的逻辑搬到 `example` 方法中,并加上对应注解 `AsyncQueueMessage`,具体代码如下。
|
2019-10-16 13:56:16 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Service;
|
|
|
|
|
|
|
|
|
|
use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;
|
|
|
|
|
|
|
|
|
|
class QueueService
|
|
|
|
|
{
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[AsyncQueueMessage]
|
2019-10-17 09:59:01 +08:00
|
|
|
|
public function example($params)
|
2019-10-16 13:56:16 +08:00
|
|
|
|
{
|
2019-10-17 09:59:01 +08:00
|
|
|
|
// 需要异步执行的代码逻辑
|
2020-11-05 16:35:24 +08:00
|
|
|
|
// 这里的逻辑会在 ConsumerProcess 进程中执行
|
2019-10-16 13:56:16 +08:00
|
|
|
|
var_dump($params);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
投递消息
|
2019-10-16 13:56:16 +08:00
|
|
|
|
|
2020-01-12 16:13:40 +08:00
|
|
|
|
注解模式投递消息就跟平常调用方法一致,代码如下。
|
2019-07-25 02:05:31 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
namespace App\Controller;
|
2019-07-25 02:05:31 +08:00
|
|
|
|
|
2019-07-25 10:05:33 +08:00
|
|
|
|
use App\Service\QueueService;
|
|
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
|
|
use Hyperf\HttpServer\Annotation\AutoController;
|
2019-07-25 02:05:31 +08:00
|
|
|
|
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[AutoController]
|
2020-12-03 15:58:26 +08:00
|
|
|
|
class QueueController extends AbstractController
|
2019-07-25 02:05:31 +08:00
|
|
|
|
{
|
2019-07-25 10:05:33 +08:00
|
|
|
|
/**
|
|
|
|
|
* @var QueueService
|
|
|
|
|
*/
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[Inject]
|
2019-07-25 10:05:33 +08:00
|
|
|
|
protected $service;
|
2019-07-25 02:05:31 +08:00
|
|
|
|
|
2019-10-29 17:03:10 +08:00
|
|
|
|
/**
|
|
|
|
|
* 注解模式投递消息
|
|
|
|
|
*/
|
|
|
|
|
public function example()
|
|
|
|
|
{
|
|
|
|
|
$this->service->example([
|
|
|
|
|
'group@hyperf.io',
|
|
|
|
|
'https://doc.hyperf.io',
|
|
|
|
|
'https://www.hyperf.io',
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
return 'success';
|
|
|
|
|
}
|
2019-03-30 15:00:10 +08:00
|
|
|
|
}
|
2019-07-04 20:10:57 +08:00
|
|
|
|
```
|
2020-02-03 10:22:52 +08:00
|
|
|
|
|
|
|
|
|
## 事件
|
|
|
|
|
|
|
|
|
|
| 事件名称 | 触发时机 | 备注 |
|
|
|
|
|
|:------------:|:-----------------------:|:----------------------------------------------------:|
|
|
|
|
|
| BeforeHandle | 处理消息前触发 | |
|
|
|
|
|
| AfterHandle | 处理消息后触发 | |
|
|
|
|
|
| FailedHandle | 处理消息失败后触发 | |
|
|
|
|
|
| RetryHandle | 重试处理消息前触发 | |
|
|
|
|
|
| QueueLength | 每处理 500 个消息后触发 | 用户可以监听此事件,判断失败或超时队列是否有消息积压 |
|
|
|
|
|
|
|
|
|
|
### QueueLengthListener
|
|
|
|
|
|
|
|
|
|
框架自带了一个记录队列长度的监听器,默认不开启,您如果需要,可以自行添加到 `listeners` 配置中。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
Hyperf\AsyncQueue\Listener\QueueLengthListener::class
|
|
|
|
|
];
|
2020-02-27 13:52:09 +08:00
|
|
|
|
```
|
|
|
|
|
|
2020-09-08 11:35:00 +08:00
|
|
|
|
### ReloadChannelListener
|
|
|
|
|
|
2021-03-30 09:29:49 +08:00
|
|
|
|
当消息执行超时,或项目重启导致消息执行被中断,最终都会被移动到 `timeout` 队列中,只要您可以保证消息执行是幂等的(同一个消息执行一次,或执行多次,最终表现一致),
|
2020-09-08 11:35:00 +08:00
|
|
|
|
就可以开启以下监听器,框架会自动将 `timeout` 队列中消息移动到 `waiting` 队列中,等待下次消费。
|
|
|
|
|
|
|
|
|
|
> 监听器监听 `QueueLength` 事件,默认执行 500 次消息后触发一次。
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
Hyperf\AsyncQueue\Listener\ReloadChannelListener::class
|
|
|
|
|
];
|
|
|
|
|
```
|
|
|
|
|
|
2020-02-27 13:52:09 +08:00
|
|
|
|
## 任务执行流转流程
|
|
|
|
|
|
|
|
|
|
任务执行流转流程主要包括以下几个队列:
|
|
|
|
|
|
|
|
|
|
| 队列名 | 备注 |
|
|
|
|
|
|:--------:|:-----------------------------------------:|
|
|
|
|
|
| waiting | 等待消费的队列 |
|
|
|
|
|
| reserved | 正在消费的队列 |
|
|
|
|
|
| delayed | 延迟消费的队列 |
|
|
|
|
|
| failed | 消费失败的队列 |
|
|
|
|
|
| timeout | 消费超时的队列 (虽然超时,但可能执行成功) |
|
|
|
|
|
|
|
|
|
|
队列流转顺序如下:
|
|
|
|
|
|
|
|
|
|
```mermaid
|
|
|
|
|
graph LR;
|
|
|
|
|
A[投递延时消息]-->C[delayed队列];
|
|
|
|
|
B[投递消息]-->D[waiting队列];
|
|
|
|
|
C--到期-->D;
|
|
|
|
|
D--消费-->E[reserved队列];
|
|
|
|
|
E--成功-->F[删除消息];
|
|
|
|
|
E--失败-->G[failed队列];
|
|
|
|
|
E--超时-->H[timeout队列];
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 配置多个异步队列
|
|
|
|
|
|
|
|
|
|
当您需要使用多个队列来区分消费高频和低频或其他种类的消息时,可以配置多个队列。
|
|
|
|
|
|
|
|
|
|
1. 添加配置
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
2020-02-03 10:22:52 +08:00
|
|
|
|
|
2020-02-27 13:52:09 +08:00
|
|
|
|
return [
|
|
|
|
|
'default' => [
|
|
|
|
|
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
|
|
|
|
|
'channel' => '{queue}',
|
|
|
|
|
'timeout' => 2,
|
|
|
|
|
'retry_seconds' => 5,
|
|
|
|
|
'handle_timeout' => 10,
|
|
|
|
|
'processes' => 1,
|
|
|
|
|
'concurrent' => [
|
|
|
|
|
'limit' => 2,
|
|
|
|
|
],
|
|
|
|
|
],
|
|
|
|
|
'other' => [
|
|
|
|
|
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
|
|
|
|
|
'channel' => '{other.queue}',
|
|
|
|
|
'timeout' => 2,
|
|
|
|
|
'retry_seconds' => 5,
|
|
|
|
|
'handle_timeout' => 10,
|
|
|
|
|
'processes' => 1,
|
|
|
|
|
'concurrent' => [
|
|
|
|
|
'limit' => 2,
|
|
|
|
|
],
|
|
|
|
|
],
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
2. 添加消费进程
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
namespace App\Process;
|
|
|
|
|
|
|
|
|
|
use Hyperf\AsyncQueue\Process\ConsumerProcess;
|
|
|
|
|
use Hyperf\Process\Annotation\Process;
|
|
|
|
|
|
2021-11-20 10:58:02 +08:00
|
|
|
|
#[Process]
|
2020-08-13 19:37:41 +08:00
|
|
|
|
class OtherConsumerProcess extends ConsumerProcess
|
2020-02-27 13:52:09 +08:00
|
|
|
|
{
|
2021-11-20 10:58:02 +08:00
|
|
|
|
protected string $queue = 'other';
|
2020-02-27 13:52:09 +08:00
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
3. 调用
|
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
use Hyperf\AsyncQueue\Driver\DriverFactory;
|
|
|
|
|
use Hyperf\Utils\ApplicationContext;
|
|
|
|
|
|
|
|
|
|
$driver = ApplicationContext::getContainer()->get(DriverFactory::class)->get('other');
|
|
|
|
|
return $driver->push(new ExampleJob());
|
|
|
|
|
```
|
2020-06-04 16:07:56 +08:00
|
|
|
|
|
|
|
|
|
## 安全关闭
|
|
|
|
|
|
2022-10-11 09:43:30 +08:00
|
|
|
|
异步队列在终止时,如果正在进行消费逻辑,可能会导致出现错误。框架提供了 `ProcessStopHandler` ,可以让异步队列进程安全关闭。
|
2020-06-04 16:07:56 +08:00
|
|
|
|
|
2020-08-26 14:27:48 +08:00
|
|
|
|
> 当前信号处理器并不适配于 CoroutineServer,如有需要请自行实现
|
|
|
|
|
|
2020-06-04 16:07:56 +08:00
|
|
|
|
安装信号处理器
|
|
|
|
|
|
2022-10-11 09:43:30 +08:00
|
|
|
|
```shell
|
2020-06-04 16:07:56 +08:00
|
|
|
|
composer require hyperf/signal
|
2022-10-11 09:43:30 +08:00
|
|
|
|
composer require hyperf/process
|
2020-06-04 16:07:56 +08:00
|
|
|
|
```
|
|
|
|
|
|
2022-08-09 17:34:58 +08:00
|
|
|
|
添加配置 `autoload/signal.php`
|
2020-06-04 16:07:56 +08:00
|
|
|
|
|
|
|
|
|
```php
|
|
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
'handlers' => [
|
2022-10-11 09:43:30 +08:00
|
|
|
|
Hyperf\Process\Handler\ProcessStopHandler::class,
|
2020-06-04 16:07:56 +08:00
|
|
|
|
],
|
|
|
|
|
'timeout' => 5.0,
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
```
|
2021-03-14 13:29:35 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 异步驱动之间的区别
|
|
|
|
|
|
|
|
|
|
- Hyperf\AsyncQueue\Driver\RedisDriver::class
|
|
|
|
|
|
|
|
|
|
此异步驱动会将整个 `JOB` 进行序列化,当投递即时队列后,会 `lpush` 到 `list` 结构中,投递延时队列,会 `zadd` 到 `zset` 结构中。
|
|
|
|
|
所以,如果 `Job` 的参数完全一致的情况,在延时队列中就会出现后投递的消息 **覆盖** 前面投递的消息的问题。
|
|
|
|
|
如果不想出现延时消息覆盖的情况,只需要在 `Job` 里增加一个唯一的 `uniqid`,或者在使用 `注解` 的方法上增加一个 `uniqid` 的入参即可。
|