Support delayed message exchange for AMQP. (#3987)

Co-authored-by: 李铭昕 <715557344@qq.com>
This commit is contained in:
Hyman 2021-08-28 17:36:30 +08:00 committed by GitHub
parent b996c3c690
commit c27f12655e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 246 additions and 0 deletions

View File

@ -6,6 +6,10 @@
- [#3979](https://github.com/hyperf/hyperf/pull/3979) Fixed bug that timeout property does not work in circuit breaker.
- [#3986](https://github.com/hyperf/hyperf/pull/3986) Fixed OSS hook failed when using `SWOOLE_HOOK_NATIVE_CURL`.
## Added
- [#3987](https://github.com/hyperf/hyperf/pull/3987) Support delayed message exchange for AMQP.
# v2.2.5 - 2021-08-23
## Fixed

View File

@ -157,6 +157,175 @@ class DemoConsumer extends ConsumerMessage
}
```
## 延时队列
AMQP 的延时队列,并不会根据延时时间进行排序,所以,一旦你投递了一个延时 10s 的任务,又往这个队列中投递了一个延时 5s 的任务,那么也一定会在第一个 10s 任务完成后,才会消费第二个 5s 的任务。
所以,需要根据时间设置不同的队列,如果想要更加灵活的延时队列,可以尝试 异步队列(async-queue) 和 AMQP 配合使用。
另外AMQP 需要下载 [延时插件](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases),并激活才能正常使用
```shell
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
```
### 生产者
使用 `gen:amqp-producer` 命令创建一个 `producer`。这里举例 `direct` 类型,其他类型如 `fanout`、`topic`,改生产者和消费者中的 `type` 即可。
```bash
php bin/hyperf.php gen:amqp-producer DelayDirectProducer
```
在 DelayDirectProducer 文件中,加入`use ProducerDelayedMessageTrait;`,示例如下:
```php
<?php
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Amqp\Message\Type;
/**
* @Producer()
*/
class DelayDirectProducer extends ProducerMessage
{
use ProducerDelayedMessageTrait;
protected $exchange = 'ext.hyperf.delay';
protected $type = Type::DIRECT;
protected $routingKey = '';
public function __construct($data)
{
$this->payload = $data;
}
}
```
### 消费者
使用 `gen:amqp-consumer` 命令创建一个 `consumer`
```bash
php bin/hyperf.php gen:amqp-consumer DelayDirectConsumer
```
`DelayDirectConsumer` 文件中,增加引入`use ProducerDelayedMessageTrait, ConsumerDelayedMessageTrait;`,示例如下:
```php
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
/**
* @Consumer(nums=1)
*/
class DelayDirectConsumer extends ConsumerMessage
{
use ProducerDelayedMessageTrait;
use ConsumerDelayedMessageTrait;
protected $exchange = 'ext.hyperf.delay';
protected $queue = 'queue.hyperf.delay';
protected $type = Type::DIRECT; //Type::FANOUT;
protected $routingKey = '';
public function consumeMessage($data, AMQPMessage $message): string
{
var_dump($data, 'delay+direct consumeTime:' . (microtime(true)));
return Result::ACK;
}
}
```
### 生产延时消息
> 以下是在 Command 中演示如何使用,具体用法请以实际为准
使用 `gen:command DelayCommand` 命令创建一个 `DelayCommand`。如下:
```php
<?php
declare(strict_types=1);
namespace App\Command;
use App\Amqp\Producer\DelayDirectProducer;
//use App\Amqp\Producer\DelayFanoutProducer;
//use App\Amqp\Producer\DelayTopicProducer;
use Hyperf\Amqp\Producer;
use Hyperf\Command\Annotation\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Utils\ApplicationContext;
use Psr\Container\ContainerInterface;
/**
* @Command
*/
class DelayCommand extends HyperfCommand
{
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
parent::__construct('demo:command');
}
public function configure()
{
parent::configure();
$this->setDescription('Hyperf Demo Command');
}
public function handle()
{
//1.delayed + direct
$message = new DelayDirectProducer('delay+direct produceTime:'.(microtime(true)));
//2.delayed + fanout
//$message = new DelayFanoutProducer('delay+fanout produceTime:'.(microtime(true)));
//3.delayed + topic
//$message = new DelayTopicProducer('delay+topic produceTime:' . (microtime(true)));
$message->setDelayMs(5000);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$producer->produce($message);
}
}
```
执行命令行生产消息
```
php bin/hyperf.php demo:command
```
### 禁止消费进程自启
默认情况下,使用了 `@Consumer` 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。

View File

@ -0,0 +1,30 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Message;
use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Wire\AMQPTable;
/**
* @method ConsumerMessage getQueue()
*/
trait ConsumerDelayedMessageTrait
{
/**
* Overwrite.
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())->setQueue((string) $this->getQueue())
->setArguments(new AMQPTable(['x-dead-letter-exchange' => 'delayed']));
}
}

View File

@ -0,0 +1,43 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Message;
use Hyperf\Amqp\Builder\ExchangeBuilder;
use PhpAmqpLib\Wire\AMQPTable;
/**
* @method ProducerMessage getExchange()
* @method ProducerMessage getType()
* @property ProducerMessage $properties
*/
trait ProducerDelayedMessageTrait
{
/**
* Set the delay time.
* @return $this
*/
public function setDelayMs(int $millisecond, string $name = 'x-delay'): self
{
$this->properties['application_headers'] = new AMQPTable([$name => $millisecond]);
return $this;
}
/**
* Overwrite.
*/
public function getExchangeBuilder(): ExchangeBuilder
{
return (new ExchangeBuilder())->setExchange((string) $this->getExchange())
->setType('x-delayed-message')
->setArguments(new AMQPTable(['x-delayed-type' => $this->getType()]));
}
}