Add support for crontab task run on one server.

This commit is contained in:
You Ming 2019-11-10 11:04:23 +08:00
parent 03f5c2c9ca
commit 4ccbe18a12
6 changed files with 189 additions and 3 deletions

View File

@ -53,6 +53,11 @@ class Crontab extends AbstractAnnotation
*/
public $mutexExpires;
/**
* @var bool
*/
public $onOneServer;
/**
* @var array|string
*/

View File

@ -46,6 +46,11 @@ class Crontab
*/
protected $mutexExpires = 3600;
/**
* @var bool
*/
protected $onOneServer = false;
/**
* @var mixed
*/
@ -116,6 +121,17 @@ class Crontab
return $this;
}
public function isOnOneServer(): bool
{
return $this->onOneServer;
}
public function setOnOneServer(bool $onOneServer): Crontab
{
$this->onOneServer = $onOneServer;
return $this;
}
public function getCallback()
{
return $this->callback;

View File

@ -96,6 +96,7 @@ class CrontabRegisterListener implements ListenerInterface
isset($annotation->singleton) && $crontab->setSingleton($annotation->singleton);
isset($annotation->mutexPool) && $crontab->setMutexPool($annotation->mutexPool);
isset($annotation->mutexExpires) && $crontab->setMutexExpires($annotation->mutexExpires);
isset($annotation->onOneServer) && $crontab->setOnOneServer($annotation->onOneServer);
isset($annotation->callback) && $crontab->setCallback($annotation->callback);
isset($annotation->memo) && $crontab->setMemo($annotation->memo);
return $crontab;

View File

@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Crontab\Mutex;
use Hyperf\Crontab\Crontab;
use Hyperf\Redis\RedisFactory;
use Hyperf\Utils\Arr;
class RedisServerMutex implements ServerMutex
{
/**
* @var RedisFactory
*/
private $redisFactory;
/**
* @var string|null
*/
private $macAddress;
public function __construct(RedisFactory $redisFactory)
{
$this->redisFactory = $redisFactory;
$this->macAddress = $this->getMacAddress();
}
/**
* Attempt to obtain a server mutex for the given crontab.
*/
public function attempt(Crontab $crontab): bool
{
if ($this->macAddress === null) {
return false;
}
$redis = $this->redisFactory->get($crontab->getMutexPool());
$mutexName = $this->getMutexName($crontab);
$result = (bool) $redis->set($mutexName, $this->macAddress, ['NX', 'EX' => $crontab->getMutexExpires()]);
if ($result === true) {
return $result;
}
return $redis->get($mutexName) === $this->macAddress;
}
/**
* Get the server mutex for the given crontab.
*/
public function get(Crontab $crontab): string
{
return (string) $this->redisFactory->get($crontab->getMutexPool())->get(
$this->getMutexName($crontab)
);
}
protected function getMutexName(Crontab $crontab)
{
return 'framework' . DIRECTORY_SEPARATOR . 'crontab-' . sha1($crontab->getName() . $crontab->getRule()) . '-sv';
}
protected function getMacAddress(): ?string
{
$macAddresses = swoole_get_local_mac();
foreach (Arr::wrap($macAddresses) as $name => $address) {
if ($address && $address !== '00:00:00:00:00:00') {
return $name . ':' . str_replace(':', '', $address);
}
}
return null;
}
}

View File

@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Crontab\Mutex;
use Hyperf\Crontab\Crontab;
interface ServerMutex
{
/**
* Attempt to obtain a server mutex for the given crontab.
*/
public function attempt(Crontab $crontab): bool;
/**
* Get the server mutex for the given crontab.
*/
public function get(Crontab $crontab): string;
}

View File

@ -17,7 +17,9 @@ use Closure;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Crontab\Crontab;
use Hyperf\Crontab\LoggerInterface;
use Hyperf\Crontab\Mutex\RedisServerMutex;
use Hyperf\Crontab\Mutex\RedisTaskMutex;
use Hyperf\Crontab\Mutex\ServerMutex;
use Hyperf\Crontab\Mutex\TaskMutex;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
@ -34,6 +36,16 @@ class Executor
*/
protected $logger;
/**
* @var \Hyperf\Crontab\Mutex\TaskMutex
*/
protected $taskMutex;
/**
* @var \Hyperf\Crontab\Mutex\ServerMutex
*/
protected $serverMutex;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
@ -83,6 +95,10 @@ class Executor
$runable = $this->runInSingleton($crontab, $runable);
}
if ($crontab->isOnOneServer()) {
$runable = $this->runOnOneServer($crontab, $runable);
}
Coroutine::create($runable);
};
}
@ -101,9 +117,7 @@ class Executor
protected function runInSingleton(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->container->has(TaskMutex::class)
? $this->container->get(TaskMutex::class)
: $this->container->get(RedisTaskMutex::class);
$taskMutex = $this->getTaskMutex();
if ($taskMutex->exists($crontab) || ! $taskMutex->create($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skip to execute at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
@ -117,4 +131,40 @@ class Executor
}
};
}
protected function getTaskMutex(): TaskMutex
{
if ($this->taskMutex) {
return $this->taskMutex;
}
return $this->taskMutex = $this->container->has(TaskMutex::class)
? $this->container->get(TaskMutex::class)
: $this->container->get(RedisTaskMutex::class);
}
protected function runOnOneServer(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->getServerMutex();
if (!$taskMutex->attempt($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skip to execute at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
return;
}
$runnable();
};
}
protected function getServerMutex(): ServerMutex
{
if ($this->serverMutex) {
return $this->serverMutex;
}
return $this->serverMutex = $this->container->has(ServerMutex::class)
? $this->container->get(ServerMutex::class)
: $this->container->get(RedisServerMutex::class);
}
}