[Fix-16705] [Built-in Parameter] Fix project name and definition name variable parameter not resolved (#16715)

This commit is contained in:
小可耐 2024-10-25 15:29:57 +08:00 committed by GitHub
parent cf9d4878e9
commit 5f319e5183
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 186 additions and 9 deletions

View File

@ -21,10 +21,12 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.utils.JSONUtils.parseObject;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.ProjectDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionLogDao;
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
@ -64,12 +66,16 @@ public abstract class AbstractCommandHandler implements ICommandHandler {
@Autowired
protected List<IWorkflowLifecycleListener> workflowLifecycleListeners;
@Autowired
protected ProjectDao projectDao;
@Override
public WorkflowExecutionRunnable handleCommand(final Command command) {
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder = WorkflowExecuteContext.builder()
.withCommand(command);
assembleWorkflowDefinition(workflowExecuteContextBuilder);
assembleProject(workflowExecuteContextBuilder);
assembleWorkflowGraph(workflowExecuteContextBuilder);
assembleWorkflowInstance(workflowExecuteContextBuilder);
assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder);
@ -146,4 +152,12 @@ public abstract class AbstractCommandHandler implements ICommandHandler {
workflowInstance.getTestFlag());
}
protected void assembleProject(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final WorkflowDefinition workflowDefinition = workflowExecuteContextBuilder.getWorkflowDefinition();
final Project project = projectDao.queryByCode(workflowDefinition.getProjectCode());
checkArgument(project != null, "Cannot find the project code: " + workflowDefinition.getProjectCode());
workflowExecuteContextBuilder.setProject(project);
}
}

View File

@ -123,6 +123,7 @@ public class RecoverFailureTaskCommandHandler extends AbstractCommandHandler {
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.taskInstance(taskInstanceMap.get(task))

View File

@ -98,6 +98,7 @@ public class RunWorkflowCommandHandler extends AbstractCommandHandler {
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())

View File

@ -119,6 +119,7 @@ public class WorkflowFailoverCommandHandler extends AbstractCommandHandler {
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.taskInstance(taskInstanceMap.get(task))

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.engine.task.runnable;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -35,5 +36,6 @@ public class TaskExecutionContextCreateRequest {
private WorkflowInstance workflowInstance;
private TaskDefinition taskDefinition;
private TaskInstance taskInstance;
private Project project;
}

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.engine.task.runnable;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -57,6 +58,8 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable {
@Getter
private final WorkflowDefinition workflowDefinition;
@Getter
private final Project project;
@Getter
private final WorkflowInstance workflowInstance;
@Getter
private TaskInstance taskInstance;
@ -70,6 +73,7 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable {
this.workflowExecutionGraph = checkNotNull(taskExecutionRunnableBuilder.getWorkflowExecutionGraph());
this.workflowEventBus = checkNotNull(taskExecutionRunnableBuilder.getWorkflowEventBus());
this.workflowDefinition = checkNotNull(taskExecutionRunnableBuilder.getWorkflowDefinition());
this.project = checkNotNull(taskExecutionRunnableBuilder.getProject());
this.workflowInstance = checkNotNull(taskExecutionRunnableBuilder.getWorkflowInstance());
this.taskDefinition = checkNotNull(taskExecutionRunnableBuilder.getTaskDefinition());
this.taskInstance = taskExecutionRunnableBuilder.getTaskInstance();
@ -144,6 +148,7 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable {
checkState(isTaskInstanceInitialized(), "The task instance is null, can't initialize TaskExecutionContext.");
final TaskExecutionContextCreateRequest request = TaskExecutionContextCreateRequest.builder()
.workflowDefinition(workflowDefinition)
.project(project)
.workflowInstance(workflowInstance)
.taskDefinition(taskDefinition)
.taskInstance(taskInstance)

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.engine.task.runnable;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -37,6 +38,7 @@ public class TaskExecutionRunnableBuilder {
private final IWorkflowExecutionGraph workflowExecutionGraph;
private final WorkflowDefinition workflowDefinition;
private final Project project;
private final WorkflowInstance workflowInstance;
private final TaskDefinition taskDefinition;
private final TaskInstance taskInstance;

View File

@ -36,7 +36,9 @@ import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqRule;
import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
@ -98,6 +100,8 @@ public class TaskExecutionContextFactory {
public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreateRequest request) {
TaskInstance taskInstance = request.getTaskInstance();
WorkflowInstance workflowInstance = request.getWorkflowInstance();
WorkflowDefinition workflowDefinition = request.getWorkflowDefinition();
Project project = request.getProject();
ResourceParametersHelper resources = TaskPluginManager.getTaskChannel(taskInstance.getTaskType())
.parseParameters(taskInstance.getTaskParams())
@ -108,8 +112,10 @@ public class TaskExecutionContextFactory {
AbstractParameters baseParam =
TaskPluginManager.parseTaskParameters(taskInstance.getTaskType(), taskInstance.getTaskParams());
Map<String, Property> propertyMap =
curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance);
curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance,
project.getName(), workflowDefinition.getName());
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildWorkflowInstanceHost(masterConfig.getMasterAddress())
.buildTaskInstanceRelatedInfo(taskInstance)

View File

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
@ -40,6 +41,8 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext {
private final WorkflowDefinition workflowDefinition;
private final Project project;
private final WorkflowInstance workflowInstance;
private final IWorkflowGraph workflowGraph;
@ -72,6 +75,8 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext {
private List<IWorkflowLifecycleListener> workflowInstanceLifecycleListeners;
private Project project;
public WorkflowExecuteContextBuilder withCommand(Command command) {
this.command = command;
return this;
@ -81,6 +86,7 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext {
return new WorkflowExecuteContext(
command,
workflowDefinition,
project,
workflowInstance,
workflowGraph,
workflowExecutionGraph,

View File

@ -709,4 +709,40 @@ public class WorkflowStartTestCase extends AbstractMasterIntegrationTestCase {
assertThat(workflowRepository.getAll()).isEmpty();
}
@Test
@DisplayName("Test start a workflow which using workflow built in params")
public void testStartWorkflow_usingWorkflowBuiltInParam() {
final String yaml = "/it/start/workflow_with_built_in_param.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getWorkflows().get(0);
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflow))
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.SUCCESS));
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.hasSize(2)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
assertThat(workflowRepository.getAll()).isEmpty();
}
}

View File

@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -118,6 +119,7 @@ class GlobalTaskDispatchWaitingQueueLooperTest {
.taskInstance(taskInstance)
.workflowExecutionGraph(new WorkflowExecutionGraph())
.workflowDefinition(new WorkflowDefinition())
.project(new Project())
.taskDefinition(new TaskDefinition())
.workflowEventBus(new WorkflowEventBus())
.build();

View File

@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -194,6 +195,7 @@ class GlobalTaskDispatchWaitingQueueTest {
.taskInstance(taskInstance)
.workflowExecutionGraph(new WorkflowExecutionGraph())
.workflowDefinition(new WorkflowDefinition())
.project(new Project())
.taskDefinition(new TaskDefinition())
.workflowEventBus(new WorkflowEventBus())
.build();

View File

@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
workflows:
- name: workflow_with_one_fake_task_success
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with single task
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL
tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":[],"shellScript":"if [ \"${system.project.name}\" = \"MasterIntegrationTest\" ]; then\n exit 0 \nelse\n exit 1\nfi","resourceList":[]}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":[],"shellScript":"if [ \"${system.workflow.definition.name}\" = \"workflow_with_one_fake_task_success\" ]; then\n exit 0 \nelse\n exit 1\nfi","resourceList":[]}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00

View File

@ -76,11 +76,15 @@ public interface CuringParamsService {
* @param parameters
* @param taskInstance
* @param workflowInstance
* @param projectName
* @param workflowDefinitionName
* @return
*/
Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance,
@NonNull AbstractParameters parameters,
@NonNull WorkflowInstance workflowInstance);
@NonNull WorkflowInstance workflowInstance,
String projectName,
String workflowDefinitionName);
/**
* Parse workflow star parameter

View File

@ -180,12 +180,16 @@ public class CuringParamsServiceImpl implements CuringParamsService {
* @param taskInstance
* @param parameters
* @param workflowInstance
* @param projectName
* @param workflowDefinitionName
* @return
*/
@Override
public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance,
@NonNull AbstractParameters parameters,
@NonNull WorkflowInstance workflowInstance) {
@NonNull WorkflowInstance workflowInstance,
String projectName,
String workflowDefinitionName) {
Map<String, Property> prepareParamsMap = new HashMap<>();
// assign value to definedParams here
@ -205,7 +209,8 @@ public class CuringParamsServiceImpl implements CuringParamsService {
String timeZone = commandParam.getTimeZone();
// built-in params
Map<String, String> builtInParams = setBuiltInParamsMap(taskInstance, workflowInstance, timeZone);
Map<String, String> builtInParams =
setBuiltInParamsMap(taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);
// project-level params
Map<String, Property> projectParams = getProjectParameterMap(taskInstance.getProjectCode());
@ -273,10 +278,14 @@ public class CuringParamsServiceImpl implements CuringParamsService {
*
* @param taskInstance
* @param timeZone
* @param projectName
* @param workflowDefinitionName
*/
private Map<String, String> setBuiltInParamsMap(@NonNull TaskInstance taskInstance,
WorkflowInstance workflowInstance,
String timeZone) {
String timeZone,
String projectName,
String workflowDefinitionName) {
CommandType commandType = workflowInstance.getCmdTypeIfComplement();
Date scheduleTime = workflowInstance.getScheduleTime();
@ -289,10 +298,9 @@ public class CuringParamsServiceImpl implements CuringParamsService {
params.put(PARAMETER_TASK_DEFINITION_NAME, taskInstance.getName());
params.put(PARAMETER_TASK_DEFINITION_CODE, Long.toString(taskInstance.getTaskCode()));
params.put(PARAMETER_WORKFLOW_INSTANCE_ID, Integer.toString(taskInstance.getWorkflowInstanceId()));
// todo: set workflow definitionName and projectName
params.put(PARAMETER_WORKFLOW_DEFINITION_NAME, null);
params.put(PARAMETER_WORKFLOW_DEFINITION_NAME, workflowDefinitionName);
params.put(PARAMETER_WORKFLOW_DEFINITION_CODE, Long.toString(workflowInstance.getWorkflowDefinitionCode()));
params.put(PARAMETER_PROJECT_NAME, null);
params.put(PARAMETER_PROJECT_NAME, projectName);
params.put(PARAMETER_PROJECT_CODE, Long.toString(workflowInstance.getProjectCode()));
return params;
}

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.expand;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
@ -209,6 +210,10 @@ public class CuringParamsServiceTest {
workflowDefinition.setProjectCode(3000001L);
workflowDefinition.setCode(200001L);
Project project = new Project();
project.setName("ProjectName");
project.setCode(3000001L);
workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
taskInstance.setTaskCode(taskDefinition.getCode());
@ -221,7 +226,8 @@ public class CuringParamsServiceTest {
Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());
Map<String, Property> propertyMap =
dolphinSchedulerCuringGlobalParams.paramParsingPreparation(taskInstance, parameters, workflowInstance);
dolphinSchedulerCuringGlobalParams.paramParsingPreparation(taskInstance, parameters, workflowInstance,
project.getName(), workflowDefinition.getName());
Assertions.assertNotNull(propertyMap);
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_INSTANCE_ID).getValue(),
String.valueOf(taskInstance.getId()));