[Feature-#8373][MasterServer] Dependent tasks can re-run automatically in the case of complement (#8496)

* first add feature_8373

* fix code smell

* add blank line

* fix some problems

* fix unit test error
This commit is contained in:
xiangzihao 2022-02-25 10:34:02 +08:00 committed by GitHub
parent e174d7a40e
commit 891a002bea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 443 additions and 32 deletions

View File

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@ -107,7 +108,9 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ -130,7 +133,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@ -139,8 +143,15 @@ public class ExecutorController extends BaseController {
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode,timeout, startParamMap, expectedParallelismNumber, dryRun);
if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode);
return returnDataList(result);
}
@ -181,7 +192,9 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8")
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ -204,7 +217,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun) {
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@ -215,6 +229,10 @@ public class ExecutorController extends BaseController {
startParamMap = JSONUtils.toMap(startParams);
}
if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}
Map<String, Object> result = new HashMap<>();
List<String> processDefinitionCodeArray = Arrays.asList(processDefinitionCodes.split(Constants.COMMA));
List<String> startFailedProcessDefinitionCodeList = new ArrayList<>();
@ -224,7 +242,9 @@ public class ExecutorController extends BaseController {
for (String strProcessDefinitionCode : processDefinitionCodeArray) {
long processDefinitionCode = Long.parseLong(strProcessDefinitionCode);
result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun);
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
complementDependentMode);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@ -63,7 +64,8 @@ public interface ExecutorService {
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun);
int dryRun,
ComplementDependentMode complementDependentMode);
/**
* check whether the process definition can be executed

View File

@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
@ -31,6 +32,8 @@ import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -44,6 +47,7 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -66,9 +70,9 @@ import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -97,11 +101,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessService processService;
@ -138,7 +140,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun) {
int dryRun,
ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -175,7 +178,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/
int create = this.createCommand(commandType, processDefinition.getCode(),
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams, expectedParallelismNumber, dryRun);
warningGroupId, runMode, processInstancePriority, workerGroup, environmentCode, startParams,
expectedParallelismNumber, dryRun, complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@ -536,7 +540,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun) {
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, ComplementDependentMode complementDependentMode) {
/**
* instantiate command schedule instance
@ -599,7 +603,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (start == null || end == null) {
return 0;
}
return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber);
return createComplementCommandList(start, end, runMode, command, expectedParallelismNumber, complementDependentMode);
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
return processService.createCommand(command);
@ -608,15 +612,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* create complement command
* close left open right
* close left and close right
*
* @param start
* @param end
* @param runMode
* @return
*/
private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command, Integer expectedParallelismNumber) {
private int createComplementCommandList(Date start, Date end, RunMode runMode, Command command,
Integer expectedParallelismNumber, ComplementDependentMode complementDependentMode) {
int createCount = 0;
int dependentProcessDefinitionCreateCount = 0;
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
switch (runMode) {
@ -629,6 +636,17 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(end));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ "dependent complement data", command.getProcessDefinitionCode());
} else {
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
break;
}
case RUN_MODE_PARALLEL: {
@ -637,7 +655,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
}
LinkedList<Date> listDate = new LinkedList<>();
List<Date> listDate = new ArrayList<>();
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
listDate.addAll(CronUtils.getSelfFireDateList(start, end, schedules));
int listDateSize = listDate.size();
@ -650,7 +668,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
}
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be assigned to the remaining tasks.
int itemsPerCommand = (listDateSize / createCount);
@ -673,6 +691,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ "dependent complement data", command.getProcessDefinitionCode());
} else {
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
}
}
break;
@ -680,7 +705,81 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
default:
break;
}
logger.info("create complement command count: {}", createCount);
logger.info("create complement command count: {}, create dependent complement command count: {}", createCount
, dependentProcessDefinitionCreateCount);
return createCount;
}
/**
* create complement dependent command
*/
private int createComplementDependentCommand(List<Schedule> schedules, Command command) {
int dependentProcessDefinitionCreateCount = 0;
Command dependentCommand;
try {
dependentCommand = (Command) BeanUtils.cloneBean(command);
} catch (Exception e) {
logger.error("copy dependent command error: ", e);
return dependentProcessDefinitionCreateCount;
}
List<DependentProcessDefinition> dependentProcessDefinitionList =
getComplementDependentDefinitionList(dependentCommand.getProcessDefinitionCode(),
CronUtils.getMaxCycle(schedules.get(0).getCrontab()),
dependentCommand.getWorkerGroup());
dependentCommand.setTaskDependType(TaskDependType.TASK_POST);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
dependentCommand.setProcessDefinitionCode(dependentProcessDefinition.getProcessDefinitionCode());
dependentCommand.setWorkerGroup(dependentProcessDefinition.getWorkerGroup());
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand);
}
return dependentProcessDefinitionCreateCount;
}
/**
* get complement dependent process definition list
*/
private List<DependentProcessDefinition> getComplementDependentDefinitionList(long processDefinitionCode,
CycleEnum processDefinitionCycle,
String workerGroup) {
List<DependentProcessDefinition> dependentProcessDefinitionList =
processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
return checkDependentProcessDefinitionValid(dependentProcessDefinitionList,processDefinitionCycle,workerGroup);
}
/**
* Check whether the dependency cycle of the dependent node is consistent with the schedule cycle of
* the dependent process definition and if there is no worker group in the schedule, use the complement selection's
* worker group
*/
private List<DependentProcessDefinition> checkDependentProcessDefinitionValid(List<DependentProcessDefinition> dependentProcessDefinitionList,
CycleEnum processDefinitionCycle,
String workerGroup) {
List<DependentProcessDefinition> validDependentProcessDefinitionList = new ArrayList<>();
List<Long> processDefinitionCodeList = dependentProcessDefinitionList.stream()
.map(DependentProcessDefinition::getProcessDefinitionCode)
.collect(Collectors.toList());
Map<Long, String> processDefinitionWorkerGroupMap = processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle() == processDefinitionCycle) {
if (processDefinitionWorkerGroupMap.get(dependentProcessDefinition.getProcessDefinitionCode()) == null) {
dependentProcessDefinition.setWorkerGroup(workerGroup);
}
validDependentProcessDefinitionList.add(dependentProcessDefinition);
}
}
return validDependentProcessDefinitionList;
}
}

View File

@ -179,6 +179,8 @@ PROCESS_INSTANCE_END_TIME=process instance end time
PROCESS_INSTANCE_SIZE=process instance size
PROCESS_INSTANCE_PRIORITY=process instance priority
EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads
DRY_RUN=dry run
COMPLEMENT_DEPENDENT_MODE=complement dependent mode
UPDATE_SCHEDULE_NOTES=update schedule
SCHEDULE_ID=schedule id
ONLINE_SCHEDULE_NOTES=online schedule

View File

@ -165,6 +165,8 @@ RECEIVERS_CC=收件人(抄送)
WORKER_GROUP_ID=Worker Server分组ID
PROCESS_INSTANCE_PRIORITY=流程实例优先级
EXPECTED_PARALLELISM_NUMBER=补数任务自定义并行度
DRY_RUN=是否空跑
COMPLEMENT_DEPENDENT_MODE=补数依赖的类型
UPDATE_SCHEDULE_NOTES=更新定时
SCHEDULE_ID=定时ID
ONLINE_SCHEDULE_NOTES=定时上线

View File

@ -66,6 +66,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
final ImmutableMap<String, String> startParams = ImmutableMap.of("start", "params");
final Integer expectedParallelismNumber = 6;
final int dryRun = 7;
final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE;
final JsonObject expectResponseContent = gson
.fromJson("{\"code\":0,\"msg\":\"success\",\"data\":\"Test Data\",\"success\":true,\"failed\":false}"
@ -102,7 +103,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun)))
eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(complementDependentMode)))
.thenReturn(executeServiceResult);
//When
@ -141,7 +142,8 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult);
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode)
@ -178,7 +180,8 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun))).thenReturn(executeServiceResult);
eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode)
@ -203,7 +206,8 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(null), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
eq(0), eq(null), eq(null), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0))).thenReturn(executeServiceResult);
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode)

View File

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.service.impl.ExecutorServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@ -166,7 +167,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@ -184,7 +186,8 @@ public class ExecutorServiceTest {
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@ -202,7 +205,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}
@ -219,7 +223,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}
@ -236,7 +241,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP,100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
@ -254,7 +260,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 15, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
@ -269,7 +276,8 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L,110, null, 0, Constants.DRY_RUN_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* task node depend type
*/
public enum ComplementDependentMode {
/**
* 0 off mode
* 1 run complement data with all dependent process
*/
OFF_MODE(0,"off mode"),
ALL_DEPENDENT(1,"all dependent");
ComplementDependentMode(int code, String desc) {
this.code = code;
this.desc = desc;
}
@EnumValue
private final int code;
private final String desc;
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.List;
/**
* dependent process definition
*/
public class DependentProcessDefinition {
/**
* process definition code
*/
private long processDefinitionCode;
/**
* process definition name
*/
private String processDefinitionName;
/**
* task definition name
*/
private long taskDefinitionCode;
/**
* task definition params
*/
private String taskParams;
/**
* schedule worker group
*/
private String workerGroup;
/**
* get dependent cycle
* @return CycleEnum
*/
public CycleEnum getDependentCycle() {
DependentParameters dependentParameters = this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
for (DependentItem dependentItem : dependentItemList) {
if (this.getProcessDefinitionCode() == dependentItem.getDefinitionCode()) {
return cycle2CycleEnum(dependentItem.getCycle());
}
}
}
return CycleEnum.DAY;
}
public CycleEnum cycle2CycleEnum(String cycle) {
CycleEnum cycleEnum = null;
switch (cycle) {
case "day":
cycleEnum = CycleEnum.DAY;
break;
case "hour":
cycleEnum = CycleEnum.HOUR;
break;
case "week":
cycleEnum = CycleEnum.WEEK;
break;
case "month":
cycleEnum = CycleEnum.MONTH;
break;
default:
break;
}
return cycleEnum;
}
public DependentParameters getDependentParameters() {
return JSONUtils.parseObject(getDependence(), DependentParameters.class);
}
public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}
public String getProcessDefinitionName() {
return this.processDefinitionName;
}
public void setprocessDefinitionName(String name) {
this.processDefinitionName = name;
}
public long getProcessDefinitionCode() {
return this.processDefinitionCode;
}
public void setProcessDefinitionCode(long code) {
this.processDefinitionCode = code;
}
public long getTaskDefinitionCode() {
return this.taskDefinitionCode;
}
public void setTaskDefinitionCode(long code) {
this.taskDefinitionCode = code;
}
public String getTaskParams() {
return this.taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public String getWorkerGroup() {
return this.workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
}

View File

@ -86,4 +86,12 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
* @return schedule
*/
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query worker group list by process definition code
*
* @param processDefinitionCodeList processDefinitionCodeList
* @return schedule
*/
List<Schedule> querySchedulesByProcessDefinitionCodes(@Param("processDefinitionCodeList") List<Long> processDefinitionCodeList);
}

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
@ -69,4 +70,12 @@ public interface WorkFlowLineageMapper {
*/
List<ProcessLineage> queryProcessLineageByCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
/**
* query process definition by name
*
* @return dependent process definition
*/
List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(@Param("code") long code);
}

View File

@ -70,6 +70,17 @@
from t_ds_schedules
where process_definition_code = #{processDefinitionCode}
</select>
<select id="querySchedulesByProcessDefinitionCodes" resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select
<include refid="baseSql"/>
from t_ds_schedules
where process_definition_code in
<foreach collection="processDefinitionCodeList" item="code" index="index" open="(" close=")" separator=",">
#{code}
</foreach>
</select>
<select id="queryReleaseSchedulerListByProcessDefinitionCode"
resultType="org.apache.dolphinscheduler.dao.entity.Schedule">
select

View File

@ -88,4 +88,21 @@
where project_code = #{projectCode}
and process_definition_code = #{processDefinitionCode}
</select>
<select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="DependentProcessDefinition">
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name
,a.code AS task_definition_code
,a.task_params
FROM
t_ds_task_definition a
JOIN t_ds_process_task_relation b ON a.code = b.pre_task_code and a.version = b.pre_task_version
JOIN t_ds_process_definition c ON c.code = b.process_definition_code AND c.version = b.process_definition_version AND c.project_code = b.project_code
WHERE 1=1
<if test="code != null and code != ''">
AND a.task_params LIKE concat('%', #{code}, '%')
</if>
AND a.task_type = 'DEPENDENT'
</select>
</mapper>

View File

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.service.TenantService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@ -94,6 +95,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
private static final int DEFAULT_DRY_RUN = 0;
private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@ -341,7 +343,8 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
timeout,
null,
null,
DEFAULT_DRY_RUN
DEFAULT_DRY_RUN,
COMPLEMENT_DEPENDENT_MODE
);
}

View File

@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.DqComparisonType;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
import org.apache.dolphinscheduler.dao.entity.DqRule;
@ -114,6 +115,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@ -142,6 +144,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -256,6 +259,9 @@ public class ProcessService {
@Autowired
private TaskGroupMapper taskGroupMapper;
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@ -1900,6 +1906,28 @@ public class ProcessService {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
/**
* query Schedule by processDefinitionCode
*
* @param processDefinitionCodeList processDefinitionCodeList
* @see Schedule
*/
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList = scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
Schedule::getWorkerGroup));
}
/**
* query dependent process definition by process definition code
*
* @param processDefinitionCode processDefinitionCode
* @see DependentProcessDefinition
*/
public List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode) {
return workFlowLineageMapper.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode);
}
/**
* query need failover process instance
*