[Fix-11413] Cannot set task status to kill if the task is not in running(#11414) (#12007)

(cherry picked from commit 496c2d4bfa)
This commit is contained in:
Wenjun Ruan 2022-09-17 13:55:20 +08:00 committed by GitHub
parent 36135d4151
commit fc72f8bae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
@ -31,8 +32,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.commons.lang.StringUtils;
import java.util.Date;
import com.google.auto.service.AutoService;
@ -131,29 +130,33 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (taskInstance.getState().typeIsFinished()) {
return true;
}
if (StringUtils.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
return true;
// we don't wait the kill response
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
if (StringUtils.isNotEmpty(taskInstance.getHost())) {
killRemoteTask();
}
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
} catch (ExecuteException e) {
logger.error("kill task error:", e);
} catch (Exception e) {
logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
return false;
}
logger.info("master kill taskInstance name :{} taskInstance id:{}",
logger.info("master success kill taskInstance name: {} taskInstance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
private void killRemoteTask() throws ExecuteException {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext =
new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
}
}