2019-11-01 13:54:42 +08:00
|
|
|
# NATS
|
|
|
|
|
2019-11-12 19:46:54 +08:00
|
|
|
NATS 是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的 `Publish` / `Subscribe` 模型,使用 `Golang` 语言开发。NATS 的开发哲学认为高质量的 QoS 应该在客户端构建,故只建立了 `Request-Reply`,不提供 1. 持久化 2. 事务处理 3. 增强的交付模式 4. 企业级队列。
|
2019-11-01 13:54:42 +08:00
|
|
|
|
2020-03-19 11:48:15 +08:00
|
|
|
## 安装
|
|
|
|
|
|
|
|
```bash
|
2020-04-08 16:00:59 +08:00
|
|
|
composer require hyperf/nats
|
2020-03-19 11:48:15 +08:00
|
|
|
```
|
|
|
|
|
2019-11-01 13:54:42 +08:00
|
|
|
## 使用
|
|
|
|
|
|
|
|
### 创建消费者
|
|
|
|
|
|
|
|
```
|
2020-03-19 11:48:15 +08:00
|
|
|
php bin/hyperf.php gen:nats-consumer DemoConsumer
|
2019-11-01 13:54:42 +08:00
|
|
|
```
|
|
|
|
|
|
|
|
如果设置了 `queue`,则相同的 `subject` 只会被一个 `queue` 消费。若不设置 `queue`,则每个消费者都会受到消息。
|
|
|
|
|
|
|
|
```php
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace App\Nats\Consumer;
|
|
|
|
|
|
|
|
use Hyperf\Nats\AbstractConsumer;
|
|
|
|
use Hyperf\Nats\Annotation\Consumer;
|
|
|
|
use Hyperf\Nats\Message;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @Consumer(subject="hyperf.demo", queue="hyperf.demo", name="DemoConsumer", nums=1)
|
|
|
|
*/
|
|
|
|
class DemoConsumer extends AbstractConsumer
|
|
|
|
{
|
|
|
|
public function consume(Message $payload)
|
|
|
|
{
|
|
|
|
// Do something...
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
### 投递消息
|
|
|
|
|
|
|
|
使用 publish 投递消息。
|
|
|
|
|
|
|
|
```php
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace App\Controller;
|
|
|
|
|
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
|
use Hyperf\HttpServer\Annotation\AutoController;
|
|
|
|
use Hyperf\Nats\Driver\DriverInterface;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @AutoController(prefix="nats")
|
|
|
|
*/
|
2020-12-03 15:58:26 +08:00
|
|
|
class NatsController extends AbstractController
|
2019-11-01 13:54:42 +08:00
|
|
|
{
|
|
|
|
/**
|
|
|
|
* @Inject
|
|
|
|
* @var DriverInterface
|
|
|
|
*/
|
|
|
|
protected $nats;
|
|
|
|
|
|
|
|
public function publish()
|
|
|
|
{
|
|
|
|
$res = $this->nats->publish('hyperf.demo', [
|
|
|
|
'id' => 'Hyperf',
|
|
|
|
]);
|
|
|
|
|
|
|
|
return $this->response->success($res);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
使用 request 投递消息。
|
|
|
|
|
|
|
|
```php
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace App\Controller;
|
|
|
|
|
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
|
use Hyperf\HttpServer\Annotation\AutoController;
|
|
|
|
use Hyperf\Nats\Driver\DriverInterface;
|
|
|
|
use Hyperf\Nats\Message;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @AutoController(prefix="nats")
|
|
|
|
*/
|
2020-12-03 15:58:26 +08:00
|
|
|
class NatsController extends AbstractController
|
2019-11-01 13:54:42 +08:00
|
|
|
{
|
|
|
|
/**
|
|
|
|
* @Inject
|
|
|
|
* @var DriverInterface
|
|
|
|
*/
|
|
|
|
protected $nats;
|
|
|
|
|
|
|
|
public function request()
|
|
|
|
{
|
|
|
|
$res = $this->nats->request('hyperf.reply', [
|
|
|
|
'id' => 'limx',
|
|
|
|
], function (Message $payload) {
|
|
|
|
var_dump($payload->getBody());
|
|
|
|
});
|
|
|
|
|
|
|
|
return $this->response->success($res);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
使用 requestSync 投递消息。
|
|
|
|
|
|
|
|
```php
|
|
|
|
<?php
|
|
|
|
|
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace App\Controller;
|
|
|
|
|
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
|
use Hyperf\HttpServer\Annotation\AutoController;
|
|
|
|
use Hyperf\Nats\Driver\DriverInterface;
|
|
|
|
use Hyperf\Nats\Message;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @AutoController(prefix="nats")
|
|
|
|
*/
|
2020-12-03 15:58:26 +08:00
|
|
|
class NatsController extends AbstractController
|
2019-11-01 13:54:42 +08:00
|
|
|
{
|
|
|
|
/**
|
|
|
|
* @Inject
|
|
|
|
* @var DriverInterface
|
|
|
|
*/
|
|
|
|
protected $nats;
|
|
|
|
|
|
|
|
public function sync()
|
|
|
|
{
|
|
|
|
/** @var Message $message */
|
|
|
|
$message = $this->nats->requestSync('hyperf.reply', [
|
|
|
|
'id' => 'limx',
|
|
|
|
]);
|
|
|
|
|
|
|
|
return $this->response->success($message->getBody());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|