mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-12-02 11:48:08 +08:00
commit
3f85fa8bf0
@ -39,7 +39,7 @@ return [
|
||||
|
||||
### 消费消息
|
||||
|
||||
组件已经提供了默认子进程,只需要将子进程配置到 `processes.php` 中即可。
|
||||
组件已经提供了默认子进程,只需要将它配置到 `processes.php` 中即可。
|
||||
|
||||
```php
|
||||
<?php
|
||||
@ -50,6 +50,26 @@ return [
|
||||
|
||||
```
|
||||
|
||||
当然,您也可以将以下 `Process` 添加到自己的项目中。
|
||||
|
||||
```php
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Process;
|
||||
|
||||
use Hyperf\AsyncQueue\Process\ConsumerProcess;
|
||||
use Hyperf\Process\Annotation\Process;
|
||||
|
||||
/**
|
||||
* @Process(name="async-queue")
|
||||
*/
|
||||
class AsyncQueueConsumer extends ConsumerProcess
|
||||
{
|
||||
}
|
||||
```
|
||||
|
||||
### 发布消息
|
||||
|
||||
首先我们定义一个消息,如下
|
||||
@ -59,15 +79,24 @@ return [
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Jobs;
|
||||
namespace App\Job;
|
||||
|
||||
use Hyperf\AsyncQueue\Job;
|
||||
|
||||
class ExampleJob extends Job
|
||||
{
|
||||
public $params;
|
||||
|
||||
public function __construct($params)
|
||||
{
|
||||
// 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
|
||||
$this->params = $params;
|
||||
}
|
||||
|
||||
public function handle()
|
||||
{
|
||||
var_dump('hello world');
|
||||
// 根据参数处理具体逻辑
|
||||
var_dump($this->params);
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,11 +109,13 @@ class ExampleJob extends Job
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
use Psr\Container\ContainerInterface;
|
||||
namespace App\Service;
|
||||
|
||||
use App\Job\ExampleJob;
|
||||
use Hyperf\AsyncQueue\Driver\DriverFactory;
|
||||
use Hyperf\AsyncQueue\Driver\DriverInterface;
|
||||
|
||||
class DemoService
|
||||
class QueueService
|
||||
{
|
||||
/**
|
||||
* @var DriverInterface
|
||||
@ -96,18 +127,56 @@ class DemoService
|
||||
$this->driver = $driverFactory->get('default');
|
||||
}
|
||||
|
||||
public function publish()
|
||||
/**
|
||||
* 投递消息.
|
||||
* @param $params 数据
|
||||
* @param int $delay 延时时间 单位秒
|
||||
* @return bool
|
||||
*/
|
||||
public function push($params, $delay = 0)
|
||||
{
|
||||
// 发布消息
|
||||
// 这里的 ExampleJob 是直接实例化出来的,所以不能在 Job 内使用 @Inject @Value 等注解及注解所对应功能的其它使用方式
|
||||
return $this->driver->push(new ExampleJob());
|
||||
}
|
||||
|
||||
public function delay()
|
||||
{
|
||||
// 发布延迟消息
|
||||
// 第二个参数 $delay 即为延迟的秒数
|
||||
return $this->driver->push(new ExampleJob(), 60);
|
||||
// 这里的 `ExampleJob` 会被序列化存到 Redis 中,所以内部变量最好只传入普通数据
|
||||
// 同理,如果内部使用了注解 @Value 会把对应对象一起序列化,导致消息体变大。
|
||||
// 所以这里也不推荐使用 `make` 方法来创建 `Job` 对象。
|
||||
return $this->driver->push(new ExampleJob($params), $delay);
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
根据实际业务场景,动态投递消息到异步队列执行,我们演示在控制器动态投递消息,如下:
|
||||
|
||||
```php
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Controller;
|
||||
|
||||
use App\Service\QueueService;
|
||||
use Hyperf\Di\Annotation\Inject;
|
||||
use Hyperf\HttpServer\Annotation\AutoController;
|
||||
|
||||
/**
|
||||
* @AutoController
|
||||
*/
|
||||
class QueueController extends Controller
|
||||
{
|
||||
/**
|
||||
* @Inject
|
||||
* @var QueueService
|
||||
*/
|
||||
protected $service;
|
||||
|
||||
public function index()
|
||||
{
|
||||
$this->service->push([
|
||||
'group@hyperf.io',
|
||||
'https://doc.hyperf.io',
|
||||
'https://www.hyperf.io',
|
||||
]);
|
||||
|
||||
return 'success';
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -42,13 +42,3 @@ php bin/hyperf.php di:init-proxy && composer test
|
||||
```
|
||||
php bin/hyperf.php di:init-proxy && php bin/hyperf.php start
|
||||
```
|
||||
|
||||
## Docker打包失败
|
||||
|
||||
显示 `wget: error getting response: Connection reset by peer`
|
||||
|
||||
修改我们默认的 `Dockerfile`,重新安装一下 `wget`,增加以下代码即可。
|
||||
|
||||
```
|
||||
&& apk add wget \
|
||||
```
|
Loading…
Reference in New Issue
Block a user