Add some warning log in master (#10383)

* Add some warn log in master

* fix may skip sleep
This commit is contained in:
Wenjun Ruan 2022-06-10 19:38:20 +08:00 committed by GitHub
parent e6d39910fb
commit b0d9d3f9ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 84 additions and 41 deletions

View File

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master.config; package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -27,15 +29,47 @@ import org.springframework.stereotype.Component;
@EnableConfigurationProperties @EnableConfigurationProperties
@ConfigurationProperties("master") @ConfigurationProperties("master")
public class MasterConfig { public class MasterConfig {
/**
* The master RPC server listen port.
*/
private int listenPort; private int listenPort;
/**
* The max batch size used to fetch command from database.
*/
private int fetchCommandNum; private int fetchCommandNum;
/**
* The thread number used to prepare processInstance. This number shouldn't bigger than fetchCommandNum.
*/
private int preExecThreads; private int preExecThreads;
/**
* todo: We may need to split the process/task into different thread size.
* The thread number used to handle processInstance and task event.
* Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
*/
private int execThreads; private int execThreads;
/**
* The task dispatch thread pool size.
*/
private int dispatchTaskNumber; private int dispatchTaskNumber;
/**
* Worker select strategy.
*/
private HostSelector hostSelector; private HostSelector hostSelector;
/**
* Master heart beat task execute interval.
*/
private int heartbeatInterval; private int heartbeatInterval;
/**
* task submit max retry times.
*/
private int taskCommitRetryTimes; private int taskCommitRetryTimes;
/**
* task submit retry interval/ms.
*/
private int taskCommitInterval; private int taskCommitInterval;
/**
* state wheel check interval/ms, if this value is bigger, may increase the delay of task/processInstance.
*/
private int stateWheelInterval; private int stateWheelInterval;
private double maxCpuLoadAvg; private double maxCpuLoadAvg;
private double reservedMemory; private double reservedMemory;

View File

@ -39,6 +39,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -54,11 +56,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import org.apache.commons.lang3.time.StopWatch;
/** /**
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@ -124,7 +121,7 @@ public class TaskPriorityQueueConsumer extends Thread {
try { try {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum); List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (!failedDispatchTasks.isEmpty()) { if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size()); TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) { for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask); taskPriorityQueue.put(dispatchFailedTask);
@ -157,11 +154,15 @@ public class TaskPriorityQueueConsumer extends Thread {
} }
consumerThreadPoolExecutor.submit(() -> { consumerThreadPoolExecutor.submit(() -> {
boolean dispatchResult = this.dispatchTask(taskPriority); try {
if (!dispatchResult) { boolean dispatchResult = this.dispatchTask(taskPriority);
failedDispatchTasks.add(taskPriority); if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
} finally {
// make sure the latch countDown
latch.countDown();
} }
latch.countDown();
}); });
} }
@ -171,10 +172,10 @@ public class TaskPriorityQueueConsumer extends Thread {
} }
/** /**
* dispatch task * Dispatch task to worker.
* *
* @param taskPriority taskPriority * @param taskPriority taskPriority
* @return result * @return dispatch result, return true if dispatch success, return false if dispatch failed.
*/ */
protected boolean dispatchTask(TaskPriority taskPriority) { protected boolean dispatchTask(TaskPriority taskPriority) {
TaskMetrics.incTaskDispatch(); TaskMetrics.incTaskDispatch();

View File

@ -56,7 +56,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class); TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
logger.info("taskRecallCommand : {}", recallCommand); logger.info("taskRecallCommand : {}", recallCommand);
TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel); TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
taskEventService.addEvent(taskEvent); taskEventService.addEvent(taskEvent);
} }
} }

View File

@ -136,7 +136,7 @@ public class TaskEvent {
return event; return event;
} }
public static TaskEvent newRecall(TaskRecallCommand command, Channel channel) { public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
TaskEvent event = new TaskEvent(); TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId()); event.setProcessInstanceId(command.getProcessInstanceId());

View File

@ -49,7 +49,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
* master scheduler thread * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/ */
@Service @Service
public class MasterSchedulerService extends Thread { public class MasterSchedulerService extends Thread {
@ -163,11 +163,8 @@ public class MasterSchedulerService extends Thread {
MasterServerMetrics.incMasterConsumeCommand(commands.size()); MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) { for (ProcessInstance processInstance : processInstances) {
if (processInstance == null) {
continue;
}
WorkflowExecuteRunnable workflowExecuteThread = new WorkflowExecuteRunnable( WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance processInstance
, processService , processService
, nettyExecutorManager , nettyExecutorManager
@ -175,11 +172,11 @@ public class MasterSchedulerService extends Thread {
, masterConfig , masterConfig
, stateWheelExecuteThread); , stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) { if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
} }
workflowExecuteThreadPool.startWorkflow(workflowExecuteThread); workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
} }
} }
@ -203,7 +200,7 @@ public class MasterSchedulerService extends Thread {
logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId()); logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("handle command error ", e); logger.error("handle command {} error ", command.getId(), e);
processService.moveToErrorCommand(command, e.toString()); processService.moveToErrorCommand(command, e.toString());
} finally { } finally {
latch.countDown(); latch.countDown();

View File

@ -31,8 +31,9 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil; import org.apache.commons.lang3.ThreadUtils;
import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -84,6 +85,7 @@ public class StateWheelExecuteThread extends Thread {
@Override @Override
public void run() { public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
checkTask4Timeout(); checkTask4Timeout();
@ -93,7 +95,11 @@ public class StateWheelExecuteThread extends Thread {
} catch (Exception e) { } catch (Exception e) {
logger.error("state wheel thread check error:", e); logger.error("state wheel thread check error:", e);
} }
ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); try {
ThreadUtils.sleep(checkInterval);
} catch (InterruptedException e) {
logger.error("state wheel thread sleep error", e);
}
} }
} }

View File

@ -805,11 +805,11 @@ public class WorkflowExecuteRunnable implements Runnable {
*/ */
@Override @Override
public void run() { public void run() {
if (this.taskInstanceMap.size() > 0) { if (this.taskInstanceMap.size() > 0 || isStart) {
logger.warn("The workflow has already been started");
return; return;
} }
try { try {
isStart = false;
buildFlowDag(); buildFlowDag();
initTaskQueue(); initTaskQueue();
submitPostNode(null); submitPostNode(null);

View File

@ -46,6 +46,9 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings; import com.google.common.base.Strings;
/**
* Used to execute {@link WorkflowExecuteRunnable}, when
*/
@Component @Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@ -69,7 +72,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/** /**
* multi-thread filter, avoid handling workflow at the same time * multi-thread filter, avoid handling workflow at the same time
*/ */
private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap(); private ConcurrentHashMap<String, WorkflowExecuteRunnable> multiThreadFilterMap = new ConcurrentHashMap<>();
@PostConstruct @PostConstruct
private void init() { private void init() {
@ -92,7 +95,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
/** /**
* start workflow * Start the given workflow.
*/ */
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) { public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
ProcessInstanceMetrics.incProcessInstanceSubmit(); ProcessInstanceMetrics.incProcessInstanceSubmit();
@ -100,13 +103,14 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
/** /**
* execute workflow * Handle the events belong to the given workflow.
*/ */
public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) { public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return; return;
} }
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
return; return;
} }
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@ -121,8 +125,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override @Override
public void onSuccess(Object result) { public void onSuccess(Object result) {
// if an exception occurs, first, the error message cannot be printed in the log;
// secondly, the `multiThreadFilterMap` cannot remove the `workflowExecuteThread`, resulting in the state of process instance cannot be changed and memory leak
try { try {
if (workflowExecuteThread.workFlowFinish()) { if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
@ -132,8 +134,10 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("handle events {} success, but notify changed error", processInstanceId, e); logger.error("handle events {} success, but notify changed error", processInstanceId, e);
} finally {
// make sure the process has been removed from multiThreadFilterMap
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
} }
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
} }
}); });
} }

View File

@ -304,6 +304,8 @@ public class NettyRemotingClient {
logger.error(msg, future.cause()); logger.error(msg, future.cause());
throw new RemotingException(msg); throw new RemotingException(msg);
} }
} catch (RemotingException remotingException) {
throw remotingException;
} catch (Exception e) { } catch (Exception e) {
logger.error("Send command {} to address {} encounter error.", command, host.getAddress()); logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e); throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
@ -385,10 +387,10 @@ public class NettyRemotingClient {
if (this.responseFutureExecutor != null) { if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow(); this.responseFutureExecutor.shutdownNow();
} }
logger.info("netty client closed");
} catch (Exception ex) { } catch (Exception ex) {
logger.error("netty client close exception", ex); logger.error("netty client close exception", ex);
} }
logger.info("netty client closed");
} }
} }

View File

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet; import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@ -163,6 +162,8 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
*/ */
@ -1266,8 +1267,9 @@ public class ProcessServiceImpl implements ProcessService {
Thread.sleep(commitInterval); Thread.sleep(commitInterval);
} catch (Exception e) { } catch (Exception e) {
logger.error("task commit to db failed", e); logger.error("task commit to db failed", e);
} finally {
retryTimes += 1;
} }
retryTimes += 1;
} }
return task; return task;
} }

View File

@ -29,15 +29,12 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> { public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 3000;
/** /**
* queue * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError.
* The master will stop to generate the task if memory is too high.
*/ */
private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE); private final PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(3000);
/** /**
* put task takePriorityInfo * put task takePriorityInfo