gracefully handle metrics

Signed-off-by: reasno <guxi99@gmail.com>
This commit is contained in:
reasno 2020-04-07 00:05:08 +08:00
parent f9c65e05ed
commit 765774bd8c
5 changed files with 42 additions and 13 deletions

View File

@ -12,6 +12,9 @@ declare(strict_types=1);
namespace Hyperf\LoadBalancer;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use Swoole\Timer;
abstract class AbstractLoadBalancer implements LoadBalancerInterface
@ -60,9 +63,13 @@ abstract class AbstractLoadBalancer implements LoadBalancerInterface
public function refresh(callable $callback, int $tickMs = 5000)
{
Timer::tick($tickMs, function () use ($callback) {
$timerId = Timer::tick($tickMs, function () use ($callback) {
$nodes = call($callback);
is_array($nodes) && $this->setNodes($nodes);
});
Coroutine::create(function() use ($timerId){
CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield();
Timer::clear($timerId);
});
}
}

View File

@ -21,6 +21,8 @@ use Hyperf\Metric\Contract\CounterInterface;
use Hyperf\Metric\Contract\GaugeInterface;
use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Str;
use InfluxDB\Client;
use InfluxDB\Database;
@ -116,7 +118,10 @@ class MetricFactory implements MetricFactoryInterface
$database->create(new RetentionPolicy($dbname, '1d', 1, true));
}
while (true) {
Coroutine::sleep($interval);
$workerExited = CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield($interval);
if ($workerExited){
break;
}
$points = [];
$metrics = $this->registry->getMetricFamilySamples();
foreach ($metrics as $metric) {

View File

@ -20,10 +20,11 @@ use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Metric\Exception\InvalidArgumentException;
use Hyperf\Metric\Exception\RuntimeException;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Str;
use Prometheus\CollectorRegistry;
use Prometheus\RenderTextFormat;
use Swoole\Coroutine;
use Hyperf\Utils\Coordinator\Constants as Coord;
use Swoole\Coroutine\Http\Server;
class MetricFactory implements MetricFactoryInterface
@ -129,13 +130,16 @@ class MetricFactory implements MetricFactoryInterface
$host = $this->config->get("metric.metric.{$this->name}.push_host");
$port = $this->config->get("metric.metric.{$this->name}.push_port");
$this->doRequest("{$host}:{$port}", $this->getNamespace(), 'put');
Coroutine::sleep($interval);
$workerExited = CoordinatorManager::get(Coord::ON_WORKER_EXIT)->yield($interval);
if ($workerExited){
break;
}
}
}
protected function customHandle()
{
Coroutine::yield(); // Yield forever
CoordinatorManager::get(Coord::ON_WORKER_EXIT)->yield(); // Yield forever
}
private function getNamespace(): string

View File

@ -19,6 +19,8 @@ use Hyperf\Metric\Contract\CounterInterface;
use Hyperf\Metric\Contract\GaugeInterface;
use Hyperf\Metric\Contract\HistogramInterface;
use Hyperf\Metric\Contract\MetricFactoryInterface;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Swoole\Coroutine;
class MetricFactory implements MetricFactoryInterface
@ -84,15 +86,18 @@ class MetricFactory implements MetricFactoryInterface
$interval = (float) $this->config->get("metric.metric.{$name}.push_interval", 5);
$batchEnabled = $this->config->get("metric.metric.{$name}.enable_batch") == true;
// Block handle from returning.
do {
if ($batchEnabled) {
if ($batchEnabled) {
do {
$this->client->startBatch();
Coroutine::sleep((int) $interval);
$workerExited = CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield($interval);
$this->client->endBatch();
} else {
Coroutine::sleep(5000);
}
} while (true);
if ($workerExited){
break;
}
} while (true);
} else {
CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield();
}
}
protected function getConnection(): Connection

View File

@ -15,7 +15,11 @@ namespace Hyperf\Metric\Listener;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Framework\Bootstrap\WorkerExitCallback;
use Hyperf\Metric\Event\MetricFactoryReady;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use Psr\Container\ContainerInterface;
use Swoole\Timer;
@ -71,12 +75,16 @@ class QueueWatcher implements ListenerInterface
$config = $this->container->get(ConfigInterface::class);
$timerInterval = $config->get('metric.default_metric_interval', 5);
Timer::tick($timerInterval * 1000, function () use ($waiting, $delayed, $failed, $timeout, $queue) {
$timerId = Timer::tick($timerInterval * 1000, function () use ($waiting, $delayed, $failed, $timeout, $queue) {
$info = $queue->info();
$waiting->set((float) $info['waiting']);
$delayed->set((float) $info['delayed']);
$failed->set((float) $info['failed']);
$timeout->set((float) $info['timeout']);
});
Coroutine::create(function() use ($timerId) {
CoordinatorManager::get(Constants::ON_WORKER_EXIT)->yield();
Timer::clear($timerId);
});
}
}