Added concurrent for async-queue.

This commit is contained in:
李铭昕 2019-09-20 12:20:42 +08:00
parent 7b1049934c
commit 7192ec6760
2 changed files with 22 additions and 2 deletions

View File

@ -18,5 +18,9 @@ return [
'retry_seconds' => 5,
'handle_timeout' => 10,
'processes' => 1,
'concurrent' => [
'limit' => 10,
'timeout' => 10,
],
],
];

View File

@ -20,6 +20,7 @@ use Hyperf\AsyncQueue\Event\RetryHandle;
use Hyperf\AsyncQueue\Exception\InvalidPackerException;
use Hyperf\AsyncQueue\MessageInterface;
use Hyperf\Contract\PackerInterface;
use Hyperf\Utils\Coroutine\Concurrent;
use Hyperf\Utils\Packer\PhpSerializerPacker;
use Psr\Container\ContainerInterface;
use Psr\EventDispatcher\EventDispatcherInterface;
@ -41,6 +42,11 @@ abstract class Driver implements DriverInterface
*/
protected $event;
/**
* @var null|Concurrent
*/
protected $concurrent;
public function __construct(ContainerInterface $container, $config)
{
$this->container = $container;
@ -50,6 +56,10 @@ abstract class Driver implements DriverInterface
if (! $this->packer instanceof PackerInterface) {
throw new InvalidPackerException(sprintf('[Error] %s is not a invalid packer.', $config['packer']));
}
if ($concurrent = $config['concurrent'] ?? null) {
$this->concurrent = new Concurrent($concurrent['limit'] ?? 10, $concurrent['timeout'] ?? 10);
}
}
public function consume(): void
@ -63,7 +73,7 @@ abstract class Driver implements DriverInterface
continue;
}
parallel([function () use ($message, $data) {
$callback = function () use ($message, $data) {
try {
if ($message instanceof MessageInterface) {
$this->event && $this->event->dispatch(new BeforeHandle($message));
@ -81,7 +91,13 @@ abstract class Driver implements DriverInterface
$this->fail($data);
}
}
}]);
};
if ($this->concurrent instanceof Concurrent) {
$this->concurrent->create($callback);
} else {
parallel([$callback]);
}
}
}