mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-29 18:58:05 +08:00
[Improvement][Metrics] Add more worker related metrics and fix some previous ones (#14254)
* Add more worker related metrics and fix some previous ones * update metrics docs
This commit is contained in:
parent
d67fe52c33
commit
e86630bb7b
@ -73,7 +73,6 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
|
||||
- ds.task.dispatch.failure.count: (counter) the number of tasks failed to dispatch, retry failure included
|
||||
- ds.task.dispatch.error.count: (counter) the number of task dispatch errors
|
||||
- ds.task.execution.count.by.type: (counter) the number of task executions grouped by tag `task_type`
|
||||
- ds.task.running: (gauge) the number of running tasks
|
||||
- ds.task.prepared: (gauge) the number of tasks prepared for task queue
|
||||
- ds.task.execution.count: (counter) the number of executed tasks
|
||||
- ds.task.execution.duration: (histogram) duration of task executions
|
||||
@ -104,6 +103,12 @@ For example, you can get the master metrics by `curl http://localhost:5679/actua
|
||||
### Worker Server Metrics
|
||||
|
||||
- ds.worker.overload.count: (counter) the number of times the worker overloaded
|
||||
- ds.worker.task: (gauge) the number of tasks on the worker, including pending and running ones
|
||||
- ds.worker.execute.queue.size: (gauge) the number of pending tasks on the worker
|
||||
- ds.worker.active.execute.thread: (gauge) the number of running tasks on the worker
|
||||
- ds.worker.memory.available: (gauge) the available physical memory of the worker (GB)
|
||||
- ds.worker.cpu.usage: (gauge) the cpu usage percentage of the worker
|
||||
- ds.worker.memory.usage: (gauge) the memory usage percentage of the worker
|
||||
- ds.worker.full.submit.queue.count: (counter) the number of times the worker's submit queue being full
|
||||
- ds.worker.resource.download.count: (counter) the number of downloaded resource files on workers, sliced by tag `status`
|
||||
- ds.worker.resource.download.duration: (histogram) the time cost of resource download on workers
|
||||
|
@ -74,7 +74,6 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
|
||||
- ds.task.dispatch.failure.count: (counter) 分发失败的任务数量,重试也包含在内
|
||||
- ds.task.dispatch.error.count: (counter) 分发任务的错误数量
|
||||
- ds.task.execution.count.by.type: (counter) 任务执行数量,按标签`task_type`聚类
|
||||
- ds.task.running: (gauge) 正在运行的任务数量
|
||||
- ds.task.prepared: (gauge) 准备好且待提交的任务数量
|
||||
- ds.task.execution.count: (counter) 已执行的任务数量
|
||||
- ds.task.execution.duration: (histogram) 任务执行时长
|
||||
@ -104,6 +103,12 @@ metrics exporter端口`server.port`是在application.yaml里定义的: master: `
|
||||
### Worker Server指标
|
||||
|
||||
- ds.worker.overload.count: (counter) worker过载次数
|
||||
- ds.worker.task: (gauge) worker上任务总数,包含等待提交和正在执行的任务
|
||||
- ds.worker.execute.queue.size: (gauge) worker上等待提交的任务总数
|
||||
- ds.worker.active.execute.thread: (gauge) worker上正在执行的任务总数
|
||||
- ds.worker.memory.available: (gauge) worker机器可用物理内存 (GB)
|
||||
- ds.worker.cpu.usage: (gauge) worker机器cpu使用百分比
|
||||
- ds.worker.memory.usage: (gauge) worker机器内存使用百分比
|
||||
- ds.worker.full.submit.queue.count: (counter) worker提交队列全满次数
|
||||
- ds.worker.resource.download.count: (counter) worker下载资源文件的次数,可由`status`标签切分
|
||||
- ds.worker.resource.download.duration: (histogram) worker下载资源文件时花费的时间分布
|
||||
|
@ -437,11 +437,11 @@
|
||||
"type": "prometheus",
|
||||
"uid": "PBFA97CFB590B2093"
|
||||
},
|
||||
"expr": "ds_task_running{}",
|
||||
"expr": "ds_worker_task{}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"title": "Worker Running Task",
|
||||
"title": "Worker Tasks Total",
|
||||
"type": "timeseries"
|
||||
},
|
||||
{
|
||||
|
@ -91,9 +91,39 @@ public class WorkerServerMetrics {
|
||||
workerResourceDownloadSizeDistribution.record(size);
|
||||
}
|
||||
|
||||
public void registerWorkerRunningTaskGauge(final Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.task.running", supplier)
|
||||
.description("number of running tasks on workers")
|
||||
public void registerWorkerTaskTotalGauge(final Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.task", supplier)
|
||||
.description("total number of tasks on worker")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
public void registerWorkerExecuteQueueSizeGauge(Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.execute.queue.size", supplier)
|
||||
.description("worker execute queue size")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
public void registerWorkerActiveExecuteThreadGauge(Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.active.execute.thread", supplier)
|
||||
.description("number of active task execute threads on worker")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
public void registerWorkerMemoryAvailableGauge(Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.memory.available", supplier)
|
||||
.description("worker memory available")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
public void registerWorkerCpuUsageGauge(Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.cpu.usage", supplier)
|
||||
.description("worker cpu usage")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
public void registerWorkerMemoryUsageGauge(Supplier<Number> supplier) {
|
||||
Gauge.builder("ds.worker.memory.usage", supplier)
|
||||
.description("worker memory usage")
|
||||
.register(Metrics.globalRegistry);
|
||||
}
|
||||
|
||||
|
@ -52,7 +52,7 @@ public class WorkerExecService {
|
||||
this.execService = execService;
|
||||
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
|
||||
this.taskExecuteThreadMap = taskExecuteThreadMap;
|
||||
WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
|
||||
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
|
||||
}
|
||||
|
||||
public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
|
||||
@ -86,6 +86,10 @@ public class WorkerExecService {
|
||||
return ((ThreadPoolExecutor) this.execService).getQueue().size();
|
||||
}
|
||||
|
||||
public int getActiveExecThreadCount() {
|
||||
return ((ThreadPoolExecutor) this.execService).getActiveCount();
|
||||
}
|
||||
|
||||
public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
|
||||
return taskExecuteThreadMap;
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
|
||||
import org.apache.dolphinscheduler.common.constants.Constants;
|
||||
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
|
||||
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
|
||||
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
||||
@ -122,6 +123,12 @@ public class WorkerManagerThread implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
|
||||
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
|
||||
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
|
||||
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize);
|
||||
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount);
|
||||
|
||||
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
|
||||
while (!ServerLifeCycleManager.isStopped()) {
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user