[Feature] Add project name/workflow definition name/instance id/task name to the built-in variables (#14099)

* [Feature] Add project name/workflow definition name/instance id/task name to the built-in variables
This commit is contained in:
haibingtown 2023-05-25 11:02:39 +08:00 committed by GitHub
parent 41e3ce35a7
commit af7da0630e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 172 additions and 39 deletions

View File

@ -2,11 +2,20 @@
## Basic Built-in Parameter
| Variable | Declaration Method | Meaning |
|--------------------|-------------------------|---------------------------------------------------------------------------------------------|
| system.biz.date | `${system.biz.date}` | The day before the schedule time of the daily scheduling instance, the format is `yyyyMMdd` |
| system.biz.curdate | `${system.biz.curdate}` | The schedule time of the daily scheduling instance, the format is `yyyyMMdd` |
| system.datetime | `${system.datetime}` | The schedule time of the daily scheduling instance, the format is `yyyyMMddHHmmss` |
| Variable | Declaration Method | Meaning |
|---------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------|
| system.biz.date | `${system.biz.date}` | The day before the schedule time of the daily scheduling instance, the format is `yyyyMMdd` |
| system.biz.curdate | `${system.biz.curdate}` | The schedule time of the daily scheduling instance, the format is `yyyyMMdd` |
| system.datetime | `${system.datetime}` | The schedule time of the daily scheduling instance, the format is `yyyyMMddHHmmss` |
| system.task.execute.path | `${system.task.execute.path}` | The absolute path of current executing task |
| system.task.instance.id | `${ssystem.task.instance.id}` | The instance id of current task |
| system.task.definition.name | `${system.task.definition.name}` | The definition name of current task |
| system.task.definition.code | `${system.task.definition.code}` | The definition code of current task |
| system.workflow.instance.id | `${system.workflow.instance.id}` | The instance id of the workflow to which current task belongs |
| system.workflow.definition.name | `${system.workflow.definition.name}` | The definition name of the workflow to which current task belongs |
| system.workflow.definition.code | `${system.workflow.definition.code}` | The definition code of the workflow to which current task belongs |
| system.project.name | `${system.project.name}` | The name of the project to which current task belongs |
| system.project.code | `${system.project.code}` | The code of the project to which current task belongs |
## Extended Built-in Parameter

View File

@ -2,24 +2,20 @@
## 基础内置参数
<table>
<tr><th>变量名</th><th>声明方式</th><th>含义</th></tr>
<tr>
<td>system.biz.date</td>
<td>${system.biz.date}</td>
<td>日常调度实例定时的定时时间前一天,格式为 yyyyMMdd</td>
</tr>
<tr>
<td>system.biz.curdate</td>
<td>${system.biz.curdate}</td>
<td>日常调度实例定时的定时时间,格式为 yyyyMMdd</td>
</tr>
<tr>
<td>system.datetime</td>
<td>${system.datetime}</td>
<td>日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss</td>
</tr>
</table>
| 变量名 | 声明方式 | 含义 |
|---------------------------------|--------------------------------------|----------------------------------|
| system.biz.date | `${system.biz.date}` | 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd |
| system.biz.curdate | `${system.biz.curdate}` | 日常调度实例定时的定时时间,格式为 yyyyMMdd |
| system.datetime | `${system.datetime}` | 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss |
| system.task.execute.path | `${system.task.execute.path}` | 当前任务执行的绝对路径 |
| system.task.instance.id | `${ssystem.task.instance.id}` | 当前任务实例的ID |
| system.task.definition.name | `${system.task.definition.name}` | 当前任务所属任务定义的名称 |
| system.task.definition.code | `${system.task.definition.code}` | 当前任务所属任务定义的code |
| system.workflow.instance.id | `${system.workflow.instance.id}` | 当前任务所属工作流实例ID |
| system.workflow.definition.name | `${system.workflow.definition.name}` | 当前任务所属工作流定义的名称 |
| system.workflow.definition.code | `${system.workflow.definition.code}` | 当前任务所属工作流定义的code |
| system.project.name | `${system.project.name}` | 当前任务所在项目的名称 |
| system.project.code | `${system.project.code}` | 当前任务所在项目的code |
## 衍生内置参数

View File

@ -17,8 +17,15 @@
package org.apache.dolphinscheduler.service.expand;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_PROJECT_CODE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_PROJECT_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_DEFINITION_CODE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_DEFINITION_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.constants.DateConstants;
@ -123,7 +130,7 @@ public class CuringGlobalParams implements CuringParamsService {
}
/**
* the global parameters and local parameters used in the worker will be prepared here.
* the global parameters and local parameters used in the worker will be prepared here, and built-in parameters.
*
* @param taskInstance
* @param parameters
@ -137,8 +144,6 @@ public class CuringGlobalParams implements CuringParamsService {
// assign value to definedParams here
Map<String, String> globalParamsMap = setGlobalParamsMap(processInstance);
Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(globalParamsMap);
CommandType commandType = processInstance.getCmdTypeIfComplement();
Date scheduleTime = processInstance.getScheduleTime();
// combining local and global parameters
Map<String, Property> localParams = parameters.getInputLocalParametersMap();
@ -147,24 +152,18 @@ public class CuringGlobalParams implements CuringParamsService {
parameters.setVarPool(taskInstance.getVarPool());
Map<String, Property> varParams = parameters.getVarPoolMap();
if (MapUtils.isEmpty(globalParams) && MapUtils.isEmpty(localParams) && MapUtils.isEmpty(varParams)) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
String timeZone = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
Map<String, String> params = BusinessTimeUtils.getBusinessTime(commandType, scheduleTime, timeZone);
if (MapUtils.isNotEmpty(globalParamsMap)) {
params.putAll(globalParamsMap);
}
// built-in params
Map<String, String> params = setBuiltInParamsMap(taskInstance, timeZone);
if (StringUtils.isNotBlank(taskInstance.getExecutePath())) {
params.put(PARAMETER_TASK_EXECUTE_PATH, taskInstance.getExecutePath());
if (MapUtils.isNotEmpty(params)) {
globalParams.putAll(ParamUtils.getUserDefParamsMap(params));
}
params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskInstance.getId()));
if (MapUtils.isNotEmpty(varParams)) {
globalParams.putAll(varParams);
@ -206,6 +205,33 @@ public class CuringGlobalParams implements CuringParamsService {
return globalParams;
}
/**
* build all built-in parameters
* @param taskInstance
* @param timeZone
*/
private Map<String, String> setBuiltInParamsMap(@NonNull TaskInstance taskInstance, String timeZone) {
CommandType commandType = taskInstance.getProcessInstance().getCmdTypeIfComplement();
Date scheduleTime = taskInstance.getProcessInstance().getScheduleTime();
Map<String, String> params = BusinessTimeUtils.getBusinessTime(commandType, scheduleTime, timeZone);
if (StringUtils.isNotBlank(taskInstance.getExecutePath())) {
params.put(PARAMETER_TASK_EXECUTE_PATH, taskInstance.getExecutePath());
}
params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskInstance.getId()));
params.put(PARAMETER_TASK_DEFINITION_NAME, taskInstance.getTaskDefine().getName());
params.put(PARAMETER_TASK_DEFINITION_CODE, Long.toString(taskInstance.getTaskDefine().getCode()));
params.put(PARAMETER_WORKFLOW_INSTANCE_ID, Integer.toString(taskInstance.getProcessInstance().getId()));
params.put(PARAMETER_WORKFLOW_DEFINITION_NAME,
taskInstance.getProcessInstance().getProcessDefinition().getName());
params.put(PARAMETER_WORKFLOW_DEFINITION_CODE,
Long.toString(taskInstance.getProcessInstance().getProcessDefinition().getCode()));
params.put(PARAMETER_PROJECT_NAME, taskInstance.getProcessInstance().getProcessDefinition().getProjectName());
params.put(PARAMETER_PROJECT_CODE,
Long.toString(taskInstance.getProcessInstance().getProcessDefinition().getProjectCode()));
return params;
}
private Map<String, String> setGlobalParamsMap(ProcessInstance processInstance) {
Map<String, String> globalParamsMap = new HashMap<>(16);

View File

@ -17,12 +17,20 @@
package org.apache.dolphinscheduler.service.expand;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import java.util.ArrayList;
import java.util.Date;
@ -39,6 +47,8 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.Lists;
@ExtendWith(MockitoExtension.class)
public class CuringGlobalParamsServiceTest {
@ -158,4 +168,60 @@ public class CuringGlobalParamsServiceTest {
CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assertions.assertEquals(result6, JSONUtils.toJsonString(globalParamList));
}
@Test
public void testParamParsingPreparation() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setExecutePath("home/path/execute");
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setName("TaskName-1");
taskDefinition.setCode(1000001l);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(2);
processInstance.setCommandParam("{\"" + Constants.SCHEDULE_TIMEZONE + "\":\"Asia/Shanghai\"}");
processInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
Property property = new Property();
property.setDirect(Direct.IN);
property.setProp("global_params");
property.setValue("hello world");
property.setType(DataType.VARCHAR);
List<Property> properties = Lists.newArrayList(property);
processInstance.setGlobalParams(JSONUtils.toJsonString(properties));
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setName("ProcessName-1");
processDefinition.setProjectName("ProjectName-1");
processDefinition.setProjectCode(3000001l);
processDefinition.setCode(200001l);
processInstance.setProcessDefinition(processDefinition);
taskInstance.setProcessDefine(processDefinition);
taskInstance.setProcessInstance(processInstance);
taskInstance.setTaskDefine(taskDefinition);
AbstractParameters parameters = new SubProcessParameters();
Map<String, Property> propertyMap =
dolphinSchedulerCuringGlobalParams.paramParsingPreparation(taskInstance, parameters, processInstance);
Assertions.assertNotNull(propertyMap);
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_INSTANCE_ID).getValue(),
String.valueOf(taskInstance.getId()));
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_EXECUTE_PATH).getValue(),
taskInstance.getExecutePath());
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_INSTANCE_ID).getValue(),
String.valueOf(processInstance.getId()));
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_NAME).getValue(),
processDefinition.getName());
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_NAME).getValue(),
processDefinition.getProjectName());
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_PROJECT_CODE).getValue(),
String.valueOf(processDefinition.getProjectCode()));
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_DEFINITION_CODE).getValue(),
String.valueOf(taskDefinition.getCode()));
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_WORKFLOW_DEFINITION_CODE).getValue(),
String.valueOf(processDefinition.getCode()));
}
}

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.common.constants.DateConstants;
import java.time.Duration;
import java.util.Set;
import java.util.regex.Pattern;
@ -165,17 +167,17 @@ public class TaskConstants {
/**
* system date(yyyyMMddHHmmss)
*/
public static final String PARAMETER_DATETIME = "system.datetime";
public static final String PARAMETER_DATETIME = DateConstants.PARAMETER_DATETIME;
/**
* system date(yyyymmdd) today
*/
public static final String PARAMETER_CURRENT_DATE = "system.biz.curdate";
public static final String PARAMETER_CURRENT_DATE = DateConstants.PARAMETER_CURRENT_DATE;
/**
* system date(yyyymmdd) yesterday
*/
public static final String PARAMETER_BUSINESS_DATE = "system.biz.date";
public static final String PARAMETER_BUSINESS_DATE = DateConstants.PARAMETER_BUSINESS_DATE;
/**
* the absolute path of current executing task
@ -187,6 +189,40 @@ public class TaskConstants {
*/
public static final String PARAMETER_TASK_INSTANCE_ID = "system.task.instance.id";
/**
* the definition code of current task
*/
public static final String PARAMETER_TASK_DEFINITION_CODE = "system.task.definition.code";
/**
* the definition name of current task
*/
public static final String PARAMETER_TASK_DEFINITION_NAME = "system.task.definition.name";
/**
* the instance id of the workflow to which current task belongs
*/
public static final String PARAMETER_WORKFLOW_INSTANCE_ID = "system.workflow.instance.id";
/**
* the definition code of the workflow to which current task belongs
*/
public static final String PARAMETER_WORKFLOW_DEFINITION_CODE = "system.workflow.definition.code";
/**
* the definition name of the workflow to which current task belongs
*/
public static final String PARAMETER_WORKFLOW_DEFINITION_NAME = "system.workflow.definition.name";
/**
* the code of the project to which current task belongs
*/
public static final String PARAMETER_PROJECT_CODE = "system.project.code";
/**
* the name of the project to which current task belongs
*/
public static final String PARAMETER_PROJECT_NAME = "system.project.name";
/**
* month_begin
*/