[Bug-7206] [MasterServer] fix process isntance always running when task timeout (#7207)

* fix timeout

* add task timeout map to avoid repeated timeout event

* split task check list for retry and timeout

Co-authored-by: caishunfeng <534328519@qq.com>
This commit is contained in:
wind 2021-12-06 18:20:56 +08:00 committed by GitHub
parent 705ba74f52
commit ff9bc806ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 75 deletions

View File

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@ -41,7 +40,6 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@ -117,10 +115,15 @@ public class MasterSchedulerService extends Thread {
ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task time out checkout list
* task time out check list
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task retry check list
*/
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@ -134,6 +137,7 @@ public class MasterSchedulerService extends Thread {
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
this.processInstanceExecCacheManager,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
}
@ -209,6 +213,7 @@ public class MasterSchedulerService extends Thread {
, processAlertManager
, masterConfig
, taskTimeoutCheckList
, taskRetryCheckList
, taskProcessorFactory);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);

View File

@ -43,18 +43,21 @@ public class StateWheelExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList;
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList;
private ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances,
ConcurrentHashMap<Integer, TaskInstance> taskInstances,
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ProcessInstanceExecCacheManager processInstanceExecCacheManager,
int stateCheckIntervalSecs) {
this.processInstanceCheckList = processInstances;
this.taskInstanceCheckList = taskInstances;
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.stateCheckIntervalSecs = stateCheckIntervalSecs;
}
@ -65,8 +68,9 @@ public class StateWheelExecuteThread extends Thread {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
checkProcess();
checkTask();
checkTask4Timeout();
checkTask4Retry();
checkProcess4Timeout();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
@ -74,53 +78,91 @@ public class StateWheelExecuteThread extends Thread {
}
}
public boolean addProcess(ProcessInstance processInstance) {
this.processInstanceCheckList.put(processInstance.getId(), processInstance);
return true;
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance);
}
public boolean addTask(TaskInstance taskInstance) {
this.taskInstanceCheckList.put(taskInstance.getId(), taskInstance);
return true;
public void addTask4TimeoutCheck(TaskInstance taskInstance) {
this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
private void checkTask() {
if (taskInstanceCheckList.isEmpty()) {
public void addTask4RetryCheck(TaskInstance taskInstance) {
this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance);
}
public void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceCheckList.values()) {
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain && processTimeout(taskInstance)) {
taskInstanceCheckList.remove(taskInstance.getId());
if (0 >= timeRemain) {
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
}
}
if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
processDependCheck(taskInstance);
taskInstanceCheckList.remove(taskInstance.getId());
}
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
processDependCheck(taskInstance);
}
}
}
private void checkProcess() {
if (processInstanceCheckList.isEmpty()) {
private void checkTask4Retry() {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (ProcessInstance processInstance : this.processInstanceCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 <= timeRemain && processTimeout(processInstance)) {
processInstanceCheckList.remove(processInstance.getId());
for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) {
if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
addTaskStateChangeEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstance.getId());
}
if (taskInstance.isSubProcess() || taskInstance.isDependTask()) {
addTaskStateChangeEvent(taskInstance);
}
}
}
private void putEvent(StateEvent stateEvent) {
private void checkProcess4Timeout() {
if (processInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
}
}
private boolean addTaskStateChangeEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
addEvent(stateEvent);
return true;
}
private boolean addTaskTimeoutEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
addEvent(stateEvent);
return true;
}
private boolean addProcessTimeoutEvent(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
stateEvent.setProcessInstanceId(processInstance.getId());
addEvent(stateEvent);
return true;
}
private void addEvent(StateEvent stateEvent) {
if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
return;
}
@ -128,31 +170,4 @@ public class StateWheelExecuteThread extends Thread {
workflowExecuteThread.addStateEvent(stateEvent);
}
private boolean processDependCheck(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
putEvent(stateEvent);
return true;
}
private boolean processTimeout(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
putEvent(stateEvent);
return true;
}
private boolean processTimeout(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
stateEvent.setProcessInstanceId(processInstance.getId());
putEvent(stateEvent);
return true;
}
}

View File

@ -205,6 +205,11 @@ public class WorkflowExecuteThread implements Runnable {
*/
private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
/**
* task retry check list
*/
private ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList;
/**
* state event queue
*/
@ -232,14 +237,15 @@ public class WorkflowExecuteThread implements Runnable {
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList
, TaskProcessorFactory taskProcessorFactory) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
this.taskRetryCheckList = taskRetryCheckList;
this.taskProcessorFactory = taskProcessorFactory;
}
@ -378,11 +384,10 @@ public class WorkflowExecuteThread implements Runnable {
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
taskProcessor.action(TaskAction.TIMEOUT);
return false;
} else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
return true;
}
return true;
}
private boolean processTimeout() {
@ -415,7 +420,7 @@ public class WorkflowExecuteThread implements Runnable {
this.stateEvents.add(nextEvent);
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance,nextTaskInstance.getId(),
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
@ -450,6 +455,7 @@ public class WorkflowExecuteThread implements Runnable {
task.getMaxRetryTimes(),
task.getRetryInterval());
this.addTimeoutCheck(task);
this.addRetryCheck(task);
} else {
submitStandByTask();
}
@ -459,6 +465,7 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
taskRetryCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
@ -826,6 +833,7 @@ public class WorkflowExecuteThread implements Runnable {
taskProcessor.run();
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
if (taskProcessor.taskState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
@ -867,13 +875,30 @@ public class WorkflowExecuteThread implements Runnable {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
} else {
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
}
private void addRetryCheck(TaskInstance taskInstance) {
if (taskRetryCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (taskInstance.taskCanRetry()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
}

View File

@ -108,7 +108,8 @@ public class WorkflowExecuteThreadTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskProcessorFactory));
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList, taskProcessorFactory));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);