From 9b833c26daee7da43d2425c5b7623309181e5e5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= <715557344@qq.com> Date: Fri, 18 Jan 2019 14:47:56 +0800 Subject: [PATCH] Fixed some code not well. --- src/queue/src/Driver/DriverInterface.php | 12 ++++++------ src/queue/src/Driver/RedisDriver.php | 18 +++++++++--------- .../{QueueProcess.php => ConsumerProcess.php} | 4 ++-- 3 files changed, 17 insertions(+), 17 deletions(-) rename src/queue/src/Process/{QueueProcess.php => ConsumerProcess.php} (94%) diff --git a/src/queue/src/Driver/DriverInterface.php b/src/queue/src/Driver/DriverInterface.php index b50102292..039ae1418 100644 --- a/src/queue/src/Driver/DriverInterface.php +++ b/src/queue/src/Driver/DriverInterface.php @@ -19,35 +19,35 @@ interface DriverInterface * Push a job to queue. * @param JobInterface $job */ - public function push(JobInterface $job); + public function push(JobInterface $job): bool; /** * Push a delay job to queue. * @param JobInterface $job * @param int $delay */ - public function delay(JobInterface $job, int $delay = 0); + public function delay(JobInterface $job, int $delay = 0): bool; /** * Pop a job from queue. * @param int $timeout */ - public function pop(int $timeout = 0); + public function pop(int $timeout = 0): array; /** * Ack a job. * @param $data */ - public function ack($data); + public function ack($data): bool; /** * Push a job to failed queue. * @param $data */ - public function fail($data); + public function fail($data): bool; /** * Consume jobs from a queue. */ - public function consume(); + public function consume(): void; } diff --git a/src/queue/src/Driver/RedisDriver.php b/src/queue/src/Driver/RedisDriver.php index aff867ed6..35acffadb 100644 --- a/src/queue/src/Driver/RedisDriver.php +++ b/src/queue/src/Driver/RedisDriver.php @@ -76,14 +76,14 @@ class RedisDriver extends Driver $this->failed = "{$this->channel}:failed"; } - public function push(JobInterface $job) + public function push(JobInterface $job): bool { $message = new Message($job); $data = $this->packer->pack($message); - $this->redis->lPush($this->waiting, $data); + return (bool)$this->redis->lPush($this->waiting, $data); } - public function delay(JobInterface $job, int $delay = 0) + public function delay(JobInterface $job, int $delay = 0): bool { if ($delay === 0) { return $this->push($job); @@ -91,10 +91,10 @@ class RedisDriver extends Driver $message = new Message($job); $data = $this->packer->pack($message); - $this->redis->zAdd($this->delayed, time() + $delay, $data); + return $this->redis->zAdd($this->delayed, time() + $delay, $data) > 0; } - public function pop(int $timeout = 0) + public function pop(int $timeout = 0): array { $this->move($this->delayed); $this->move($this->reserved); @@ -115,15 +115,15 @@ class RedisDriver extends Driver return [$data, $message]; } - public function ack($data) + public function ack($data): bool { - $this->remove($data); + return $this->remove($data); } - public function fail($data) + public function fail($data): bool { if ($this->remove($data)) { - $this->redis->lPush($this->failed, $data); + return (bool)$this->redis->lPush($this->failed, $data); } } diff --git a/src/queue/src/Process/QueueProcess.php b/src/queue/src/Process/ConsumerProcess.php similarity index 94% rename from src/queue/src/Process/QueueProcess.php rename to src/queue/src/Process/ConsumerProcess.php index bed6d46c1..572ece91b 100644 --- a/src/queue/src/Process/QueueProcess.php +++ b/src/queue/src/Process/ConsumerProcess.php @@ -17,7 +17,7 @@ use Hyperf\Queue\Driver\DriverFactory; use Hyperf\Queue\Driver\DriverInterface; use Psr\Container\ContainerInterface; -class QueueProcess extends Process +class ConsumerProcess extends Process { /** * @var string @@ -49,7 +49,7 @@ class QueueProcess extends Process $logger->critical( sprintf( '[CRITICAL] process %s not work expected, please check config [%s]', - QueueProcess::class, + ConsumerProcess::class, 'config/autoload/queue.php' ) );