Optimized code of async-queue.

This commit is contained in:
李铭昕 2020-05-13 12:18:12 +08:00
parent 6b9ba9a38f
commit 251eda809c
8 changed files with 15 additions and 8 deletions

View File

@ -41,5 +41,8 @@ before_script:
- composer config -g process-timeout 900 && composer update
script:
- composer analyse \
src/amqp \
src/async-queue
- composer test -- --exclude-group NonCoroutine
- vendor/bin/phpunit --group NonCoroutine

View File

@ -6,7 +6,10 @@ parameters:
bootstrap: "bootstrap.php"
inferPrivatePropertyTypeFromConstructor: true
treatPhpDocTypesAsCertain: true
reportUnmatchedIgnoredErrors: false
excludes_analyse:
- %currentWorkingDirectory%/src/*/tests/*
ignoreErrors:
- '#Right side of && is always *.#'
- '#side of && is always#'
- '#method Redis::zRevRangeByScore\(\) expects int, string given#'
- '#Argument of an invalid type Hyperf\\AsyncQueue\\Job supplied for foreach, only iterables are supported#'

View File

@ -20,7 +20,7 @@ class RpcClient extends Builder
{
try {
$pool = $this->poolFactory->getRpcPool($rpcMessage->getPoolName());
/** @var null|RpcConnection $connection */
/** @var RpcConnection $connection */
$connection = $pool->get();
$channel = $connection->initChannel($rpcMessage->getQueueBuilder(), uniqid());

View File

@ -33,14 +33,14 @@ interface DriverInterface
/**
* Ack a job.
*
* @param $data
* @param mixed $data
*/
public function ack($data): bool;
/**
* Push a job to failed queue.
*
* @param $data
* @param mixed $data
*/
public function fail($data): bool;

View File

@ -176,7 +176,7 @@ class RedisDriver extends Driver
/**
* Remove data from reserved queue.
* @param mixed $data
* @param string $data
*/
protected function remove($data): bool
{

View File

@ -27,7 +27,7 @@ abstract class Job implements JobInterface, CompressInterface, UnCompressInterfa
}
/**
* @return JobInterface
* @return static
*/
public function uncompress(): CompressInterface
{
@ -41,7 +41,7 @@ abstract class Job implements JobInterface, CompressInterface, UnCompressInterfa
}
/**
* @return JobInterface
* @return static
*/
public function compress(): UnCompressInterface
{

View File

@ -45,6 +45,7 @@ class QueueLengthListener implements ListenerInterface
*/
public function process(object $event)
{
$value = 0;
foreach ($this->level as $level => $value) {
if ($event->length < $value) {
$message = sprintf('Queue lengh of %s is %d.', $event->key, $event->length);

View File

@ -18,7 +18,7 @@ use Serializable;
class Message implements MessageInterface, Serializable
{
/**
* @var JobInterface
* @var CompressInterface|JobInterface|UnCompressInterface
*/
protected $job;