diff --git a/.travis.yml b/.travis.yml index 360004230..d750214c2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,11 +5,11 @@ sudo: required matrix: include: - php: 7.2 - env: SW_VERSION="4.4.8" + env: SW_VERSION="4.4.12" - php: 7.3 - env: SW_VERSION="4.4.8" + env: SW_VERSION="4.4.12" - php: master - env: SW_VERSION="4.4.8" + env: SW_VERSION="4.4.12" allow_failures: - php: master @@ -38,6 +38,6 @@ before_script: - composer config -g process-timeout 900 && composer update script: - - composer analyse src/di src/json-rpc src/tracer src/metric + - composer analyse src/di src/json-rpc src/tracer src/metric src/nats - composer test -- --exclude-group NonCoroutine - vendor/bin/phpunit --group NonCoroutine diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c9568241..15012df54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Added - [#812](https://github.com/hyperf/hyperf/pull/812) Added singleton crontab task support. +- [#820](https://github.com/hyperf/hyperf/pull/820) Added nats component. - [#832](https://github.com/hyperf/hyperf/pull/832) Added `Hyperf\Utils\Codec\Json`. - [#833](https://github.com/hyperf/hyperf/pull/833) Added `Hyperf\Utils\Backoff`. - [#852](https://github.com/hyperf/hyperf/pull/852) Added a `clear()` method for `Hyperf\Utils\Parallel` to clear adde callbacks. diff --git a/bin/composer-json-fixer b/bin/composer-json-fixer index 79ef13393..16e96c1d3 100755 --- a/bin/composer-json-fixer +++ b/bin/composer-json-fixer @@ -25,7 +25,7 @@ foreach ($files as $file) { $composerJson = json_decode(file_get_contents($file), true); foreach ($composerJson['autoload']['files'] ?? [] as $file) { - $autoloadFiles[] = "src/{$component}/". preg_replace('#^./#', '', $file); + $autoloadFiles[] = "src/{$component}/" . preg_replace('#^./#', '', $file); } foreach ($composerJson['autoload']['psr-4'] ?? [] as $ns => $dir) { $autoload[$ns] = "src/{$component}/" . trim($dir, '/') . '/'; @@ -51,5 +51,5 @@ $json->extra->hyperf->config = $configProviders; file_put_contents( __DIR__ . '/../composer.json', - json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT) + json_encode($json, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_PRETTY_PRINT) . PHP_EOL ); diff --git a/composer.json b/composer.json index 015973410..b5f570d86 100644 --- a/composer.json +++ b/composer.json @@ -18,10 +18,12 @@ "doctrine/instantiator": "^1.0", "egulias/email-validator": "^2.1", "elasticsearch/elasticsearch": "^6.1", + "endclothing/prometheus_client_php": "^0.9.1", "fig/http-message-util": "^1.1.2", "google/protobuf": "^3.6.1", "grpc/grpc": "^1.15", "guzzlehttp/guzzle": "^6.3", + "ircmaxell/random-lib": "^1.2", "jcchavezs/zipkin-opentracing": "^0.1.2", "jean85/pretty-package-versions": "^1.2", "monolog/monolog": "^1.24", @@ -40,8 +42,7 @@ "start-point/etcd-php": "^1.1", "symfony/console": "^4.2", "symfony/finder": "^4.1", - "vlucas/phpdotenv": "^3.1", - "endclothing/prometheus_client_php": "^0.9.1" + "vlucas/phpdotenv": "^3.1" }, "require-dev": { "doctrine/common": "@stable", @@ -114,6 +115,7 @@ "files": [ "src/config/src/Functions.php", "src/di/src/Functions.php", + "src/nats/src/Functions.php", "src/translation/src/Functions.php", "src/utils/src/Functions.php" ], @@ -155,6 +157,7 @@ "Hyperf\\Metric\\": "src/metric/src/", "Hyperf\\ModelCache\\": "src/model-cache/src/", "Hyperf\\ModelListener\\": "src/model-listener/src/", + "Hyperf\\Nats\\": "src/nats/src/", "Hyperf\\Paginator\\": "src/paginator/src/", "Hyperf\\Pool\\": "src/pool/src/", "Hyperf\\Process\\": "src/process/src/", @@ -216,6 +219,7 @@ "HyperfTest\\Metric\\": "src/metric/tests/", "HyperfTest\\ModelCache\\": "src/model-cache/tests/", "HyperfTest\\ModelListener\\": "src/model-listener/tests/", + "HyperfTest\\Nats\\": "src/nats/tests/", "HyperfTest\\Paginator\\": "src/paginator/tests/", "HyperfTest\\Pool\\": "src/pool/tests/", "HyperfTest\\Process\\": "src/process/tests/", @@ -271,6 +275,7 @@ "Hyperf\\Metric\\ConfigProvider", "Hyperf\\ModelCache\\ConfigProvider", "Hyperf\\ModelListener\\ConfigProvider", + "Hyperf\\Nats\\ConfigProvider", "Hyperf\\Paginator\\ConfigProvider", "Hyperf\\Pool\\ConfigProvider", "Hyperf\\Process\\ConfigProvider", @@ -308,4 +313,4 @@ }, "minimum-stability": "dev", "prefer-stable": true -} \ No newline at end of file +} diff --git a/doc/zh/nats.md b/doc/zh/nats.md new file mode 100644 index 000000000..2d4ab59ba --- /dev/null +++ b/doc/zh/nats.md @@ -0,0 +1,151 @@ +# NATS + +NATS是一个开源、轻量级、高性能的分布式消息中间件,实现了高可伸缩性和优雅的 `Publish` / `Subscribe` 模型,使用 `Golang` 语言开发。NATS的开发哲学认为高质量的QoS应该在客户端构建,故只建立了 `Request-Reply`,不提供 1.持久化 2.事务处理 3.增强的交付模式 4.企业级队列。 + +## 使用 + +### 创建消费者 + +``` +$ php bin/hyperf.php gen:nats-consumer DemoConsumer +``` + +如果设置了 `queue`,则相同的 `subject` 只会被一个 `queue` 消费。若不设置 `queue`,则每个消费者都会受到消息。 + +```php +nats->publish('hyperf.demo', [ + 'id' => 'Hyperf', + ]); + + return $this->response->success($res); + } +} + +``` + +使用 request 投递消息。 + +```php +nats->request('hyperf.reply', [ + 'id' => 'limx', + ], function (Message $payload) { + var_dump($payload->getBody()); + }); + + return $this->response->success($res); + } +} + +``` + +使用 requestSync 投递消息。 + +```php +nats->requestSync('hyperf.reply', [ + 'id' => 'limx', + ]); + + return $this->response->success($message->getBody()); + } +} + +``` \ No newline at end of file diff --git a/doc/zh/summary.md b/doc/zh/summary.md index a9e631811..90d03b73b 100644 --- a/doc/zh/summary.md +++ b/doc/zh/summary.md @@ -83,6 +83,7 @@ * [Task 机制](zh/task.md) * [枚举类](zh/constants.md) * [Snowflake](zh/snowflake.md) + * [Nats](zh/nats.md) * 应用部署 diff --git a/src/devtool/src/Generator/AmqpConsumerCommand.php b/src/devtool/src/Generator/AmqpConsumerCommand.php index 1586757fb..2c53fdd98 100644 --- a/src/devtool/src/Generator/AmqpConsumerCommand.php +++ b/src/devtool/src/Generator/AmqpConsumerCommand.php @@ -32,6 +32,6 @@ class AmqpConsumerCommand extends GeneratorCommand protected function getDefaultNamespace(): string { - return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumers'; + return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Consumer'; } } diff --git a/src/devtool/src/Generator/AmqpProducerCommand.php b/src/devtool/src/Generator/AmqpProducerCommand.php index 00f814ba4..de0608fb9 100644 --- a/src/devtool/src/Generator/AmqpProducerCommand.php +++ b/src/devtool/src/Generator/AmqpProducerCommand.php @@ -32,6 +32,6 @@ class AmqpProducerCommand extends GeneratorCommand protected function getDefaultNamespace(): string { - return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producers'; + return $this->getConfig()['namespace'] ?? 'App\\Amqp\\Producer'; } } diff --git a/src/devtool/src/Generator/NatsConsumerCommand.php b/src/devtool/src/Generator/NatsConsumerCommand.php new file mode 100644 index 000000000..2e34bbffc --- /dev/null +++ b/src/devtool/src/Generator/NatsConsumerCommand.php @@ -0,0 +1,37 @@ +setDescription('Create a new nats consumer class'); + } + + protected function getStub(): string + { + return $this->getConfig()['stub'] ?? __DIR__ . '/stubs/nats-consumer.stub'; + } + + protected function getDefaultNamespace(): string + { + return $this->getConfig()['namespace'] ?? 'App\\Nats\\Consumer'; + } +} diff --git a/src/devtool/src/Generator/stubs/nats-consumer.stub b/src/devtool/src/Generator/stubs/nats-consumer.stub new file mode 100644 index 000000000..a6a338fa3 --- /dev/null +++ b/src/devtool/src/Generator/stubs/nats-consumer.stub @@ -0,0 +1,20 @@ +getBody()); + } +} diff --git a/src/nats/.gitattributes b/src/nats/.gitattributes new file mode 100644 index 000000000..bdd4ea29c --- /dev/null +++ b/src/nats/.gitattributes @@ -0,0 +1 @@ +/tests export-ignore \ No newline at end of file diff --git a/src/nats/LICENSE b/src/nats/LICENSE new file mode 100644 index 000000000..c35d3f5a8 --- /dev/null +++ b/src/nats/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) Hyperf + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/nats/composer.json b/src/nats/composer.json new file mode 100644 index 000000000..0b106a605 --- /dev/null +++ b/src/nats/composer.json @@ -0,0 +1,59 @@ +{ + "name": "hyperf/nats", + "description": "A nats library for Hyperf.", + "license": "MIT", + "keywords": [ + "php", + "swoole", + "hyperf", + "nats" + ], + "support": { + }, + "require": { + "php": ">=7.2", + "hyperf/contract": "~1.1.0", + "hyperf/pool": "~1.1.0", + "hyperf/utils": "~1.1.0", + "ircmaxell/random-lib": "^1.2", + "psr/container": "^1.0" + }, + "require-dev": { + "malukenho/docheader": "^0.1.6", + "mockery/mockery": "^1.0", + "phpunit/phpunit": "^7.0.0", + "friendsofphp/php-cs-fixer": "^2.9" + }, + "suggest": { + }, + "autoload": { + "psr-4": { + "Hyperf\\Nats\\": "src/" + }, + "files": [ + "src/Functions.php" + ] + }, + "autoload-dev": { + "psr-4": { + "HyperfTest\\Nats\\": "tests/" + } + }, + "config": { + "sort-packages": true + }, + "extra": { + "branch-alias": { + "dev-master": "1.1-dev" + }, + "hyperf": { + "config": "Hyperf\\Nats\\ConfigProvider" + } + }, + "bin": [ + ], + "scripts": { + "cs-fix": "php-cs-fixer fix $1", + "test": "phpunit --colors=always" + } +} diff --git a/src/nats/publish/nats.php b/src/nats/publish/nats.php new file mode 100644 index 000000000..52a3311da --- /dev/null +++ b/src/nats/publish/nats.php @@ -0,0 +1,34 @@ + [ + 'driver' => Hyperf\Nats\Driver\NatsDriver::class, + 'encoder' => Hyperf\Nats\Encoders\JSONEncoder::class, + 'timeout' => 10.0, + 'options' => [ + 'host' => '127.0.0.1', + 'port' => 4222, + 'user' => 'nats', + 'pass' => 'nats', + 'lang' => 'php', + ], + 'pool' => [ + 'min_connections' => 1, + 'max_connections' => 10, + 'connect_timeout' => 10.0, + 'wait_timeout' => 3.0, + 'heartbeat' => -1, + 'max_idle_time' => 60, + ], + ], +]; diff --git a/src/nats/src/AbstractConsumer.php b/src/nats/src/AbstractConsumer.php new file mode 100644 index 000000000..480ec864d --- /dev/null +++ b/src/nats/src/AbstractConsumer.php @@ -0,0 +1,110 @@ +container = $container; + } + + abstract public function consume(Message $payload); + + public function getSubject(): string + { + return $this->subject; + } + + public function setSubject(string $subject): self + { + $this->subject = $subject; + return $this; + } + + public function getQueue(): string + { + return $this->queue; + } + + public function setQueue(string $queue): self + { + $this->queue = $queue; + return $this; + } + + public function getName(): string + { + return $this->name; + } + + public function setName(string $name): self + { + $this->name = $name; + return $this; + } + + public function getNums(): int + { + return $this->nums; + } + + public function setNums(int $nums): self + { + $this->nums = $nums; + return $this; + } + + public function getPool(): string + { + return $this->pool; + } + + public function setPool(string $pool): self + { + $this->pool = $pool; + return $this; + } +} diff --git a/src/nats/src/Annotation/Consumer.php b/src/nats/src/Annotation/Consumer.php new file mode 100644 index 000000000..16dc81ab9 --- /dev/null +++ b/src/nats/src/Annotation/Consumer.php @@ -0,0 +1,47 @@ + [ + DriverInterface::class => function (ContainerInterface $container) { + $factory = $container->get(DriverFactory::class); + return $factory->get('default'); + }, + ], + 'annotations' => [ + 'scan' => [ + 'paths' => [ + __DIR__, + ], + ], + ], + 'publish' => [ + [ + 'id' => 'config', + 'description' => 'The config for amqp.', + 'source' => __DIR__ . '/../publish/nats.php', + 'destination' => BASE_PATH . '/config/autoload/nats.php', + ], + ], + ]; + } +} diff --git a/src/nats/src/Connection.php b/src/nats/src/Connection.php new file mode 100644 index 000000000..75fb3d509 --- /dev/null +++ b/src/nats/src/Connection.php @@ -0,0 +1,612 @@ +pings = 0; + $this->pubs = 0; + $this->subscriptions = []; + $this->options = $options; + if (version_compare(phpversion(), '7.0', '>') === true) { + $this->randomGenerator = new Php71RandomGenerator(); + } else { + $randomFactory = new Factory(); + $this->randomGenerator = $randomFactory->getLowStrengthGenerator(); + } + + if ($options === null) { + $this->options = new ConnectionOptions(); + } + } + + /** + * Enable or disable debug mode. + * + * @param bool $debug if debug is enabled + */ + public function setDebug($debug) + { + $this->debug = $debug; + } + + /** + * Return the number of pings. + * + * @return int Number of pings + */ + public function pingsCount() + { + return $this->pings; + } + + /** + * Return the number of messages published. + * + * @return int number of messages published + */ + public function pubsCount() + { + return $this->pubs; + } + + /** + * Return the number of reconnects to the server. + * + * @return int number of reconnects + */ + public function reconnectsCount() + { + return $this->reconnects; + } + + /** + * Return the number of subscriptions available. + * + * @return int number of subscription + */ + public function subscriptionsCount() + { + return count($this->subscriptions); + } + + /** + * Return subscriptions list. + * + * @return array list of subscription ids + */ + public function getSubscriptions() + { + return array_keys($this->subscriptions); + } + + /** + * Sets the chunck size in bytes to be processed when reading. + * + * @param int $chunkSize set byte chunk len to read when reading from wire + */ + public function setChunkSize($chunkSize) + { + $this->chunkSize = $chunkSize; + } + + /** + * Set Stream Timeout. + * + * @param float $seconds before timeout on stream + * + * @return bool + */ + public function setStreamTimeout($seconds) + { + if ($this->isConnected() === true) { + if (is_numeric($seconds) === true) { + try { + $timeout = (float) number_format($seconds, 3); + $seconds = floor($timeout); + $microseconds = (($timeout - $seconds) * 1000); + return stream_set_timeout($this->streamSocket, $seconds, $microseconds); + } catch (\Exception $e) { + return false; + } + } + } + + return false; + } + + /** + * Returns an stream socket for this connection. + * + * @return resource + */ + public function getStreamSocket() + { + return $this->streamSocket; + } + + /** + * Checks if the client is connected to a server. + * + * @return bool + */ + public function isConnected() + { + return isset($this->streamSocket); + } + + /** + * Returns current connected server ID. + * + * @return string server ID + */ + public function connectedServerID() + { + return $this->serverInfo->getServerID(); + } + + /** + * Connect to server. + * + * @param float $timeout number of seconds until the connect() system call should timeout + * + * @throws \Throwable exception raised if connection fails + */ + public function connect($timeout = null) + { + if ($timeout === null) { + $timeout = intval(ini_get('default_socket_timeout')); + } + + $this->timeout = $timeout; + $this->streamSocket = $this->getStream($this->options->getAddress(), $timeout); + $this->setStreamTimeout($timeout); + + $msg = 'CONNECT ' . $this->options; + $this->send($msg); + $connectResponse = $this->receive(); + + if ($this->isErrorResponse($connectResponse) === true) { + throw Exception::forFailedConnection($connectResponse); + } + $this->processServerInfo($connectResponse); + + $this->ping(); + $pingResponse = $this->receive(); + + if ($this->isErrorResponse($pingResponse) === true) { + throw Exception::forFailedPing($pingResponse); + } + } + + /** + * Sends PING message. + */ + public function ping() + { + $msg = 'PING'; + $this->send($msg); + ++$this->pings; + } + + /** + * Request does a request and executes a callback with the response. + * + * @param string $subject message topic + * @param string $payload message data + * @param \Closure $callback closure to be executed as callback + */ + public function request($subject, $payload, \Closure $callback) + { + $inbox = uniqid('_INBOX.'); + $sid = $this->subscribe( + $inbox, + $callback + ); + $this->unsubscribe($sid, 1); + $this->publish($subject, $payload, $inbox); + $this->wait(1); + } + + /** + * Subscribes to an specific event given a subject. + * + * @param string $subject message topic + * @param \Closure $callback closure to be executed as callback + * + * @return string + */ + public function subscribe($subject, \Closure $callback) + { + $sid = $this->randomGenerator->generateString(16); + $msg = 'SUB ' . $subject . ' ' . $sid; + $this->send($msg); + $this->subscriptions[$sid] = $callback; + return $sid; + } + + /** + * Subscribes to an specific event given a subject and a queue. + * + * @param string $subject message topic + * @param string $queue queue name + * @param \Closure $callback closure to be executed as callback + * + * @return string + */ + public function queueSubscribe($subject, $queue, \Closure $callback) + { + $sid = $this->randomGenerator->generateString(16); + $msg = 'SUB ' . $subject . ' ' . $queue . ' ' . $sid; + $this->send($msg); + $this->subscriptions[$sid] = $callback; + return $sid; + } + + /** + * Unsubscribe from a event given a subject. + * + * @param string $sid subscription ID + * @param int $quantity quantity of messages + */ + public function unsubscribe($sid, $quantity = null) + { + $msg = 'UNSUB ' . $sid; + if ($quantity !== null) { + $msg = $msg . ' ' . $quantity; + } + + $this->send($msg); + if ($quantity === null) { + unset($this->subscriptions[$sid]); + } + } + + /** + * Publish publishes the data argument to the given subject. + * + * @param string $subject message topic + * @param string $payload message data + * @param string $inbox message inbox + * + * @throws Exception if subscription not found + */ + public function publish($subject, $payload = null, $inbox = null) + { + $msg = 'PUB ' . $subject; + if ($inbox !== null) { + $msg = $msg . ' ' . $inbox; + } + + $msg = $msg . ' ' . strlen($payload); + $this->send($msg . "\r\n" . $payload); + ++$this->pubs; + } + + /** + * Waits for messages. + * + * @param int $quantity number of messages to wait for + * + * @return null|Connection $connection Connection object + */ + public function wait($quantity = 0) + { + $count = 0; + $info = stream_get_meta_data($this->streamSocket); + while (is_resource($this->streamSocket) === true && feof($this->streamSocket) === false && empty($info['timed_out']) === true) { + $line = $this->receive(); + + if ($line === false) { + return null; + } + + if (strpos($line, 'PING') === 0) { + $this->handlePING(); + } + + if (strpos($line, 'MSG') === 0) { + ++$count; + $this->handleMSG($line); + if (($quantity !== 0) && ($count >= $quantity)) { + return $this; + } + } + + $info = stream_get_meta_data($this->streamSocket); + } + + $this->close(); + + return $this; + } + + /** + * Reconnects to the server. + */ + public function reconnect() + { + ++$this->reconnects; + $this->close(); + $this->connect($this->timeout); + } + + /** + * Close will close the connection to the server. + */ + public function close() + { + if ($this->streamSocket === null) { + return; + } + + fclose($this->streamSocket); + $this->streamSocket = null; + } + + /** + * Indicates whether $response is an error response. + * + * @param string $response the Nats Server response + * + * @return bool + */ + private function isErrorResponse($response) + { + return substr($response, 0, 4) === '-ERR'; + } + + /** + * Returns an stream socket to the desired server. + * + * @param string $address server url string + * @param float $timeout number of seconds until the connect() system call should timeout + * + * @throws \Exception exception raised if connection fails + * @return resource + */ + private function getStream($address, $timeout) + { + $errno = null; + $errstr = null; + + $fp = stream_socket_client($address, $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT); + + if ($fp === false) { + throw Exception::forStreamSocketClientError($errstr, $errno); + } + + $timeout = (float) number_format($timeout, 3); + $seconds = floor($timeout); + $microseconds = (($timeout - $seconds) * 1000); + stream_set_timeout($fp, $seconds, $microseconds); + + return $fp; + } + + /** + * Process information returned by the server after connection. + * + * @param string $connectionResponse INFO message + */ + private function processServerInfo($connectionResponse) + { + $this->serverInfo = new ServerInfo($connectionResponse); + } + + /** + * Sends data thought the stream. + * + * @param string $payload message data + * + * @throws \Exception raises if fails sending data + */ + private function send($payload) + { + $msg = $payload . "\r\n"; + $len = strlen($msg); + while (true) { + $written = @fwrite($this->streamSocket, $msg); + if ($written === false) { + throw new \Exception('Error sending data'); + } + + if ($written === 0) { + throw new \Exception('Broken pipe or closed connection'); + } + + $len = ($len - $written); + if ($len > 0) { + $msg = substr($msg, (0 - $len)); + } else { + break; + } + } + + if ($this->debug === true) { + printf('>>>> %s', $msg); + } + } + + /** + * Receives a message thought the stream. + * + * @param int $len number of bytes to receive + * + * @return string + */ + private function receive(int $len = 0) + { + if ($len > 0) { + $chunkSize = $this->chunkSize; + $line = null; + $receivedBytes = 0; + while ($receivedBytes < $len) { + $bytesLeft = ($len - $receivedBytes); + if ($bytesLeft < $this->chunkSize) { + $chunkSize = $bytesLeft; + } + + $readChunk = fread($this->streamSocket, $chunkSize); + $receivedBytes += strlen($readChunk); + $line .= $readChunk; + } + } else { + $line = fgets($this->streamSocket); + } + + if ($this->debug === true) { + printf('<<<< %s\r\n', $line); + } + + return $line; + } + + /** + * Handles PING command. + */ + private function handlePING() + { + $this->send('PONG'); + } + + /** + * Handles MSG command. + * + * @param string $line message command from Nats + * + * @throws Exception if subscription not found + * @codeCoverageIgnore + */ + private function handleMSG($line) + { + $parts = explode(' ', $line); + $subject = null; + $length = trim($parts[3]); + $sid = $parts[2]; + + if (count($parts) === 5) { + $length = trim($parts[4]); + $subject = $parts[3]; + } elseif (count($parts) === 4) { + $length = trim($parts[3]); + $subject = $parts[1]; + } + + $payload = $this->receive((int) $length); + $msg = new Message($subject, $payload, $sid, $this); + + if (isset($this->subscriptions[$sid]) === false) { + throw Exception::forSubscriptionNotFound($sid); + } + + $func = $this->subscriptions[$sid]; + if (is_callable($func) === true) { + $func($msg); + } else { + throw Exception::forSubscriptionCallbackInvalid($sid); + } + } +} diff --git a/src/nats/src/ConnectionOptions.php b/src/nats/src/ConnectionOptions.php new file mode 100644 index 000000000..7c186161c --- /dev/null +++ b/src/nats/src/ConnectionOptions.php @@ -0,0 +1,448 @@ + + * use Nats\ConnectionOptions; + * + * $options = new ConnectionOptions([ + * 'host' => '127.0.0.1', + * 'port' => 4222, + * 'user' => 'nats', + * 'pass' => 'nats', + * 'lang' => 'php', + * // ... + * ]); + * + * + * @param array|Traversable $options the connection options + */ + public function __construct($options = null) + { + if (empty($options) === false) { + $this->initialize($options); + } + } + + /** + * Get the options JSON string. + * + * @return string + */ + public function __toString() + { + $a = [ + 'lang' => $this->lang, + 'version' => $this->version, + 'verbose' => $this->verbose, + 'pedantic' => $this->pedantic, + ]; + if (empty($this->user) === false) { + $a['user'] = $this->user; + } + + if (empty($this->pass) === false) { + $a['pass'] = $this->pass; + } + + if (empty($this->token) === false) { + $a['auth_token'] = $this->token; + } + + return json_encode($a); + } + + /** + * Get the URI for a server. + * + * @return string + */ + public function getAddress() + { + return 'tcp://' . $this->host . ':' . $this->port; + } + + /** + * Get host. + * + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * Set host. + * + * @param string $host host + * + * @return $this + */ + public function setHost($host) + { + $this->host = $host; + + return $this; + } + + /** + * Get port. + * + * @return int + */ + public function getPort() + { + return $this->port; + } + + /** + * Set port. + * + * @param int $port port + * + * @return $this + */ + public function setPort($port) + { + $this->port = $port; + + return $this; + } + + /** + * Get user. + * + * @return string + */ + public function getUser() + { + return $this->user; + } + + /** + * Set user. + * + * @param string $user user + * + * @return $this + */ + public function setUser($user) + { + $this->user = $user; + + return $this; + } + + /** + * Get password. + * + * @return string + */ + public function getPass() + { + return $this->pass; + } + + /** + * Set password. + * + * @param string $pass password + * + * @return $this + */ + public function setPass($pass) + { + $this->pass = $pass; + + return $this; + } + + /** + * Get token. + * + * @return string + */ + public function getToken() + { + return $this->token; + } + + /** + * Set token. + * + * @param string $token token + * + * @return $this + */ + public function setToken($token) + { + $this->token = $token; + + return $this; + } + + /** + * Get language. + * + * @return string + */ + public function getLang() + { + return $this->lang; + } + + /** + * Set language. + * + * @param string $lang language + * + * @return $this + */ + public function setLang($lang) + { + $this->lang = $lang; + + return $this; + } + + /** + * Get version. + * + * @return string + */ + public function getVersion() + { + return $this->version; + } + + /** + * Set version. + * + * @param string $version version number + * + * @return $this + */ + public function setVersion($version) + { + $this->version = $version; + + return $this; + } + + /** + * Get verbose. + * + * @return bool + */ + public function isVerbose() + { + return $this->verbose; + } + + /** + * Set verbose. + * + * @param bool $verbose verbose flag + * + * @return $this + */ + public function setVerbose($verbose) + { + $this->verbose = $verbose; + + return $this; + } + + /** + * Get pedantic. + * + * @return bool + */ + public function isPedantic() + { + return $this->pedantic; + } + + /** + * Set pedantic. + * + * @param bool $pedantic pedantic flag + * + * @return $this + */ + public function setPedantic($pedantic) + { + $this->pedantic = $pedantic; + + return $this; + } + + /** + * Get reconnect. + * + * @return bool + */ + public function isReconnect() + { + return $this->reconnect; + } + + /** + * Set reconnect. + * + * @param bool $reconnect reconnect flag + * + * @return $this + */ + public function setReconnect($reconnect) + { + $this->reconnect = $reconnect; + + return $this; + } + + /** + * Set the connection options. + * + * @param array|Traversable $options the connection options + */ + public function setConnectionOptions($options) + { + $this->initialize($options); + } + + /** + * Initialize the parameters. + * + * @param array|Traversable $options the connection options + * + * @throws Exception when $options are an invalid type + */ + protected function initialize($options) + { + if (is_array($options) === false && ($options instanceof Traversable) === false) { + throw new Exception('The $options argument must be either an array or Traversable'); + } + + foreach ($options as $key => $value) { + if (in_array($key, $this->configurable, true) === false) { + continue; + } + + $method = 'set' . ucfirst($key); + + if (method_exists($this, $method) === true) { + $this->{$method}($value); + } + } + } +} diff --git a/src/nats/src/ConsumerManager.php b/src/nats/src/ConsumerManager.php new file mode 100644 index 000000000..a07356b0c --- /dev/null +++ b/src/nats/src/ConsumerManager.php @@ -0,0 +1,93 @@ +container = $container; + } + + public function run() + { + $classes = AnnotationCollector::getClassByAnnotation(ConsumerAnnotation::class); + /** + * @var string + * @var ConsumerAnnotation $annotation + */ + foreach ($classes as $class => $annotation) { + $instance = make($class); + if (! $instance instanceof AbstractConsumer) { + continue; + } + $annotation->subject && $instance->setSubject($annotation->subject); + $annotation->queue && $instance->setQueue($annotation->queue); + $annotation->name && $instance->setName($annotation->name); + $annotation->pool && $instance->setName($annotation->pool); + + $nums = $annotation->nums; + $process = $this->createProcess($instance); + $process->nums = (int) $nums; + $process->name = $instance->getName() . '-' . $instance->getSubject(); + ProcessManager::register($process); + } + } + + private function createProcess(AbstractConsumer $consumer): AbstractProcess + { + return new class($this->container, $consumer) extends AbstractProcess { + /** + * @var AbstractConsumer + */ + private $consumer; + + /** + * @var Driver\DriverInterface + */ + private $subscriber; + + public function __construct(ContainerInterface $container, AbstractConsumer $consumer) + { + parent::__construct($container); + $this->consumer = $consumer; + + $pool = $this->consumer->getPool(); + $this->subscriber = $this->container->get(DriverFactory::class)->get($pool); + } + + public function handle(): void + { + $this->subscriber->subscribe( + $this->consumer->getSubject(), + $this->consumer->getQueue(), + function ($data) { + $this->consumer->consume($data); + } + ); + } + }; + } +} diff --git a/src/nats/src/Contract/PublishInterface.php b/src/nats/src/Contract/PublishInterface.php new file mode 100644 index 000000000..bd323f9b6 --- /dev/null +++ b/src/nats/src/Contract/PublishInterface.php @@ -0,0 +1,18 @@ +container = $container; + $this->name = $name; + $this->config = $config; + + $this->logger = $container->get(StdoutLoggerInterface::class); + } +} diff --git a/src/nats/src/Driver/DriverFactory.php b/src/nats/src/Driver/DriverFactory.php new file mode 100644 index 000000000..4c8954717 --- /dev/null +++ b/src/nats/src/Driver/DriverFactory.php @@ -0,0 +1,58 @@ +container = $container; + $this->config = $container->get(ConfigInterface::class)->get('nats', []); + } + + public function get($pool = 'default'): DriverInterface + { + if (isset($this->drivers[$pool]) && $this->drivers[$pool] instanceof DriverInterface) { + return $this->drivers[$pool]; + } + + $config = $this->config[$pool] ?? null; + if (empty($config)) { + throw new ConfigNotFoundException(sprintf('The config of %s is not found.', $pool)); + } + + return $this->drivers[$pool] = make($config['driver'], [ + 'name' => $pool, + 'config' => $config, + ]); + } +} diff --git a/src/nats/src/Driver/DriverInterface.php b/src/nats/src/Driver/DriverInterface.php new file mode 100644 index 000000000..eecb96ad5 --- /dev/null +++ b/src/nats/src/Driver/DriverInterface.php @@ -0,0 +1,21 @@ +container->get(PoolFactory::class); + $poolConfig = $config['pool'] ?? []; + + $this->pool = $factory->get('nats' . $this->name, function () use ($config) { + $option = new ConnectionOptions($config['options'] ?? []); + $encoder = make($config['encoder'] ?? JSONEncoder::class); + $timeout = $config['timeout'] ?? null; + $conn = make(EncodedConnection::class, [$option, $encoder]); + $conn->connect($timeout); + return $conn; + }, $poolConfig); + } + + public function publish(string $subject, $payload = null, $inbox = null) + { + try { + /** @var Connection $connection */ + $connection = $this->pool->get(); + /** @var NatsConnection $client */ + $client = $connection->getConnection(); + $client->publish($subject, $payload, $inbox); + } finally { + $connection && $connection->release(); + } + } + + public function request(string $subject, $payload, Closure $callback) + { + try { + /** @var Connection $connection */ + $connection = $this->pool->get(); + /** @var NatsConnection $client */ + $client = $connection->getConnection(); + $client->request($subject, $payload, $callback); + } finally { + $connection && $connection->release(); + } + } + + public function requestSync(string $subject, $payload): Message + { + try { + /** @var Connection $connection */ + $connection = $this->pool->get(); + /** @var NatsConnection $client */ + $client = $connection->getConnection(); + $channel = new Channel(1); + $timeout = floatval($this->config['timeout'] ?? 1.0); + $client->request($subject, $payload, function (Message $message) use ($channel) { + $channel->push($message); + }); + + $message = $channel->pop($timeout); + if (! $message instanceof Message) { + throw new TimeoutException('Request timeout.'); + } + return $message; + } finally { + $connection && $connection->release(); + } + } + + public function subscribe(string $subject, string $queue, Closure $callback): void + { + try { + /** @var Connection $connection */ + $connection = $this->pool->get(); + /** @var NatsConnection $client */ + $client = $connection->getConnection(); + if (empty($queue)) { + $client->subscribe($subject, $callback); + } else { + $client->queueSubscribe($subject, $queue, $callback); + } + $client->wait(); + } finally { + $connection && $connection->release(); + } + } +} diff --git a/src/nats/src/EncodedConnection.php b/src/nats/src/EncodedConnection.php new file mode 100644 index 000000000..4ccbfd356 --- /dev/null +++ b/src/nats/src/EncodedConnection.php @@ -0,0 +1,86 @@ +encoder = $encoder; + parent::__construct($options); + } + + /** + * Publish publishes the data argument to the given subject. + * + * @param string $subject message topic + * @param string $payload message data + * @param string $inbox message inbox + */ + public function publish($subject, $payload = null, $inbox = null) + { + $payload = $this->encoder->encode($payload); + parent::publish($subject, $payload, $inbox); + } + + /** + * Subscribes to an specific event given a subject. + * + * @param string $subject message topic + * @param \Closure $callback closure to be executed as callback + * + * @return string + */ + public function subscribe($subject, \Closure $callback) + { + $c = function ($message) use ($callback) { + $message->setBody($this->encoder->decode($message->getBody())); + $callback($message); + }; + return parent::subscribe($subject, $c); + } + + /** + * Subscribes to an specific event given a subject and a queue. + * + * @param string $subject message topic + * @param string $queue queue name + * @param \Closure $callback closure to be executed as callback + */ + public function queueSubscribe($subject, $queue, \Closure $callback) + { + $c = function ($message) use ($callback) { + $message->setBody($this->encoder->decode($message->getBody())); + $callback($message); + }; + parent::queueSubscribe($subject, $queue, $c); + } +} diff --git a/src/nats/src/Encoders/Encoder.php b/src/nats/src/Encoders/Encoder.php new file mode 100644 index 000000000..8e6e3ccca --- /dev/null +++ b/src/nats/src/Encoders/Encoder.php @@ -0,0 +1,37 @@ +container = $container; + } + + /** + * @return string[] returns the events that you want to listen + */ + public function listen(): array + { + return [ + BeforeMainServerStart::class, + ]; + } + + /** + * Handle the Event when the event is triggered, all listeners will + * complete before the event is returned to the EventDispatcher. + */ + public function process(object $event) + { + // Init the consumer process. + $consumerManager = $this->container->get(ConsumerManager::class); + $consumerManager->run(); + } +} diff --git a/src/nats/src/Message.php b/src/nats/src/Message.php new file mode 100644 index 000000000..ebd983127 --- /dev/null +++ b/src/nats/src/Message.php @@ -0,0 +1,179 @@ +setSubject($subject); + $this->setBody($body); + $this->setSid($sid); + $this->setConn($conn); + } + + /** + * String representation of a message. + * + * @return string + */ + public function __toString() + { + return $this->getBody(); + } + + /** + * Set subject. + * + * @param string $subject subject + * + * @return $this + */ + public function setSubject($subject) + { + $this->subject = $subject; + + return $this; + } + + /** + * Get subject. + * + * @return string + */ + public function getSubject() + { + return $this->subject; + } + + /** + * Set body. + * + * @param string $body body + * + * @return $this + */ + public function setBody($body) + { + $this->body = $body; + return $this; + } + + /** + * Get body. + * + * @return string + */ + public function getBody() + { + return $this->body; + } + + /** + * Set Ssid. + * + * @param string $sid ssid + * + * @return $this + */ + public function setSid($sid) + { + $this->sid = $sid; + return $this; + } + + /** + * Get Ssid. + * + * @return string + */ + public function getSid() + { + return $this->sid; + } + + /** + * Set Conn. + * + * @param Connection $conn connection + * + * @return $this + */ + public function setConn(Connection $conn) + { + $this->conn = $conn; + return $this; + } + + /** + * Get Conn. + * + * @return Connection + */ + public function getConn() + { + return $this->conn; + } + + /** + * Allows you reply the message with a specific body. + * + * @param string $body body to be set + */ + public function reply($body) + { + $this->conn->publish( + $this->subject, + $body + ); + } +} diff --git a/src/nats/src/Php71RandomGenerator.php b/src/nats/src/Php71RandomGenerator.php new file mode 100644 index 000000000..6d8630a1f --- /dev/null +++ b/src/nats/src/Php71RandomGenerator.php @@ -0,0 +1,31 @@ +setServerID($data['server_id']); + $this->setHost($data['host']); + $this->setPort($data['port']); + $this->setVersion($data['version']); + $this->setGoVersion($data['go']); + $this->setAuthRequired($data['auth_required'] ?? false); + $this->setTLSRequired($data['tls_required'] ?? false); + $this->setTLSVerify($data['tls_verify'] ?? false); + $this->setMaxPayload($data['max_payload']); + + if (version_compare($data['version'], '1.1.0') === -1) { + $this->setSSLRequired($data['ssl_required']); + } + } + + /** + * Get the server ID. + * + * @return string server ID + */ + public function getServerID() + { + return $this->serverID; + } + + /** + * Set the server ID. + * + * @param string $serverID server ID + */ + public function setServerID($serverID) + { + $this->serverID = $serverID; + } + + /** + * Get the server host name or ip. + * + * @return string server host + */ + public function getHost() + { + return $this->host; + } + + /** + * Set the server host name or ip. + * + * @param string $host server host + */ + public function setHost($host) + { + $this->host = $host; + } + + /** + * Get server port number. + * + * @return int server port number + */ + public function getPort() + { + return $this->port; + } + + /** + * Set server port number. + * + * @param int $port server port number + */ + public function setPort($port) + { + $this->port = $port; + } + + /** + * Get server version number. + * + * @return string server version number + */ + public function getVersion() + { + return $this->version; + } + + /** + * Set server version number. + * + * @param string $version server version number + */ + public function setVersion($version) + { + $this->version = $version; + } + + /** + * Get the golang version number. + * + * @return string go version number + */ + public function getGoVersion() + { + return $this->goVersion; + } + + /** + * Set the golang version number. + * + * @param string $goVersion go version number + */ + public function setGoVersion($goVersion) + { + $this->goVersion = $goVersion; + } + + /** + * Check if server requires authorization. + * + * @return bool if auth is required + */ + public function isAuthRequired() + { + return $this->authRequired; + } + + /** + * Set if the server requires authorization. + * + * @param bool $authRequired if auth is required + */ + public function setAuthRequired($authRequired) + { + $this->authRequired = $authRequired; + } + + /** + * Check if server requires TLS. + * + * @return bool if TLS is required + */ + public function isTLSRequired() + { + return $this->TLSRequired; + } + + /** + * Set if server requires TLS. + * + * @param bool $TLSRequired if TLS is required + */ + public function setTLSRequired($TLSRequired) + { + $this->TLSRequired = $TLSRequired; + } + + /** + * Check if TLS certificate is verified. + * + * @return bool if TLS certificate is verified + */ + public function isTLSVerify() + { + return $this->TLSVerify; + } + + /** + * Set if server verifies TLS certificate. + * + * @param bool $TLSVerify if TLS certificate is verified + */ + public function setTLSVerify($TLSVerify) + { + $this->TLSVerify = $TLSVerify; + } + + /** + * Check if SSL is required. + * + * @return bool if SSL is required + */ + public function isSSLRequired() + { + return $this->SSLRequired; + } + + /** + * Set if SSL is required. + * + * @param bool $SSLRequired if SSL is required + */ + public function setSSLRequired($SSLRequired) + { + $this->SSLRequired = $SSLRequired; + } + + /** + * Get the max size of the payload. + * + * @return int size in bytes + */ + public function getMaxPayload() + { + return $this->maxPayload; + } + + /** + * Set the max size of the payload. + * + * @param int $maxPayload size in bytes + */ + public function setMaxPayload($maxPayload) + { + $this->maxPayload = $maxPayload; + } + + /** + * Get the server connection URLs. + * + * @return array list of server connection urls + */ + public function getConnectURLs() + { + return $this->connectURLs; + } + + /** + * Set the server connection URLs. + * + * @param array $connectURLs list of server connection urls + */ + public function setConnectURLs(array $connectURLs) + { + $this->connectURLs = $connectURLs; + } +}