Merge branch 'master' into pr/932

This commit is contained in:
李铭昕 2019-11-13 10:04:38 +08:00
commit 0224affc4c
10 changed files with 228 additions and 8 deletions

View File

@ -4,7 +4,9 @@
- [#827](https://github.com/hyperf/hyperf/pull/827) Added a simple db component.
- [#905](https://github.com/hyperf/hyperf/pull/905) Added twig template engine for view.
- [#911](https://github.com/hyperf/hyperf/pull/911) Added support for crontab task run on one server.
- [#913](https://github.com/hyperf/hyperf/pull/913) Added `Hyperf\ExceptionHandler\Listener\ErrorExceptionHandler`.
- [#931](https://github.com/hyperf/hyperf/pull/931) Added `strict_mode` for config-apollo.
## Fixed

View File

@ -99,7 +99,11 @@ class FooTask
#### singleton
多实例部署项目时,如果设置为 `true`,则只会触发一次。
解决任务的并发执行问题任务永远只会同时运行1个。但是这个没法保障任务在集群时重复执行的问题。
#### onOneServer
多实例部署项目时,则只有一个实例会被触发。
#### mutexPool

View File

@ -19,4 +19,5 @@ return [
'application',
],
'interval' => 5,
'strict_mode' => false,
];

View File

@ -83,10 +83,39 @@ class OnPipeMessageListener implements ListenerInterface
return;
}
foreach ($data->configurations ?? [] as $key => $value) {
$this->config->set($key, $value);
$this->config->set($key, $this->formatValue($value));
$this->logger->debug(sprintf('Config [%s] is updated', $key));
}
ReleaseKey::set($cacheKey, $data->releaseKey);
}
}
private function formatValue($value)
{
if (! $this->config->get('apollo.strict_mode', false)) {
return $value;
}
switch (strtolower($value)) {
case 'true':
case '(true)':
return true;
case 'false':
case '(false)':
return false;
case 'empty':
case '(empty)':
return '';
case 'null':
case '(null)':
return;
}
if (is_numeric($value)) {
$value = (strpos($value, '.') === false) ? (int) $value : (float) $value;
}
return $value;
}
}

View File

@ -53,6 +53,11 @@ class Crontab extends AbstractAnnotation
*/
public $mutexExpires;
/**
* @var bool
*/
public $onOneServer;
/**
* @var array|string
*/

View File

@ -46,6 +46,11 @@ class Crontab
*/
protected $mutexExpires = 3600;
/**
* @var bool
*/
protected $onOneServer = false;
/**
* @var mixed
*/
@ -116,6 +121,17 @@ class Crontab
return $this;
}
public function isOnOneServer(): bool
{
return $this->onOneServer;
}
public function setOnOneServer(bool $onOneServer): Crontab
{
$this->onOneServer = $onOneServer;
return $this;
}
public function getCallback()
{
return $this->callback;

View File

@ -96,6 +96,7 @@ class CrontabRegisterListener implements ListenerInterface
isset($annotation->singleton) && $crontab->setSingleton($annotation->singleton);
isset($annotation->mutexPool) && $crontab->setMutexPool($annotation->mutexPool);
isset($annotation->mutexExpires) && $crontab->setMutexExpires($annotation->mutexExpires);
isset($annotation->onOneServer) && $crontab->setOnOneServer($annotation->onOneServer);
isset($annotation->callback) && $crontab->setCallback($annotation->callback);
isset($annotation->memo) && $crontab->setMemo($annotation->memo);
return $crontab;

View File

@ -0,0 +1,86 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Crontab\Mutex;
use Hyperf\Crontab\Crontab;
use Hyperf\Redis\RedisFactory;
use Hyperf\Utils\Arr;
class RedisServerMutex implements ServerMutex
{
/**
* @var RedisFactory
*/
private $redisFactory;
/**
* @var string|null
*/
private $macAddress;
public function __construct(RedisFactory $redisFactory)
{
$this->redisFactory = $redisFactory;
$this->macAddress = $this->getMacAddress();
}
/**
* Attempt to obtain a server mutex for the given crontab.
*/
public function attempt(Crontab $crontab): bool
{
if ($this->macAddress === null) {
return false;
}
$redis = $this->redisFactory->get($crontab->getMutexPool());
$mutexName = $this->getMutexName($crontab);
$result = (bool) $redis->set($mutexName, $this->macAddress, ['NX', 'EX' => $crontab->getMutexExpires()]);
if ($result === true) {
return $result;
}
return $redis->get($mutexName) === $this->macAddress;
}
/**
* Get the server mutex for the given crontab.
*/
public function get(Crontab $crontab): string
{
return (string) $this->redisFactory->get($crontab->getMutexPool())->get(
$this->getMutexName($crontab)
);
}
protected function getMutexName(Crontab $crontab)
{
return 'hyperf' . DIRECTORY_SEPARATOR . 'crontab-' . sha1($crontab->getName() . $crontab->getRule()) . '-sv';
}
protected function getMacAddress(): ?string
{
$macAddresses = swoole_get_local_mac();
foreach (Arr::wrap($macAddresses) as $name => $address) {
if ($address && $address !== '00:00:00:00:00:00') {
return $name . ':' . str_replace(':', '', $address);
}
}
return null;
}
}

View File

@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Crontab\Mutex;
use Hyperf\Crontab\Crontab;
interface ServerMutex
{
/**
* Attempt to obtain a server mutex for the given crontab.
*/
public function attempt(Crontab $crontab): bool;
/**
* Get the server mutex for the given crontab.
*/
public function get(Crontab $crontab): string;
}

View File

@ -17,7 +17,9 @@ use Closure;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Crontab\Crontab;
use Hyperf\Crontab\LoggerInterface;
use Hyperf\Crontab\Mutex\RedisServerMutex;
use Hyperf\Crontab\Mutex\RedisTaskMutex;
use Hyperf\Crontab\Mutex\ServerMutex;
use Hyperf\Crontab\Mutex\TaskMutex;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
@ -35,6 +37,16 @@ class Executor
*/
protected $logger;
/**
* @var \Hyperf\Crontab\Mutex\TaskMutex
*/
protected $taskMutex;
/**
* @var \Hyperf\Crontab\Mutex\ServerMutex
*/
protected $serverMutex;
public function __construct(ContainerInterface $container)
{
$this->container = $container;
@ -58,7 +70,7 @@ class Executor
$parameters = $crontab->getCallback()[2] ?? null;
if ($class && $method && class_exists($class) && method_exists($class, $method)) {
$callback = function () use ($class, $method, $parameters, $crontab) {
$runable = function () use ($class, $method, $parameters, $crontab) {
$runnable = function () use ($class, $method, $parameters, $crontab) {
try {
$result = true;
$instance = make($class);
@ -81,10 +93,14 @@ class Executor
};
if ($crontab->isSingleton()) {
$runable = $this->runInSingleton($crontab, $runable);
$runnable = $this->runInSingleton($crontab, $runnable);
}
Coroutine::create($runable);
if ($crontab->isOnOneServer()) {
$runnable = $this->runOnOneServer($crontab, $runnable);
}
Coroutine::create($runnable);
};
}
break;
@ -102,9 +118,7 @@ class Executor
protected function runInSingleton(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->container->has(TaskMutex::class)
? $this->container->get(TaskMutex::class)
: $this->container->get(RedisTaskMutex::class);
$taskMutex = $this->getTaskMutex();
if ($taskMutex->exists($crontab) || ! $taskMutex->create($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skip to execute at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
@ -118,4 +132,38 @@ class Executor
}
};
}
protected function getTaskMutex(): TaskMutex
{
if (! $this->taskMutex) {
$this->taskMutex = $this->container->has(TaskMutex::class)
? $this->container->get(TaskMutex::class)
: $this->container->get(RedisTaskMutex::class);
}
return $this->taskMutex;
}
protected function runOnOneServer(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->getServerMutex();
if (!$taskMutex->attempt($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skip to execute at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
return;
}
$runnable();
};
}
protected function getServerMutex(): ServerMutex
{
if (! $this->serverMutex) {
$this->serverMutex = $this->container->has(ServerMutex::class)
? $this->container->get(ServerMutex::class)
: $this->container->get(RedisServerMutex::class);
}
return $this->serverMutex;
}
}