Merge branch 'master' of github.com:hyperf-cloud/hyperf

This commit is contained in:
huangzhhui 2019-01-18 16:16:23 +08:00
commit 2c6d064cf7
4 changed files with 20 additions and 27 deletions

View File

@ -45,7 +45,7 @@ abstract class Driver implements DriverInterface
$this->packer = $container->get($config['packer'] ?? PhpSerializer::class);
$this->event = $container->get(EventDispatcherInterface::class);
if (! $this->packer instanceof PackerInterface) {
if (!$this->packer instanceof PackerInterface) {
throw new InvalidPackerException(sprintf('[Error] %s is not a invalid packer.', $config['packer']));
}
}
@ -84,5 +84,5 @@ abstract class Driver implements DriverInterface
*
* @param MessageInterface $message
*/
abstract protected function retry(MessageInterface $message);
abstract protected function retry(MessageInterface $message): bool;
}

View File

@ -20,7 +20,7 @@ interface DriverInterface
*
* @param JobInterface $job
*/
public function push(JobInterface $job);
public function push(JobInterface $job): bool;
/**
* Push a delay job to queue.
@ -28,28 +28,28 @@ interface DriverInterface
* @param JobInterface $job
* @param int $delay
*/
public function delay(JobInterface $job, int $delay = 0);
public function delay(JobInterface $job, int $delay = 0): bool;
/**
* Pop a job from queue.
*
* @param int $timeout
*/
public function pop(int $timeout = 0);
public function pop(int $timeout = 0): array;
/**
* Ack a job.
*
* @param $data
*/
public function ack($data);
public function ack($data): bool;
/**
* Push a job to failed queue.
*
* @param $data
*/
public function fail($data);
public function fail($data): bool;
/**
* Consume jobs from a queue.

View File

@ -81,14 +81,14 @@ class RedisDriver extends Driver
$this->failed = "{$this->channel}:failed";
}
public function push(JobInterface $job): void
public function push(JobInterface $job): bool
{
$message = new Message($job);
$data = $this->packer->pack($message);
$this->redis->lPush($this->waiting, $data);
return (bool)$this->redis->lPush($this->waiting, $data);
}
public function delay(JobInterface $job, int $delay = 0): void
public function delay(JobInterface $job, int $delay = 0): bool
{
if ($delay === 0) {
return $this->push($job);
@ -96,7 +96,7 @@ class RedisDriver extends Driver
$message = new Message($job);
$data = $this->packer->pack($message);
$this->redis->zAdd($this->delayed, time() + $delay, $data);
return $this->redis->zAdd($this->delayed, time() + $delay, $data) > 0;
}
public function pop(int $timeout = 0): array
@ -120,22 +120,23 @@ class RedisDriver extends Driver
return [$data, $message];
}
public function ack($data): void
public function ack($data): bool
{
$this->remove($data);
return $this->remove($data);
}
public function fail($data): void
public function fail($data): bool
{
if ($this->remove($data)) {
$this->redis->lPush($this->failed, $data);
return (bool)$this->redis->lPush($this->failed, $data);
}
return false;
}
protected function retry(MessageInterface $message): void
protected function retry(MessageInterface $message): bool
{
$data = $this->packer->pack($message);
$this->redis->zAdd($this->delayed, time() + $this->retrySeconds, $data);
return $this->redis->zAdd($this->delayed, time() + $this->retrySeconds, $data) > 0;
}
/**
@ -160,12 +161,4 @@ class RedisDriver extends Driver
}
}
}
/**
* Consume jobs from a queue.
*/
public function consume(): void
{
// @TODO
}
}

View File

@ -17,7 +17,7 @@ use Hyperf\Queue\Driver\DriverFactory;
use Hyperf\Queue\Driver\DriverInterface;
use Psr\Container\ContainerInterface;
class QueueProcess extends Process
class ConsumerProcess extends Process
{
/**
* @var string
@ -49,7 +49,7 @@ class QueueProcess extends Process
$logger->critical(
sprintf(
'[CRITICAL] process %s not work expected, please check config [%s]',
QueueProcess::class,
ConsumerProcess::class,
'config/autoload/queue.php'
)
);