diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 35ad1e8c82..9006812431 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -465,12 +465,13 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { * release task group * */ - public void releaseTaskGroup(TaskInstance taskInstance) throws InterruptedException { + public void releaseTaskGroup(TaskInstance taskInstance) { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); // todo: use Integer if (taskInstance.getTaskGroupId() <= 0) { log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup", taskInstance.getName()); + return; } TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance); if (nextTaskInstance == null) { @@ -1347,9 +1348,11 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { TaskExecutionStatus state = existTaskInstance.getState(); if (state == TaskExecutionStatus.RUNNING_EXECUTION || state == TaskExecutionStatus.DISPATCH - || state == TaskExecutionStatus.SUBMITTED_SUCCESS) { + || state == TaskExecutionStatus.SUBMITTED_SUCCESS + || state == TaskExecutionStatus.DELAY_EXECUTION) { // try to take over task instance if (state != TaskExecutionStatus.SUBMITTED_SUCCESS + && state != TaskExecutionStatus.DELAY_EXECUTION && tryToTakeOverTaskInstance(existTaskInstance)) { log.info("Success take over task {}", existTaskInstance.getName()); continue; @@ -1357,6 +1360,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // set the task instance state to fault tolerance existTaskInstance.setFlag(Flag.NO); existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); + releaseTaskGroup(existTaskInstance); + validTaskMap.remove(existTaskInstance.getTaskCode()); taskInstanceDao.updateById(existTaskInstance); existTaskInstance = cloneTolerantTaskInstance(existTaskInstance); @@ -1444,12 +1449,12 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { ITaskInstanceOperator iTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class); - UpdateWorkflowHostResponse updateWorkflowHostResponse = iTaskInstanceOperator.updateWorkflowInstanceHost( + UpdateWorkflowHostResponse response = iTaskInstanceOperator.updateWorkflowInstanceHost( new UpdateWorkflowHostRequest(taskInstance.getId(), masterConfig.getMasterAddress())); - if (!updateWorkflowHostResponse.isSuccess()) { + if (!response.isSuccess()) { log.error( - "Takeover TaskInstance failed, receive a failed response from worker: {}, will try to create a new TaskInstance", - taskInstance.getHost()); + "Takeover TaskInstance failed, receive a failed response: {} from worker: {}, will try to create a new TaskInstance", + response, taskInstance.getHost()); return false; }