Merge pull request #820 from limingxinleo/1.1-nats

Added nats component.
This commit is contained in:
李铭昕 2019-11-07 09:53:10 +08:00 committed by GitHub
commit 35d5fbc78a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 3014 additions and 11 deletions

View File

@ -5,11 +5,11 @@ sudo: required
matrix:
include:
- php: 7.2
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
- php: 7.3
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
- php: master
env: SW_VERSION="4.4.8"
env: SW_VERSION="4.4.12"
allow_failures:
- php: master
@ -38,6 +38,6 @@ before_script:
- composer config -g process-timeout 900 && composer update
script:
- composer analyse src/di src/json-rpc src/tracer src/metric
- composer analyse src/di src/json-rpc src/tracer src/metric src/nats
- composer test -- --exclude-group NonCoroutine
- vendor/bin/phpunit --group NonCoroutine

View File

@ -3,6 +3,7 @@
## Added
- [#812](https://github.com/hyperf/hyperf/pull/812) Added singleton crontab task support.
- [#820](https://github.com/hyperf/hyperf/pull/820) Added nats component.
- [#832](https://github.com/hyperf/hyperf/pull/832) Added `Hyperf\Utils\Codec\Json`.
- [#833](https://github.com/hyperf/hyperf/pull/833) Added `Hyperf\Utils\Backoff`.
- [#852](https://github.com/hyperf/hyperf/pull/852) Added a `clear()` method for `Hyperf\Utils\Parallel` to clear adde callbacks.

View File

@ -25,7 +25,7 @@ foreach ($files as $file) {
$composerJson = json_decode(file_get_contents($file), true);
foreach ($composerJson['autoload']['files'] ?? [] as $file) {
$autoloadFiles[] = "src/{$component}/". preg_replace('#^./#', '', $file);
$autoloadFiles[] = "src/{$component}/" . preg_replace('#^./#', '', $file);
}
foreach ($composerJson['autoload']['psr-4'] ?? [] as $ns => $dir) {
$autoload[$ns] = "src/{$component}/" . trim($dir, '/') . '/';
@ -51,5 +51,5 @@ $json->extra->hyperf->config = $configProviders;
file_put_contents(
__DIR__ . '/../composer.json',
json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT)
json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT) . PHP_EOL
);

View File

@ -18,10 +18,12 @@
"doctrine/instantiator": "^1.0",
"egulias/email-validator": "^2.1",
"elasticsearch/elasticsearch": "^6.1",
"endclothing/prometheus_client_php": "^0.9.1",
"fig/http-message-util": "^1.1.2",
"google/protobuf": "^3.6.1",
"grpc/grpc": "^1.15",
"guzzlehttp/guzzle": "^6.3",
"ircmaxell/random-lib": "^1.2",
"jcchavezs/zipkin-opentracing": "^0.1.2",
"jean85/pretty-package-versions": "^1.2",
"monolog/monolog": "^1.24",
@ -40,8 +42,7 @@
"start-point/etcd-php": "^1.1",
"symfony/console": "^4.2",
"symfony/finder": "^4.1",
"vlucas/phpdotenv": "^3.1",
"endclothing/prometheus_client_php": "^0.9.1"
"vlucas/phpdotenv": "^3.1"
},
"require-dev": {
"doctrine/common": "@stable",
@ -114,6 +115,7 @@
"files": [
"src/config/src/Functions.php",
"src/di/src/Functions.php",
"src/nats/src/Functions.php",
"src/translation/src/Functions.php",
"src/utils/src/Functions.php"
],
@ -155,6 +157,7 @@
"Hyperf\\Metric\\": "src/metric/src/",
"Hyperf\\ModelCache\\": "src/model-cache/src/",
"Hyperf\\ModelListener\\": "src/model-listener/src/",
"Hyperf\\Nats\\": "src/nats/src/",
"Hyperf\\Paginator\\": "src/paginator/src/",
"Hyperf\\Pool\\": "src/pool/src/",
"Hyperf\\Process\\": "src/process/src/",
@ -216,6 +219,7 @@
"HyperfTest\\Metric\\": "src/metric/tests/",
"HyperfTest\\ModelCache\\": "src/model-cache/tests/",
"HyperfTest\\ModelListener\\": "src/model-listener/tests/",
"HyperfTest\\Nats\\": "src/nats/tests/",
"HyperfTest\\Paginator\\": "src/paginator/tests/",
"HyperfTest\\Pool\\": "src/pool/tests/",
"HyperfTest\\Process\\": "src/process/tests/",
@ -271,6 +275,7 @@
"Hyperf\\Metric\\ConfigProvider",
"Hyperf\\ModelCache\\ConfigProvider",
"Hyperf\\ModelListener\\ConfigProvider",
"Hyperf\\Nats\\ConfigProvider",
"Hyperf\\Paginator\\ConfigProvider",
"Hyperf\\Pool\\ConfigProvider",
"Hyperf\\Process\\ConfigProvider",
@ -308,4 +313,4 @@
},
"minimum-stability": "dev",
"prefer-stable": true
}
}

151
doc/zh/nats.md Normal file
View File

@ -0,0 +1,151 @@
# NATS
NATS是一个开源、轻量级、高性能的分布式消息中间件实现了高可伸缩性和优雅的 `Publish` / `Subscribe` 模型,使用 `Golang` 语言开发。NATS的开发哲学认为高质量的QoS应该在客户端构建故只建立了 `Request-Reply`,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列。
## 使用
### 创建消费者
```
$ php bin/hyperf.php gen:nats-consumer DemoConsumer
```
如果设置了 `queue`,则相同的 `subject` 只会被一个 `queue` 消费。若不设置 `queue`,则每个消费者都会受到消息。
```php
<?php
declare(strict_types=1);
namespace App\Nats\Consumer;
use Hyperf\Nats\AbstractConsumer;
use Hyperf\Nats\Annotation\Consumer;
use Hyperf\Nats\Message;
/**
* @Consumer(subject="hyperf.demo", queue="hyperf.demo", name="DemoConsumer", nums=1)
*/
class DemoConsumer extends AbstractConsumer
{
public function consume(Message $payload)
{
// Do something...
}
}
```
### 投递消息
使用 publish 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function publish()
{
$res = $this->nats->publish('hyperf.demo', [
'id' => 'Hyperf',
]);
return $this->response->success($res);
}
}
```
使用 request 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
use Hyperf\Nats\Message;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function request()
{
$res = $this->nats->request('hyperf.reply', [
'id' => 'limx',
], function (Message $payload) {
var_dump($payload->getBody());
});
return $this->response->success($res);
}
}
```
使用 requestSync 投递消息。
```php
<?php
declare(strict_types=1);
namespace App\Controller;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\AutoController;
use Hyperf\Nats\Driver\DriverInterface;
use Hyperf\Nats\Message;
/**
* @AutoController(prefix="nats")
*/
class NatsController extends Controller
{
/**
* @Inject
* @var DriverInterface
*/
protected $nats;
public function sync()
{
/** @var Message $message */
$message = $this->nats->requestSync('hyperf.reply', [
'id' => 'limx',
]);
return $this->response->success($message->getBody());
}
}
```

View File

@ -83,6 +83,7 @@
* [Task 机制](zh/task.md)
* [枚举类](zh/constants.md)
* [Snowflake](zh/snowflake.md)
* [Nats](zh/nats.md)
* 应用部署

View File

@ -32,6 +32,6 @@ class AmqpConsumerCommand extends GeneratorCommand
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumers';
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumer';
}
}

View File

@ -32,6 +32,6 @@ class AmqpProducerCommand extends GeneratorCommand
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producers';
return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producer';
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Devtool\Generator;
use Hyperf\Command\Annotation\Command;
/**
* @Command
*/
class NatsConsumerCommand extends GeneratorCommand
{
public function __construct()
{
parent::__construct('gen:nats-consumer');
$this->setDescription('Create a new nats consumer class');
}
protected function getStub(): string
{
return $this->getConfig()['stub'] ?? __DIR__ . '/stubs/nats-consumer.stub';
}
protected function getDefaultNamespace(): string
{
return $this->getConfig()['namespace'] ?? 'App\\Nats\\Consumer';
}
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace %NAMESPACE%;
use Hyperf\Nats\AbstractConsumer;
use Hyperf\Nats\Annotation\Consumer;
use Hyperf\Nats\Message;
/**
* @Consumer(subject="hyperf", queue="hyperf", name ="%CLASS%", nums=1)
*/
class %CLASS% extends AbstractConsumer
{
public function consume(Message $payload)
{
var_dump($payload->getBody());
}
}

1
src/nats/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
/tests export-ignore

21
src/nats/LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) Hyperf
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

59
src/nats/composer.json Normal file
View File

@ -0,0 +1,59 @@
{
"name": "hyperf/nats",
"description": "A nats library for Hyperf.",
"license": "MIT",
"keywords": [
"php",
"swoole",
"hyperf",
"nats"
],
"support": {
},
"require": {
"php": ">=7.2",
"hyperf/contract": "~1.1.0",
"hyperf/pool": "~1.1.0",
"hyperf/utils": "~1.1.0",
"ircmaxell/random-lib": "^1.2",
"psr/container": "^1.0"
},
"require-dev": {
"malukenho/docheader": "^0.1.6",
"mockery/mockery": "^1.0",
"phpunit/phpunit": "^7.0.0",
"friendsofphp/php-cs-fixer": "^2.9"
},
"suggest": {
},
"autoload": {
"psr-4": {
"Hyperf\\Nats\\": "src/"
},
"files": [
"src/Functions.php"
]
},
"autoload-dev": {
"psr-4": {
"HyperfTest\\Nats\\": "tests/"
}
},
"config": {
"sort-packages": true
},
"extra": {
"branch-alias": {
"dev-master": "1.1-dev"
},
"hyperf": {
"config": "Hyperf\\Nats\\ConfigProvider"
}
},
"bin": [
],
"scripts": {
"cs-fix": "php-cs-fixer fix $1",
"test": "phpunit --colors=always"
}
}

34
src/nats/publish/nats.php Normal file
View File

@ -0,0 +1,34 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
return [
'default' => [
'driver' => Hyperf\Nats\Driver\NatsDriver::class,
'encoder' => Hyperf\Nats\Encoders\JSONEncoder::class,
'timeout' => 10.0,
'options' => [
'host' => '127.0.0.1',
'port' => 4222,
'user' => 'nats',
'pass' => 'nats',
'lang' => 'php',
],
'pool' => [
'min_connections' => 1,
'max_connections' => 10,
'connect_timeout' => 10.0,
'wait_timeout' => 3.0,
'heartbeat' => -1,
'max_idle_time' => 60,
],
],
];

View File

@ -0,0 +1,110 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Psr\Container\ContainerInterface;
abstract class AbstractConsumer
{
/**
* @var string
*/
public $pool = 'default';
/**
* @var string
*/
protected $subject = '';
/**
* @var string
*/
protected $queue = '';
/**
* @var string
*/
protected $name = 'NatsConsumer';
/**
* @var int
*/
protected $nums = 1;
/**
* @var ContainerInterface
*/
protected $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
abstract public function consume(Message $payload);
public function getSubject(): string
{
return $this->subject;
}
public function setSubject(string $subject): self
{
$this->subject = $subject;
return $this;
}
public function getQueue(): string
{
return $this->queue;
}
public function setQueue(string $queue): self
{
$this->queue = $queue;
return $this;
}
public function getName(): string
{
return $this->name;
}
public function setName(string $name): self
{
$this->name = $name;
return $this;
}
public function getNums(): int
{
return $this->nums;
}
public function setNums(int $nums): self
{
$this->nums = $nums;
return $this;
}
public function getPool(): string
{
return $this->pool;
}
public function setPool(string $pool): self
{
$this->pool = $pool;
return $this;
}
}

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
/**
* @Annotation
* @Target({"CLASS"})
*/
class Consumer extends AbstractAnnotation
{
/**
* @var string
*/
public $subject = '';
/**
* @var string
*/
public $queue = '';
/**
* @var string
*/
public $name = '';
/**
* @var int
*/
public $nums = 1;
/**
* @var string
*/
public $pool = '';
}

View File

@ -0,0 +1,47 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Nats\Driver\DriverFactory;
use Hyperf\Nats\Driver\DriverInterface;
use Psr\Container\ContainerInterface;
class ConfigProvider
{
public function __invoke(): array
{
return [
'dependencies' => [
DriverInterface::class => function (ContainerInterface $container) {
$factory = $container->get(DriverFactory::class);
return $factory->get('default');
},
],
'annotations' => [
'scan' => [
'paths' => [
__DIR__,
],
],
],
'publish' => [
[
'id' => 'config',
'description' => 'The config for amqp.',
'source' => __DIR__ . '/../publish/nats.php',
'destination' => BASE_PATH . '/config/autoload/nats.php',
],
],
];
}
}

612
src/nats/src/Connection.php Normal file
View File

@ -0,0 +1,612 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use RandomLib\Factory;
use RandomLib\Generator;
/**
* Connection Class.
*
* Handles the connection to a NATS server or cluster of servers.
*/
class Connection
{
/**
* Show DEBUG info?
*
* @var bool if debug is enabled
*/
private $debug = false;
/**
* Number of PINGs.
*
* @var int number of pings
*/
private $pings = 0;
/**
* Chunk size in bytes to use when reading an stream of data.
*
* @var int size of chunk
*/
private $chunkSize = 1500;
/**
* Number of messages published.
*
* @var int number of messages
*/
private $pubs = 0;
/**
* Number of reconnects to the server.
*
* @var int Number of reconnects
*/
private $reconnects = 0;
/**
* List of available subscriptions.
*
* @var array list of subscriptions
*/
private $subscriptions = [];
/**
* Connection options object.
*
* @var null|ConnectionOptions
*/
private $options;
/**
* Connection timeout.
*
* @var float
*/
private $timeout;
/**
* Stream File Pointer.
*
* @var mixed Socket file pointer
*/
private $streamSocket;
/**
* Generator object.
*
* @var Generator|Php71RandomGenerator
*/
private $randomGenerator;
/**
* Server information.
*
* @var mixed
*/
private $serverInfo;
/**
* Constructor.
*
* @param ConnectionOptions $options connection options object
*/
public function __construct(ConnectionOptions $options = null)
{
$this->pings = 0;
$this->pubs = 0;
$this->subscriptions = [];
$this->options = $options;
if (version_compare(phpversion(), '7.0', '>') === true) {
$this->randomGenerator = new Php71RandomGenerator();
} else {
$randomFactory = new Factory();
$this->randomGenerator = $randomFactory->getLowStrengthGenerator();
}
if ($options === null) {
$this->options = new ConnectionOptions();
}
}
/**
* Enable or disable debug mode.
*
* @param bool $debug if debug is enabled
*/
public function setDebug($debug)
{
$this->debug = $debug;
}
/**
* Return the number of pings.
*
* @return int Number of pings
*/
public function pingsCount()
{
return $this->pings;
}
/**
* Return the number of messages published.
*
* @return int number of messages published
*/
public function pubsCount()
{
return $this->pubs;
}
/**
* Return the number of reconnects to the server.
*
* @return int number of reconnects
*/
public function reconnectsCount()
{
return $this->reconnects;
}
/**
* Return the number of subscriptions available.
*
* @return int number of subscription
*/
public function subscriptionsCount()
{
return count($this->subscriptions);
}
/**
* Return subscriptions list.
*
* @return array list of subscription ids
*/
public function getSubscriptions()
{
return array_keys($this->subscriptions);
}
/**
* Sets the chunck size in bytes to be processed when reading.
*
* @param int $chunkSize set byte chunk len to read when reading from wire
*/
public function setChunkSize($chunkSize)
{
$this->chunkSize = $chunkSize;
}
/**
* Set Stream Timeout.
*
* @param float $seconds before timeout on stream
*
* @return bool
*/
public function setStreamTimeout($seconds)
{
if ($this->isConnected() === true) {
if (is_numeric($seconds) === true) {
try {
$timeout = (float) number_format($seconds, 3);
$seconds = floor($timeout);
$microseconds = (($timeout - $seconds) * 1000);
return stream_set_timeout($this->streamSocket, $seconds, $microseconds);
} catch (\Exception $e) {
return false;
}
}
}
return false;
}
/**
* Returns an stream socket for this connection.
*
* @return resource
*/
public function getStreamSocket()
{
return $this->streamSocket;
}
/**
* Checks if the client is connected to a server.
*
* @return bool
*/
public function isConnected()
{
return isset($this->streamSocket);
}
/**
* Returns current connected server ID.
*
* @return string server ID
*/
public function connectedServerID()
{
return $this->serverInfo->getServerID();
}
/**
* Connect to server.
*
* @param float $timeout number of seconds until the connect() system call should timeout
*
* @throws \Throwable exception raised if connection fails
*/
public function connect($timeout = null)
{
if ($timeout === null) {
$timeout = intval(ini_get('default_socket_timeout'));
}
$this->timeout = $timeout;
$this->streamSocket = $this->getStream($this->options->getAddress(), $timeout);
$this->setStreamTimeout($timeout);
$msg = 'CONNECT ' . $this->options;
$this->send($msg);
$connectResponse = $this->receive();
if ($this->isErrorResponse($connectResponse) === true) {
throw Exception::forFailedConnection($connectResponse);
}
$this->processServerInfo($connectResponse);
$this->ping();
$pingResponse = $this->receive();
if ($this->isErrorResponse($pingResponse) === true) {
throw Exception::forFailedPing($pingResponse);
}
}
/**
* Sends PING message.
*/
public function ping()
{
$msg = 'PING';
$this->send($msg);
++$this->pings;
}
/**
* Request does a request and executes a callback with the response.
*
* @param string $subject message topic
* @param string $payload message data
* @param \Closure $callback closure to be executed as callback
*/
public function request($subject, $payload, \Closure $callback)
{
$inbox = uniqid('_INBOX.');
$sid = $this->subscribe(
$inbox,
$callback
);
$this->unsubscribe($sid, 1);
$this->publish($subject, $payload, $inbox);
$this->wait(1);
}
/**
* Subscribes to an specific event given a subject.
*
* @param string $subject message topic
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function subscribe($subject, \Closure $callback)
{
$sid = $this->randomGenerator->generateString(16);
$msg = 'SUB ' . $subject . ' ' . $sid;
$this->send($msg);
$this->subscriptions[$sid] = $callback;
return $sid;
}
/**
* Subscribes to an specific event given a subject and a queue.
*
* @param string $subject message topic
* @param string $queue queue name
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function queueSubscribe($subject, $queue, \Closure $callback)
{
$sid = $this->randomGenerator->generateString(16);
$msg = 'SUB ' . $subject . ' ' . $queue . ' ' . $sid;
$this->send($msg);
$this->subscriptions[$sid] = $callback;
return $sid;
}
/**
* Unsubscribe from a event given a subject.
*
* @param string $sid subscription ID
* @param int $quantity quantity of messages
*/
public function unsubscribe($sid, $quantity = null)
{
$msg = 'UNSUB ' . $sid;
if ($quantity !== null) {
$msg = $msg . ' ' . $quantity;
}
$this->send($msg);
if ($quantity === null) {
unset($this->subscriptions[$sid]);
}
}
/**
* Publish publishes the data argument to the given subject.
*
* @param string $subject message topic
* @param string $payload message data
* @param string $inbox message inbox
*
* @throws Exception if subscription not found
*/
public function publish($subject, $payload = null, $inbox = null)
{
$msg = 'PUB ' . $subject;
if ($inbox !== null) {
$msg = $msg . ' ' . $inbox;
}
$msg = $msg . ' ' . strlen($payload);
$this->send($msg . "\r\n" . $payload);
++$this->pubs;
}
/**
* Waits for messages.
*
* @param int $quantity number of messages to wait for
*
* @return null|Connection $connection Connection object
*/
public function wait($quantity = 0)
{
$count = 0;
$info = stream_get_meta_data($this->streamSocket);
while (is_resource($this->streamSocket) === true && feof($this->streamSocket) === false && empty($info['timed_out']) === true) {
$line = $this->receive();
if ($line === false) {
return null;
}
if (strpos($line, 'PING') === 0) {
$this->handlePING();
}
if (strpos($line, 'MSG') === 0) {
++$count;
$this->handleMSG($line);
if (($quantity !== 0) && ($count >= $quantity)) {
return $this;
}
}
$info = stream_get_meta_data($this->streamSocket);
}
$this->close();
return $this;
}
/**
* Reconnects to the server.
*/
public function reconnect()
{
++$this->reconnects;
$this->close();
$this->connect($this->timeout);
}
/**
* Close will close the connection to the server.
*/
public function close()
{
if ($this->streamSocket === null) {
return;
}
fclose($this->streamSocket);
$this->streamSocket = null;
}
/**
* Indicates whether $response is an error response.
*
* @param string $response the Nats Server response
*
* @return bool
*/
private function isErrorResponse($response)
{
return substr($response, 0, 4) === '-ERR';
}
/**
* Returns an stream socket to the desired server.
*
* @param string $address server url string
* @param float $timeout number of seconds until the connect() system call should timeout
*
* @throws \Exception exception raised if connection fails
* @return resource
*/
private function getStream($address, $timeout)
{
$errno = null;
$errstr = null;
$fp = stream_socket_client($address, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT);
if ($fp === false) {
throw Exception::forStreamSocketClientError($errstr, $errno);
}
$timeout = (float) number_format($timeout, 3);
$seconds = floor($timeout);
$microseconds = (($timeout - $seconds) * 1000);
stream_set_timeout($fp, $seconds, $microseconds);
return $fp;
}
/**
* Process information returned by the server after connection.
*
* @param string $connectionResponse INFO message
*/
private function processServerInfo($connectionResponse)
{
$this->serverInfo = new ServerInfo($connectionResponse);
}
/**
* Sends data thought the stream.
*
* @param string $payload message data
*
* @throws \Exception raises if fails sending data
*/
private function send($payload)
{
$msg = $payload . "\r\n";
$len = strlen($msg);
while (true) {
$written = @fwrite($this->streamSocket, $msg);
if ($written === false) {
throw new \Exception('Error sending data');
}
if ($written === 0) {
throw new \Exception('Broken pipe or closed connection');
}
$len = ($len - $written);
if ($len > 0) {
$msg = substr($msg, (0 - $len));
} else {
break;
}
}
if ($this->debug === true) {
printf('>>>> %s', $msg);
}
}
/**
* Receives a message thought the stream.
*
* @param int $len number of bytes to receive
*
* @return string
*/
private function receive(int $len = 0)
{
if ($len > 0) {
$chunkSize = $this->chunkSize;
$line = null;
$receivedBytes = 0;
while ($receivedBytes < $len) {
$bytesLeft = ($len - $receivedBytes);
if ($bytesLeft < $this->chunkSize) {
$chunkSize = $bytesLeft;
}
$readChunk = fread($this->streamSocket, $chunkSize);
$receivedBytes += strlen($readChunk);
$line .= $readChunk;
}
} else {
$line = fgets($this->streamSocket);
}
if ($this->debug === true) {
printf('<<<< %s\r\n', $line);
}
return $line;
}
/**
* Handles PING command.
*/
private function handlePING()
{
$this->send('PONG');
}
/**
* Handles MSG command.
*
* @param string $line message command from Nats
*
* @throws Exception if subscription not found
* @codeCoverageIgnore
*/
private function handleMSG($line)
{
$parts = explode(' ', $line);
$subject = null;
$length = trim($parts[3]);
$sid = $parts[2];
if (count($parts) === 5) {
$length = trim($parts[4]);
$subject = $parts[3];
} elseif (count($parts) === 4) {
$length = trim($parts[3]);
$subject = $parts[1];
}
$payload = $this->receive((int) $length);
$msg = new Message($subject, $payload, $sid, $this);
if (isset($this->subscriptions[$sid]) === false) {
throw Exception::forSubscriptionNotFound($sid);
}
$func = $this->subscriptions[$sid];
if (is_callable($func) === true) {
$func($msg);
} else {
throw Exception::forSubscriptionCallbackInvalid($sid);
}
}
}

View File

@ -0,0 +1,448 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Traversable;
/**
* ConnectionOptions Class.
*/
class ConnectionOptions
{
/**
* Hostname or IP to connect.
*
* @var string
*/
private $host = 'localhost';
/**
* Port number to connect.
*
* @var int
*/
private $port = 4222;
/**
* Username to connect.
*
* @var string
*/
private $user;
/**
* Password to connect.
*
* @var string
*/
private $pass;
/**
* Token to connect.
*
* @var string
*/
private $token;
/**
* Language of this client.
*
* @var string
*/
private $lang = 'php';
/**
* Version of this client.
*
* @var string
*/
private $version = '0.8.2';
/**
* If verbose mode is enabled.
*
* @var bool
*/
private $verbose = false;
/**
* If pedantic mode is enabled.
*
* @var bool
*/
private $pedantic = false;
/**
* If reconnect mode is enabled.
*
* @var bool
*/
private $reconnect = true;
/**
* Allows to define parameters which can be set by passing them to the class constructor.
*
* @var array
*/
private $configurable = [
'host',
'port',
'user',
'pass',
'token',
'lang',
'version',
'verbose',
'pedantic',
'reconnect',
];
/**
* ConnectionOptions constructor.
*
* <code>
* use Nats\ConnectionOptions;
*
* $options = new ConnectionOptions([
* 'host' => '127.0.0.1',
* 'port' => 4222,
* 'user' => 'nats',
* 'pass' => 'nats',
* 'lang' => 'php',
* // ...
* ]);
* </code>
*
* @param array|Traversable $options the connection options
*/
public function __construct($options = null)
{
if (empty($options) === false) {
$this->initialize($options);
}
}
/**
* Get the options JSON string.
*
* @return string
*/
public function __toString()
{
$a = [
'lang' => $this->lang,
'version' => $this->version,
'verbose' => $this->verbose,
'pedantic' => $this->pedantic,
];
if (empty($this->user) === false) {
$a['user'] = $this->user;
}
if (empty($this->pass) === false) {
$a['pass'] = $this->pass;
}
if (empty($this->token) === false) {
$a['auth_token'] = $this->token;
}
return json_encode($a);
}
/**
* Get the URI for a server.
*
* @return string
*/
public function getAddress()
{
return 'tcp://' . $this->host . ':' . $this->port;
}
/**
* Get host.
*
* @return string
*/
public function getHost()
{
return $this->host;
}
/**
* Set host.
*
* @param string $host host
*
* @return $this
*/
public function setHost($host)
{
$this->host = $host;
return $this;
}
/**
* Get port.
*
* @return int
*/
public function getPort()
{
return $this->port;
}
/**
* Set port.
*
* @param int $port port
*
* @return $this
*/
public function setPort($port)
{
$this->port = $port;
return $this;
}
/**
* Get user.
*
* @return string
*/
public function getUser()
{
return $this->user;
}
/**
* Set user.
*
* @param string $user user
*
* @return $this
*/
public function setUser($user)
{
$this->user = $user;
return $this;
}
/**
* Get password.
*
* @return string
*/
public function getPass()
{
return $this->pass;
}
/**
* Set password.
*
* @param string $pass password
*
* @return $this
*/
public function setPass($pass)
{
$this->pass = $pass;
return $this;
}
/**
* Get token.
*
* @return string
*/
public function getToken()
{
return $this->token;
}
/**
* Set token.
*
* @param string $token token
*
* @return $this
*/
public function setToken($token)
{
$this->token = $token;
return $this;
}
/**
* Get language.
*
* @return string
*/
public function getLang()
{
return $this->lang;
}
/**
* Set language.
*
* @param string $lang language
*
* @return $this
*/
public function setLang($lang)
{
$this->lang = $lang;
return $this;
}
/**
* Get version.
*
* @return string
*/
public function getVersion()
{
return $this->version;
}
/**
* Set version.
*
* @param string $version version number
*
* @return $this
*/
public function setVersion($version)
{
$this->version = $version;
return $this;
}
/**
* Get verbose.
*
* @return bool
*/
public function isVerbose()
{
return $this->verbose;
}
/**
* Set verbose.
*
* @param bool $verbose verbose flag
*
* @return $this
*/
public function setVerbose($verbose)
{
$this->verbose = $verbose;
return $this;
}
/**
* Get pedantic.
*
* @return bool
*/
public function isPedantic()
{
return $this->pedantic;
}
/**
* Set pedantic.
*
* @param bool $pedantic pedantic flag
*
* @return $this
*/
public function setPedantic($pedantic)
{
$this->pedantic = $pedantic;
return $this;
}
/**
* Get reconnect.
*
* @return bool
*/
public function isReconnect()
{
return $this->reconnect;
}
/**
* Set reconnect.
*
* @param bool $reconnect reconnect flag
*
* @return $this
*/
public function setReconnect($reconnect)
{
$this->reconnect = $reconnect;
return $this;
}
/**
* Set the connection options.
*
* @param array|Traversable $options the connection options
*/
public function setConnectionOptions($options)
{
$this->initialize($options);
}
/**
* Initialize the parameters.
*
* @param array|Traversable $options the connection options
*
* @throws Exception when $options are an invalid type
*/
protected function initialize($options)
{
if (is_array($options) === false && ($options instanceof Traversable) === false) {
throw new Exception('The $options argument must be either an array or Traversable');
}
foreach ($options as $key => $value) {
if (in_array($key, $this->configurable, true) === false) {
continue;
}
$method = 'set' . ucfirst($key);
if (method_exists($this, $method) === true) {
$this->{$method}($value);
}
}
}
}

View File

@ -0,0 +1,93 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Nats\Annotation\Consumer as ConsumerAnnotation;
use Hyperf\Nats\Driver\DriverFactory;
use Hyperf\Process\AbstractProcess;
use Hyperf\Process\ProcessManager;
use Psr\Container\ContainerInterface;
class ConsumerManager
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
public function run()
{
$classes = AnnotationCollector::getClassByAnnotation(ConsumerAnnotation::class);
/**
* @var string
* @var ConsumerAnnotation $annotation
*/
foreach ($classes as $class => $annotation) {
$instance = make($class);
if (! $instance instanceof AbstractConsumer) {
continue;
}
$annotation->subject && $instance->setSubject($annotation->subject);
$annotation->queue && $instance->setQueue($annotation->queue);
$annotation->name && $instance->setName($annotation->name);
$annotation->pool && $instance->setName($annotation->pool);
$nums = $annotation->nums;
$process = $this->createProcess($instance);
$process->nums = (int) $nums;
$process->name = $instance->getName() . '-' . $instance->getSubject();
ProcessManager::register($process);
}
}
private function createProcess(AbstractConsumer $consumer): AbstractProcess
{
return new class($this->container, $consumer) extends AbstractProcess {
/**
* @var AbstractConsumer
*/
private $consumer;
/**
* @var Driver\DriverInterface
*/
private $subscriber;
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
{
parent::__construct($container);
$this->consumer = $consumer;
$pool = $this->consumer->getPool();
$this->subscriber = $this->container->get(DriverFactory::class)->get($pool);
}
public function handle(): void
{
$this->subscriber->subscribe(
$this->consumer->getSubject(),
$this->consumer->getQueue(),
function ($data) {
$this->consumer->consume($data);
}
);
}
};
}
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
interface PublishInterface
{
public function publish(string $subject, $payload = null, $inbox = null);
}

View File

@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
use Closure;
use Hyperf\Nats\Message;
interface RequestInterface
{
public function request(string $subject, $payload, Closure $callback);
public function requestSync(string $subject, $payload): Message;
}

View File

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Contract;
use Closure;
interface SubscribeInterface
{
public function subscribe(string $subject, string $queue, Closure $callback): void;
}

View File

@ -0,0 +1,48 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Contract\StdoutLoggerInterface;
use Psr\Container\ContainerInterface;
abstract class AbstractDriver implements DriverInterface
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var string
*/
protected $name;
/**
* @var array
*/
protected $config;
/**
* @var StdoutLoggerInterface
*/
protected $logger;
public function __construct(ContainerInterface $container, string $name, array $config)
{
$this->container = $container;
$this->name = $name;
$this->config = $config;
$this->logger = $container->get(StdoutLoggerInterface::class);
}
}

View File

@ -0,0 +1,58 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Nats\Exception\ConfigNotFoundException;
use Psr\Container\ContainerInterface;
class DriverFactory
{
/**
* @var ContainerInterface
*/
protected $container;
/**
* @var array
*/
protected $config;
/**
* @var DriverInterface[]
*/
protected $drivers = [];
public function __construct(ContainerInterface $container)
{
$this->container = $container;
$this->config = $container->get(ConfigInterface::class)->get('nats', []);
}
public function get($pool = 'default'): DriverInterface
{
if (isset($this->drivers[$pool]) && $this->drivers[$pool] instanceof DriverInterface) {
return $this->drivers[$pool];
}
$config = $this->config[$pool] ?? null;
if (empty($config)) {
throw new ConfigNotFoundException(sprintf('The config of %s is not found.', $pool));
}
return $this->drivers[$pool] = make($config['driver'], [
'name' => $pool,
'config' => $config,
]);
}
}

View File

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Hyperf\Nats\Contract\PublishInterface;
use Hyperf\Nats\Contract\RequestInterface;
use Hyperf\Nats\Contract\SubscribeInterface;
interface DriverInterface extends PublishInterface, RequestInterface, SubscribeInterface
{
}

View File

@ -0,0 +1,117 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Driver;
use Closure;
use Hyperf\Nats\Connection as NatsConnection;
use Hyperf\Nats\ConnectionOptions;
use Hyperf\Nats\EncodedConnection;
use Hyperf\Nats\Encoders\JSONEncoder;
use Hyperf\Nats\Exception\TimeoutException;
use Hyperf\Nats\Message;
use Hyperf\Pool\SimplePool\Connection;
use Hyperf\Pool\SimplePool\PoolFactory;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel;
class NatsDriver extends AbstractDriver
{
/**
* @var \Hyperf\Pool\SimplePool\Pool
*/
protected $pool;
public function __construct(ContainerInterface $container, string $name, array $config)
{
parent::__construct($container, $name, $config);
$factory = $this->container->get(PoolFactory::class);
$poolConfig = $config['pool'] ?? [];
$this->pool = $factory->get('nats' . $this->name, function () use ($config) {
$option = new ConnectionOptions($config['options'] ?? []);
$encoder = make($config['encoder'] ?? JSONEncoder::class);
$timeout = $config['timeout'] ?? null;
$conn = make(EncodedConnection::class, [$option, $encoder]);
$conn->connect($timeout);
return $conn;
}, $poolConfig);
}
public function publish(string $subject, $payload = null, $inbox = null)
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$client->publish($subject, $payload, $inbox);
} finally {
$connection && $connection->release();
}
}
public function request(string $subject, $payload, Closure $callback)
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$client->request($subject, $payload, $callback);
} finally {
$connection && $connection->release();
}
}
public function requestSync(string $subject, $payload): Message
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
$channel = new Channel(1);
$timeout = floatval($this->config['timeout'] ?? 1.0);
$client->request($subject, $payload, function (Message $message) use ($channel) {
$channel->push($message);
});
$message = $channel->pop($timeout);
if (! $message instanceof Message) {
throw new TimeoutException('Request timeout.');
}
return $message;
} finally {
$connection && $connection->release();
}
}
public function subscribe(string $subject, string $queue, Closure $callback): void
{
try {
/** @var Connection $connection */
$connection = $this->pool->get();
/** @var NatsConnection $client */
$client = $connection->getConnection();
if (empty($queue)) {
$client->subscribe($subject, $callback);
} else {
$client->queueSubscribe($subject, $queue, $callback);
}
$client->wait();
} finally {
$connection && $connection->release();
}
}
}

View File

@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
use Hyperf\Nats\Encoders\Encoder;
/**
* Class EncodedConnection.
*/
class EncodedConnection extends Connection
{
/**
* Encoder for this connection.
*
* @var null|Encoder
*/
private $encoder;
/**
* EncodedConnection constructor.
*
* @param ConnectionOptions $options connection options object
* @param null|Encoder $encoder encoder to use with the payload
*/
public function __construct(ConnectionOptions $options = null, Encoder $encoder = null)
{
$this->encoder = $encoder;
parent::__construct($options);
}
/**
* Publish publishes the data argument to the given subject.
*
* @param string $subject message topic
* @param string $payload message data
* @param string $inbox message inbox
*/
public function publish($subject, $payload = null, $inbox = null)
{
$payload = $this->encoder->encode($payload);
parent::publish($subject, $payload, $inbox);
}
/**
* Subscribes to an specific event given a subject.
*
* @param string $subject message topic
* @param \Closure $callback closure to be executed as callback
*
* @return string
*/
public function subscribe($subject, \Closure $callback)
{
$c = function ($message) use ($callback) {
$message->setBody($this->encoder->decode($message->getBody()));
$callback($message);
};
return parent::subscribe($subject, $c);
}
/**
* Subscribes to an specific event given a subject and a queue.
*
* @param string $subject message topic
* @param string $queue queue name
* @param \Closure $callback closure to be executed as callback
*/
public function queueSubscribe($subject, $queue, \Closure $callback)
{
$c = function ($message) use ($callback) {
$message->setBody($this->encoder->decode($message->getBody()));
$callback($message);
};
parent::queueSubscribe($subject, $queue, $c);
}
}

View File

@ -0,0 +1,37 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Interface Encoder.
*/
interface Encoder
{
/**
* Encodes a message.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload);
/**
* Decodes a message.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload);
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class JSONEncoder.
*
* Encodes and decodes messages in JSON format.
*/
class JSONEncoder implements Encoder
{
/**
* Encodes a message to JSON.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return json_encode($payload);
}
/**
* Decodes a message from JSON.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return json_decode($payload, true);
}
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class PHPEncoder.
*
* Encodes and decodes messages in PHP format.
*/
class PHPEncoder implements Encoder
{
/**
* Encodes a message to PHP.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return serialize($payload);
}
/**
* Decodes a message from PHP.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return unserialize($payload);
}
}

View File

@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Encoders;
/**
* Class YAMLEncoder.
*
* Encodes and decodes messages in YAML format.
*/
class YAMLEncoder implements Encoder
{
/**
* Encodes a message to YAML.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function encode($payload)
{
return yaml_emit($payload);
}
/**
* Decodes a message from YAML.
*
* @param string $payload message to decode
*
* @return mixed
*/
public function decode($payload)
{
return yaml_parse($payload);
}
}

View File

@ -0,0 +1,70 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class Exception.
*/
class Exception extends \Exception
{
/**
* Creates an Exception for a failed connection.
*
* @param string $response the failed error response
*/
public static function forFailedConnection($response)
{
return new static(sprintf('Failed to connect: %s', $response));
}
/**
* Creates an Exception for a failed PING response.
*
* @param string $response the failed PING response
*/
public static function forFailedPing($response)
{
return new static(sprintf('Failed to ping: %s', $response));
}
/**
* Creates an Exception for an invalid Subscription Identifier (sid).
*
* @param string $subscription the Subscription Identifier (sid)
*/
public static function forSubscriptionNotFound($subscription)
{
return new static(sprintf('Subscription not found: %s', $subscription));
}
/**
* Creates an Exception for an invalid Subscription Identifier (sid) callback.
*
* @param string $subscription the Subscription Identifier (sid)
*/
public static function forSubscriptionCallbackInvalid($subscription)
{
return new static(sprintf('Subscription callback is invalid: %s', $subscription));
}
/**
* Creates an Exception for the failed creation of a Stream Socket Client.
*
* @param string $message the system level error message
* @param int $code the system level error code
*/
public static function forStreamSocketClientError($message, $code)
{
return new static(sprintf('A Stream Socket Client could not be created: (%d) %s', $code, $message), $code);
}
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class ConfigNotFoundException extends RuntimeException
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class DriverException extends RuntimeException
{
}

View File

@ -0,0 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Exception;
use RuntimeException;
class TimeoutException extends RuntimeException
{
}

View File

@ -0,0 +1,18 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
function stream_set_timeout($fp, $seconds, $microseconds)
{
return \stream_set_timeout($fp, (int) $seconds, (int) $microseconds);
}

View File

@ -0,0 +1,57 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats\Listener;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Event\BeforeMainServerStart;
use Hyperf\Nats\ConsumerManager;
use Psr\Container\ContainerInterface;
/**
* Must handle the event before `Hyperf\Process\Listener\BootProcessListener`.
* @Listener(priority=99)
*/
class BeforeMainServerStartListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
private $container;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
}
/**
* @return string[] returns the events that you want to listen
*/
public function listen(): array
{
return [
BeforeMainServerStart::class,
];
}
/**
* Handle the Event when the event is triggered, all listeners will
* complete before the event is returned to the EventDispatcher.
*/
public function process(object $event)
{
// Init the consumer process.
$consumerManager = $this->container->get(ConsumerManager::class);
$consumerManager->run();
}
}

179
src/nats/src/Message.php Normal file
View File

@ -0,0 +1,179 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Message Class.
*/
class Message
{
/**
* Message Body.
*
* @var string
*/
public $body;
/**
* Message Subject.
*
* @var string
*/
private $subject;
/**
* Message Ssid.
*
* @var string
*/
private $sid;
/**
* Message related connection.
*
* @var Connection
*/
private $conn;
/**
* Message constructor.
*
* @param string $subject message subject
* @param string $body message body
* @param string $sid message Sid
* @param Connection $conn message Connection
*/
public function __construct($subject, $body, $sid, Connection $conn)
{
$this->setSubject($subject);
$this->setBody($body);
$this->setSid($sid);
$this->setConn($conn);
}
/**
* String representation of a message.
*
* @return string
*/
public function __toString()
{
return $this->getBody();
}
/**
* Set subject.
*
* @param string $subject subject
*
* @return $this
*/
public function setSubject($subject)
{
$this->subject = $subject;
return $this;
}
/**
* Get subject.
*
* @return string
*/
public function getSubject()
{
return $this->subject;
}
/**
* Set body.
*
* @param string $body body
*
* @return $this
*/
public function setBody($body)
{
$this->body = $body;
return $this;
}
/**
* Get body.
*
* @return string
*/
public function getBody()
{
return $this->body;
}
/**
* Set Ssid.
*
* @param string $sid ssid
*
* @return $this
*/
public function setSid($sid)
{
$this->sid = $sid;
return $this;
}
/**
* Get Ssid.
*
* @return string
*/
public function getSid()
{
return $this->sid;
}
/**
* Set Conn.
*
* @param Connection $conn connection
*
* @return $this
*/
public function setConn(Connection $conn)
{
$this->conn = $conn;
return $this;
}
/**
* Get Conn.
*
* @return Connection
*/
public function getConn()
{
return $this->conn;
}
/**
* Allows you reply the message with a specific body.
*
* @param string $body body to be set
*/
public function reply($body)
{
$this->conn->publish(
$this->subject,
$body
);
}
}

View File

@ -0,0 +1,31 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class Php71RandomGenerator.
*/
class Php71RandomGenerator
{
/**
* A simple wrapper on random_bytes.
*
* @param int $len length of the string
*
* @return string random string
*/
public function generateString($len)
{
return bin2hex(random_bytes($len));
}
}

341
src/nats/src/ServerInfo.php Normal file
View File

@ -0,0 +1,341 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Nats;
/**
* Class ServerInfo.
*/
class ServerInfo
{
/**
* Server unique ID.
*
* @var string
*/
private $serverID;
/**
* Server hostname.
*
* @var string
*/
private $host;
/**
* Server port.
*
* @var int
*/
private $port;
/**
* Server version number.
*
* @var string
*/
private $version;
/**
* Server Golang version.
*
* @var string
*/
private $goVersion;
/**
* Is authorization required?
*
* @var bool
*/
private $authRequired;
/**
* Is TLS required?
*
* @var bool
*/
private $TLSRequired;
/**
* Should TLS be verified?
*
* @var bool
*/
private $TLSVerify;
/**
* Is SSL required?
*
* @var bool
*/
private $SSLRequired;
/**
* Max payload size.
*
* @var int
*/
private $maxPayload;
/**
* Connection URL list.
*
* @var array
*/
private $connectURLs;
/**
* ServerInfo constructor.
*
* @param string $connectionResponse connection response Message
*/
public function __construct($connectionResponse)
{
$parts = explode(' ', $connectionResponse);
$data = json_decode($parts[1], true);
$this->setServerID($data['server_id']);
$this->setHost($data['host']);
$this->setPort($data['port']);
$this->setVersion($data['version']);
$this->setGoVersion($data['go']);
$this->setAuthRequired($data['auth_required'] ?? false);
$this->setTLSRequired($data['tls_required'] ?? false);
$this->setTLSVerify($data['tls_verify'] ?? false);
$this->setMaxPayload($data['max_payload']);
if (version_compare($data['version'], '1.1.0') === -1) {
$this->setSSLRequired($data['ssl_required']);
}
}
/**
* Get the server ID.
*
* @return string server ID
*/
public function getServerID()
{
return $this->serverID;
}
/**
* Set the server ID.
*
* @param string $serverID server ID
*/
public function setServerID($serverID)
{
$this->serverID = $serverID;
}
/**
* Get the server host name or ip.
*
* @return string server host
*/
public function getHost()
{
return $this->host;
}
/**
* Set the server host name or ip.
*
* @param string $host server host
*/
public function setHost($host)
{
$this->host = $host;
}
/**
* Get server port number.
*
* @return int server port number
*/
public function getPort()
{
return $this->port;
}
/**
* Set server port number.
*
* @param int $port server port number
*/
public function setPort($port)
{
$this->port = $port;
}
/**
* Get server version number.
*
* @return string server version number
*/
public function getVersion()
{
return $this->version;
}
/**
* Set server version number.
*
* @param string $version server version number
*/
public function setVersion($version)
{
$this->version = $version;
}
/**
* Get the golang version number.
*
* @return string go version number
*/
public function getGoVersion()
{
return $this->goVersion;
}
/**
* Set the golang version number.
*
* @param string $goVersion go version number
*/
public function setGoVersion($goVersion)
{
$this->goVersion = $goVersion;
}
/**
* Check if server requires authorization.
*
* @return bool if auth is required
*/
public function isAuthRequired()
{
return $this->authRequired;
}
/**
* Set if the server requires authorization.
*
* @param bool $authRequired if auth is required
*/
public function setAuthRequired($authRequired)
{
$this->authRequired = $authRequired;
}
/**
* Check if server requires TLS.
*
* @return bool if TLS is required
*/
public function isTLSRequired()
{
return $this->TLSRequired;
}
/**
* Set if server requires TLS.
*
* @param bool $TLSRequired if TLS is required
*/
public function setTLSRequired($TLSRequired)
{
$this->TLSRequired = $TLSRequired;
}
/**
* Check if TLS certificate is verified.
*
* @return bool if TLS certificate is verified
*/
public function isTLSVerify()
{
return $this->TLSVerify;
}
/**
* Set if server verifies TLS certificate.
*
* @param bool $TLSVerify if TLS certificate is verified
*/
public function setTLSVerify($TLSVerify)
{
$this->TLSVerify = $TLSVerify;
}
/**
* Check if SSL is required.
*
* @return bool if SSL is required
*/
public function isSSLRequired()
{
return $this->SSLRequired;
}
/**
* Set if SSL is required.
*
* @param bool $SSLRequired if SSL is required
*/
public function setSSLRequired($SSLRequired)
{
$this->SSLRequired = $SSLRequired;
}
/**
* Get the max size of the payload.
*
* @return int size in bytes
*/
public function getMaxPayload()
{
return $this->maxPayload;
}
/**
* Set the max size of the payload.
*
* @param int $maxPayload size in bytes
*/
public function setMaxPayload($maxPayload)
{
$this->maxPayload = $maxPayload;
}
/**
* Get the server connection URLs.
*
* @return array list of server connection urls
*/
public function getConnectURLs()
{
return $this->connectURLs;
}
/**
* Set the server connection URLs.
*
* @param array $connectURLs list of server connection urls
*/
public function setConnectURLs(array $connectURLs)
{
$this->connectURLs = $connectURLs;
}
}