Format amqp component

This commit is contained in:
huangzhhui 2019-02-05 21:56:38 +08:00
parent 3f487b3558
commit 3bde09ac81
20 changed files with 97 additions and 70 deletions

View File

@ -1,7 +1,16 @@
<?php
namespace Hyperf\Amqp\Annotation;
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
use Hyperf\Di\Annotation\AnnotationCollector;
@ -12,7 +21,6 @@ use Hyperf\Di\Annotation\AnnotationCollector;
*/
class Consumer extends AbstractAnnotation
{
/**
* @var string
*/
@ -61,5 +69,4 @@ class Consumer extends AbstractAnnotation
]);
}
}
}
}

View File

@ -1,5 +1,15 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Annotation;
use Hyperf\Di\Annotation\AbstractAnnotation;
@ -11,7 +21,6 @@ use Hyperf\Di\Annotation\AnnotationCollector;
*/
class Producer extends AbstractAnnotation
{
/**
* @var string
*/
@ -42,5 +51,4 @@ class Producer extends AbstractAnnotation
]);
}
}
}
}

View File

@ -40,7 +40,7 @@ class Builder
}
/**
* @throws AMQPProtocolChannelException When the channel operation is failed.
* @throws AMQPProtocolChannelException when the channel operation is failed
*/
public function declare(MessageInterface $message, ?AMQPChannel $channel = null): void
{
@ -64,5 +64,4 @@ class Builder
{
return $this->poolFactory->getConnectionPool($poolName);
}
}

View File

@ -13,18 +13,15 @@ declare(strict_types=1);
namespace Hyperf\Amqp;
use Hyperf\Amqp\Connection\AMQPSwooleConnection;
use Hyperf\Amqp\Pool\AmqpChannelPool;
use Hyperf\Amqp\Pool\AmqpConnectionPool;
use Hyperf\Contract\ConnectionInterface;
use Hyperf\Pool\Connection as BaseConnection;
use Hyperf\Utils\Arr;
use Hyperf\Utils\Coroutine;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel as SwooleChannel;
class Connection extends BaseConnection implements ConnectionInterface
{
@ -164,5 +161,4 @@ class Connection extends BaseConnection implements ConnectionInterface
return false;
}
}

View File

@ -24,7 +24,6 @@ use Throwable;
class Consumer extends Builder
{
/**
* @var bool
*/
@ -54,7 +53,6 @@ class Consumer extends Builder
$this->logger = $logger;
}
public function signalHandler(): void
{
$this->status = false;
@ -93,7 +91,8 @@ class Consumer extends Builder
if ($result === Result::ACK) {
$this->logger->debug($deliveryTag . ' acked.');
return $channel->basic_ack($deliveryTag);
} elseif ($consumerMessage->isRequeue() && $result === Result::REQUEUE) {
}
if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) {
$this->logger->debug($deliveryTag . ' requeued.');
return $channel->basic_reject($deliveryTag, true);
}

View File

@ -1,7 +1,16 @@
<?php
namespace Hyperf\Amqp;
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp;
use Hyperf\Amqp\Pool\PoolFactory;
use Hyperf\Framework\Contract\StdoutLoggerInterface;
@ -9,10 +18,8 @@ use Psr\Container\ContainerInterface;
class ConsumerFactory
{
public function __invoke(ContainerInterface $container)
{
return new Consumer($container, $container->get(PoolFactory::class), $container->get(StdoutLoggerInterface::class));
}
}
}

View File

@ -1,20 +1,27 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp;
use Doctrine\Instantiator\Instantiator;
use Hyperf\Amqp\Message\ConsumerInterface;
use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation;
use Hyperf\Amqp\Message\ConsumerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Amqp\Annotation\Consumer as ConsumerAnnotation;
use Hyperf\Process\ProcessRegister;
use Hyperf\Process\Process;
use Hyperf\Process\ProcessRegister;
use Psr\Container\ContainerInterface;
class ConsumerManager
{
/**
* @var ContainerInterface
*/
@ -40,7 +47,7 @@ class ConsumerManager
property_exists($instance, 'container') && $instance->container = $this->container;
$nums = $property['nums'] ?? 1;
$process = $this->createProcess($instance);
$process->nums = (int)$nums;
$process->nums = (int) $nums;
$process->name = 'Consumer-' . $property['queue'];
ProcessRegister::register($process);
}
@ -48,9 +55,7 @@ class ConsumerManager
private function createProcess(ConsumerMessageInterface $consumerMessage): Process
{
return new class($this->container, $consumerMessage) extends Process
{
return new class($this->container, $consumerMessage) extends Process {
/**
* @var \Hyperf\Amqp\Consumer
*/
@ -74,5 +79,4 @@ class ConsumerManager
}
};
}
}
}

View File

@ -1,5 +1,15 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Listener;
use Hyperf\Amqp\ConsumerManager;
@ -9,11 +19,10 @@ use Hyperf\Framework\Event\BeforeMainServerStart;
use Psr\Container\ContainerInterface;
/**
* @Listener()
* @Listener
*/
class BeforeMainServerStartListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
@ -43,7 +52,5 @@ class BeforeMainServerStartListener implements ListenerInterface
// Init the consumer process.
$consumerManager = $this->container->get(ConsumerManager::class);
$consumerManager->run();
}
}
}

View File

@ -1,5 +1,15 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Listener;
use Doctrine\Instantiator\Instantiator;
@ -8,7 +18,6 @@ use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\ApplicationContext;
use Hyperf\Framework\Contract\StdoutLoggerInterface;
use Hyperf\Framework\Event\MainWorkerStart;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
@ -16,11 +25,10 @@ use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
/**
* @Listener()
* @Listener
*/
class MainWorkerStartListener implements ListenerInterface
{
/**
* @var ContainerInterface
*/
@ -53,7 +61,6 @@ class MainWorkerStartListener implements ListenerInterface
*/
public function process(object $event)
{
// Declare exchange and routingKey
$producerMessages = AnnotationCollector::getClassByAnnotation(Producer::class);
if ($producerMessages) {
@ -76,5 +83,4 @@ class MainWorkerStartListener implements ListenerInterface
}
}
}
}
}

View File

@ -19,6 +19,10 @@ use Psr\Container\ContainerInterface;
abstract class ConsumerMessage extends Message implements ConsumerMessageInterface
{
/**
* @var ContainerInterface
*/
public $container;
/**
* @var string
@ -30,11 +34,6 @@ abstract class ConsumerMessage extends Message implements ConsumerMessageInterfa
*/
protected $requeue = true;
/**
* @var ContainerInterface
*/
public $container;
public function setQueue(string $queue): self
{
$this->queue = $queue;

View File

@ -12,12 +12,10 @@ declare(strict_types=1);
namespace Hyperf\Amqp\Message;
use Hyperf\Amqp\Builder\QueueBuilder;
interface ConsumerMessageInterface extends MessageInterface
{
public function consume($data): string;
public function setQueue(string $queue);

View File

@ -17,7 +17,6 @@ use Hyperf\Amqp\Exception\MessageException;
abstract class Message implements MessageInterface
{
/**
* @var string
*/

View File

@ -12,7 +12,6 @@ declare(strict_types=1);
namespace Hyperf\Amqp\Message;
use Hyperf\Amqp\Builder\ExchangeBuilder;
interface MessageInterface

View File

@ -18,7 +18,6 @@ use Hyperf\Framework\ApplicationContext;
abstract class ProducerMessage extends Message implements ProducerMessageInterface
{
/**
* @var string
*/

View File

@ -14,11 +14,9 @@ namespace Hyperf\Amqp\Message;
interface ProducerMessageInterface extends MessageInterface
{
public function setPayload($data);
public function payload(): string;
public function getProperties(): array;
}

View File

@ -28,5 +28,4 @@ class Type
self::TOPIC,
];
}
}

View File

@ -46,6 +46,11 @@ class AmqpConnectionPool extends Pool
return $this->name;
}
public function release(ConnectionInterface $connection): void
{
parent::release($connection);
}
protected function initOption()
{
if ($poolOptions = Arr::get($this->config, 'pool')) {
@ -66,10 +71,4 @@ class AmqpConnectionPool extends Pool
{
return new Connection($this->container, $this, $this->config);
}
public function release(ConnectionInterface $connection): void
{
parent::release($connection);
}
}

View File

@ -39,5 +39,4 @@ class PoolFactory
return $this->pools[$name] = new AmqpConnectionPool($this->container, $name);
}
}

View File

@ -18,13 +18,12 @@ use PhpAmqpLib\Message\AMQPMessage;
class Producer extends Builder
{
public function produce(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5): bool
{
$result = false;
$this->injectMessageProperty($producerMessage);
$message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
$pool = $this->getConnectionPool($producerMessage->getPoolName());
/** @var \Hyperf\Amqp\Connection $connection */
@ -50,9 +49,8 @@ class Producer extends Builder
foreach ($item as $key => $value) {
$setter = setter($key);
if (method_exists($producerMessage, $setter)) {
$producerMessage->$setter($value);
$producerMessage->{$setter}($value);
}
}
}
}

View File

@ -1,11 +1,19 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://hyperf.org
* @document https://wiki.hyperf.org
* @contact group@hyperf.org
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp;
class Result
{
/**
* Acknowledge the message.
*/
@ -20,5 +28,4 @@ class Result
* Reject the message and drop it.
*/
const DROP = 'drop';
}
}