mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 03:08:01 +08:00
cherry-pick Fix task group cannot release when kill task #13314
This commit is contained in:
parent
aa18878122
commit
9564bdeefb
@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
|
||||
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
@ -341,23 +342,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
||||
public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
|
||||
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
|
||||
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
|
||||
logger.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId());
|
||||
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
|
||||
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
|
||||
taskProcessor.action(TaskAction.DISPATCH);
|
||||
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
|
||||
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
|
||||
logger.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId());
|
||||
return true;
|
||||
}
|
||||
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
|
||||
logger.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId());
|
||||
boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue);
|
||||
if (acquireTaskGroup) {
|
||||
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
|
||||
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
|
||||
taskProcessor.action(TaskAction.DISPATCH);
|
||||
logger.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
logger.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId());
|
||||
return false;
|
||||
} else {
|
||||
logger.info(
|
||||
"Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.",
|
||||
taskGroupQueue);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void processTimeout() {
|
||||
@ -425,7 +436,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
||||
*
|
||||
*/
|
||||
public void releaseTaskGroup(TaskInstance taskInstance) {
|
||||
logger.info("Release task group");
|
||||
if (taskInstance.getTaskGroupId() > 0) {
|
||||
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
|
||||
if (nextTaskInstance != null) {
|
||||
@ -1765,6 +1775,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
||||
if (taskInstanceId == null || taskInstanceId.equals(0)) {
|
||||
continue;
|
||||
}
|
||||
LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId);
|
||||
try {
|
||||
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
|
||||
if (taskInstance == null || taskInstance.getState().isFinished()) {
|
||||
continue;
|
||||
@ -1779,6 +1791,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
|
||||
.build();
|
||||
this.addStateEvent(taskStateEvent);
|
||||
}
|
||||
} finally {
|
||||
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,11 +138,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
|
||||
public boolean killTask() {
|
||||
|
||||
try {
|
||||
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
|
||||
logger.info("Begin to kill task: {}", taskInstance.getName());
|
||||
if (taskInstance == null) {
|
||||
logger.warn("Kill task failed, the task instance is not exist");
|
||||
return true;
|
||||
}
|
||||
if (taskInstance.getState().isFinished()) {
|
||||
logger.warn("Kill task failed, the task instance is already finished");
|
||||
return true;
|
||||
}
|
||||
// we don't wait the kill response
|
||||
@ -153,12 +155,12 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
|
||||
killRemoteTask();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
|
||||
logger.error("Master kill task: {} error, taskInstance id: {}", taskInstance.getName(),
|
||||
taskInstance.getId(), e);
|
||||
return false;
|
||||
}
|
||||
|
||||
logger.info("master success kill taskInstance name: {} taskInstance id: {}",
|
||||
taskInstance.getName(), taskInstance.getId());
|
||||
logger.info("Master success kill task: {}, taskInstanceId: {}", taskInstance.getName(), taskInstance.getId());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3040,6 +3040,10 @@ public class ProcessServiceImpl implements ProcessService {
|
||||
logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
|
||||
return null;
|
||||
}
|
||||
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) {
|
||||
logger.info("The taskGroupQueue's status is in waiting, will not need to release task group");
|
||||
break;
|
||||
}
|
||||
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
|
||||
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
|
||||
taskGroup.getUseSize(),
|
||||
@ -3066,7 +3070,8 @@ public class ProcessServiceImpl implements ProcessService {
|
||||
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
|
||||
Flag.YES.getCode(),
|
||||
taskGroupQueue.getId()) != 1);
|
||||
logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
|
||||
logger.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}",
|
||||
taskInstance.getTaskGroupId(), taskGroupQueue.getId());
|
||||
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
|
||||
}
|
||||
|
||||
@ -3080,6 +3085,7 @@ public class ProcessServiceImpl implements ProcessService {
|
||||
@Override
|
||||
public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) {
|
||||
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId);
|
||||
taskGroupQueue.setInQueue(Flag.NO.getCode());
|
||||
taskGroupQueue.setStatus(status);
|
||||
taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis()));
|
||||
taskGroupQueueMapper.updateById(taskGroupQueue);
|
||||
|
@ -17,10 +17,10 @@
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.api.utils;
|
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
@ -37,6 +37,7 @@ import lombok.experimental.UtilityClass;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
@Slf4j
|
||||
@UtilityClass
|
||||
@ -73,4 +74,44 @@ public class LogUtils {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
public static String readWholeFileContent(String filePath) {
|
||||
String line;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) {
|
||||
while ((line = br.readLine()) != null) {
|
||||
sb.append(line + "\r\n");
|
||||
}
|
||||
return sb.toString();
|
||||
} catch (IOException e) {
|
||||
log.error("read file error", e);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) {
|
||||
setWorkflowInstanceIdMDC(workflowInstanceId);
|
||||
setTaskInstanceIdMDC(taskInstanceId);
|
||||
}
|
||||
|
||||
public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) {
|
||||
MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
|
||||
}
|
||||
|
||||
public static void setTaskInstanceIdMDC(Integer taskInstanceId) {
|
||||
MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
|
||||
}
|
||||
|
||||
public static void removeWorkflowAndTaskInstanceIdMDC() {
|
||||
removeWorkflowInstanceIdMDC();
|
||||
removeTaskInstanceIdMDC();
|
||||
}
|
||||
|
||||
public static void removeWorkflowInstanceIdMDC() {
|
||||
MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
|
||||
}
|
||||
|
||||
public static void removeTaskInstanceIdMDC() {
|
||||
MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user