hyperf/doc/zh-hk/async-queue.md

320 lines
7.2 KiB
Markdown
Raw Normal View History

2019-12-12 16:24:04 +08:00
# 異步隊列
2019-03-30 15:00:10 +08:00
2019-12-12 16:24:04 +08:00
異步隊列區別於 `RabbitMQ` `Kafka` 等消息隊列,它只提供一種 `異步處理``異步延時處理` 的能力,並 **不能** 嚴格地保證消息的持久化和 **不支持** ACK 應答機制。
2019-03-30 15:00:10 +08:00
2019-12-12 16:24:04 +08:00
## 安裝
2019-03-30 22:53:32 +08:00
```bash
2019-04-10 14:28:15 +08:00
composer require hyperf/async-queue
2019-03-30 22:53:32 +08:00
```
2019-03-30 15:00:10 +08:00
## 配置
2019-12-12 16:24:04 +08:00
配置文件位於 `config/autoload/async_queue.php`,如文件不存在可自行創建。
2019-07-04 20:10:57 +08:00
2019-12-12 16:24:04 +08:00
> 暫時只支持 `Redis Driver` 驅動。
2019-03-30 15:00:10 +08:00
2019-12-12 16:24:04 +08:00
| 配置 | 類型 | 默認值 | 備註 |
2019-10-08 14:37:27 +08:00
|:----------------:|:---------:|:-------------------------------------------:|:---------------------------------------:|
2019-12-12 16:24:04 +08:00
| driver | string | Hyperf\AsyncQueue\Driver\RedisDriver::class | 無 |
| channel | string | queue | 隊列前綴 |
| timeout | int | 2 | pop 消息的超時時間 |
| retry_seconds | int,array | 5 | 失敗後重新嘗試間隔 |
| handle_timeout | int | 10 | 消息處理超時時間 |
| processes | int | 1 | 消費進程數 |
| concurrent.limit | int | 1 | 同時處理消息數 |
| max_messages | int | 0 | 進程重啟所需最大處理的消息數 默認不重啟 |
2019-03-30 15:00:10 +08:00
```php
<?php
return [
'default' => [
2019-04-10 14:28:15 +08:00
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
2019-03-30 15:00:10 +08:00
'channel' => 'queue',
2019-10-08 14:37:27 +08:00
'timeout' => 2,
2019-03-30 15:00:10 +08:00
'retry_seconds' => 5,
2019-10-08 14:37:27 +08:00
'handle_timeout' => 10,
2019-03-30 15:00:10 +08:00
'processes' => 1,
2019-10-08 14:37:27 +08:00
'concurrent' => [
'limit' => 5,
],
2019-03-30 15:00:10 +08:00
],
];
```
2019-12-12 16:24:04 +08:00
`retry_seconds` 也可以傳入數組,根據重試次數相應修改重試時間,例如
2019-09-20 17:32:53 +08:00
```php
<?php
return [
'default' => [
'driver' => Hyperf\AsyncQueue\Driver\RedisDriver::class,
'channel' => 'queue',
'retry_seconds' => [1, 5, 10, 20],
'processes' => 1,
],
];
```
2019-03-30 15:00:10 +08:00
## 使用
2019-12-12 16:24:04 +08:00
### 消費消息
2019-03-30 15:00:10 +08:00
2019-12-12 16:24:04 +08:00
組件已經提供了默認子進程,只需要將它配置到 `config/autoload/processes.php` 中即可。
2019-03-30 15:00:10 +08:00
```php
<?php
return [
2019-04-10 14:28:15 +08:00
Hyperf\AsyncQueue\Process\ConsumerProcess::class,
2019-03-30 15:00:10 +08:00
];
```
2019-12-12 16:24:04 +08:00
當然,您也可以將以下 `Process` 添加到自己的項目中。
2019-07-25 10:05:33 +08:00
```php
<?php
declare(strict_types=1);
namespace App\Process;
use Hyperf\AsyncQueue\Process\ConsumerProcess;
use Hyperf\Process\Annotation\Process;
/**
* @Process(name="async-queue")
*/
class AsyncQueueConsumer extends ConsumerProcess
{
}
```
2019-12-12 16:24:04 +08:00
### 生產消息
2019-03-30 15:00:10 +08:00
2019-12-12 16:24:04 +08:00
#### 傳統方式
2019-10-16 13:56:16 +08:00
2020-01-16 14:33:32 +08:00
這種模式會把對象直接序列化然後存到 `Redis` 等隊列中,所以為了保證序列化後的體積,儘量不要將 `Container``Config` 等設置為成員變量。
比如以下 `Job` 的定義,是 **不可取**
```php
<?php
declare(strict_types=1);
namespace App\Job;
use Hyperf\AsyncQueue\Job;
use Psr\Container\ContainerInterface;
class ExampleJob extends Job
{
public $container;
public $params;
public function __construct(ContainerInterface $container, $params)
{
$this->container = $container;
$this->params = $params;
}
public function handle()
{
// 根據參數處理具體邏輯
var_dump($this->params);
}
}
$job = make(ExampleJob::class);
```
正確的 `Job` 應該是隻有需要處理的數據,其他相關數據,可以在 `handle` 方法中重新獲取,如下。
2019-03-30 15:00:10 +08:00
```php
<?php
declare(strict_types=1);
2019-07-25 10:05:33 +08:00
namespace App\Job;
2019-03-30 15:00:10 +08:00
2019-04-10 14:28:15 +08:00
use Hyperf\AsyncQueue\Job;
2019-03-30 15:00:10 +08:00
class ExampleJob extends Job
{
2019-07-25 10:05:33 +08:00
public $params;
2019-07-25 10:05:33 +08:00
public function __construct($params)
{
2019-12-12 16:24:04 +08:00
// 這裏最好是普通數據,不要使用攜帶 IO 的對象,比如 PDO 對象
2019-07-25 10:05:33 +08:00
$this->params = $params;
}
2019-07-25 10:05:33 +08:00
2019-03-30 15:00:10 +08:00
public function handle()
{
2019-12-12 16:24:04 +08:00
// 根據參數處理具體邏輯
2020-01-16 14:33:32 +08:00
// 通過具體參數獲取模型等
var_dump($this->params);
2019-03-30 15:00:10 +08:00
}
}
```
2020-01-16 14:33:32 +08:00
正確定義完 `Job` 後,我們需要寫一個專門投遞消息的 `Service`,代碼如下。
2019-03-30 15:00:10 +08:00
```php
<?php
declare(strict_types=1);
2019-07-25 10:05:33 +08:00
namespace App\Service;
use App\Job\ExampleJob;
2019-04-10 14:28:15 +08:00
use Hyperf\AsyncQueue\Driver\DriverFactory;
2019-07-04 17:41:44 +08:00
use Hyperf\AsyncQueue\Driver\DriverInterface;
2019-03-30 15:00:10 +08:00
2019-07-25 10:05:33 +08:00
class QueueService
2019-03-30 15:00:10 +08:00
{
2019-07-04 17:41:44 +08:00
/**
* @var DriverInterface
*/
2019-03-30 15:00:10 +08:00
protected $driver;
2019-05-13 02:25:03 +08:00
public function __construct(DriverFactory $driverFactory)
2019-03-30 15:00:10 +08:00
{
2019-05-13 02:25:03 +08:00
$this->driver = $driverFactory->get('default');
2019-03-30 15:00:10 +08:00
}
2019-07-25 10:05:33 +08:00
/**
2019-12-12 16:24:04 +08:00
* 生產消息.
* @param $params 數據
* @param int $delay 延時時間 單位秒
2019-07-25 10:05:33 +08:00
*/
2019-07-29 00:19:34 +08:00
public function push($params, int $delay = 0): bool
2019-07-04 17:41:44 +08:00
{
2019-12-12 16:24:04 +08:00
// 這裏的 `ExampleJob` 會被序列化存到 Redis 中,所以內部變量最好只傳入普通數據
// 同理,如果內部使用了註解 @Value 會把對應對象一起序列化,導致消息體變大。
// 所以這裏也不推薦使用 `make` 方法來創建 `Job` 對象。
2019-07-25 10:05:33 +08:00
return $this->driver->push(new ExampleJob($params), $delay);
}
}
```
2020-01-16 14:33:32 +08:00
投遞消息
接下來,調用我們的 `QueueService` 投遞消息即可。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use App\Service\QueueService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
/**
* @AutoController
*/
class QueueController extends Controller
{
/**
* @Inject
* @var QueueService
*/
protected $service;
/**
* 傳統模式投遞消息
*/
public function index()
{
$this->service->push([
'group@hyperf.io',
'https://doc.hyperf.io',
'https://www.hyperf.io',
]);
return 'success';
}
}
```
2019-12-12 16:24:04 +08:00
#### 註解方式
2019-10-16 13:56:16 +08:00
2019-12-12 16:24:04 +08:00
框架除了傳統方式投遞消息,還提供了註解方式。
2019-10-16 13:56:16 +08:00
2020-01-16 14:33:32 +08:00
讓我們重寫上述 `QueueService`,直接將 `ExampleJob` 的邏輯搬到 `example` 方法中,並加上對應註解 `AsyncQueueMessage`,具體代碼如下。
2019-10-16 13:56:16 +08:00
```php
<?php
declare(strict_types=1);
namespace App\Service;
use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;
class QueueService
{
/**
* @AsyncQueueMessage
*/
2019-10-17 09:59:01 +08:00
public function example($params)
2019-10-16 13:56:16 +08:00
{
2019-12-12 16:24:04 +08:00
// 需要異步執行的代碼邏輯
2019-10-16 13:56:16 +08:00
var_dump($params);
}
}
```
2020-01-16 14:33:32 +08:00
投遞消息
2019-10-16 13:56:16 +08:00
2020-01-16 14:33:32 +08:00
註解模式投遞消息就跟平常調用方法一致,代碼如下。
```php
<?php
declare(strict_types=1);
2019-07-25 10:05:33 +08:00
namespace App\Controller;
2019-07-25 10:05:33 +08:00
use App\Service\QueueService;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
2019-07-25 10:05:33 +08:00
/**
* @AutoController
*/
class QueueController extends Controller
{
2019-07-25 10:05:33 +08:00
/**
* @Inject
* @var QueueService
*/
protected $service;
2019-10-29 17:03:10 +08:00
/**
2019-12-12 16:24:04 +08:00
* 註解模式投遞消息
2019-10-29 17:03:10 +08:00
*/
public function example()
{
$this->service->example([
'group@hyperf.io',
'https://doc.hyperf.io',
'https://www.hyperf.io',
]);
return 'success';
}
2019-03-30 15:00:10 +08:00
}
2019-07-04 20:10:57 +08:00
```