[Improvement][K8S] K8S task support passing context parameters to downstream task (#14934)

* [Improvement][K8S] K8S task support passing context parameters to downstream task

Signed-off-by: Gallardot <gallardot@apache.org>

* [Improvement][K8S] K8S task support passing context parameters to downstream task

Signed-off-by: Gallardot <gallardot@apache.org>

* chore: update doc img

Signed-off-by: Gallardot <gallardot@apache.org>

* chore: update doc

Signed-off-by: Gallardot <gallardot@apache.org>

* chore: add UT

Signed-off-by: Gallardot <gallardot@apache.org>

* merge suggestion

Co-authored-by: 旺阳 <wang@lqwang.net>

* merge suggestion

Co-authored-by: 旺阳 <wang@lqwang.net>

* merge suggestion

Co-authored-by: 旺阳 <wang@lqwang.net>

---------

Signed-off-by: Gallardot <gallardot@apache.org>
Co-authored-by: David Zollo <dailidong66@gmail.com>
Co-authored-by: 旺阳 <qingwli@cisco.com>
Co-authored-by: 旺阳 <wang@lqwang.net>
Co-authored-by: xiangzihao <460888207@qq.com>
This commit is contained in:
Gallardot 2023-10-08 17:27:10 +08:00 committed by GitHub
parent 2a65590117
commit 049c1511fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 161 additions and 2 deletions

View File

@ -15,6 +15,7 @@ DolphinScheduler allows parameter transfer between tasks. Currently, transfer di
* [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md)
* [SubProcess](../task/sub-process.md)
* [Kubernetes](../task/kubernetes.md)
When defining an upstream node, if there is a need to transmit the result of that node to a dependency related downstream node. You need to set an `OUT` direction parameter to [Custom Parameters] of the [Current Node Settings]. If it is a SubProcess node, there is no need to set a parameter in [Current Node Settings], but an `OUT` direction parameter needs to be set in the workflow definition of the subprocess.
@ -116,3 +117,13 @@ Save the workflow and run it. The result of the downstream task is as follows:
![context-subprocess05](../../../../img/new_ui/dev/parameter/context-subprocess05.png)
Although the two parameters var1 and var2 are output in the A task, only the `OUT` parameter var1 is defined in the workflow definition, and the downstream task successfully outputs var1. It proves that the var1 parameter is passed in the workflow with reference to the expected value.
#### Pass parameter from Kubernetes task to downstream
Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)dsVal}`. Users can output log data in the format `${(key=value)dsVal}` in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)dsVal}` in the output logs to capture the parameters and pass them downstream.
For example
![kubernetes_context_param](../../../../img/new_ui/dev/parameter/k8s_context_param.png)
Another special consideration, not always can DolphinScheduler collect pod logs, if the user redirects the log output stream, DolphinScheduler can not collect logs for use and can not use the output parameter, either.

View File

@ -15,6 +15,7 @@ DolphinScheduler 允许在任务间进行参数传递,目前传递方向仅支
* [Procedure](../task/stored-procedure.md)
* [Python](../task/python.md)
* [SubProcess](../task/sub-process.md)
* [Kubernetes](../task/kubernetes.md)
当定义上游节点时,如果有需要将该节点的结果传递给有依赖关系的下游节点,需要在【当前节点设置】的【自定义参数】设置一个方向是 OUT 的变量。如果是 SubProcess 节点无需在【当前节点设置】中设置变量,需要在子流程的工作流定义中设置一个方向是 OUT 的变量。
@ -115,3 +116,13 @@ Node_mysql 运行结果如下:
![context-subprocess05](../../../../img/new_ui/dev/parameter/context-subprocess05.png)
虽然在 A 任务中输出 var1 和 var2 两个参数,但是工作流定义中只定义了 var1 的 OUT 变量,下游任务成功输出 var1证明var1 参数参照预期的值在该工作流中传递。
#### Kubernetes 任务传递参数
在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架DolphinScheduler为了兼容不同的日志框架提供了一种通用的日志数据格式`${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据key 为对应参数的 propvalue 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。
如下图所示:
![kubernetes_context_param](../../../../img/new_ui/dev/parameter/k8s_context_param.png)
另外需要特别注意的是并非总是可以收集pod日志如果用户重定向日志输出流我们既不能收集日志使用也不能使用输出参数。

Binary file not shown.

After

Width:  |  Height:  |  Size: 94 KiB

View File

@ -37,6 +37,8 @@ public class TaskConstants {
public static final String SETVALUE_REGEX = "[\\$#]\\{setValue\\((.*?)\\)}";
public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$";
/**
* string false
*/

View File

@ -47,6 +47,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
dealOutParam(abstractK8sTaskExecutor.getVarPool());
} catch (Exception e) {
log.error("k8s task submit failed with error");
exitStatusCode = -1;
@ -85,4 +86,5 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
*/
protected abstract String buildCommand();
protected abstract void dealOutParam(String result);
}

View File

@ -31,12 +31,16 @@ public abstract class AbstractK8sTaskExecutor {
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
protected Yaml yaml;
protected StringBuilder varPool;
protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) {
this.log = log;
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
this.yaml = new Yaml();
this.varPool = new StringBuilder();
}
public String getVarPool() {
return varPool.toString();
}
public abstract TaskResponse run(String k8sParameterStr) throws Exception;

View File

@ -47,6 +47,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.lang3.StringUtils;
@ -262,6 +263,11 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) {
while ((line = reader.readLine()) != null) {
log.info("[K8S-pod-log] {}", line);
if (line.endsWith(VarPoolUtils.VAR_SUFFIX)) {
varPool.append(VarPoolUtils.findVarPool(line));
varPool.append(VarPoolUtils.VAR_DELIMITER);
}
}
}
} catch (Exception e) {

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.experimental.UtilityClass;
@UtilityClass
public class VarPoolUtils {
static final Pattern DSVALUE_REGEX = Pattern.compile(TaskConstants.DSVALUE_REGEX);
public static final String VAR_SUFFIX = ")dsVal}";
public static final String VAR_DELIMITER = "$VarPool$";
/**
* find var pool
*
* @param line
* @return
*/
public static String findVarPool(String line) {
Matcher matcher = DSVALUE_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.NAMESPACE_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -107,4 +108,11 @@ public class K8sTaskExecutorTest {
Assertions.assertEquals(e.getMessage(), "K8sTask is timeout");
}
}
@Test
public void testValpool() {
String result = "key=value" + VAR_DELIMITER;
k8sTaskExecutor.varPool.append(result);
Assertions.assertEquals(result, k8sTaskExecutor.getVarPool());
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class VarPoolUtilsTest {
@Test
void findVar() {
HashMap<String, String> tcs = new HashMap<>();
tcs.put("${(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 ${(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 ${(set_val=123)dsVal}123", null);
tcs.put("${(set_val=123}dsVal", null);
tcs.put("#{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 #{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 #{(set_val=123)dsVal}123", null);
tcs.put("#{(set_val=123)dsVal}123", null);
tcs.put("#{(set_val=123dsVal}", null);
tcs.put("${(set_val=123)dsVal}${(set_val=456)dsVal}", "set_val=123)dsVal}${(set_val=456");
tcs.put("1970-01-01$#{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01{(set_val=123)dsVal}123", null);
tcs.put("1970-01-01$#{(${(set_val=123)})dsVal}", "${(set_val=123)}");
tcs.put("1970-01-01$#{(${(set_val=123\\)})dsVal}", "${(set_val=123\\)}");
for (String tc : tcs.keySet()) {
Assertions.assertEquals(tcs.get(tc), VarPoolUtils.findVarPool(tc));
}
}
}

View File

@ -102,6 +102,11 @@ public class K8sTask extends AbstractK8sTask {
return JSONUtils.toJsonString(k8sTaskMainParameters);
}
@Override
protected void dealOutParam(String result) {
this.k8sTaskParameters.dealOutParam(result);
}
public List<NodeSelectorRequirement> convertToNodeSelectorRequirements(List<NodeSelectorExpression> expressions) {
if (CollectionUtils.isEmpty(expressions)) {
return Collections.emptyList();

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
@ -76,6 +78,7 @@ public class K8sTaskTest {
k8sTaskParameters.setArgs(args);
k8sTaskParameters.setCustomizedLabels(labels);
k8sTaskParameters.setNodeSelectors(nodeSelectorExpressions);
k8sTaskParameters.setLocalParams(new ArrayList<>());
k8sTaskParameters.setPullSecret(pullSecret);
TaskExecutionContext taskRequest = new TaskExecutionContext();
taskRequest.setTaskInstanceId(taskInstanceId);
@ -130,4 +133,14 @@ public class K8sTaskTest {
Assertions.assertEquals(expectedList, nodeSelectorRequirements.get(0).getValues());
}
@Test
public void testDealOutParam() {
String result = "key=123" + VAR_DELIMITER;
k8sTask.getParameters().localParams.add(new Property("key", Direct.OUT, DataType.VARCHAR, "value"));
k8sTask.dealOutParam(result);
k8sTask.getParameters().getVarPool().forEach(property -> {
Assertions.assertNotEquals("value", property.getValue());
Assertions.assertEquals("123", property.getValue());
});
}
}

View File

@ -113,7 +113,7 @@ export function useK8s(model: { [field: string]: any }): IJsonItem[] {
field: 'nodeSelectors',
name: 'node_selectors'
}),
...useCustomParams({ model, field: 'localParams', isSimple: true })
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}