Optimized nats consumer process restart frequently. (#907)

This commit is contained in:
李铭昕 2019-11-13 10:18:35 +08:00 committed by GitHub
parent 28091318ec
commit 5bbbe4a9b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 9 deletions

View File

@ -15,9 +15,14 @@
- [#903](https://github.com/hyperf/hyperf/pull/903) Fixed execute `init-proxy` command can not stop when `hyperf/rpc-client` component exists.
- [#904](https://github.com/hyperf/hyperf/pull/904) Fixed the hooked I/O request does not works in the listener that listening `Hyperf\Framework\Event\BeforeMainServerStart` event.
- [#906](https://github.com/hyperf/hyperf/pull/906) Fixed `port` property of URI of `Hyperf\HttpMessage\Server\Request`.
- [#907](https://github.com/hyperf/hyperf/pull/907) Fixed the expire time is double of the config for `requestSync` in nats.
- [#909](https://github.com/hyperf/hyperf/pull/909) Fixed a issue that causes staled parallel execution.
- [#932](https://github.com/hyperf/hyperf/pull/932) Fixed `Translator::setLocale` does not works in coroutine evnironment.
## Optimized
- [#907](https://github.com/hyperf/hyperf/pull/907) Optimized nats consumer process restart frequently.
# v1.1.5 - 2019-11-07
## Added

View File

@ -12,6 +12,7 @@ declare(strict_types=1);
namespace Hyperf\Nats;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Nats\Annotation\Consumer as ConsumerAnnotation;
use Hyperf\Nats\Driver\DriverFactory;
@ -69,6 +70,11 @@ class ConsumerManager
*/
private $subscriber;
/**
* @var StdoutLoggerInterface
*/
private $logger;
public function __construct(ContainerInterface $container, AbstractConsumer $consumer)
{
parent::__construct($container);
@ -76,17 +82,27 @@ class ConsumerManager
$pool = $this->consumer->getPool();
$this->subscriber = $this->container->get(DriverFactory::class)->get($pool);
$this->logger = $container->get(StdoutLoggerInterface::class);
}
public function handle(): void
{
$this->subscriber->subscribe(
$this->consumer->getSubject(),
$this->consumer->getQueue(),
function ($data) {
$this->consumer->consume($data);
}
);
while (true) {
$this->subscriber->subscribe(
$this->consumer->getSubject(),
$this->consumer->getQueue(),
function ($data) {
$this->consumer->consume($data);
}
);
$this->logger->warning(sprintf(
'NatsConsumer[%s] subscribe timeout. Try again after 1 ms.',
$this->consumer->getName()
));
usleep(1000);
}
}
};
}

View File

@ -82,12 +82,11 @@ class NatsDriver extends AbstractDriver
/** @var NatsConnection $client */
$client = $connection->getConnection();
$channel = new Channel(1);
$timeout = floatval($this->config['timeout'] ?? 1.0);
$client->request($subject, $payload, function (Message $message) use ($channel) {
$channel->push($message);
});
$message = $channel->pop($timeout);
$message = $channel->pop(0.001);
if (! $message instanceof Message) {
throw new TimeoutException('Request timeout.');
}