[Feature-14545][Master][UI]Cross workflow parameter passing (#14552)

* feat: cross-workflow parameter passing

* refactor: remove useless parameters

* refactor: remove useless code

* refactor: code format

* docs: update docs

* docs: docs format

* more effective information

Co-authored-by: xiangzihao <zihaoxiang@apache.org>

* more effective information

Co-authored-by: xiangzihao <zihaoxiang@apache.org>

* fix: wrong writing of log code

* fix: perfect error log

---------

Co-authored-by: xiangzihao <zihaoxiang@apache.org>
Co-authored-by: xiangzihao <460888207@qq.com>
Co-authored-by: Rick Cheng <rickchengx@gmail.com>
Co-authored-by: Jay Chung <zhongjiajie955@gmail.com>
This commit is contained in:
Orange Summer 2023-07-19 18:00:44 +08:00 committed by GitHub
parent 93c3871925
commit af2986eca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 164 additions and 19 deletions

View File

@ -14,8 +14,9 @@ DolphinScheduler allows parameter transfer between tasks. Currently, transfer di
* [SQL](../task/sql.md) * [SQL](../task/sql.md)
* [Procedure](../task/stored-procedure.md) * [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md) * [Python](../task/python.md)
* [SubProcess](../task/sub-process.md)
When defining an upstream node, if there is a need to transmit the result of that node to a dependency related downstream node. You need to set an `OUT` direction parameter to [Custom Parameters] of the [Current Node Settings]. At present, we mainly focus on the SQL and shell nodes to pass parameters downstream. When defining an upstream node, if there is a need to transmit the result of that node to a dependency related downstream node. You need to set an `OUT` direction parameter to [Custom Parameters] of the [Current Node Settings]. If it is a SubProcess node, there is no need to set a parameter in [Current Node Settings], but an `OUT` direction parameter needs to be set in the workflow definition of the subprocess.
The value of upstream parameter can be updated in downstream node in the same way as [setting parameter](#create-a-shell-task-and-set-parameters). The value of upstream parameter can be updated in downstream node in the same way as [setting parameter](#create-a-shell-task-and-set-parameters).
@ -60,7 +61,7 @@ When the SHELL task is completed, we can use the output passed upstream as the q
Click on the Save workflow icon and set the global parameters output and value. Click on the Save workflow icon and set the global parameters output and value.
![context-parameter03](../../../../img/new_ui/dev/parameter/context_parameter04.png) ![context-parameter04](../../../../img/new_ui/dev/parameter/context_parameter04.png)
#### View results #### View results
@ -88,4 +89,30 @@ Use `print('${setValue(key=%s)}' % value)`, DolphinScheduler will capture the `$
For example For example
![img.png](../../../../img/new_ui/dev/parameter/python_context_param.png) ![python_context_param](../../../../img/new_ui/dev/parameter/python_context_param.png)
#### Pass parameter from SubProcess task to downstream
In the workflow definition of the subprocess, define `OUT` direction parameters as output parameters, and these parameters can be passed to the downstream tasks of the subprocess node.
Create an A task in the workflow definition of the subprocess, add var1 and var2 parameters to the custom parameters, and write the following script:
![context-subprocess01](../../../../img/new_ui/dev/parameter/context-subprocess01.png)
Save the subprocess_example1 workflow and set the global parameters var1.
![context-subprocess02](../../../../img/new_ui/dev/parameter/context-subprocess02.png)
Create a sub_process task in a new workflow, and use the subprocess_example1 workflow as the sub-node.
![context-subprocess03](../../../../img/new_ui/dev/parameter/context-subprocess03.png)
Create a shell task as a downstream task of the sub_process task, and write the following script:
![context-subprocess04](../../../../img/new_ui/dev/parameter/context-subprocess04.png)
Save the workflow and run it. The result of the downstream task is as follows:
![context-subprocess05](../../../../img/new_ui/dev/parameter/context-subprocess05.png)
Although the two parameters var1 and var2 are output in the A task, only the `OUT` parameter var1 is defined in the workflow definition, and the downstream task successfully outputs var1. It proves that the var1 parameter is passed in the workflow with reference to the expected value.

View File

@ -2,7 +2,11 @@
## Scope ## Scope
Global parameters are parameters that are valid for all task nodes of the entire workflow. It can be configured on the workflow definition page. Global parameters can be configured on the workflow definition page.
The `IN` direction parameter is valid for all task nodes of the entire workflow.
The `OUT` direction parameter is the output parameter of the workflow and passed to the downstream task of the corresponding SubProcess task in the parent workflow.
## Usage ## Usage

View File

@ -14,8 +14,9 @@ DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支
* [SQL](../task/sql.md) * [SQL](../task/sql.md)
* [Procedure](../task/stored-procedure.md) * [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md) * [Python](../task/python.md)
* [SubProcess](../task/sub-process.md)
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。目前我们主要针对 SQL 和 SHELL 节点做了可以向下传递参数的功能 当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。如果是 SubProcess 节点无需在【当前节点设置】中设置变量,需要在子流程的工作流定义中设置一个方向是 OUT 的变量
上游传递的参数可以在下游节点中被更新,更新方法与[设置参数](#创建-shell-任务并设置参数)相同。 上游传递的参数可以在下游节点中被更新,更新方法与[设置参数](#创建-shell-任务并设置参数)相同。
@ -87,4 +88,30 @@ Node_mysql 运行结果如下:
使用 `print('${setValue(key=%s)}' % value)`DolphinScheduler会捕捉输出中的 `${setValue(key=value}`来进行参数捕捉,从而传递到下游 使用 `print('${setValue(key=%s)}' % value)`DolphinScheduler会捕捉输出中的 `${setValue(key=value}`来进行参数捕捉,从而传递到下游
![img.png](../../../../img/new_ui/dev/parameter/python_context_param.png) ![python_context_param](../../../../img/new_ui/dev/parameter/python_context_param.png)
#### SubProcess 任务传递参数
在子流程的工作流定义中定义方向是 OUT 的变量作为输出参数,可以将这些参数传递到子流程节点的下游任务。
在子流程的工作流定义中创建 A 任务,在自定义参数中添加 var1 和 var2 参数,并编写如下脚本:
![context-subprocess01](../../../../img/new_ui/dev/parameter/context-subprocess01.png)
保存 subprocess_example1 工作流,设置全局参数 var1。
![context-subprocess02](../../../../img/new_ui/dev/parameter/context-subprocess02.png)
在新的工作流中创建 sub_process 任务,使用 subprocess_example1 工作流作为子节点。
![context-subprocess03](../../../../img/new_ui/dev/parameter/context-subprocess03.png)
创建一个 shell 任务作为 sub_process 任务的下游任务,并编写如下脚本:
![context-subprocess04](../../../../img/new_ui/dev/parameter/context-subprocess04.png)
保存该工作流并运行,下游任务运行结果如下:
![context-subprocess05](../../../../img/new_ui/dev/parameter/context-subprocess05.png)
虽然在 A 任务中输出 var1 和 var2 两个参数,但是工作流定义中只定义了 var1 的 OUT 变量,下游任务成功输出 var1证明var1 参数参照预期的值在该工作流中传递。

View File

@ -2,7 +2,11 @@
## 作用域 ## 作用域
全局参数是指针对**整个工作流**的所有任务节点都有效的参数,在工作流定义页面配置。 全局参数在工作流定义页面配置。
方向是 IN 的变量是针对**整个工作流**的所有任务节点都有效的参数。
方向是 OUT 的变量作为该工作流的输出参数,传递到父工作流中对应 SubProcess 任务的下游任务。
## 使用方式 ## 使用方式

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 137 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 161 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 170 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 146 KiB

After

Width:  |  Height:  |  Size: 166 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 138 KiB

After

Width:  |  Height:  |  Size: 112 KiB

View File

@ -1307,13 +1307,35 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
if (StringUtils.isNotEmpty(taskInstanceVarPool)) { if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class)); Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
String processInstanceVarPool = workflowInstance.getVarPool(); String processInstanceVarPool = workflowInstance.getVarPool();
List<Property> processGlobalParams =
new ArrayList<>(JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class));
Map<String, Direct> oldProcessGlobalParamsMap = processGlobalParams.stream()
.collect(Collectors.toMap(Property::getProp, Property::getDirect));
Set<Property> processVarPoolOut = taskProperties.stream()
.filter(property -> property.getDirect().equals(Direct.OUT)
&& oldProcessGlobalParamsMap.containsKey(property.getProp())
&& oldProcessGlobalParamsMap.get(property.getProp()).equals(Direct.OUT))
.collect(Collectors.toSet());
Set<Property> taskVarPoolIn =
taskProperties.stream().filter(property -> property.getDirect().equals(Direct.IN))
.collect(Collectors.toSet());
if (StringUtils.isNotEmpty(processInstanceVarPool)) { if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties = Set<Property> properties =
new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class)); new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
properties.addAll(taskProperties); Set<String> newProcessVarPoolKeys =
taskProperties.stream().map(Property::getProp).collect(Collectors.toSet());
properties = properties.stream()
.filter(property -> !newProcessVarPoolKeys.contains(property.getProp()))
.collect(Collectors.toSet());
properties.addAll(processVarPoolOut);
properties.addAll(taskVarPoolIn);
workflowInstance.setVarPool(JSONUtils.toJsonString(properties)); workflowInstance.setVarPool(JSONUtils.toJsonString(properties));
} else { } else {
workflowInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); Set<Property> varPool = new HashSet<>();
varPool.addAll(taskVarPoolIn);
varPool.addAll(processVarPoolOut);
workflowInstance.setVarPool(JSONUtils.toJsonString(varPool));
} }
} }
} }

View File

@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map; import java.util.Map;
@ -167,6 +169,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) { for (Map.Entry<ProcessInstance, TaskInstance> entry : fatherMaps.entrySet()) {
ProcessInstance processInstance = entry.getKey(); ProcessInstance processInstance = entry.getKey();
TaskInstance taskInstance = entry.getValue(); TaskInstance taskInstance = entry.getValue();
crossWorkflowParameterPassing(finishProcessInstance, taskInstance);
String address = NetUtils.getAddr(masterConfig.getListenPort()); String address = NetUtils.getAddr(masterConfig.getListenPort());
try ( try (
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
@ -182,6 +185,20 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
} }
private void crossWorkflowParameterPassing(ProcessInstance finishProcessInstance, TaskInstance taskInstance) {
try {
MasterTaskExecuteRunnable masterTaskExecuteRunnable =
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstance.getId());
masterTaskExecuteRunnable.getILogicTask().getTaskParameters()
.setVarPool(finishProcessInstance.getVarPool());
log.info("Cross workflow parameter passing success, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId());
} catch (Exception ex) {
log.error("Cross workflow parameter passing error, finishProcessInstanceId: {}, taskInstanceId: {}",
finishProcessInstance.getId(), taskInstance.getId(), ex);
}
}
/** /**
* notify myself * notify myself
*/ */

View File

@ -88,6 +88,10 @@ public abstract class MasterTaskExecuteRunnable implements Runnable {
return taskExecutionContext; return taskExecutionContext;
} }
public ILogicTask getILogicTask() {
return logicTask;
}
@Override @Override
public void run() { public void run() {
try { try {

View File

@ -33,7 +33,9 @@ import {
NSwitch, NSwitch,
NInputNumber, NInputNumber,
NDynamicInput, NDynamicInput,
NCheckbox NCheckbox,
NGridItem,
NGrid
} from 'naive-ui' } from 'naive-ui'
import { useRoute } from 'vue-router' import { useRoute } from 'vue-router'
import { verifyName } from '@/service/modules/process-definition' import { verifyName } from '@/service/modules/process-definition'
@ -106,7 +108,8 @@ export default defineComponent({
for (const param of formValue.value.globalParams) { for (const param of formValue.value.globalParams) {
const prop = param.value const prop = param.value
if (!prop) { const direct = param.direct
if (direct === 'IN' && !prop) {
return new Error(t('project.dag.prop_empty')) return new Error(t('project.dag.prop_empty'))
} }
@ -150,7 +153,8 @@ export default defineComponent({
} }
formValue.value.globalParams = process.globalParamList.map((param) => ({ formValue.value.globalParams = process.globalParamList.map((param) => ({
key: param.prop, key: param.prop,
value: param.value value: param.value,
direct: param.direct
})) }))
} }
} }
@ -231,11 +235,46 @@ export default defineComponent({
> >
<NDynamicInput <NDynamicInput
v-model:value={formValue.value.globalParams} v-model:value={formValue.value.globalParams}
preset='pair' onCreate={() => {
key-placeholder={t('project.dag.key')} return {
value-placeholder={t('project.dag.value')} key: '',
direct: 'IN',
value: ''
}
}}
class='input-global-params' class='input-global-params'
/> >
{{
default: (param: {
value: { key: string; direct: string; value: string }
}) => (
<NGrid xGap={12} cols={24}>
<NGridItem span={9}>
<NInput
v-model:value={param.value.key}
placeholder={t('project.dag.key')}
/>
</NGridItem>
<NGridItem span={6}>
<NSelect
options={[
{ value: 'IN', label: 'IN' },
{ value: 'OUT', label: 'OUT' }
]}
v-model:value={param.value.direct}
defaultValue={'IN'}
/>
</NGridItem>
<NGridItem span={9}>
<NInput
v-model:value={param.value.value}
placeholder={t('project.dag.value')}
/>
</NGridItem>
</NGrid>
)
}}
</NDynamicInput>
</NFormItem> </NFormItem>
{props.definition && !props.instance && ( {props.definition && !props.instance && (
<NFormItem path='timeoutFlag' showLabel={false}> <NFormItem path='timeoutFlag' showLabel={false}>

View File

@ -137,6 +137,7 @@ export interface Coordinate {
export interface GlobalParam { export interface GlobalParam {
key: string key: string
direct: string
value: string value: string
} }

View File

@ -58,7 +58,7 @@ export default defineComponent({
return { return {
prop: p.key, prop: p.key,
value: p.value, value: p.value,
direct: 'IN', direct: p.direct,
type: 'VARCHAR' type: 'VARCHAR'
} }
}) })

View File

@ -84,7 +84,7 @@ export default defineComponent({
return { return {
prop: p.key, prop: p.key,
value: p.value, value: p.value,
direct: 'IN', direct: p.direct,
type: 'VARCHAR' type: 'VARCHAR'
} }
}) })

View File

@ -81,7 +81,7 @@ export default defineComponent({
return { return {
prop: p.key, prop: p.key,
value: p.value, value: p.value,
direct: 'IN', direct: p.direct,
type: 'VARCHAR' type: 'VARCHAR'
} }
}) })