Add TaskExecutionRunnable in WorkflowExecutionDAG

This commit is contained in:
Wenjun Ruan 2024-04-01 10:40:38 +08:00
parent aa9a8b9431
commit 8fcd95a08e
86 changed files with 2121 additions and 633 deletions

View File

@ -15,5 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IChain {
package org.apache.dolphinscheduler.workflow.engine.dag;
public class BaseDAG {
}

View File

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.workflow.engine.dag;
import java.util.List;
import java.util.stream.Collectors;
/**
* The Directed Acyclic Graph class.
@ -27,9 +26,8 @@ import java.util.stream.Collectors;
* The nodes are the tasks, the edges are the dependencies between the tasks.
* The DAG is acyclic, which means there is no cycle in the graph.
* The DAG is a directed graph, which means the edges have direction.
*
*/
public interface DAG {
public interface DAG<Node, NodeIdentify> {
/**
* Get the direct post node of given dagNode, if the dagNode is null, return the nodes which doesn't have inDegrees.
@ -50,26 +48,7 @@ public interface DAG {
*/
List<Node> getDirectPostNodes(Node node);
/**
* Same with {@link #getDirectPostNodes(Node)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have inDegrees. Otherwise, return the post nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
*
* @param dagNodeName task name, can be null.
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
List<Node> getDirectPostNodes(String dagNodeName);
/**
* Same with {@link #getDirectPostNodes(String)}. Return the post node names.
*
* @param dagNodeName task name, can be null.
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
List<String> getDirectPostNodeNames(String dagNodeName);
List<Node> getDirectPostNodesByIdentify(NodeIdentify nodeIdentify);
/**
* Get the direct pre node of given dagNode, if the dagNode is null, return the nodes which doesn't have outDegrees.
@ -90,33 +69,14 @@ public interface DAG {
*/
List<Node> getDirectPreNodes(Node node);
/**
* Same with {@link #getDirectPreNodes(Node)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have outDegrees. Otherwise, return the pre nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
*
* @param dagNodeName task name, can be null.
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
List<Node> getDirectPreNodes(String dagNodeName);
/**
* Same with {@link #getDirectPreNodes(String)}. Return the pre node names.
*
* @param dagNodeName task name, can be null.
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
List<String> getDirectPreNodeNames(String dagNodeName);
List<Node> getDirectPreNodesByIdentify(NodeIdentify nodeIdentify);
/**
* Get the node of the DAG by the node name.
*
* @param nodeName the node name.
* @param nodeIdentify the node name.
* @return the node of the DAG, return null if cannot find the node.
*/
Node getDAGNode(String nodeName);
Node getDAGNode(NodeIdentify nodeIdentify);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public interface ITask {
ITaskIdentify getIdentify();
ITaskContext getContext();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public interface ITaskChain {
ITask getFrom();
ITask getTo();
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public interface ITaskContext {
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public interface ITaskIdentify {
Long getId();
String getName();
}

View File

@ -19,11 +19,6 @@ package org.apache.dolphinscheduler.workflow.engine.dag;
import lombok.AllArgsConstructor;
import lombok.Builder;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import lombok.Getter;
import lombok.NoArgsConstructor;

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public class Task implements ITask {
private final TaskIdentify taskIdentify;
private final TaskContext taskContext;
public Task(TaskIdentify taskIdentify, TaskContext taskContext) {
this.taskIdentify = taskIdentify;
this.taskContext = taskContext;
}
@Override
public TaskIdentify getIdentify() {
return taskIdentify;
}
@Override
public TaskContext getContext() {
return taskContext;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public class TaskChain implements ITaskChain {
private final Task from;
private final Task to;
public TaskChain(Task from, Task to) {
if (from == null && to == null) {
throw new IllegalArgumentException("from and to can not be null at the same time");
}
this.from = from;
this.to = to;
}
@Override
public Task getFrom() {
return from;
}
@Override
public Task getTo() {
return to;
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.dag;
public class TaskContext implements ITaskContext {
}

View File

@ -15,24 +15,29 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
package org.apache.dolphinscheduler.workflow.engine.dag;
import lombok.Getter;
import lombok.EqualsAndHashCode;
@Getter
public class WorkflowInstance implements IWorkflowInstance {
private final int id;
@EqualsAndHashCode
public class TaskIdentify implements ITaskIdentify {
private final Long id;
private final String name;
public WorkflowInstance(int id, String name) {
public TaskIdentify(Long id, String name) {
this.id = id;
this.name = name;
}
public static WorkflowInstance of(int id, String name) {
return new WorkflowInstance(id, name);
@Override
public Long getId() {
return id;
}
@Override
public String getName() {
return name;
}
}

View File

@ -17,105 +17,108 @@
package org.apache.dolphinscheduler.workflow.engine.dag;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The IWorkflowDAG represent the DAG of a workflow.
*/
public class WorkflowDAG implements DAG {
public class WorkflowDAG implements DAG<ITask, ITaskIdentify> {
private final Map<NodeIdentify, Node> dagNodeMap;
private final Map<ITaskIdentify, ITask> dagNodeMap;
private final Map<NodeIdentify, List<Node>> outdegreeMap;
private final Map<ITaskIdentify, Set<ITaskIdentify>> outdegreeMap;
private final Map<NodeIdentify, List<Node>> inDegredMap;
private final Map<ITaskIdentify, Set<ITaskIdentify>> inDegredMap;
public WorkflowDAG(List<Node> nodes, List<Edge> edges) {
this.dagNodeMap = nodes.stream().collect(Collectors.toMap(Node::getNodeIdentify, Function.identity()));
public WorkflowDAG(List<ITask> tasks,
List<ITaskChain> taskChains) {
this.dagNodeMap = new HashMap<>();
this.outdegreeMap = new HashMap<>();
this.inDegredMap = new HashMap<>();
// todo:
}
@Override
public List<Node> getDirectPostNodes(Node dagNode) {
NodeIdentify nodeIdentify = dagNode.getNodeIdentify();
if (!dagNodeMap.containsKey(nodeIdentify)) {
return Collections.emptyList();
}
Node node = dagNodeMap.get(nodeIdentify);
List<Node> nodes = new ArrayList<>();
for (DAGEdge edge : node.getOutDegrees()) {
if (dagNodeMap.containsKey(edge.getToNodeName())) {
nodes.add(dagNodeMap.get(edge.getToNodeName()));
for (ITask task : tasks) {
ITaskIdentify identify = task.getIdentify();
if (dagNodeMap.containsKey(identify)) {
throw new IllegalArgumentException("Duplicate task identify: " + identify);
}
dagNodeMap.put(identify, task);
}
for (ITaskChain taskChain : taskChains) {
ITask from = taskChain.getFrom();
ITask to = taskChain.getTo();
if (from == null) {
continue;
}
if (to == null) {
continue;
}
ITaskIdentify fromIdentify = from.getIdentify();
ITaskIdentify toIdentify = to.getIdentify();
Set<ITaskIdentify> outDegrees = outdegreeMap.computeIfAbsent(fromIdentify, k -> new HashSet<>());
if (outDegrees.contains(toIdentify)) {
throw new IllegalArgumentException("Duplicate task chain: " + fromIdentify + " -> " + toIdentify);
}
outDegrees.add(toIdentify);
Set<ITaskIdentify> inDegrees = inDegredMap.computeIfAbsent(toIdentify, k -> new HashSet<>());
if (inDegrees.contains(fromIdentify)) {
throw new IllegalArgumentException("Duplicate task chain: " + fromIdentify + " -> " + toIdentify);
}
inDegrees.add(fromIdentify);
}
return nodes;
}
@Override
public List<Node> getDirectPostNodes(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
public List<ITask> getDirectPostNodes(ITask iTask) {
if (iTask == null) {
return getDirectPostNodesByIdentify(null);
}
return getDirectPostNodes(node);
return getDirectPostNodesByIdentify(iTask.getIdentify());
}
@Override
public List<String> getDirectPostNodeNames(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
public List<ITask> getDirectPostNodesByIdentify(ITaskIdentify taskIdentify) {
if (taskIdentify == null) {
return dagNodeMap.values()
.stream()
.filter(task -> !inDegredMap.containsKey(task.getIdentify()))
.collect(Collectors.toList());
}
return getDirectPostNodes(node).stream()
.map(Node::getNodeName)
return outdegreeMap.getOrDefault(taskIdentify, Collections.emptySet())
.stream()
.map(dagNodeMap::get)
.collect(Collectors.toList());
}
@Override
public List<Node> getDirectPreNodes(Node dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
public List<ITask> getDirectPreNodes(ITask iTask) {
if (iTask == null) {
return getDirectPreNodesByIdentify(null);
}
Node node = dagNodeMap.get(nodeName);
List<Node> nodes = new ArrayList<>();
for (DAGEdge edge : node.getInDegrees()) {
if (dagNodeMap.containsKey(edge.getFromNodeName())) {
nodes.add(dagNodeMap.get(edge.getFromNodeName()));
}
}
return nodes;
return getDirectPreNodesByIdentify(iTask.getIdentify());
}
@Override
public List<Node> getDirectPreNodes(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
public List<ITask> getDirectPreNodesByIdentify(ITaskIdentify taskIdentify) {
if (taskIdentify == null) {
return dagNodeMap.values()
.stream()
.filter(task -> !outdegreeMap.containsKey(taskIdentify))
.collect(Collectors.toList());
}
return getDirectPreNodes(node);
return inDegredMap.getOrDefault(taskIdentify, Collections.emptySet())
.stream()
.map(dagNodeMap::get)
.collect(Collectors.toList());
}
@Override
public List<String> getDirectPreNodeNames(String dagNodeName) {
Node node = getDAGNode(dagNodeName);
if (dagNodeName != null && node == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(node).stream().map(Node::getNodeName).collect(Collectors.toList());
public ITask getDAGNode(ITaskIdentify taskIdentify) {
return dagNodeMap.get(taskIdentify);
}
@Override
public Node getDAGNode(String nodeName) {
return dagNodeMap.get(nodeName);
}
}

View File

@ -17,155 +17,167 @@
package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.dag.Node;
import org.apache.dolphinscheduler.workflow.engine.dag.ITask;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationEvent;
import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationType;
import org.apache.dolphinscheduler.workflow.engine.event.WorkflowFinishEvent;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionPlan;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnableFactory;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionContext;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionDAG;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.WorkflowExecutionDAG;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Deprecated
@Slf4j
public class DAGEngine implements IDAGEngine {
private final IWorkflowExecutionContext workflowExecutionContext;
private final ITaskExecutionRunnableFactory taskExecutionRunnableFactory;
private final IEventRepository eventRepository;
private final WorkflowExecutionDAG workflowExecutionDAG;
private final WorkflowDAG workflowDAG;
public DAGEngine(IWorkflowExecutionContext workflowExecutionContext,
ITaskExecutionRunnableFactory taskExecutionRunnableFactory) {
private final IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify;
public DAGEngine(IWorkflowExecutionContext workflowExecutionContext) {
this.workflowExecutionContext = workflowExecutionContext;
this.taskExecutionRunnableFactory = taskExecutionRunnableFactory;
this.workflowExecutionRunnableIdentify = workflowExecutionContext.getIdentify();
this.eventRepository = workflowExecutionContext.getEventRepository();
this.workflowDAG = workflowExecutionContext.getWorkflowDAG();
this.workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
}
@Override
public void triggerNextTasks(String parentTaskNodeName) {
workflowExecutionContext.getWorkflowExecutionDAG()
.getDirectPostNodeNames(parentTaskNodeName)
.forEach(this::triggerTask);
public void start() {
List<ITaskIdentify> startTaskIdentifies = workflowExecutionContext.getStartTaskIdentifies();
// If the start task is empty, trigger from the beginning
if (CollectionUtils.isEmpty(startTaskIdentifies)) {
startTaskIdentifies = workflowDAG.getDirectPostNodesByIdentify(null)
.stream()
.map(ITask::getIdentify)
.collect(Collectors.toList());
}
if (CollectionUtils.isEmpty(startTaskIdentifies)) {
workflowFinish();
return;
}
startTaskIdentifies.forEach(this::triggerTask);
}
@Override
public void triggerTask(String taskName) {
IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
Node node = workflowExecutionDAG.getDAGNode(taskName);
if (node == null) {
log.error("Cannot find the DAGNode for task: {}", taskName);
return;
}
// todo: Use condition check?
// How to make sure the
if (!workflowExecutionDAG.isTaskAbleToBeTriggered(taskName)) {
log.info("The task: {} is not able to be triggered", taskName);
return;
}
if (node.isSkip()) {
log.info("The task: {} is skipped", taskName);
triggerNextTasks(taskName);
return;
}
ITaskExecutionRunnable taskExecutionRunnable =
taskExecutionRunnableFactory.createTaskExecutionRunnable(taskName, workflowExecutionContext);
workflowExecutionDAG.storeTaskExecutionRunnable(taskExecutionRunnable);
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.taskOperationType(TaskOperationType.RUN)
.build();
workflowExecutionContext.getEventRepository().storeEventToTail(taskOperationEvent);
}
@Override
public void failoverTask(Integer taskInstanceId) {
IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
ITaskExecutionRunnable taskExecutionRunnable =
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for taskInstance: {}", taskInstanceId);
return;
}
ITaskExecutionRunnable failoverTaskExecutionRunnable = taskExecutionRunnableFactory
.createFailoverTaskExecutionRunnable(taskExecutionRunnable, workflowExecutionContext);
workflowExecutionDAG.storeTaskExecutionRunnable(failoverTaskExecutionRunnable);
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.taskOperationType(TaskOperationType.FAILOVER)
.build();
workflowExecutionContext.getEventRepository().storeEventToTail(taskOperationEvent);
}
@Override
public void retryTask(Integer taskInstanceId) {
IWorkflowExecutionDAG workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
ITaskExecutionRunnable taskExecutionRunnable =
workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for taskInstance: {}", taskInstanceId);
return;
}
ITaskExecutionRunnable retryTaskExecutionRunnable = taskExecutionRunnableFactory
.createRetryTaskExecutionRunnable(taskExecutionRunnable, workflowExecutionContext);
workflowExecutionDAG.storeTaskExecutionRunnable(retryTaskExecutionRunnable);
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.taskOperationType(TaskOperationType.RETRY)
.build();
workflowExecutionContext.getEventRepository().storeEventToTail(taskOperationEvent);
}
@Override
public void pauseAllTask() {
workflowExecutionContext.getWorkflowExecutionDAG()
.getActiveTaskExecutionRunnable()
public void triggerNextTasks(ITaskIdentify taskIdentify) {
List<ITaskIdentify> directPostNodeIdentifies = workflowDAG.getDirectPostNodesByIdentify(taskIdentify)
.stream()
.map(taskExecutionRunnable -> taskExecutionRunnable.getTaskExecutionContext().getTaskInstance().getId())
.forEach(this::pauseTask);
}
@Override
public void pauseTask(Integer taskInstanceId) {
ITaskExecutionRunnable taskExecutionRunnable =
workflowExecutionContext.getWorkflowExecutionDAG().getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for taskInstance: {}", taskInstanceId);
.map(ITask::getIdentify)
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(directPostNodeIdentifies)) {
directPostNodeIdentifies.forEach(this::triggerTask);
return;
}
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.taskOperationType(TaskOperationType.PAUSE)
.build();
workflowExecutionContext.getEventRepository().storeEventToTail(taskOperationEvent);
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
// The task chain is finished, but there are still active tasks, wait for the active tasks to finish
}
@Override
public void killAllTask() {
workflowExecutionContext.getWorkflowExecutionDAG()
.getActiveTaskExecutionRunnable()
public void triggerTask(ITaskIdentify taskIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskIdentify);
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: " + taskIdentify);
}
eventRepository.storeEventToTail(TaskOperationEvent.startEvent(taskExecutionPlan));
}
@Override
public void failoverTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify) {
ITaskExecutionPlan taskExecutionPlan =
workflowExecutionDAG.getDAGNode(taskExecutionRunnableIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: "
+ taskExecutionRunnableIdentify.getTaskIdentify());
}
eventRepository.storeEventToTail(TaskOperationEvent.failoverEvent(taskExecutionPlan));
}
@Override
public void retryTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify) {
ITaskExecutionPlan taskExecutionPlan =
workflowExecutionDAG.getDAGNode(taskExecutionRunnableIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: "
+ taskExecutionRunnableIdentify.getTaskIdentify());
}
eventRepository.storeEventToTail(TaskOperationEvent.retryEvent(taskExecutionPlan));
}
@Override
public void pause() {
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
activeTaskExecutionIdentify.forEach(this::pauseTask);
}
@Override
public void pauseTask(ITaskExecutionRunnableIdentify taskExecutionIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskExecutionIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException(
"Cannot find the ITaskExecutionPlan for taskIdentify: " + taskExecutionIdentify.getTaskIdentify());
}
eventRepository.storeEventToTail(TaskOperationEvent.pauseEvent(taskExecutionPlan));
}
@Override
public void kill() {
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
activeTaskExecutionIdentify.forEach(this::killTask);
}
@Override
public void killTask(ITaskExecutionRunnableIdentify taskExecutionIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskExecutionIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException(
"Cannot find the ITaskExecutionPlan for taskIdentify: " + taskExecutionIdentify.getTaskIdentify());
}
eventRepository.storeEventToTail(TaskOperationEvent.killEvent(taskExecutionPlan));
}
private void workflowFinish() {
if (workflowExecutionDAG.isFailed()) {
}
eventRepository.storeEventToTail(WorkflowFinishEvent.of(workflowExecutionRunnableIdentify));
}
private List<ITaskExecutionRunnableIdentify> getActiveTaskExecutionIdentify() {
return workflowExecutionDAG.getActiveTaskExecutionPlan()
.stream()
.map(taskExecutionRunnable -> taskExecutionRunnable.getTaskExecutionContext().getTaskInstance().getId())
.forEach(this::killTask);
}
@Override
public void killTask(Integer taskInstanceId) {
ITaskExecutionRunnable taskExecutionRunnable =
workflowExecutionContext.getWorkflowExecutionDAG().getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for taskInstance: {}", taskInstanceId);
return;
}
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.taskOperationType(TaskOperationType.KILL)
.build();
workflowExecutionContext.getEventRepository().storeEventToTail(taskOperationEvent);
.map(ITaskExecutionPlan::getActiveTaskExecutionRunnable)
.map(ITaskExecutionRunnable::getIdentify)
.collect(Collectors.toList());
}
}

View File

@ -33,6 +33,6 @@ public class DAGEngineFactory implements IDAGEngineFactory {
@Override
public IDAGEngine createDAGEngine(IWorkflowExecutionContext workflowExecutionContext) {
return new DAGEngine(workflowExecutionContext, taskExecutionRunnableFactory);
return new DAGEngine(workflowExecutionContext);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionContext;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableRepository;
import org.apache.commons.collections4.CollectionUtils;
@ -83,24 +84,22 @@ public class EventEngine extends BaseDaemonThread implements IEventEngine {
for (IWorkflowExecutionRunnable workflowExecutionRunnable : workflowExecutionRunnableList) {
IWorkflowExecutionContext workflowExecutionContext =
workflowExecutionRunnable.getWorkflowExecutionContext();
final Integer workflowInstanceId = workflowExecutionContext.getWorkflowInstanceId();
final String workflowInstanceName = workflowExecutionContext.getWorkflowInstanceName();
IWorkflowExecutionRunnableIdentify identify = workflowExecutionContext.getIdentify();
try {
MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
if (workflowExecutionRunnable.isEventFiring()) {
log.debug("WorkflowExecutionRunnable: {} is already in firing", workflowInstanceName);
continue;
}
MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(identify.getId()));
// if (workflowExecutionRunnable.isEventFiring()) {
// log.debug("WorkflowExecutionRunnable: {} is already in firing", identify);
// continue;
// }
eventFirer.fireActiveEvents(workflowExecutionRunnable)
.whenComplete((fireCount, ex) -> {
workflowExecutionRunnable.setEventFiring(false);
// workflowExecutionRunnable.setEventFiring(false);
if (ex != null) {
log.error("Fire event for WorkflowExecutionRunnable: {} error", workflowInstanceName,
ex);
log.error("Fire event for WorkflowExecutionRunnable: {} error", identify, ex);
} else {
if (fireCount > 0) {
log.info("Fire {} events for WorkflowExecutionRunnable: {} success", fireCount,
workflowInstanceName);
identify);
}
}
});

View File

@ -17,9 +17,6 @@
package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.event.EventOperatorManager;
import org.apache.dolphinscheduler.workflow.engine.event.IEvent;
import org.apache.dolphinscheduler.workflow.engine.event.IEventOperatorManager;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableRepository;
import org.apache.dolphinscheduler.workflow.engine.workflow.SingletonWorkflowExecutionRunnableRepository;
@ -33,11 +30,6 @@ public class EventEngineFactory implements IEventEngineFactory {
private IWorkflowExecutionRunnableRepository workflowExecuteRunnableRepository;
private static final IEventOperatorManager<IEvent> DEFAULT_EVENT_OPERATOR_MANAGER =
EventOperatorManager.getInstance();
private IEventOperatorManager<IEvent> eventOperatorManager = DEFAULT_EVENT_OPERATOR_MANAGER;
private static final int DEFAULT_EVENT_FIRE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private int eventFireThreadPoolSize = DEFAULT_EVENT_FIRE_THREAD_POOL_SIZE;
@ -54,11 +46,6 @@ public class EventEngineFactory implements IEventEngineFactory {
return this;
}
public EventEngineFactory withEventOperatorManager(IEventOperatorManager<IEvent> eventOperatorManager) {
this.eventOperatorManager = eventOperatorManager;
return this;
}
public int withEventFireThreadPoolSize(int eventFireThreadPoolSize) {
this.eventFireThreadPoolSize = eventFireThreadPoolSize;
return this.eventFireThreadPoolSize;
@ -66,7 +53,7 @@ public class EventEngineFactory implements IEventEngineFactory {
@Override
public IEventEngine createEventEngine() {
EventFirer eventFirer = new EventFirer(eventOperatorManager, eventFireThreadPoolSize);
EventFirer eventFirer = new EventFirer(eventFireThreadPoolSize);
return new EventEngine(workflowExecuteRunnableRepository, eventFirer);
}
}

View File

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.workflow.engine.event.IAsyncEvent;
import org.apache.dolphinscheduler.workflow.engine.event.IEvent;
import org.apache.dolphinscheduler.workflow.engine.event.IEventOperatorManager;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
import org.apache.dolphinscheduler.workflow.engine.utils.ExceptionUtils;
import org.apache.dolphinscheduler.workflow.engine.workflow.IEventfulExecutionRunnable;
@ -33,12 +32,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EventFirer implements IEventFirer {
private final IEventOperatorManager<IEvent> eventOperatorManager;
private final ThreadPoolExecutor eventFireThreadPool;
public EventFirer(IEventOperatorManager<IEvent> eventOperatorManager, int eventFireThreadPoolSize) {
this.eventOperatorManager = eventOperatorManager;
public EventFirer(int eventFireThreadPoolSize) {
this.eventFireThreadPool =
ThreadUtils.newDaemonFixedThreadExecutor("EventFireThreadPool", eventFireThreadPoolSize);
}
@ -86,7 +82,7 @@ public class EventFirer implements IEventFirer {
private void fireAsyncEvent(IEvent event) {
CompletableFuture.runAsync(() -> {
log.info("Begin fire IAsyncEvent: {}", event);
eventOperatorManager.getEventOperator(event).handleEvent(event);
event.getEventOperation().operate();
log.info("Success fire IAsyncEvent: {}", event);
}, eventFireThreadPool).exceptionally(ex -> {
log.error("Failed to fire IAsyncEvent: {}", event, ex);
@ -96,7 +92,7 @@ public class EventFirer implements IEventFirer {
private void fireSyncEvent(IEvent event) {
log.info("Begin fire SyncEvent: {}", event);
eventOperatorManager.getEventOperator(event).handleEvent(event);
event.getEventOperation().operate();
log.info("Success fire SyncEvent: {}", event);
}

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionDAG;
/**
@ -26,47 +28,51 @@ import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionDA
public interface IDAGEngine {
/**
* Trigger the tasks which are post of the given task.
* <P> If there are no task after the given taskNode, will try to finish the WorkflowExecutionRunnable.
* <p> If the
*
* @param parentTaskNodeName the parent task name
* Start the DAGEngine, will trigger the start tasks.
*/
void triggerNextTasks(String parentTaskNodeName);
void start();
/**
* Trigger the given task
* Trigger the tasks which are post of the given task.
* <P> If there are no task after the given taskNode, will try to finish the WorkflowExecutionRunnable(Send a task chain end event).
*
* @param taskName task name
* @param taskIdentify the parent task identify
*/
void triggerTask(String taskName);
void triggerNextTasks(ITaskIdentify taskIdentify);
/**
* Trigger the given task.
*
* @param taskIdentify task name
*/
void triggerTask(ITaskIdentify taskIdentify);
/**
* Failover the given task.
*
* @param taskInstanceId taskInstanceId
*/
void failoverTask(Integer taskInstanceId);
void failoverTask(ITaskExecutionRunnableIdentify taskInstanceId);
/**
* Retry the given task.
*
* @param taskInstanceId taskInstanceId
*/
void retryTask(Integer taskInstanceId);
void retryTask(ITaskExecutionRunnableIdentify taskInstanceId);
void pauseAllTask();
void pause();
/**
* Pause the given task.
*/
void pauseTask(Integer taskInstanceId);
void pauseTask(ITaskExecutionRunnableIdentify taskExecutionIdentify);
void killAllTask();
void kill();
/**
* Kill the given task.
*/
void killTask(Integer taskId);
void killTask(ITaskExecutionRunnableIdentify taskExecutionIdentify);
}

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.workflow.engine.engine;
import org.apache.dolphinscheduler.workflow.engine.exception.WorkflowExecuteRunnableNotFoundException;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
/**
* The WorkflowEngine is responsible for starting, stopping, pausing, and finalizing {@link IWorkflowExecutionRunnable}.
@ -41,25 +42,25 @@ public interface IWorkflowEngine {
/**
* Pause a workflow instance.
*
* @param workflowInstanceId the ID of the workflow to pause
* @param workflowExecutionRunnableIdentify the ID of the workflow to pause
* @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found
*/
void pauseWorkflow(Integer workflowInstanceId);
void pauseWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify);
/**
* Kill a workflow instance.
*
* @param workflowInstanceId the ID of the workflow to stop
* @param workflowExecutionRunnableIdentify the ID of the workflow to stop
* @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found
*/
void killWorkflow(Integer workflowInstanceId);
void killWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify);
/**
* Finalize a workflow instance. Once a workflow has been finalized, then it cannot receive new operation, and will be removed from memory.
*
* @param workflowInstanceId the ID of the workflow to finalize
* @param workflowExecutionRunnableIdentify the ID of the workflow to finalize
*/
void finalizeWorkflow(Integer workflowInstanceId);
void finalizeWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify);
/**
* Shutdown the workflow engine. The workflow engine cannot be restarted after shutdown. This method will block until the workflow engine is completely shutdown.

View File

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.workflow.engine.event.WorkflowOperationEvent;
import org.apache.dolphinscheduler.workflow.engine.exception.WorkflowExecuteRunnableNotFoundException;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionContext;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableRepository;
import lombok.extern.slf4j.Slf4j;
@ -46,47 +47,49 @@ public class WorkflowEngine implements IWorkflowEngine {
@Override
public void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable) {
IWorkflowExecutionContext workflowExecutionContext = workflowExecuteRunnable.getWorkflowExecutionContext();
Integer workflowInstanceId = workflowExecutionContext.getWorkflowInstanceId();
log.info("Triggering WorkflowExecutionRunnable: {}", workflowExecutionContext.getWorkflowInstanceName());
IWorkflowExecutionRunnableIdentify workflowExecutionIdentify = workflowExecutionContext.getIdentify();
log.info("Triggering WorkflowExecutionRunnable: {}", workflowExecutionIdentify);
workflowExecuteRunnableRepository.storeWorkflowExecutionRunnable(workflowExecuteRunnable);
workflowExecuteRunnable.storeEventToTail(WorkflowOperationEvent.triggerEvent(workflowInstanceId));
workflowExecuteRunnable
.storeEventToTail(WorkflowOperationEvent.triggerEvent(workflowExecuteRunnable));
}
@Override
public void pauseWorkflow(Integer workflowInstanceId) {
public void pauseWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
IWorkflowExecutionRunnable workflowExecuteRunnable =
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
workflowExecuteRunnableRepository.getWorkflowExecutionRunnable(workflowExecutionRunnableIdentify);
if (workflowExecuteRunnable == null) {
throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
throw new WorkflowExecuteRunnableNotFoundException(workflowExecutionRunnableIdentify);
}
log.info("Pausing WorkflowExecutionRunnable: {}",
workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstanceName());
workflowExecuteRunnable.storeEventToTail(WorkflowOperationEvent.pauseEvent(workflowInstanceId));
workflowExecuteRunnable.getWorkflowExecutionContext().getIdentify());
workflowExecuteRunnable
.storeEventToTail(WorkflowOperationEvent.pauseEvent(workflowExecuteRunnable));
}
@Override
public void killWorkflow(Integer workflowInstanceId) {
public void killWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
IWorkflowExecutionRunnable workflowExecuteRunnable =
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
workflowExecuteRunnableRepository.getWorkflowExecutionRunnable(workflowExecutionRunnableIdentify);
if (workflowExecuteRunnable == null) {
throw new WorkflowExecuteRunnableNotFoundException(workflowInstanceId);
throw new WorkflowExecuteRunnableNotFoundException(workflowExecutionRunnableIdentify);
}
log.info("Killing WorkflowExecutionRunnable: {}",
workflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstanceName());
workflowExecuteRunnable.storeEventToTail(WorkflowOperationEvent.killEvent(workflowInstanceId));
workflowExecuteRunnable.getWorkflowExecutionContext().getIdentify());
workflowExecuteRunnable
.storeEventToTail(WorkflowOperationEvent.killEvent(workflowExecuteRunnable));
}
@Override
public void finalizeWorkflow(Integer workflowInstanceId) {
public void finalizeWorkflow(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
IWorkflowExecutionRunnable workflowExecutionRunnable =
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(workflowInstanceId);
workflowExecuteRunnableRepository.getWorkflowExecutionRunnable(workflowExecutionRunnableIdentify);
if (workflowExecutionRunnable == null) {
return;
}
// todo: If the workflowExecutionRunnable is not finished, we cannot finalize it.
log.info("Finalizing WorkflowExecutionRunnable: {}",
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstanceName());
workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowInstanceId);
log.info("Finalizing WorkflowExecutionRunnable: {}", workflowExecutionRunnable.getIdentity());
workflowExecuteRunnableRepository.removeWorkflowExecutionRunnable(workflowExecutionRunnableIdentify);
}
@Override

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface EventAction {
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableRepository;
public class EventDispatcher implements IEventDispatcher {
private final IWorkflowExecutionRunnableRepository workflowExecutionRunnableRepository;
public EventDispatcher(IWorkflowExecutionRunnableRepository workflowExecutionRunnableRepository) {
this.workflowExecutionRunnableRepository = workflowExecutionRunnableRepository;
}
// todo: Do we need to split the EventRepository from WorkflowExecutionRunnable?
@Override
public void dispatch(IEvent event) {
IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify;
if (event instanceof IWorkflowEvent) {
workflowExecutionRunnableIdentify = ((IWorkflowEvent) event).getWorkflowExecutionRunnableIdentify();
} else {
throw new UnsupportedOperationException("Unsupported event: " + event);
}
// todo:
workflowExecutionRunnableRepository.getWorkflowExecutionRunnable(workflowExecutionRunnableIdentify)
.getEventRepository().storeEventToTail(event);
}
}

View File

@ -28,7 +28,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EventOperatorManager implements IEventOperatorManager<IEvent> {
private static final Map<String, IEventOperator<IEvent>> EVENT_OPERATOR_MAP = new HashMap<>();
private static final Map<IEventType, IEventOperator<IEvent>> EVENT_OPERATOR_MAP = new HashMap<>();
private static final EventOperatorManager INSTANCE = new EventOperatorManager();
@ -39,8 +39,8 @@ public class EventOperatorManager implements IEventOperatorManager<IEvent> {
return INSTANCE;
}
public void registerEventOperator(IEventOperator<IEvent> eventOperator) {
EVENT_OPERATOR_MAP.put(eventOperator.getClass().getSimpleName(), eventOperator);
public void registerEventOperator(IEventType eventType, IEventOperator<IEvent> eventOperator) {
EVENT_OPERATOR_MAP.put(eventType, eventOperator);
}
@Override
@ -48,10 +48,10 @@ public class EventOperatorManager implements IEventOperatorManager<IEvent> {
if (event == null) {
throw new IllegalArgumentException("event cannot be null");
}
if (event.getEventOperatorClass() == null) {
if (event.getEventType() == null) {
throw new IllegalArgumentException("event operator class cannot be null");
}
return EVENT_OPERATOR_MAP.get(event.getEventOperatorClass().getSimpleName());
return EVENT_OPERATOR_MAP.get(event.getEventType());
}
}

View File

@ -17,8 +17,10 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
public interface IEvent {
Class getEventOperatorClass();
IWorkflowExecutionRunnableIdentify getWorkflowExecutionRunnableIdentify();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IEventDispatcher {
void dispatch(IEvent event);
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IEventOperation<T> {
void operate(T t);
}

View File

@ -22,7 +22,7 @@ package org.apache.dolphinscheduler.workflow.engine.event;
*/
public interface IEventOperatorManager<E> {
void registerEventOperator(IEventOperator<E> eventOperator);
void registerEventOperator(IEventType eventType, IEventOperator<E> eventOperator);
/**
* Get the {@link IEventOperator} for the given event.

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IEventType {
}

View File

@ -18,4 +18,5 @@
package org.apache.dolphinscheduler.workflow.engine.event;
public interface ISyncEvent {
}

View File

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable;
public interface ITaskEvent extends IEvent {
ITaskExecutionRunnable getTaskExecutionRunnable();
}

View File

@ -17,13 +17,8 @@
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IWorkflowEvent extends IEvent {
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
/**
* The id of WorkflowInstance which the event is related to
*
* @return workflowInstanceId, shouldn't be null
*/
Integer getWorkflowInstanceId();
public interface IWorkflowEvent extends IEvent<IWorkflowExecutionRunnableIdentify> {
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IWorkflowExecutionRunnableEventBuilder {
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.event;
public interface IWorkflowExecutionRunnableEventOperation extends IEventOperation {
}

View File

@ -17,25 +17,39 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionPlan;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskOperationEvent implements ITaskEvent, ISyncEvent {
private ITaskExecutionRunnable taskExecutionRunnable;
private IEventOperation eventOperation;
private TaskOperationType taskOperationType;
@Override
public Class getEventOperatorClass() {
return TaskOperationEventOperator.class;
public TaskOperationEvent(IEventOperation eventOperation) {
this.eventOperation = eventOperation;
}
public static TaskOperationEvent startEvent(ITaskExecutionPlan taskExecutionPlan) {
return new TaskOperationEvent(taskExecutionPlan::start);
}
public static TaskOperationEvent failoverEvent(ITaskExecutionPlan taskExecutionPlan) {
return new TaskOperationEvent(taskExecutionPlan::failoverTask);
}
public static TaskOperationEvent retryEvent(ITaskExecutionPlan taskExecutionPlan) {
return new TaskOperationEvent(taskExecutionPlan::retryTask);
}
public static TaskOperationEvent pauseEvent(ITaskExecutionPlan taskExecutionPlan) {
return new TaskOperationEvent(taskExecutionPlan::pauseTask);
}
public static TaskOperationEvent killEvent(ITaskExecutionPlan taskExecutionPlan) {
return new TaskOperationEvent(taskExecutionPlan::killTask);
}
}

View File

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.ITaskExecutionRunnable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ -26,19 +24,6 @@ public class TaskOperationEventOperator implements ITaskEventOperator<TaskOperat
@Override
public void handleEvent(TaskOperationEvent event) {
ITaskExecutionRunnable taskExecutionRunnable = event.getTaskExecutionRunnable();
switch (event.getTaskOperationType()) {
case RUN:
taskExecutionRunnable.dispatch();
break;
case KILL:
taskExecutionRunnable.kill();
break;
case PAUSE:
taskExecutionRunnable.pause();
break;
default:
log.error("Unknown TaskOperationType for event: {}", event);
}
}
}

View File

@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.workflow.engine.event;
public enum TaskOperationType {
public enum TaskOperationEventType implements IEventType {
FAILOVER,
RUN,
START,
RETRY,
KILL,
PAUSE,

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -28,12 +30,18 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class WorkflowFailedEvent implements IWorkflowEvent {
private Integer workflowInstanceId;
private String failedReason;
private IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify;
@Override
public Class getEventOperatorClass() {
public IWorkflowExecutionRunnableIdentify getWorkflowExecutionRunnableIdentify() {
return null;
}
@Override
public IEventType getEventType() {
return null;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -31,7 +33,12 @@ public class WorkflowFinalizeEvent implements IWorkflowEvent, ISyncEvent {
private Integer workflowInstanceId;
@Override
public Class getEventOperatorClass() {
return WorkflowOperationEventOperator.class;
public IEventType getEventType() {
return null;
}
@Override
public IWorkflowExecutionRunnableIdentify getWorkflowExecutionRunnableIdentify() {
return null;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -30,12 +30,17 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class WorkflowFinishEvent implements IWorkflowEvent, ISyncEvent {
private Integer workflowInstanceId;
private IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify;
private WorkflowExecutionStatus workflowExecutionStatus;
public static WorkflowFinishEvent of(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
return WorkflowFinishEvent.builder()
.workflowExecutionRunnableIdentify(workflowExecutionRunnableIdentify)
.build();
}
@Override
public Class getEventOperatorClass() {
public IEventType getEventType() {
return null;
}
}

View File

@ -17,39 +17,30 @@
package org.apache.dolphinscheduler.workflow.engine.event;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
@Data
@Builder
@AllArgsConstructor
public class WorkflowOperationEvent implements IWorkflowEvent, ISyncEvent {
private Integer workflowInstanceId;
private WorkflowOperationType workflowOperationType;
private final IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify;
public static WorkflowOperationEvent of(Integer workflowInstanceId, WorkflowOperationType workflowOperationType) {
return WorkflowOperationEvent.builder()
.workflowInstanceId(workflowInstanceId)
.workflowOperationType(workflowOperationType)
.build();
public WorkflowOperationEvent(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
this.workflowExecutionRunnableIdentify = workflowExecutionRunnableIdentify;
}
public static WorkflowOperationEvent triggerEvent(Integer workflowInstanceId) {
return of(workflowInstanceId, WorkflowOperationType.TRIGGER);
public static WorkflowOperationEvent triggerEvent(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
return new WorkflowOperationEvent(workflowExecutionRunnableIdentify);
}
public static WorkflowOperationEvent pauseEvent(Integer workflowInstanceId) {
return of(workflowInstanceId, WorkflowOperationType.PAUSE);
public static WorkflowOperationEvent pauseEvent(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
return new WorkflowOperationEvent(workflowExecutionRunnableIdentify);
}
public static WorkflowOperationEvent killEvent(Integer workflowInstanceId) {
return of(workflowInstanceId, WorkflowOperationType.KILL);
public static WorkflowOperationEvent killEvent(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
return new WorkflowOperationEvent(workflowExecutionRunnableIdentify);
}
@Override
public Class getEventOperatorClass() {
return WorkflowOperationEventOperator.class;
public IWorkflowExecutionRunnableIdentify getEventIdentify() {
return workflowExecutionRunnableIdentify;
}
}

View File

@ -36,12 +36,14 @@ public class WorkflowOperationEventOperator implements IWorkflowEventOperator<Wo
@Override
public void handleEvent(WorkflowOperationEvent event) {
IWorkflowExecutionRunnable workflowExecutionRunnable =
workflowExecuteRunnableRepository.getWorkflowExecutionRunnableById(event.getWorkflowInstanceId());
workflowExecuteRunnableRepository
.getWorkflowExecutionRunnable(event.getWorkflowExecutionRunnableIdentify());
if (workflowExecutionRunnable == null) {
log.warn("WorkflowExecutionRunnable not found: {}", event);
return;
}
switch (event.getWorkflowOperationType()) {
WorkflowOperationEventType workflowOperationEvent = (WorkflowOperationEventType) event.getEventType();
switch (workflowOperationEvent) {
case TRIGGER:
triggerWorkflow(workflowExecutionRunnable);
break;
@ -65,9 +67,9 @@ public class WorkflowOperationEventOperator implements IWorkflowEventOperator<Wo
}
IWorkflowExecutionContext workflowExecutionContext =
workflowExecutionRunnable.getWorkflowExecutionContext();
log.error("Trigger workflow: {} failed", workflowExecutionContext.getWorkflowInstanceName(), exception);
log.error("Trigger workflow: {} failed", workflowExecutionContext.getIdentify(), exception);
WorkflowFailedEvent workflowExecutionRunnableFailedEvent = WorkflowFailedEvent.builder()
.workflowInstanceId(workflowExecutionContext.getWorkflowInstanceId())
.workflowExecutionRunnableIdentify(workflowExecutionRunnable.getIdentity())
.failedReason(exception.getMessage())
.build();
workflowExecutionRunnable.storeEventToTail(workflowExecutionRunnableFailedEvent);

View File

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.workflow.engine.event;
public enum WorkflowOperationType {
public enum WorkflowOperationEventType implements IEventType {
/**
* Trigger the workflow instance.

View File

@ -17,14 +17,12 @@
package org.apache.dolphinscheduler.workflow.engine.exception;
import org.apache.dolphinscheduler.workflow.engine.workflow.IWorkflowExecutionRunnableIdentify;
public class WorkflowExecuteRunnableNotFoundException extends RuntimeException {
public WorkflowExecuteRunnableNotFoundException(Integer workflowInstanceId) {
super("WorkflowExecuteRunnable not found: [id=" + workflowInstanceId + "]");
}
public WorkflowExecuteRunnableNotFoundException(String workflowInstanceName) {
super("WorkflowExecuteRunnable not found: [name=" + workflowInstanceName + "]");
public WorkflowExecuteRunnableNotFoundException(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
super("WorkflowExecuteRunnable not found: " + workflowExecutionRunnableIdentify);
}
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.utils;
public interface IWorkflowExecutionDAGStatusCheck {
boolean isSuccess();
boolean isFailed();
boolean isKilled();
boolean isPaused();
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
public abstract class BaseWorkflowExecutionRunnable implements IWorkflowExecutionRunnable {
protected final IWorkflowExecutionContext workflowExecutionContext;
protected WorkflowExecutionRunnableStatus workflowExecutionRunnableStatus;
public BaseWorkflowExecutionRunnable(IWorkflowExecutionContext workflowExecutionContext,
WorkflowExecutionRunnableStatus workflowExecutionRunnableStatus) {
this.workflowExecutionContext = workflowExecutionContext;
this.workflowExecutionRunnableStatus = workflowExecutionRunnableStatus;
}
@Override
public IWorkflowExecutionRunnableIdentify getIdentity() {
return workflowExecutionContext.getIdentify();
}
@Override
public IWorkflowExecutionContext getWorkflowExecutionContext() {
return workflowExecutionContext;
}
@Override
public IEventRepository getEventRepository() {
return workflowExecutionContext.getEventRepository();
}
protected void statusTransform(WorkflowExecutionRunnableStatus targetStatus, Runnable runnable) {
WorkflowExecutionRunnableStatus originStatus = workflowExecutionRunnableStatus;
try {
workflowExecutionRunnableStatus = targetStatus;
runnable.run();
} catch (Throwable throwable) {
workflowExecutionRunnableStatus = originStatus;
throw throwable;
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public class DefaultTaskExecutionRunnableDelegate implements ITaskExecutionRunnableDelegate {
@Override
public void beforeStart() {
}
@Override
public void afterStart() {
}
@Override
public void beforePause() {
}
@Override
public void afterPause() {
}
@Override
public void beforeKill() {
}
@Override
public void afterKill() {
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public class DefaultTaskExecutionRunnableDelegateFactory implements ITaskExecutionRunnableDelegateFactory {
private static final DefaultTaskExecutionRunnableDelegateFactory INSTANCE =
new DefaultTaskExecutionRunnableDelegateFactory();
private DefaultTaskExecutionRunnableDelegateFactory() {
}
public static DefaultTaskExecutionRunnableDelegateFactory getInstance() {
return INSTANCE;
}
@Override
public ITaskExecutionRunnableDelegate createTaskExecutionRunnable(ITaskIdentify taskIdentify,
IWorkflowExecutionContext workflowExecutionContext) {
return new DefaultTaskExecutionRunnableDelegate();
}
@Override
public ITaskExecutionRunnableDelegate createFailoverTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable,
IWorkflowExecutionContext workflowExecutionContext) {
return new DefaultTaskExecutionRunnableDelegate();
}
@Override
public ITaskExecutionRunnableDelegate createRetryTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable,
IWorkflowExecutionContext workflowExecutionContext) {
return new DefaultTaskExecutionRunnableDelegate();
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public class DefaultWorkflowExecutionRunnableDelegate implements IWorkflowExecutionRunnableDelegate {
@Override
public void beforeStart() {
}
@Override
public void afterStart() {
}
@Override
public void beforePause() {
}
@Override
public void afterPause() {
}
@Override
public void beforeKill() {
}
@Override
public void afterKill() {
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public class DefaultWorkflowExecutionRunnableDelegateFactory implements IWorkflowExecutionRunnableDelegateFactory {
private static final DefaultWorkflowExecutionRunnableDelegateFactory INSTANCE =
new DefaultWorkflowExecutionRunnableDelegateFactory();
private DefaultWorkflowExecutionRunnableDelegateFactory() {
}
public static DefaultWorkflowExecutionRunnableDelegateFactory getInstance() {
return INSTANCE;
}
@Override
public IWorkflowExecutionRunnableDelegate createWorkflowExecutionRunnableDelegate(IWorkflowExecutionContext workflowExecutionContext) {
return new DefaultWorkflowExecutionRunnableDelegate();
}
}

View File

@ -24,10 +24,6 @@ public interface IEventfulExecutionRunnable {
IEventRepository getEventRepository();
boolean isEventFiring();
void setEventFiring(boolean eventFiring);
default void storeEventToTail(IEvent event) {
getEventRepository().storeEventToTail(event);
}
@ -35,4 +31,8 @@ public interface IEventfulExecutionRunnable {
default void storeEventToHead(IEvent event) {
getEventRepository().storeEventToHead(event);
}
default void onEvent(IEvent event) {
throw new UnsupportedOperationException("onEvent is not implemented");
}
}

View File

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
public interface ITaskExecutionContext {
ITaskInstance getTaskInstance();
ITaskExecutionRunnableIdentify getIdentify();
IEventRepository getEventRepository();
}

View File

@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public interface ITaskExecutionContextFactory {
ITaskExecutionContext createTaskExecutionContext(String taskName,
ITaskExecutionContext createTaskExecutionContext(ITaskIdentify taskIdentify,
IWorkflowExecutionContext workflowExecutionContext);
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITask;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import java.util.List;
/**
* The task execution plan interface which represents the task with its execution plan.
* The task execution plan contains the task identify, task and the list of task execution runnables.
*/
public interface ITaskExecutionPlan {
void start();
void failoverTask();
void retryTask();
void pauseTask();
void killTask();
ITaskIdentify getTaskIdentify();
ITaskExecutionRunnable getActiveTaskExecutionRunnable();
ITask getTask();
List<ITaskExecutionRunnable> getTaskExecutionRunnableList();
ITaskExecutionRunnable getTaskExecutionRunnable(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify);
ITaskExecutionRunnable storeTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface ITaskExecutionPlanChain {
ITaskExecutionPlan getFrom();
ITaskExecutionPlan getTo();
}

View File

@ -20,17 +20,12 @@ package org.apache.dolphinscheduler.workflow.engine.workflow;
/**
* The TaskExecutionRunnable represent the running task, it is responsible for operate the task instance. e.g. dispatch, kill, pause.
*/
public interface ITaskExecutionRunnable {
public interface ITaskExecutionRunnable extends IEventfulExecutionRunnable {
/**
* Dispatch the task instance.
* Start the task instance.
*/
void dispatch();
/**
* Run the task instance.
*/
void run();
void start();
/**
* Kill the task instance.
@ -42,6 +37,13 @@ public interface ITaskExecutionRunnable {
*/
void pause();
/**
* Get the task execution identify.
*
* @return the task execution identify
*/
ITaskExecutionRunnableIdentify getIdentify();
/**
* Get the task execution context.
*
@ -56,4 +58,5 @@ public interface ITaskExecutionRunnable {
* @return true if the current task can be accessed to the post task.
*/
boolean isReadyToTrigger(String taskNodeName);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface ITaskExecutionRunnableChain {
ITaskExecutionRunnable getFrom();
ITaskExecutionRunnable getTo();
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface ITaskExecutionRunnableDelegate {
void beforeStart();
void afterStart();
void beforePause();
void afterPause();
void beforeKill();
void afterKill();
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public interface ITaskExecutionRunnableDelegateFactory {
ITaskExecutionRunnableDelegate createTaskExecutionRunnable(ITaskIdentify taskIdentify,
IWorkflowExecutionContext workflowExecutionContext);
ITaskExecutionRunnableDelegate createFailoverTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable,
IWorkflowExecutionContext workflowExecutionContext);
ITaskExecutionRunnableDelegate createRetryTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable,
IWorkflowExecutionContext workflowExecutionContext);
}

View File

@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public interface ITaskExecutionRunnableFactory {
ITaskExecutionRunnable createTaskExecutionRunnable(String taskName,
ITaskExecutionRunnable createTaskExecutionRunnable(ITaskIdentify taskIdentify,
IWorkflowExecutionContext workflowExecutionContext);
ITaskExecutionRunnable createFailoverTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable,

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.TaskIdentify;
public interface ITaskExecutionRunnableIdentify {
Long getId();
String getName();
TaskIdentify getTaskIdentify();
}

View File

@ -17,22 +17,22 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
import java.util.List;
public interface IWorkflowExecutionContext {
IWorkflowInstance getWorkflowInstance();
IWorkflowExecutionRunnableIdentify getIdentify();
IWorkflowExecutionDAG getWorkflowExecutionDAG();
List<ITaskIdentify> getStartTaskIdentifies();
WorkflowDAG getWorkflowDAG();
WorkflowExecutionDAG getWorkflowExecutionDAG();
IEventRepository getEventRepository();
default int getWorkflowInstanceId() {
return getWorkflowInstance().getId();
}
default String getWorkflowInstanceName() {
return getWorkflowInstance().getName();
}
}

View File

@ -24,25 +24,7 @@ import java.util.List;
/**
* The WorkflowExecutionDAG represent the running workflow DAG.
*/
public interface IWorkflowExecutionDAG {
List<String> getStartNodeNames();
/**
* Get TaskExecutionRunnable by given TaskInstanceId.
*
* @param taskInstanceId taskInstanceId.
* @return TaskExecutionRunnable
*/
ITaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId);
/**
* Get TaskExecutionRunnable by given taskName.
*
* @param taskName task name.
* @return TaskExecutionRunnable
*/
ITaskExecutionRunnable getTaskExecutionRunnableByName(String taskName);
public interface IWorkflowExecutionDAG extends DAG<ITaskExecutionRunnable, ITaskExecutionRunnableIdentify> {
/**
* Get TaskExecutionRunnable which is not finished.
@ -51,21 +33,4 @@ public interface IWorkflowExecutionDAG {
*/
List<ITaskExecutionRunnable> getActiveTaskExecutionRunnable();
/**
* Get the direct pre TaskExecutionRunnable of the given taskName.
*
* @param taskName task name.
* @return TaskExecutionRunnable
*/
List<ITaskExecutionRunnable> getDirectPreTaskExecutionRunnable(String taskName);
/**
* Check whether the taskNode is ready to run.
*
* @param taskName taskNodeName
* @return true if the taskNode is ready to run.
*/
boolean isTaskAbleToBeTriggered(String taskName);
void storeTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable);
}

View File

@ -17,27 +17,21 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.engine.IDAGEngine;
/**
* The IWorkflowExecuteRunnable represent a running workflow instance, it is responsible for operate the workflow instance. e.g. start, kill, pause.
*/
public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable {
public interface IWorkflowExecutionRunnable
extends
IWorkflowOuterAction,
IWorkflowInnerAction,
IEventfulExecutionRunnable {
/**
* Start the workflow instance.
* Get the identity of the workflow execution runnable.
*
* @return the identity of the workflow execution runnable
*/
void start();
/**
* Kill the workflow instance.
*/
void kill();
/**
* Pause the workflow instance.
*/
void pause();
IWorkflowExecutionRunnableIdentify getIdentity();
/**
* Get the workflow execution context.
@ -46,11 +40,4 @@ public interface IWorkflowExecutionRunnable extends IEventfulExecutionRunnable {
*/
IWorkflowExecutionContext getWorkflowExecutionContext();
/**
* Get the {@link IDAGEngine} which used to execute the dag of the workflow instance.
*
* @return dag engine.
*/
IDAGEngine getDagEngine();
}

View File

@ -15,5 +15,14 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;public interface IWorkflowExecutionRunnableDelegate {
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface IWorkflowExecutionRunnableDelegate {
void start();
void pause();
void kill();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface IWorkflowExecutionRunnableDelegateFactory {
IWorkflowExecutionRunnableDelegate createWorkflowExecutionRunnableDelegate(IWorkflowExecutionContext workflowExecutionContext);
}

View File

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface IWorkflowInstance {
public interface IWorkflowExecutionRunnableIdentify {
int getId();
Long getId();
String getName();

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface IWorkflowExecutionRunnableOperationCheck {
boolean canStart();
boolean canPause();
boolean canKill();
}

View File

@ -23,11 +23,11 @@ public interface IWorkflowExecutionRunnableRepository {
void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable);
IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer workflowInstanceId);
IWorkflowExecutionRunnable getWorkflowExecutionRunnable(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify);
Collection<IWorkflowExecutionRunnable> getActiveWorkflowExecutionRunnable();
void removeWorkflowExecutionRunnable(Integer workflowInstanceId);
void removeWorkflowExecutionRunnable(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify);
void clear();
}

View File

@ -17,10 +17,6 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface ITaskInstance {
int getId();
String getName();
public interface IWorkflowExecutionRunnableStateEventListener {
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public interface IWorkflowInnerAction {
void triggerNextTasks(ITaskIdentify taskIdentify);
void triggerTask(ITaskIdentify taskIdentify);
void failoverTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify);
void retryTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify);
void pauseTask(ITaskExecutionRunnableIdentify taskExecutionIdentify);
void killTask(ITaskExecutionRunnableIdentify taskExecutionIdentify);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public interface IWorkflowOuterAction {
/**
* Start the workflow instance, this method will trigger the start task.
*/
void start();
/**
* Kill the workflow instance.
*/
void kill();
/**
* Pause the workflow instance.
*/
void pause();
}

View File

@ -26,7 +26,7 @@ public class SingletonWorkflowExecutionRunnableRepository implements IWorkflowEx
private static final IWorkflowExecutionRunnableRepository INSTANCE =
new SingletonWorkflowExecutionRunnableRepository();
private final Map<Integer, IWorkflowExecutionRunnable> workflowExecutionRunnableMap;
private final Map<IWorkflowExecutionRunnableIdentify, IWorkflowExecutionRunnable> workflowExecutionRunnableMap;
private SingletonWorkflowExecutionRunnableRepository() {
this.workflowExecutionRunnableMap = new ConcurrentHashMap<>();
@ -36,22 +36,23 @@ public class SingletonWorkflowExecutionRunnableRepository implements IWorkflowEx
return INSTANCE;
}
@Override
public void storeWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) {
workflowExecutionRunnableMap.put(
workflowExecutionRunnable.getWorkflowExecutionContext().getWorkflowInstanceId(),
workflowExecutionRunnable);
workflowExecutionRunnableMap.put(workflowExecutionRunnable.getIdentity(), workflowExecutionRunnable);
}
public IWorkflowExecutionRunnable getWorkflowExecutionRunnableById(Integer workflowInstanceId) {
return workflowExecutionRunnableMap.get(workflowInstanceId);
@Override
public IWorkflowExecutionRunnable getWorkflowExecutionRunnable(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
return workflowExecutionRunnableMap.get(workflowExecutionRunnableIdentify);
}
public Collection<IWorkflowExecutionRunnable> getActiveWorkflowExecutionRunnable() {
return workflowExecutionRunnableMap.values();
}
public void removeWorkflowExecutionRunnable(Integer workflowInstanceId) {
workflowExecutionRunnableMap.remove(workflowInstanceId);
@Override
public void removeWorkflowExecutionRunnable(IWorkflowExecutionRunnableIdentify workflowExecutionRunnableIdentify) {
workflowExecutionRunnableMap.remove(workflowExecutionRunnableIdentify);
}
@Override

View File

@ -19,14 +19,4 @@ package org.apache.dolphinscheduler.workflow.engine.workflow;
public class TaskExecutionContext implements ITaskExecutionContext {
private final ITaskInstance taskInstance;
public TaskExecutionContext(ITaskInstance taskInstance) {
this.taskInstance = taskInstance;
}
@Override
public ITaskInstance getTaskInstance() {
return taskInstance;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITask;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import java.util.List;
import java.util.Map;
public class TaskExecutionPlan implements ITaskExecutionPlan {
private ITaskIdentify taskIdentify;
private ITask task;
private IWorkflowExecutionContext workflowExecutionContext;
private ITaskExecutionRunnableFactory taskExecutionRunnableFactory;
private Map<ITaskExecutionRunnableIdentify, ITaskExecutionRunnable> taskExecutionRunnableMap;
private ITaskExecutionRunnable activeTaskExecutionRunnable;
@Override
public void start() {
ITaskExecutionRunnable taskExecutionRunnable =
taskExecutionRunnableFactory.createTaskExecutionRunnable(taskIdentify, workflowExecutionContext);
taskExecutionRunnableMap.put(taskExecutionRunnable.getIdentify(), taskExecutionRunnable);
activeTaskExecutionRunnable = taskExecutionRunnable;
taskExecutionRunnable.start();
}
@Override
public void failoverTask() {
// todo: check if the task can takeover
ITaskExecutionRunnable failoverTaskExecutionRunnable = taskExecutionRunnableFactory
.createFailoverTaskExecutionRunnable(activeTaskExecutionRunnable, workflowExecutionContext);
taskExecutionRunnableMap.put(failoverTaskExecutionRunnable.getIdentify(), failoverTaskExecutionRunnable);
activeTaskExecutionRunnable = failoverTaskExecutionRunnable;
failoverTaskExecutionRunnable.start();
}
@Override
public void retryTask() {
// check the retry times
ITaskExecutionRunnable retryTaskExecutionRunnable = taskExecutionRunnableFactory
.createRetryTaskExecutionRunnable(activeTaskExecutionRunnable, workflowExecutionContext);
taskExecutionRunnableMap.put(retryTaskExecutionRunnable.getIdentify(), retryTaskExecutionRunnable);
activeTaskExecutionRunnable = retryTaskExecutionRunnable;
retryTaskExecutionRunnable.start();
}
@Override
public void pauseTask() {
activeTaskExecutionRunnable.pause();
}
@Override
public void killTask() {
activeTaskExecutionRunnable.kill();
}
@Override
public ITaskIdentify getTaskIdentify() {
return taskIdentify;
}
@Override
public ITaskExecutionRunnable getActiveTaskExecutionRunnable() {
return activeTaskExecutionRunnable;
}
@Override
public ITask getTask() {
return task;
}
@Override
public List<ITaskExecutionRunnable> getTaskExecutionRunnableList() {
return null;
}
@Override
public ITaskExecutionRunnable getTaskExecutionRunnable(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify) {
return taskExecutionRunnableMap.get(taskExecutionRunnableIdentify);
}
@Override
public ITaskExecutionRunnable storeTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable) {
if (taskExecutionRunnable == null) {
throw new IllegalArgumentException("taskExecutionRunnable cannot be null");
}
return taskExecutionRunnableMap.put(taskExecutionRunnable.getIdentify(), taskExecutionRunnable);
}
}

View File

@ -17,41 +17,59 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
public class TaskExecutionRunnable implements ITaskExecutionRunnable {
public final ITaskExecutionContext taskExecutionContext;
private final ITaskExecutionContext taskExecutionContext;
public TaskExecutionRunnable(ITaskExecutionContext taskExecutionContext) {
private final ITaskExecutionRunnableDelegate taskExecutionRunnableDelegate;
public TaskExecutionRunnable(ITaskExecutionContext taskExecutionContext,
ITaskExecutionRunnableDelegate taskExecutionRunnableDelegate) {
this.taskExecutionContext = taskExecutionContext;
this.taskExecutionRunnableDelegate = taskExecutionRunnableDelegate;
}
@Override
public void dispatch() {
public void start() {
taskExecutionRunnableDelegate.beforeStart();
}
@Override
public void run() {
}
@Override
public void kill() {
taskExecutionRunnableDelegate.afterStart();
}
@Override
public void pause() {
taskExecutionRunnableDelegate.beforePause();
taskExecutionRunnableDelegate.afterPause();
}
@Override
public void kill() {
taskExecutionRunnableDelegate.beforeKill();
taskExecutionRunnableDelegate.afterKill();
}
@Override
public boolean isReadyToTrigger(String taskNodeName) {
return false;
}
@Override
public ITaskExecutionRunnableIdentify getIdentify() {
return taskExecutionContext.getIdentify();
}
@Override
public ITaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}
@Override
public boolean isReadyToTrigger(String taskNodeName) {
return false;
public IEventRepository getEventRepository() {
return taskExecutionContext.getEventRepository();
}
}

View File

@ -17,20 +17,32 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
public class TaskExecutionRunnableFactory implements ITaskExecutionRunnableFactory {
private final ITaskExecutionContextFactory taskExecutionContextFactory;
private ITaskExecutionRunnableDelegateFactory taskExecutionRunnableDelegateFactory =
DefaultTaskExecutionRunnableDelegateFactory.getInstance();
public TaskExecutionRunnableFactory(ITaskExecutionContextFactory taskExecutionContextFactory) {
this.taskExecutionContextFactory = taskExecutionContextFactory;
}
public TaskExecutionRunnableFactory withTaskExecutionRunnableDelegateFactory(ITaskExecutionRunnableDelegateFactory taskExecutionRunnableDelegateFactory) {
this.taskExecutionRunnableDelegateFactory = taskExecutionRunnableDelegateFactory;
return this;
}
@Override
public ITaskExecutionRunnable createTaskExecutionRunnable(String taskName,
public ITaskExecutionRunnable createTaskExecutionRunnable(ITaskIdentify taskIdentify,
IWorkflowExecutionContext workflowExecutionContext) {
ITaskExecutionContext taskExecutionContext =
taskExecutionContextFactory.createTaskExecutionContext(taskName, workflowExecutionContext);
return new TaskExecutionRunnable(taskExecutionContext);
taskExecutionContextFactory.createTaskExecutionContext(taskIdentify, workflowExecutionContext);
ITaskExecutionRunnableDelegate taskExecutionRunnableDelegate = taskExecutionRunnableDelegateFactory
.createTaskExecutionRunnable(taskIdentify, workflowExecutionContext);
return new TaskExecutionRunnable(taskExecutionContext, taskExecutionRunnableDelegate);
}
@Override

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.TaskIdentify;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode
public class TaskExecutionRunnableIdentify implements ITaskExecutionRunnableIdentify {
private final Long id;
private final String name;
private final TaskIdentify taskIdentify;
public TaskExecutionRunnableIdentify(Long id, String name, TaskIdentify taskIdentify) {
this.id = id;
this.name = name;
this.taskIdentify = taskIdentify;
}
@Override
public Long getId() {
return id;
}
@Override
public String getName() {
return name;
}
@Override
public TaskIdentify getTaskIdentify() {
return taskIdentify;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
import lombok.AllArgsConstructor;
@ -30,10 +31,23 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class WorkflowExecutionContext implements IWorkflowExecutionContext {
private IWorkflowInstance workflowInstance;
@Override
public IWorkflowExecutionRunnableIdentify getIdentify() {
return null;
}
private IWorkflowExecutionDAG workflowExecutionDAG;
@Override
public WorkflowDAG getWorkflowDAG() {
return null;
}
private IEventRepository eventRepository;
@Override
public WorkflowExecutionDAG getWorkflowExecutionDAG() {
return null;
}
@Override
public IEventRepository getEventRepository() {
return null;
}
}

View File

@ -17,93 +17,134 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.dag.Node;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import org.apache.dolphinscheduler.workflow.engine.dag.DAG;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import org.apache.dolphinscheduler.workflow.engine.utils.IWorkflowExecutionDAGStatusCheck;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
public class WorkflowExecutionDAG implements IWorkflowExecutionDAGStatusCheck, DAG<ITaskExecutionPlan, ITaskIdentify> {
/**
* The WorkflowExecutionDAG represent a running workflow instance DAG.
*/
@Slf4j
public class WorkflowExecutionDAG implements IWorkflowExecutionDAG {
private final Map<ITaskIdentify, ITaskExecutionPlan> taskExecutionPlanMap;
private final ITaskExecutionRunnableRepository taskExecutionRunnableRepository;
private final Map<ITaskIdentify, List<ITaskIdentify>> outdegreeMap;
private final WorkflowDAG workflowDAG;
private final Map<ITaskIdentify, List<ITaskIdentify>> inDegredMap;
@Getter
private final List<String> startNodeNames;
public WorkflowExecutionDAG(List<ITaskExecutionPlan> tasks,
List<ITaskExecutionPlanChain> taskChains) {
this.taskExecutionPlanMap = new HashMap<>();
this.outdegreeMap = new HashMap<>();
this.inDegredMap = new HashMap<>();
public WorkflowExecutionDAG(ITaskExecutionRunnableRepository taskExecutionRunnableRepository,
WorkflowDAG workflowDAG) {
this(taskExecutionRunnableRepository, workflowDAG, Collections.emptyList());
}
public WorkflowExecutionDAG(ITaskExecutionRunnableRepository taskExecutionRunnableRepository,
WorkflowDAG workflowDAG,
List<String> startNodeNames) {
this.taskExecutionRunnableRepository = taskExecutionRunnableRepository;
this.workflowDAG = workflowDAG;
this.startNodeNames = startNodeNames;
for (ITaskExecutionPlan task : tasks) {
ITaskIdentify identify = task.getTaskIdentify();
if (taskExecutionPlanMap.containsKey(identify)) {
throw new IllegalArgumentException("Duplicate task identify: " + identify);
}
taskExecutionPlanMap.put(identify, task);
}
for (ITaskExecutionPlanChain taskChain : taskChains) {
ITaskExecutionPlan from = taskChain.getFrom();
ITaskExecutionPlan to = taskChain.getTo();
if (from == null) {
continue;
}
if (to == null) {
continue;
}
ITaskIdentify fromIdentify = from.getTaskIdentify();
ITaskIdentify toIdentify = to.getTaskIdentify();
List<ITaskIdentify> outDegrees =
outdegreeMap.computeIfAbsent(fromIdentify, k -> new ArrayList<>());
if (outDegrees.contains(toIdentify)) {
throw new IllegalArgumentException("Duplicate task chain: " + fromIdentify + " -> " + toIdentify);
}
outDegrees.add(toIdentify);
List<ITaskIdentify> inDegrees =
inDegredMap.computeIfAbsent(toIdentify, k -> new ArrayList<>());
if (inDegrees.contains(fromIdentify)) {
throw new IllegalArgumentException("Duplicate task chain: " + fromIdentify + " -> " + toIdentify);
}
inDegrees.add(fromIdentify);
}
}
@Override
public ITaskExecutionRunnable getTaskExecutionRunnableById(Integer taskInstanceId) {
return taskExecutionRunnableRepository.getTaskExecutionRunnableById(taskInstanceId);
public List<ITaskExecutionPlan> getDirectPostNodes(ITaskExecutionPlan taskExecutionPlan) {
if (taskExecutionPlan == null) {
return getDirectPostNodesByIdentify(null);
}
return getDirectPostNodesByIdentify(taskExecutionPlan.getTaskIdentify());
}
@Override
public ITaskExecutionRunnable getTaskExecutionRunnableByName(String taskName) {
return taskExecutionRunnableRepository.getTaskExecutionRunnableByName(taskName);
}
@Override
public List<ITaskExecutionRunnable> getActiveTaskExecutionRunnable() {
return new ArrayList<>(taskExecutionRunnableRepository.getActiveTaskExecutionRunnable());
}
@Override
public List<ITaskExecutionRunnable> getDirectPreTaskExecutionRunnable(String taskName) {
return getDirectPreNodeNames(taskName)
public List<ITaskExecutionPlan> getDirectPostNodesByIdentify(ITaskIdentify taskIdentify) {
if (taskIdentify == null) {
return taskExecutionPlanMap.values()
.stream()
.filter(task -> !inDegredMap.containsKey(task.getTaskIdentify()))
.collect(Collectors.toList());
}
return inDegredMap.getOrDefault(taskIdentify, Collections.emptyList())
.stream()
.map(taskExecutionRunnableRepository::getTaskExecutionRunnableByName)
.map(taskExecutionPlanMap::get)
.collect(Collectors.toList());
}
@Override
public boolean isTaskAbleToBeTriggered(String taskNodeName) {
// todo: Check whether the workflow instance is finished or ready to finish.
List<Node> directPreNodes = getDirectPreNodes(taskNodeName);
if (log.isDebugEnabled()) {
log.debug("Begin to check whether the task {} is able to be triggered.", taskNodeName);
log.debug("Task {} directly dependent on the task: {}.", taskNodeName,
directPreNodes.stream().map(Node::getNodeName).collect(Collectors.toList()));
public List<ITaskExecutionPlan> getDirectPreNodes(ITaskExecutionPlan iTaskExecutionPlan) {
if (iTaskExecutionPlan == null) {
return getDirectPreNodesByIdentify(null);
}
for (Node directPreNode : directPreNodes) {
if (directPreNode.isSkip()) {
log.debug("The task {} is skipped.", directPreNode.getNodeName());
continue;
}
ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnableByName(directPreNode.getNodeName());
if (taskExecutionRunnable == null || taskExecutionRunnable.isReadyToTrigger(taskNodeName)) {
log.debug("The task {} is not finished or not able to access to the task {}.",
directPreNode.getNodeName(), taskNodeName);
}
}
return true;
return getDirectPreNodesByIdentify(iTaskExecutionPlan.getTaskIdentify());
}
@Override
public void storeTaskExecutionRunnable(ITaskExecutionRunnable taskExecutionRunnable) {
taskExecutionRunnableRepository.storeTaskExecutionRunnable(taskExecutionRunnable);
public List<ITaskExecutionPlan> getDirectPreNodesByIdentify(ITaskIdentify taskIdentify) {
if (taskIdentify == null) {
return taskExecutionPlanMap.values()
.stream()
.filter(task -> !outdegreeMap.containsKey(task.getTaskIdentify()))
.collect(Collectors.toList());
}
return outdegreeMap.getOrDefault(taskIdentify, Collections.emptyList())
.stream()
.map(taskExecutionPlanMap::get)
.collect(Collectors.toList());
}
public List<ITaskExecutionPlan> getActiveTaskExecutionPlan() {
return null;
}
@Override
public ITaskExecutionPlan getDAGNode(ITaskIdentify taskIdentify) {
return taskExecutionPlanMap.get(taskIdentify);
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public boolean isFailed() {
return false;
}
@Override
public boolean isKilled() {
return false;
}
@Override
public boolean isPaused() {
return false;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public class WorkflowExecutionDAGFactory {
public WorkflowExecutionDAG createDAG() {
return new WorkflowExecutionDAG(null, null);
}
}

View File

@ -17,60 +17,169 @@
package org.apache.dolphinscheduler.workflow.engine.workflow;
import org.apache.dolphinscheduler.workflow.engine.engine.IDAGEngine;
import org.apache.dolphinscheduler.workflow.engine.event.IEventRepository;
import org.apache.dolphinscheduler.workflow.engine.dag.ITask;
import org.apache.dolphinscheduler.workflow.engine.dag.ITaskIdentify;
import org.apache.dolphinscheduler.workflow.engine.dag.WorkflowDAG;
import org.apache.dolphinscheduler.workflow.engine.event.TaskOperationEvent;
import org.apache.dolphinscheduler.workflow.engine.event.WorkflowFinishEvent;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Getter;
@Getter
public class WorkflowExecutionRunnable implements IWorkflowExecutionRunnable {
public class WorkflowExecutionRunnable extends BaseWorkflowExecutionRunnable {
private final IWorkflowExecutionContext workflowExecutionContext;
private final WorkflowExecutionDAG workflowExecutionDAG;
private final WorkflowDAG workflowDAG;
private final IDAGEngine dagEngine;
private final IWorkflowExecutionRunnableDelegate workflowExecutionRunnableDelegate;
private volatile boolean eventFiring = false;
public WorkflowExecutionRunnable(IWorkflowExecutionContext workflowExecutionContext, IDAGEngine dagEngine) {
this.workflowExecutionContext = workflowExecutionContext;
this.dagEngine = dagEngine;
public WorkflowExecutionRunnable(IWorkflowExecutionContext workflowExecutionContext,
IWorkflowExecutionRunnableDelegate workflowExecutionRunnableDelegate) {
super(workflowExecutionContext, WorkflowExecutionRunnableStatus.CREATED);
this.workflowExecutionDAG = workflowExecutionContext.getWorkflowExecutionDAG();
this.workflowDAG = workflowExecutionContext.getWorkflowDAG();
this.workflowExecutionRunnableDelegate = workflowExecutionRunnableDelegate;
}
@Override
public void start() {
List<String> workflowStartNodeNames = workflowExecutionContext.getWorkflowExecutionDAG().getStartNodeNames();
if (CollectionUtils.isEmpty(workflowStartNodeNames)) {
dagEngine.triggerNextTasks(null);
} else {
workflowStartNodeNames.forEach(dagEngine::triggerTask);
if (!workflowExecutionRunnableStatus.canStart()) {
throw new UnsupportedOperationException(
"The current status: " + workflowExecutionRunnableStatus + " cannot start.");
}
statusTransform(WorkflowExecutionRunnableStatus.RUNNING, () -> {
workflowExecutionRunnableDelegate.start();
List<ITaskIdentify> startTaskIdentifies = workflowExecutionContext.getStartTaskIdentifies();
// If the start task is empty, trigger from the beginning
if (CollectionUtils.isEmpty(startTaskIdentifies)) {
workflowFinish();
return;
}
startTaskIdentifies.forEach(this::triggerTask);
});
}
@Override
public void triggerNextTasks(ITaskIdentify taskIdentify) {
List<ITaskIdentify> directPostNodeIdentifies = workflowDAG.getDirectPostNodesByIdentify(taskIdentify)
.stream()
.map(ITask::getIdentify)
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(directPostNodeIdentifies)) {
directPostNodeIdentifies.forEach(this::triggerTask);
return;
}
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
// The task chain is finished, but there are still active tasks, wait for the active tasks to finish
}
@Override
public void triggerTask(ITaskIdentify taskIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskIdentify);
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: " + taskIdentify);
}
getEventRepository().storeEventToTail(TaskOperationEvent.startEvent(taskExecutionPlan));
}
@Override
public void failoverTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify) {
ITaskExecutionPlan taskExecutionPlan =
workflowExecutionDAG.getDAGNode(taskExecutionRunnableIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: "
+ taskExecutionRunnableIdentify.getTaskIdentify());
}
getEventRepository().storeEventToTail(TaskOperationEvent.failoverEvent(taskExecutionPlan));
}
@Override
public void retryTask(ITaskExecutionRunnableIdentify taskExecutionRunnableIdentify) {
ITaskExecutionPlan taskExecutionPlan =
workflowExecutionDAG.getDAGNode(taskExecutionRunnableIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException("Cannot find the ITaskExecutionPlan for taskIdentify: "
+ taskExecutionRunnableIdentify.getTaskIdentify());
}
getEventRepository().storeEventToTail(TaskOperationEvent.retryEvent(taskExecutionPlan));
}
@Override
public void pause() {
dagEngine.pauseAllTask();
if (workflowExecutionRunnableStatus.canPause()) {
throw new UnsupportedOperationException(
"The current status: " + workflowExecutionRunnableStatus + " cannot pause.");
}
statusTransform(WorkflowExecutionRunnableStatus.PAUSING, () -> {
workflowExecutionRunnableDelegate.pause();
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
activeTaskExecutionIdentify.forEach(this::pauseTask);
});
}
@Override
public void pauseTask(ITaskExecutionRunnableIdentify taskExecutionIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskExecutionIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException(
"Cannot find the ITaskExecutionPlan for taskIdentify: " + taskExecutionIdentify.getTaskIdentify());
}
getEventRepository().storeEventToTail(TaskOperationEvent.pauseEvent(taskExecutionPlan));
}
@Override
public void kill() {
dagEngine.killAllTask();
if (workflowExecutionRunnableStatus.canKill()) {
throw new UnsupportedOperationException(
"The current status: " + workflowExecutionRunnableStatus + " cannot kill.");
}
statusTransform(WorkflowExecutionRunnableStatus.KILLING, () -> {
workflowExecutionRunnableDelegate.kill();
List<ITaskExecutionRunnableIdentify> activeTaskExecutionIdentify = getActiveTaskExecutionIdentify();
if (CollectionUtils.isEmpty(activeTaskExecutionIdentify)) {
workflowFinish();
return;
}
activeTaskExecutionIdentify.forEach(this::killTask);
});
}
@Override
public IEventRepository getEventRepository() {
return workflowExecutionContext.getEventRepository();
public void killTask(ITaskExecutionRunnableIdentify taskExecutionIdentify) {
ITaskExecutionPlan taskExecutionPlan = workflowExecutionDAG.getDAGNode(taskExecutionIdentify.getTaskIdentify());
if (taskExecutionPlan == null) {
throw new IllegalArgumentException(
"Cannot find the ITaskExecutionPlan for taskIdentify: " + taskExecutionIdentify.getTaskIdentify());
}
getEventRepository().storeEventToTail(TaskOperationEvent.killEvent(taskExecutionPlan));
}
@Override
public boolean isEventFiring() {
return eventFiring;
private void workflowFinish() {
if (workflowExecutionDAG.isFailed()) {
}
getEventRepository().storeEventToTail(WorkflowFinishEvent.of(workflowExecutionRunnableIdentify));
}
@Override
public void setEventFiring(boolean eventFiring) {
this.eventFiring = eventFiring;
private List<ITaskExecutionRunnableIdentify> getActiveTaskExecutionIdentify() {
return workflowExecutionDAG.getActiveTaskExecutionPlan()
.stream()
.map(ITaskExecutionPlan::getActiveTaskExecutionRunnable)
.map(ITaskExecutionRunnable::getIdentify)
.collect(Collectors.toList());
}
}

View File

@ -24,13 +24,23 @@ public class WorkflowExecutionRunnableFactory implements IWorkflowExecutionRunna
private final IDAGEngineFactory dagEngineFactory;
private IWorkflowExecutionRunnableDelegateFactory workflowExecutionRunnableDelegateFactory =
DefaultWorkflowExecutionRunnableDelegateFactory.getInstance();
public WorkflowExecutionRunnableFactory(IDAGEngineFactory dagEngineFactory) {
this.dagEngineFactory = dagEngineFactory;
}
public WorkflowExecutionRunnableFactory withWorkflowExecutionRunnableDelegateFactory(IWorkflowExecutionRunnableDelegateFactory workflowExecutionRunnableDelegateFactory) {
this.workflowExecutionRunnableDelegateFactory = workflowExecutionRunnableDelegateFactory;
return this;
}
@Override
public WorkflowExecutionRunnable createWorkflowExecutionRunnable(IWorkflowExecutionContext workflowExecutionContext) {
IDAGEngine dagEngine = dagEngineFactory.createDAGEngine(workflowExecutionContext);
return new WorkflowExecutionRunnable(workflowExecutionContext, dagEngine);
IWorkflowExecutionRunnableDelegate workflowExecutionRunnableDelegate = workflowExecutionRunnableDelegateFactory
.createWorkflowExecutionRunnableDelegate(workflowExecutionContext);
return new WorkflowExecutionRunnable(workflowExecutionContext, dagEngine, workflowExecutionRunnableDelegate);
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.workflow.engine.workflow;
public enum WorkflowExecutionRunnableStatus implements IWorkflowExecutionRunnableOperationCheck {
CREATED {
@Override
public boolean canStart() {
return true;
}
@Override
public boolean canPause() {
return true;
}
@Override
public boolean canKill() {
return true;
}
},
RUNNING {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
PAUSING {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
PAUSED {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
KILLING {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
KILLED {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
FAILED {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
SUCCEEDED {
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
},
;
@Override
public boolean canStart() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canPause() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
@Override
public boolean canKill() {
throw new UnsupportedOperationException("canStart is not supported for " + this);
}
}

View File

@ -26,12 +26,12 @@ public class SingletonWorkflowExecuteRunnableRepositoryAssertions {
public static void existWorkflowExecutionRunnable(Integer workflowInstanceId) {
assertNotNull(SingletonWorkflowExecutionRunnableRepository.getInstance()
.getWorkflowExecutionRunnableById(workflowInstanceId));
.getWorkflowExecutionRunnable(workflowInstanceId));
}
public static void notExistWorkflowExecutionRunnable(Integer workflowInstanceId) {
assertNull(SingletonWorkflowExecutionRunnableRepository.getInstance()
.getWorkflowExecutionRunnableById(workflowInstanceId));
.getWorkflowExecutionRunnable(workflowInstanceId));
}
}

View File

@ -70,7 +70,7 @@ class WorkflowEngineTest {
void killWorkflow_WorkflowNotExist() {
WorkflowExecuteRunnableNotFoundException exception =
assertThrows(WorkflowExecuteRunnableNotFoundException.class,
() -> workflowEngine.killWorkflow(1));
() -> workflowEngine.killWorkflow(1L));
assertEquals("WorkflowExecuteRunnable not found: [id=1]", exception.getMessage());
}
@ -78,8 +78,8 @@ class WorkflowEngineTest {
void killWorkflow_WorkflowExist() {
IWorkflowExecutionRunnable mockWorkflowExecuteRunnable =
MockWorkflowExecutionRunnableFactory.createWorkflowExecutionRunnable();
Integer workflowInstanceId =
mockWorkflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstanceId();
Long workflowInstanceId =
mockWorkflowExecuteRunnable.getWorkflowExecutionContext().getIdentify().getId();
SingletonWorkflowExecutionRunnableRepository.getInstance()
.storeWorkflowExecutionRunnable(mockWorkflowExecuteRunnable);
@ -90,15 +90,15 @@ class WorkflowEngineTest {
@Test
void finalizeWorkflow_WorkflowNotExist() {
workflowEngine.finalizeWorkflow(-1);
workflowEngine.finalizeWorkflow(-1L);
}
@Test
void finalizeWorkflow_WorkflowExist() {
IWorkflowExecutionRunnable emptyWorkflowExecuteRunnable =
MockWorkflowExecutionRunnableFactory.createWorkflowExecutionRunnable();
Integer workflowInstanceId =
emptyWorkflowExecuteRunnable.getWorkflowExecutionContext().getWorkflowInstanceId();
Long workflowInstanceId =
emptyWorkflowExecuteRunnable.getWorkflowExecutionContext().getIdentify().getId();
SingletonWorkflowExecutionRunnableRepository.getInstance()
.storeWorkflowExecutionRunnable(emptyWorkflowExecuteRunnable);
SingletonWorkflowExecuteRunnableRepositoryAssertions.existWorkflowExecutionRunnable(workflowInstanceId);

View File

@ -10,21 +10,21 @@ class WorkflowOperationEventTest {
void triggerEvent() {
WorkflowOperationEvent workflowOperationEvent = WorkflowOperationEvent.triggerEvent(1);
assertEquals(1, workflowOperationEvent.getWorkflowInstanceId());
assertEquals(WorkflowOperationType.TRIGGER, workflowOperationEvent.getWorkflowOperationType());
assertEquals(WorkflowOperationEventType.TRIGGER, workflowOperationEvent.getWorkflowOperationType());
}
@Test
void pauseEvent() {
WorkflowOperationEvent workflowOperationEvent = WorkflowOperationEvent.pauseEvent(1);
assertEquals(1, workflowOperationEvent.getWorkflowInstanceId());
assertEquals(WorkflowOperationType.PAUSE, workflowOperationEvent.getWorkflowOperationType());
assertEquals(WorkflowOperationEventType.PAUSE, workflowOperationEvent.getWorkflowOperationType());
}
@Test
void killEvent() {
WorkflowOperationEvent workflowOperationEvent = WorkflowOperationEvent.killEvent(1);
assertEquals(1, workflowOperationEvent.getWorkflowInstanceId());
assertEquals(WorkflowOperationType.KILL, workflowOperationEvent.getWorkflowOperationType());
assertEquals(WorkflowOperationEventType.KILL, workflowOperationEvent.getWorkflowOperationType());
}
}