mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-29 18:58:05 +08:00
Remove state check for dependent/subProcess in StateWheelExecuteThread (#14242)
This commit is contained in:
parent
7226edd04e
commit
5e9f1de075
@ -73,11 +73,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
|
||||
|
||||
/**
|
||||
* task state check list
|
||||
*/
|
||||
private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Autowired
|
||||
private MasterConfig masterConfig;
|
||||
|
||||
@ -104,7 +99,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
|
||||
try {
|
||||
checkTask4Timeout();
|
||||
checkTask4Retry();
|
||||
checkTask4State();
|
||||
checkProcess4Timeout();
|
||||
} catch (Exception e) {
|
||||
log.error("state wheel thread check error:", e);
|
||||
@ -214,30 +208,10 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
|
||||
log.info("remove task instance from retry check list");
|
||||
}
|
||||
|
||||
public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
|
||||
log.info("Adding task instance into state check list");
|
||||
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
|
||||
if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
|
||||
log.warn("Task instance is already in state check list");
|
||||
return;
|
||||
}
|
||||
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
|
||||
taskInstanceStateCheckList.add(taskInstanceKey);
|
||||
log.info("Added task instance into state check list");
|
||||
}
|
||||
}
|
||||
|
||||
public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
|
||||
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
|
||||
taskInstanceStateCheckList.remove(taskInstanceKey);
|
||||
log.info("Removed task instance from state check list");
|
||||
}
|
||||
|
||||
public void clearAllTasks() {
|
||||
processInstanceTimeoutCheckList.clear();
|
||||
taskInstanceTimeoutCheckList.clear();
|
||||
taskInstanceRetryCheckList.clear();
|
||||
taskInstanceStateCheckList.clear();
|
||||
}
|
||||
|
||||
private void checkTask4Timeout() {
|
||||
@ -352,56 +326,6 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkTask4State() {
|
||||
if (taskInstanceStateCheckList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (TaskInstanceKey taskInstanceKey : taskInstanceStateCheckList) {
|
||||
int processInstanceId = taskInstanceKey.getProcessInstanceId();
|
||||
long taskCode = taskInstanceKey.getTaskCode();
|
||||
|
||||
try {
|
||||
LogUtils.setTaskInstanceIdMDC(processInstanceId);
|
||||
WorkflowExecuteRunnable workflowExecuteThread =
|
||||
processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
|
||||
if (workflowExecuteThread == null) {
|
||||
log.warn(
|
||||
"Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
|
||||
taskInstanceStateCheckList.remove(taskInstanceKey);
|
||||
continue;
|
||||
}
|
||||
Optional<TaskInstance> taskInstanceOptional =
|
||||
workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
|
||||
if (!taskInstanceOptional.isPresent()) {
|
||||
log.warn(
|
||||
"Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
|
||||
taskInstanceStateCheckList.remove(taskInstanceKey);
|
||||
continue;
|
||||
}
|
||||
TaskInstance taskInstance = taskInstanceOptional.get();
|
||||
if (taskInstance.getState().isFinished()) {
|
||||
continue;
|
||||
}
|
||||
addTaskStateChangeEvent(taskInstance);
|
||||
} catch (Exception ex) {
|
||||
log.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
|
||||
} finally {
|
||||
LogUtils.removeWorkflowInstanceIdMDC();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addTaskStateChangeEvent(TaskInstance taskInstance) {
|
||||
TaskStateEvent stateEvent = TaskStateEvent.builder()
|
||||
.processInstanceId(taskInstance.getProcessInstanceId())
|
||||
.taskInstanceId(taskInstance.getId())
|
||||
.taskCode(taskInstance.getTaskCode())
|
||||
.type(StateEventType.TASK_STATE_CHANGE)
|
||||
.status(TaskExecutionStatus.RUNNING_EXECUTION)
|
||||
.build();
|
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent);
|
||||
}
|
||||
|
||||
private void addProcessStopEvent(ProcessInstance processInstance) {
|
||||
WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()
|
||||
.processInstanceId(processInstance.getId())
|
||||
|
@ -442,7 +442,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
|
||||
taskExecuteRunnableMap.remove(taskInstance.getTaskCode());
|
||||
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
|
||||
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
|
||||
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
|
||||
|
||||
if (taskInstance.getState().isSuccess()) {
|
||||
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
|
||||
@ -1043,7 +1042,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
|
||||
taskExecuteRunnable.dispatch();
|
||||
|
||||
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
|
||||
stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
Loading…
Reference in New Issue
Block a user