Merge pull request #300 from limingxinleo/consumer

Optimized async queue and amqp consumer.
This commit is contained in:
李铭昕 2019-08-01 09:39:48 +08:00 committed by GitHub
commit 22a5db3c70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 28 deletions

View File

@ -16,6 +16,7 @@ use Hyperf\Amqp\Exception\MessageException;
use Hyperf\Amqp\Message\ConsumerMessageInterface;
use Hyperf\Amqp\Message\MessageInterface;
use Hyperf\Amqp\Pool\PoolFactory;
use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Container\ContainerInterface;
@ -64,19 +65,30 @@ class Consumer extends Builder
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$deliveryTag = $message->delivery_info['delivery_tag'];
try {
$result = $consumerMessage->consume($data);
if ($result === Result::ACK) {
$this->logger->debug($deliveryTag . ' acked.');
return $channel->basic_ack($deliveryTag);
[$result] = parallel([function () use ($consumerMessage, $data) {
try {
return $consumerMessage->consume($data);
} catch (Throwable $exception) {
if ($this->container->has(FormatterInterface::class)) {
$formatter = $this->container->get(FormatterInterface::class);
$this->logger->error($formatter->format($exception));
} else {
$this->logger->error($exception->getMessage());
}
return Result::DROP;
}
if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) {
$this->logger->debug($deliveryTag . ' requeued.');
return $channel->basic_reject($deliveryTag, true);
}
} catch (Throwable $exception) {
$this->logger->debug($exception->getMessage());
}]);
if ($result === Result::ACK) {
$this->logger->debug($deliveryTag . ' acked.');
return $channel->basic_ack($deliveryTag);
}
if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) {
$this->logger->debug($deliveryTag . ' requeued.');
return $channel->basic_reject($deliveryTag, true);
}
$this->logger->debug($deliveryTag . ' rejected.');
$channel->basic_reject($deliveryTag, false);
}

View File

@ -60,23 +60,25 @@ abstract class Driver implements DriverInterface
continue;
}
try {
if ($message instanceof MessageInterface) {
$this->event && $this->event->dispatch(new BeforeHandle($message));
$message->job()->handle();
$this->event && $this->event->dispatch(new AfterHandle($message));
}
parallel([function () use ($message, $data) {
try {
if ($message instanceof MessageInterface) {
$this->event && $this->event->dispatch(new BeforeHandle($message));
$message->job()->handle();
$this->event && $this->event->dispatch(new AfterHandle($message));
}
$this->ack($data);
} catch (\Throwable $ex) {
if ($message->attempts() && $this->remove($data)) {
$this->event && $this->event->dispatch(new RetryHandle($message, $ex));
$this->retry($message);
} else {
$this->event && $this->event->dispatch(new FailedHandle($message, $ex));
$this->fail($data);
$this->ack($data);
} catch (\Throwable $ex) {
if ($message->attempts() && $this->remove($data)) {
$this->event && $this->event->dispatch(new RetryHandle($message, $ex));
$this->retry($message);
} else {
$this->event && $this->event->dispatch(new FailedHandle($message, $ex));
$this->fail($data);
}
}
}
}]);
}
}

View File

@ -17,7 +17,7 @@ abstract class Job implements JobInterface
/**
* @var int
*/
protected $maxAttempts = 1;
protected $maxAttempts = 0;
public function getMaxAttempts(): int
{

View File

@ -36,7 +36,7 @@ class Message implements MessageInterface
public function attempts(): bool
{
if ($this->job->getMaxAttempts() > ++$this->attempts) {
if ($this->job->getMaxAttempts() > $this->attempts++) {
return true;
}
return false;