diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 9d6eef3259..a6d7aa134a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -392,13 +392,13 @@ public class WorkflowExecuteRunnable implements Callable { retryTaskInstance(taskInstance); } else if (taskInstance.getState().isFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); // There are child nodes and the failure policy is: CONTINUE if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { - errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -422,7 +422,6 @@ public class WorkflowExecuteRunnable implements Callable { /** * release task group * - * @param taskInstance */ public void releaseTaskGroup(TaskInstance taskInstance) { logger.info("Release task group"); @@ -449,7 +448,6 @@ public class WorkflowExecuteRunnable implements Callable { /** * crate new task instance to retry, different objects from the original * - * @param taskInstance */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { if (!taskInstance.taskCanRetry()) { @@ -662,10 +660,7 @@ public class WorkflowExecuteRunnable implements Callable { } private boolean needComplementProcess() { - if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()) { - return true; - } - return false; + return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess(); } /** @@ -1069,7 +1064,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * clone a new taskInstance for retry and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1091,7 +1086,7 @@ public class WorkflowExecuteRunnable implements Callable { /** * clone a new taskInstance for tolerant and reset some logic fields * - * @return + * @return taskInstance */ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); @@ -1111,9 +1106,9 @@ public class WorkflowExecuteRunnable implements Callable { /** * new a taskInstance * - * @param processInstance - * @param taskNode - * @return + * @param processInstance process instance + * @param taskNode task node + * @return task instance */ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { TaskInstance taskInstance = new TaskInstance(); @@ -1431,9 +1426,7 @@ public class WorkflowExecuteRunnable implements Callable { long taskCode = Long.parseLong(dependNodeName); Integer taskInstanceId = completeTaskMap.get(taskCode); TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); - if (depTaskState.isFailure()) { - return false; - } + return !depTaskState.isFailure(); } return true; }