From 3af4d765c254b1ef72d1d3aeefd9b4a427cb281d Mon Sep 17 00:00:00 2001 From: lhjzmn <153390619@qq.com> Date: Tue, 28 Dec 2021 18:37:59 +0800 Subject: [PATCH] [Fix-7538] [server] Fix when there is a forbidden node in dag, the execution flow is abnormal (#7613) * when there is a forbidden node in dag, the execution flow is abnormal Co-authored-by: hongjie.li --- .../master/runner/WorkflowExecuteThread.java | 191 ++++++++++-------- 1 file changed, 104 insertions(+), 87 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index fbc4fa8223..0d46c3b3da 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -216,19 +216,19 @@ public class WorkflowExecuteThread { /** * constructor of WorkflowExecuteThread * - * @param processInstance processInstance - * @param processService processService - * @param nettyExecutorManager nettyExecutorManager - * @param processAlertManager processAlertManager - * @param masterConfig masterConfig + * @param processInstance processInstance + * @param processService processService + * @param nettyExecutorManager nettyExecutorManager + * @param processAlertManager processAlertManager + * @param masterConfig masterConfig * @param stateWheelExecuteThread stateWheelExecuteThread */ public WorkflowExecuteThread(ProcessInstance processInstance - , ProcessService processService - , NettyExecutorManager nettyExecutorManager - , ProcessAlertManager processAlertManager - , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread) { + , ProcessService processService + , NettyExecutorManager nettyExecutorManager + , ProcessAlertManager processAlertManager + , MasterConfig masterConfig + , StateWheelExecuteThread stateWheelExecuteThread) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; @@ -265,14 +265,14 @@ public class WorkflowExecuteThread { public String getKey() { if (StringUtils.isNotEmpty(key) - || this.processDefinition == null) { + || this.processDefinition == null) { return key; } key = String.format("%d_%d_%d", - this.processDefinition.getCode(), - this.processDefinition.getVersion(), - this.processInstance.getId()); + this.processDefinition.getCode(), + this.processDefinition.getVersion(), + this.processInstance.getId()); return key; } @@ -400,7 +400,7 @@ public class WorkflowExecuteThread { } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } } @@ -420,19 +420,19 @@ public class WorkflowExecuteThread { private void taskFinished(TaskInstance task) { logger.info("work flow {} task {} state:{} ", - processInstance.getId(), - task.getId(), - task.getState()); + processInstance.getId(), + task.getId(), + task.getState()); if (task.taskCanRetry()) { addTaskToStandByList(task); if (!task.retryTaskIntervalOverTime()) { logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{} / {}, interval:{}", - processInstance.getId(), - task.getId(), - task.getState(), - task.getRetryTimes(), - task.getMaxRetryTimes(), - task.getRetryInterval()); + processInstance.getId(), + task.getId(), + task.getState(), + task.getRetryTimes(), + task.getMaxRetryTimes(), + task.getRetryInterval()); stateWheelExecuteThread.addTask4TimeoutCheck(task); stateWheelExecuteThread.addTask4RetryCheck(task); } else { @@ -454,7 +454,7 @@ public class WorkflowExecuteThread { submitPostNode(Long.toString(task.getTaskCode())); } else if (task.getState().typeIsFailure()) { if (task.isConditionsTask() - || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { + || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { submitPostNode(Long.toString(task.getTaskCode())); } else { errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); @@ -473,7 +473,7 @@ public class WorkflowExecuteThread { logger.info("process instance update: {}", processInstanceId); processInstance = processService.findProcessInstanceById(processInstanceId); processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); } @@ -502,8 +502,8 @@ public class WorkflowExecuteThread { public boolean checkProcessInstance(StateEvent stateEvent) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { logger.error("mismatch process instance id: {}, state event:{}", - this.processInstance.getId(), - stateEvent); + this.processInstance.getId(), + stateEvent); return false; } return true; @@ -603,9 +603,9 @@ public class WorkflowExecuteThread { return true; } logger.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", - processInstance.getId(), - processInstance.getScheduleTime(), - complementListDate.toString()); + processInstance.getId(), + processInstance.getScheduleTime(), + complementListDate.toString()); scheduleDate = complementListDate.get(index + 1); //the next process complement processInstance.setId(0); @@ -619,9 +619,9 @@ public class WorkflowExecuteThread { processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); processInstance.setStartTime(new Date()); processInstance.setEndTime(null); processService.saveProcessInstance(processInstance); @@ -632,7 +632,7 @@ public class WorkflowExecuteThread { private boolean needComplementProcess() { if (processInstance.isComplementData() - && Flag.NO == processInstance.getIsSubProcess()) { + && Flag.NO == processInstance.getIsSubProcess()) { return true; } return false; @@ -709,7 +709,7 @@ public class WorkflowExecuteThread { return; } processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); @@ -729,7 +729,7 @@ public class WorkflowExecuteThread { List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(taskNodeList, - startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); + startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); if (processDag == null) { logger.error("processDag is null"); return; @@ -776,14 +776,14 @@ public class WorkflowExecuteThread { if (complementListDate.size() == 0 && needComplementProcess()) { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); logger.info(" process definition code:{} complement data: {}", - processInstance.getProcessDefinitionCode(), complementListDate.toString()); + processInstance.getProcessDefinitionCode(), complementListDate.toString()); if (complementListDate.size() > 0 && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); processService.updateProcessInstance(processInstance); } } @@ -801,7 +801,7 @@ public class WorkflowExecuteThread { try { ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION - && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { + && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } // package task instance before submit @@ -810,8 +810,8 @@ public class WorkflowExecuteThread { boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval(), masterConfig.isTaskLogger()); if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", - processInstance.getId(), processInstance.getName(), - taskInstance.getId(), taskInstance.getName()); + processInstance.getId(), processInstance.getName(), + taskInstance.getId(), taskInstance.getName()); return null; } validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId()); @@ -857,7 +857,7 @@ public class WorkflowExecuteThread { * find task instance in db. * in case submit more than one same name task in the same time. * - * @param taskCode task code + * @param taskCode task code * @param taskVersion task version * @return TaskInstance */ @@ -875,7 +875,7 @@ public class WorkflowExecuteThread { * encapsulation task * * @param processInstance process instance - * @param taskNode taskNode + * @param taskNode taskNode * @return TaskInstance */ private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { @@ -1083,34 +1083,51 @@ public class WorkflowExecuteThread { return DependResult.SUCCESS; } TaskNode taskNode = dag.getNode(taskCode); - List depCodeList = taskNode.getDepList(); - for (String depsNode : depCodeList) { - if (!dag.containsNode(depsNode) - || forbiddenTaskMap.containsKey(depsNode) - || skipTaskNodeMap.containsKey(depsNode)) { - continue; - } - // dependencies must be fully completed - if (!completeTaskMap.containsKey(depsNode)) { - return DependResult.WAITING; - } - Integer depsTaskId = completeTaskMap.get(depsNode); - ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState(); - if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) { - return DependResult.NON_EXEC; - } - // ignore task state if current task is condition - if (taskNode.isConditionsTask()) { - continue; - } - if (!dependTaskSuccess(depsNode, taskCode)) { - return DependResult.FAILED; + List indirectDepCodeList = new ArrayList<>(); + setIndirectDepList(taskCode, indirectDepCodeList); + for (String depsNode : indirectDepCodeList) { + if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) { + // dependencies must be fully completed + if (!completeTaskMap.containsKey(depsNode)) { + return DependResult.WAITING; + } + Integer depsTaskId = completeTaskMap.get(depsNode); + ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState(); + if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) { + return DependResult.NON_EXEC; + } + // ignore task state if current task is condition + if (taskNode.isConditionsTask()) { + continue; + } + if (!dependTaskSuccess(depsNode, taskCode)) { + return DependResult.FAILED; + } } } logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray())); return DependResult.SUCCESS; } + /** + * This function is specially used to handle the dependency situation where the parent node is a prohibited node. + * When the parent node is a forbidden node, the dependency relationship should continue to be traced + * + * @param taskCode taskCode + * @param indirectDepCodeList All indirectly dependent nodes + */ + private void setIndirectDepList(String taskCode, List indirectDepCodeList) { + TaskNode taskNode = dag.getNode(taskCode); + List depCodeList = taskNode.getDepList(); + for (String depsNode : depCodeList) { + if (forbiddenTaskMap.containsKey(depsNode)) { + setIndirectDepList(depsNode, indirectDepCodeList); + } else { + indirectDepCodeList.add(depsNode); + } + } + } + /** * depend node is completed, but here need check the condition task branch is the next node */ @@ -1156,9 +1173,9 @@ public class WorkflowExecuteThread { */ private ExecutionStatus runningState(ExecutionStatus state) { if (state == ExecutionStatus.READY_STOP - || state == ExecutionStatus.READY_PAUSE - || state == ExecutionStatus.WAITING_THREAD - || state == ExecutionStatus.DELAY_EXECUTION) { + || state == ExecutionStatus.READY_PAUSE + || state == ExecutionStatus.WAITING_THREAD + || state == ExecutionStatus.DELAY_EXECUTION) { // if the running task is not completed, the state remains unchanged return state; } else { @@ -1224,8 +1241,8 @@ public class WorkflowExecuteThread { List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) - || !isComplementEnd() - || readyToSubmitTaskQueue.size() > 0) { + || !isComplementEnd() + || readyToSubmitTaskQueue.size() > 0) { return ExecutionStatus.PAUSE; } else { return ExecutionStatus.SUCCESS; @@ -1264,8 +1281,8 @@ public class WorkflowExecuteThread { List stopList = getCompleteTaskByState(ExecutionStatus.STOP); List killList = getCompleteTaskByState(ExecutionStatus.KILL); if (CollectionUtils.isNotEmpty(stopList) - || CollectionUtils.isNotEmpty(killList) - || !isComplementEnd()) { + || CollectionUtils.isNotEmpty(killList) + || !isComplementEnd()) { return ExecutionStatus.STOP; } else { return ExecutionStatus.SUCCESS; @@ -1318,10 +1335,10 @@ public class WorkflowExecuteThread { ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { logger.info( - "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", - processInstance.getId(), processInstance.getName(), - processInstance.getState(), state, - processInstance.getCommandType()); + "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", + processInstance.getId(), processInstance.getName(), + processInstance.getState(), state, + processInstance.getCommandType()); processInstance.setState(state); if (state.typeIsFinished()) { @@ -1370,14 +1387,14 @@ public class WorkflowExecuteThread { */ private void removeTaskFromStandbyList(TaskInstance taskInstance) { logger.info("remove task from stand by list, id: {} name:{}", - taskInstance.getId(), - taskInstance.getName()); + taskInstance.getId(), + taskInstance.getName()); try { readyToSubmitTaskQueue.remove(taskInstance); } catch (Exception e) { logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}", - taskInstance.getId(), - taskInstance.getName(), e); + taskInstance.getId(), + taskInstance.getName(), e); } } @@ -1400,7 +1417,7 @@ public class WorkflowExecuteThread { */ private void killAllTasks() { logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), - activeTaskProcessorMaps.size()); + activeTaskProcessorMaps.size()); for (int taskId : activeTaskProcessorMaps.keySet()) { TaskInstance taskInstance = processService.findTaskInstanceById(taskId); if (taskInstance == null || taskInstance.getState().typeIsFinished()) { @@ -1567,10 +1584,10 @@ public class WorkflowExecuteThread { /** * generate flow dag * - * @param totalTaskNodeList total task node list - * @param startNodeNameList start node name list + * @param totalTaskNodeList total task node list + * @param startNodeNameList start node name list * @param recoveryNodeCodeList recovery node code list - * @param depNodeType depend node type + * @param depNodeType depend node type * @return ProcessDag process dag * @throws Exception exception */