From 52134277a363f89cd49399d80e908172e47427c0 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 3 Jan 2023 09:52:03 +0800 Subject: [PATCH] Fix task group cannot release when kill task (#13314) --- .../runner/WorkflowExecuteRunnable.java | 45 ++++++++++++------- .../runner/task/CommonTaskProcessor.java | 10 +++-- .../service/process/ProcessServiceImpl.java | 8 +++- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 5fd97b8e07..ec76121279 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -376,23 +377,33 @@ public class WorkflowExecuteRunnable implements Callable { public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { + logger.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + logger.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId()); return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { + logger.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { TaskInstance taskInstance = this.taskInstanceDao.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); + logger.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); return true; } + logger.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId()); + return false; + } else { + logger.info( + "Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.", + taskGroupQueue); + return true; } - return false; } public void processTimeout() { @@ -464,7 +475,6 @@ public class WorkflowExecuteRunnable implements Callable { * */ public void releaseTaskGroup(TaskInstance taskInstance) { - logger.info("Release task group"); if (taskInstance.getTaskGroupId() > 0) { TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); if (nextTaskInstance != null) { @@ -1816,19 +1826,24 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstanceId == null || taskInstanceId.equals(0)) { continue; } - TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); - if (taskInstance == null || taskInstance.getState().isFinished()) { - continue; - } - taskProcessor.action(TaskAction.STOP); - if (taskProcessor.taskInstance().getState().isFinished()) { - TaskStateEvent taskStateEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) - .taskInstanceId(taskInstance.getId()) - .status(taskProcessor.taskInstance().getState()) - .type(StateEventType.TASK_STATE_CHANGE) - .build(); - this.addStateEvent(taskStateEvent); + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId); + try { + TaskInstance taskInstance = taskInstanceDao.findTaskInstanceById(taskInstanceId); + if (taskInstance == null || taskInstance.getState().isFinished()) { + continue; + } + taskProcessor.action(TaskAction.STOP); + if (taskProcessor.taskInstance().getState().isFinished()) { + TaskStateEvent taskStateEvent = TaskStateEvent.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(taskInstance.getId()) + .status(taskProcessor.taskInstance().getState()) + .type(StateEventType.TASK_STATE_CHANGE) + .build(); + this.addStateEvent(taskStateEvent); + } + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index cddc86ddaf..09eaa2ed17 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -146,11 +146,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor { public boolean killTask() { try { - taskInstance = taskInstanceDao.findTaskInstanceById(taskInstance.getId()); + logger.info("Begin to kill task: {}", taskInstance.getName()); if (taskInstance == null) { + logger.warn("Kill task failed, the task instance is not exist"); return true; } if (taskInstance.getState().isFinished()) { + logger.warn("Kill task failed, the task instance is already finished"); return true; } // we don't wait the kill response @@ -161,12 +163,12 @@ public class CommonTaskProcessor extends BaseTaskProcessor { killRemoteTask(); } } catch (Exception e) { - logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e); + logger.error("Master kill task: {} error, taskInstance id: {}", taskInstance.getName(), + taskInstance.getId(), e); return false; } - logger.info("master success kill taskInstance name: {} taskInstance id: {}", - taskInstance.getName(), taskInstance.getId()); + logger.info("Master success kill task: {}, taskInstanceId: {}", taskInstance.getName(), taskInstance.getId()); return true; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index cefa059f83..0ff34d7484 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2537,6 +2537,10 @@ public class ProcessServiceImpl implements ProcessService { logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); return null; } + if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) { + logger.info("The taskGroupQueue's status is in waiting, will not need to release task group"); + break; + } } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize(), @@ -2563,7 +2567,8 @@ public class ProcessServiceImpl implements ProcessService { } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1); - logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId()); + logger.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}", + taskInstance.getTaskGroupId(), taskGroupQueue.getId()); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } @@ -2577,6 +2582,7 @@ public class ProcessServiceImpl implements ProcessService { @Override public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) { TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId); + taskGroupQueue.setInQueue(Flag.NO.getCode()); taskGroupQueue.setStatus(status); taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis())); taskGroupQueueMapper.updateById(taskGroupQueue);