mirror of
https://gitee.com/hyperf/hyperf.git
synced 2024-11-29 18:27:44 +08:00
Added nats component.
This commit is contained in:
parent
b902ee0a80
commit
f94d83fc12
@ -18,6 +18,7 @@
|
||||
"doctrine/instantiator": "^1.0",
|
||||
"egulias/email-validator": "^2.1",
|
||||
"elasticsearch/elasticsearch": "^6.1",
|
||||
"endclothing/prometheus_client_php": "^0.9.1",
|
||||
"fig/http-message-util": "^1.1.2",
|
||||
"google/protobuf": "^3.6.1",
|
||||
"grpc/grpc": "^1.15",
|
||||
@ -36,12 +37,12 @@
|
||||
"psr/http-server-middleware": "^1.0",
|
||||
"psr/log": "^1.0",
|
||||
"psr/simple-cache": "^1.0",
|
||||
"repejota/nats": "^0.8.7",
|
||||
"squizlabs/php_codesniffer": "^3.4",
|
||||
"start-point/etcd-php": "^1.1",
|
||||
"symfony/console": "^4.2",
|
||||
"symfony/finder": "^4.1",
|
||||
"vlucas/phpdotenv": "^3.1",
|
||||
"endclothing/prometheus_client_php": "^0.9.1"
|
||||
"vlucas/phpdotenv": "^3.1"
|
||||
},
|
||||
"require-dev": {
|
||||
"doctrine/common": "@stable",
|
||||
@ -155,6 +156,7 @@
|
||||
"Hyperf\\Metric\\": "src/metric/src/",
|
||||
"Hyperf\\ModelCache\\": "src/model-cache/src/",
|
||||
"Hyperf\\ModelListener\\": "src/model-listener/src/",
|
||||
"Hyperf\\Nats\\": "src/nats/src/",
|
||||
"Hyperf\\Paginator\\": "src/paginator/src/",
|
||||
"Hyperf\\Pool\\": "src/pool/src/",
|
||||
"Hyperf\\Process\\": "src/process/src/",
|
||||
@ -216,6 +218,7 @@
|
||||
"HyperfTest\\Metric\\": "src/metric/tests/",
|
||||
"HyperfTest\\ModelCache\\": "src/model-cache/tests/",
|
||||
"HyperfTest\\ModelListener\\": "src/model-listener/tests/",
|
||||
"HyperfTest\\Nats\\": "src/nats/tests/",
|
||||
"HyperfTest\\Paginator\\": "src/paginator/tests/",
|
||||
"HyperfTest\\Pool\\": "src/pool/tests/",
|
||||
"HyperfTest\\Process\\": "src/process/tests/",
|
||||
@ -271,6 +274,7 @@
|
||||
"Hyperf\\Metric\\ConfigProvider",
|
||||
"Hyperf\\ModelCache\\ConfigProvider",
|
||||
"Hyperf\\ModelListener\\ConfigProvider",
|
||||
"Hyperf\\Nats\\ConfigProvider",
|
||||
"Hyperf\\Paginator\\ConfigProvider",
|
||||
"Hyperf\\Pool\\ConfigProvider",
|
||||
"Hyperf\\Process\\ConfigProvider",
|
||||
|
1
src/nats/.gitattributes
vendored
Normal file
1
src/nats/.gitattributes
vendored
Normal file
@ -0,0 +1 @@
|
||||
/tests export-ignore
|
21
src/nats/LICENSE
Normal file
21
src/nats/LICENSE
Normal file
@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) Hyperf
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
56
src/nats/composer.json
Normal file
56
src/nats/composer.json
Normal file
@ -0,0 +1,56 @@
|
||||
{
|
||||
"name": "hyperf/nats",
|
||||
"description": "A nats library for Hyperf.",
|
||||
"license": "MIT",
|
||||
"keywords": [
|
||||
"php",
|
||||
"swoole",
|
||||
"hyperf",
|
||||
"nats"
|
||||
],
|
||||
"support": {
|
||||
},
|
||||
"require": {
|
||||
"php": ">=7.2",
|
||||
"hyperf/contract": "~1.1.0",
|
||||
"hyperf/pool": "~1.1.0",
|
||||
"hyperf/utils": "~1.1.0",
|
||||
"psr/container": "^1.0",
|
||||
"repejota/nats": "^0.8.7"
|
||||
},
|
||||
"require-dev": {
|
||||
"malukenho/docheader": "^0.1.6",
|
||||
"mockery/mockery": "^1.0",
|
||||
"phpunit/phpunit": "^7.0.0",
|
||||
"friendsofphp/php-cs-fixer": "^2.9"
|
||||
},
|
||||
"suggest": {
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {
|
||||
"Hyperf\\Nats\\": "src/"
|
||||
}
|
||||
},
|
||||
"autoload-dev": {
|
||||
"psr-4": {
|
||||
"HyperfTest\\Nats\\": "tests/"
|
||||
}
|
||||
},
|
||||
"config": {
|
||||
"sort-packages": true
|
||||
},
|
||||
"extra": {
|
||||
"branch-alias": {
|
||||
"dev-master": "1.1-dev"
|
||||
},
|
||||
"hyperf": {
|
||||
"config": "Hyperf\\Nats\\ConfigProvider"
|
||||
}
|
||||
},
|
||||
"bin": [
|
||||
],
|
||||
"scripts": {
|
||||
"cs-fix": "php-cs-fixer fix $1",
|
||||
"test": "phpunit --colors=always"
|
||||
}
|
||||
}
|
95
src/nats/src/AbstractConsumer.php
Normal file
95
src/nats/src/AbstractConsumer.php
Normal file
@ -0,0 +1,95 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
use Nats\Message;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
abstract class AbstractConsumer
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
public $pool = 'default';
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $subject = '';
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $name = 'NatsConsumer';
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $nums = 1;
|
||||
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $container;
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
$this->container = $container;
|
||||
}
|
||||
|
||||
abstract public function handle(Message $payload);
|
||||
|
||||
public function getSubject(): string
|
||||
{
|
||||
return $this->subject;
|
||||
}
|
||||
|
||||
public function setSubject(string $subject): self
|
||||
{
|
||||
$this->subject = $subject;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getName(): string
|
||||
{
|
||||
return $this->name;
|
||||
}
|
||||
|
||||
public function setName(string $name): self
|
||||
{
|
||||
$this->name = $name;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getNums(): int
|
||||
{
|
||||
return $this->nums;
|
||||
}
|
||||
|
||||
public function setNums(int $nums): self
|
||||
{
|
||||
$this->nums = $nums;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getPool(): string
|
||||
{
|
||||
return $this->pool;
|
||||
}
|
||||
|
||||
public function setPool(string $pool): self
|
||||
{
|
||||
$this->pool = $pool;
|
||||
return $this;
|
||||
}
|
||||
}
|
42
src/nats/src/Annotation/Consumer.php
Normal file
42
src/nats/src/Annotation/Consumer.php
Normal file
@ -0,0 +1,42 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Annotation;
|
||||
|
||||
use Hyperf\Di\Annotation\AbstractAnnotation;
|
||||
|
||||
/**
|
||||
* @Annotation
|
||||
* @Target({"CLASS"})
|
||||
*/
|
||||
class Consumer extends AbstractAnnotation
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
public $subject = '';
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
public $name = '';
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
public $nums = 1;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
public $pool = '';
|
||||
}
|
31
src/nats/src/ConfigProvider.php
Normal file
31
src/nats/src/ConfigProvider.php
Normal file
@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
class ConfigProvider
|
||||
{
|
||||
public function __invoke(): array
|
||||
{
|
||||
return [
|
||||
'dependencies' => [
|
||||
],
|
||||
'annotations' => [
|
||||
'scan' => [
|
||||
'paths' => [
|
||||
__DIR__,
|
||||
],
|
||||
],
|
||||
],
|
||||
];
|
||||
}
|
||||
}
|
90
src/nats/src/ConsumerManager.php
Normal file
90
src/nats/src/ConsumerManager.php
Normal file
@ -0,0 +1,90 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
use Hyperf\Di\Annotation\AnnotationCollector;
|
||||
use Hyperf\Nats\Annotation\Consumer as ConsumerAnnotation;
|
||||
use Hyperf\Nats\Driver\DriverFactory;
|
||||
use Hyperf\Process\AbstractProcess;
|
||||
use Hyperf\Process\ProcessManager;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class ConsumerManager
|
||||
{
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
private $container;
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
$this->container = $container;
|
||||
}
|
||||
|
||||
public function run()
|
||||
{
|
||||
$classes = AnnotationCollector::getClassByAnnotation(ConsumerAnnotation::class);
|
||||
/**
|
||||
* @var string
|
||||
* @var ConsumerAnnotation $annotation
|
||||
*/
|
||||
foreach ($classes as $class => $annotation) {
|
||||
$instance = make($class);
|
||||
if (! $instance instanceof AbstractConsumer) {
|
||||
continue;
|
||||
}
|
||||
$annotation->subject && $instance->setSubject($annotation->subject);
|
||||
$annotation->name && $instance->setName($annotation->name);
|
||||
$annotation->pool && $instance->setName($annotation->pool);
|
||||
|
||||
$nums = $annotation->nums;
|
||||
$process = $this->createProcess($instance);
|
||||
$process->nums = (int) $nums;
|
||||
$process->name = $instance->getName() . '-' . $instance->getSubject();
|
||||
ProcessManager::register($process);
|
||||
}
|
||||
}
|
||||
|
||||
private function createProcess(AbstractConsumer $consumer): AbstractProcess
|
||||
{
|
||||
return new class($this->container, $consumer) extends AbstractProcess {
|
||||
/**
|
||||
* @var AbstractConsumer
|
||||
*/
|
||||
private $consumer;
|
||||
|
||||
/**
|
||||
* @var Driver\DriverInterface
|
||||
*/
|
||||
private $subscriber;
|
||||
|
||||
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
|
||||
{
|
||||
parent::__construct($container);
|
||||
$this->consumer = $consumer;
|
||||
|
||||
$pool = $this->consumer->getPool();
|
||||
$this->subscriber = $this->container->get(DriverFactory::class)->get($pool);
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
$this->subscriber->subscribe($this->consumer->getSubject(), function ($data) {
|
||||
$this->consumer->handle($data);
|
||||
});
|
||||
|
||||
$this->subscriber->wait();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
18
src/nats/src/Contract/PublishInterface.php
Normal file
18
src/nats/src/Contract/PublishInterface.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
interface PublishInterface
|
||||
{
|
||||
public function publish(string $subject, $payload = null, $inbox = null);
|
||||
}
|
20
src/nats/src/Contract/RequestInterface.php
Normal file
20
src/nats/src/Contract/RequestInterface.php
Normal file
@ -0,0 +1,20 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
use Closure;
|
||||
|
||||
interface RequestInterface
|
||||
{
|
||||
public function request(string $subject, $payload, Closure $callback);
|
||||
}
|
20
src/nats/src/Contract/SubscribeInterface.php
Normal file
20
src/nats/src/Contract/SubscribeInterface.php
Normal file
@ -0,0 +1,20 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats;
|
||||
|
||||
use Closure;
|
||||
|
||||
interface SubscribeInterface
|
||||
{
|
||||
public function subscribe(string $subject, Closure $callback): void;
|
||||
}
|
58
src/nats/src/Driver/AbstractDriver.php
Normal file
58
src/nats/src/Driver/AbstractDriver.php
Normal file
@ -0,0 +1,58 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Driver;
|
||||
|
||||
use Hyperf\Contract\StdoutLoggerInterface;
|
||||
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
abstract class AbstractDriver implements DriverInterface
|
||||
{
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $container;
|
||||
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $name;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $config;
|
||||
|
||||
/**
|
||||
* @var StdoutLoggerInterface
|
||||
*/
|
||||
protected $logger;
|
||||
|
||||
public function __construct(ContainerInterface $container, string $name, array $config)
|
||||
{
|
||||
$this->container = $container;
|
||||
$this->name = $name;
|
||||
$this->config = $config;
|
||||
|
||||
$this->logger = $container->get(StdoutLoggerInterface::class);
|
||||
}
|
||||
|
||||
protected function formatThrowable(\Throwable $throwable): string
|
||||
{
|
||||
if ($this->container->has(FormatterInterface::class)) {
|
||||
return $this->container->get(FormatterInterface::class)->format($throwable);
|
||||
}
|
||||
|
||||
return $throwable->getMessage();
|
||||
}
|
||||
}
|
58
src/nats/src/Driver/DriverFactory.php
Normal file
58
src/nats/src/Driver/DriverFactory.php
Normal file
@ -0,0 +1,58 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Driver;
|
||||
|
||||
use Hyperf\Contract\ConfigInterface;
|
||||
use Hyperf\Nats\Exception\ConfigNotFoundException;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class DriverFactory
|
||||
{
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
protected $container;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $config;
|
||||
|
||||
/**
|
||||
* @var DriverInterface[]
|
||||
*/
|
||||
protected $drivers = [];
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
$this->container = $container;
|
||||
$this->config = $container->get(ConfigInterface::class)->get('squeue', []);
|
||||
}
|
||||
|
||||
public function get($pool = 'default'): DriverInterface
|
||||
{
|
||||
if (isset($this->drivers[$pool]) && $this->drivers[$pool] instanceof DriverInterface) {
|
||||
return $this->drivers[$pool];
|
||||
}
|
||||
|
||||
$config = $this->config[$pool] ?? null;
|
||||
if (empty($config)) {
|
||||
throw new ConfigNotFoundException(sprintf('The config of %s is not found.', $pool));
|
||||
}
|
||||
|
||||
return $this->drivers[$pool] = make($config['driver'], [
|
||||
'name' => $pool,
|
||||
'config' => $config,
|
||||
]);
|
||||
}
|
||||
}
|
21
src/nats/src/Driver/DriverInterface.php
Normal file
21
src/nats/src/Driver/DriverInterface.php
Normal file
@ -0,0 +1,21 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Driver;
|
||||
|
||||
use Hyperf\Nats\PublishInterface;
|
||||
use Hyperf\Nats\RequestInterface;
|
||||
use Hyperf\Nats\SubscribeInterface;
|
||||
|
||||
interface DriverInterface extends PublishInterface, RequestInterface, SubscribeInterface
|
||||
{
|
||||
}
|
85
src/nats/src/Driver/NatsDriver.php
Normal file
85
src/nats/src/Driver/NatsDriver.php
Normal file
@ -0,0 +1,85 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Driver;
|
||||
|
||||
use Closure;
|
||||
use Hyperf\Pool\SimplePool\Connection;
|
||||
use Hyperf\Pool\SimplePool\PoolFactory;
|
||||
use Nats\ConnectionOptions;
|
||||
use Nats\EncodedConnection;
|
||||
use Nats\Encoders\JSONEncoder;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class NatsDriver extends AbstractDriver
|
||||
{
|
||||
/**
|
||||
* @var \Hyperf\Pool\SimplePool\Pool
|
||||
*/
|
||||
protected $pool;
|
||||
|
||||
public function __construct(ContainerInterface $container, string $name, array $config)
|
||||
{
|
||||
parent::__construct($container, $name, $config);
|
||||
|
||||
$factory = $this->container->get(PoolFactory::class);
|
||||
$poolConfig = $config['pool'] ?? [];
|
||||
|
||||
$this->pool = $factory->get('squeue' . $this->name, function () use ($config) {
|
||||
$option = new ConnectionOptions($config['options'] ?? []);
|
||||
$encoder = make($config['encoder'] ?? JSONEncoder::class);
|
||||
$conn = make(EncodedConnection::class, [$option, $encoder]);
|
||||
$conn->connect();
|
||||
return $conn;
|
||||
}, $poolConfig);
|
||||
}
|
||||
|
||||
public function publish(string $subject, $payload = null, $inbox = null)
|
||||
{
|
||||
try {
|
||||
/** @var Connection $connection */
|
||||
$connection = $this->pool->get();
|
||||
/** @var \Nats\Connection $client */
|
||||
$client = $connection->getConnection();
|
||||
$client->publish($subject, $payload, $inbox);
|
||||
} finally {
|
||||
$connection->release();
|
||||
}
|
||||
}
|
||||
|
||||
public function request(string $subject, $payload, Closure $callback)
|
||||
{
|
||||
try {
|
||||
/** @var Connection $connection */
|
||||
$connection = $this->pool->get();
|
||||
/** @var \Nats\Connection $client */
|
||||
$client = $connection->getConnection();
|
||||
$client->request($subject, $payload, $callback);
|
||||
} finally {
|
||||
$connection->release();
|
||||
}
|
||||
}
|
||||
|
||||
public function subscribe(string $subject, Closure $callback): void
|
||||
{
|
||||
try {
|
||||
/** @var Connection $connection */
|
||||
$connection = $this->pool->get();
|
||||
/** @var \Nats\Connection $client */
|
||||
$client = $connection->getConnection();
|
||||
$client->subscribe($subject, $callback);
|
||||
$client->wait();
|
||||
} finally {
|
||||
$connection->release();
|
||||
}
|
||||
}
|
||||
}
|
19
src/nats/src/Exception/ConfigNotFoundException.php
Normal file
19
src/nats/src/Exception/ConfigNotFoundException.php
Normal file
@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
class ConfigNotFoundException extends RuntimeException
|
||||
{
|
||||
}
|
19
src/nats/src/Exception/DriverException.php
Normal file
19
src/nats/src/Exception/DriverException.php
Normal file
@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Exception;
|
||||
|
||||
use RuntimeException;
|
||||
|
||||
class DriverException extends RuntimeException
|
||||
{
|
||||
}
|
57
src/nats/src/Listener/BeforeMainServerStartListener.php
Normal file
57
src/nats/src/Listener/BeforeMainServerStartListener.php
Normal file
@ -0,0 +1,57 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Listener;
|
||||
|
||||
use Hyperf\Event\Annotation\Listener;
|
||||
use Hyperf\Event\Contract\ListenerInterface;
|
||||
use Hyperf\Framework\Event\BeforeMainServerStart;
|
||||
use Hyperf\Nats\ConsumerManager;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
/**
|
||||
* Must handle the event before `Hyperf\Process\Listener\BootProcessListener`.
|
||||
* @Listener(priority=99)
|
||||
*/
|
||||
class BeforeMainServerStartListener implements ListenerInterface
|
||||
{
|
||||
/**
|
||||
* @var ContainerInterface
|
||||
*/
|
||||
private $container;
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
$this->container = $container;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string[] returns the events that you want to listen
|
||||
*/
|
||||
public function listen(): array
|
||||
{
|
||||
return [
|
||||
BeforeMainServerStart::class,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the Event when the event is triggered, all listeners will
|
||||
* complete before the event is returned to the EventDispatcher.
|
||||
*/
|
||||
public function process(object $event)
|
||||
{
|
||||
// Init the consumer process.
|
||||
$consumerManager = $this->container->get(ConsumerManager::class);
|
||||
$consumerManager->run();
|
||||
}
|
||||
}
|
36
src/nats/src/Process/ConsumerProcess.php
Normal file
36
src/nats/src/Process/ConsumerProcess.php
Normal file
@ -0,0 +1,36 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
/**
|
||||
* This file is part of Hyperf.
|
||||
*
|
||||
* @link https://www.hyperf.io
|
||||
* @document https://doc.hyperf.io
|
||||
* @contact group@hyperf.io
|
||||
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
namespace Hyperf\Nats\Process;
|
||||
|
||||
use Hyperf\Process\AbstractProcess;
|
||||
use Psr\Container\ContainerInterface;
|
||||
|
||||
class ConsumerProcess extends AbstractProcess
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
protected $queue = 'default';
|
||||
|
||||
public function __construct(ContainerInterface $container)
|
||||
{
|
||||
parent::__construct($container);
|
||||
|
||||
$this->name = "squeue.{$this->queue}";
|
||||
$this->nums = $this->config['processes'] ?? 1;
|
||||
}
|
||||
|
||||
public function handle(): void
|
||||
{
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user