[Refactor] Migrate all command-related interface functions from ProcessServiceImpl (#12474)

* migrate all command-related interface functions to CommonService
This commit is contained in:
Yann Ann 2022-10-22 12:39:25 +08:00 committed by GitHub
parent 1384d8f4fa
commit b936b882bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 788 additions and 475 deletions

View File

@ -77,6 +77,7 @@ import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeComman
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -130,6 +131,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private CommandService commandService;
@Autowired @Autowired
private ProcessInstanceDao processInstanceDao; private ProcessInstanceDao processInstanceDao;
@ -626,7 +630,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessDefinitionVersion(processVersion); command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId); command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag); command.setTestFlag(testFlag);
if (!processService.verifyIsNeedCreateCommand(command)) { if (!commandService.verifyIsNeedCreateCommand(command)) {
logger.warn( logger.warn(
"Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId); processDefinitionCode, processVersion, instanceId);
@ -635,7 +639,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} }
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
int create = processService.createCommand(command); int create = commandService.createCommand(command);
if (create > 0) { if (create > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
@ -784,7 +788,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
} else { } else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
return processService.createCommand(command); return commandService.createCommand(command);
} }
} }
@ -824,26 +828,28 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command); createCount = commandService.createCommand(command);
if (createCount > 0) if (createCount > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}", logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else } else {
logger.error("Create {} command error, processDefinitionCode:{}", logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
} }
if (startDate != null && endDate != null) { if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command); createCount = commandService.createCommand(command);
if (createCount > 0) if (createCount > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}", logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else } else {
logger.error("Create {} command error, processDefinitionCode:{}", logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
// dependent process definition // dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode()); command.getProcessDefinitionCode());
@ -904,12 +910,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
DateUtils.dateToString(listDate.get(endDateIndex))); DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0) if (commandService.createCommand(command) > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}", logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else } else {
logger.error("Create {} command error, processDefinitionCode:{}", logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info( logger.info(
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
@ -937,12 +944,13 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command); logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0) if (commandService.createCommand(command) > 0) {
logger.info("Create {} command complete, processDefinitionCode:{}", logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else } else {
logger.error("Create {} command error, processDefinitionCode:{}", logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode()); command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
} }
} }
} }
@ -985,7 +993,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode())); cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam)); dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating complement dependent command, commandInfo:{}.", command); logger.info("Creating complement dependent command, commandInfo:{}.", command);
dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand); dependentProcessDefinitionCreateCount += commandService.createCommand(dependentCommand);
} }
return dependentProcessDefinitionCreateCount; return dependentProcessDefinitionCreateCount;

View File

@ -57,6 +57,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
@ -102,6 +103,9 @@ public class ExecutorServiceTest {
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private CommandService commandService;
@Mock @Mock
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@ -194,8 +198,8 @@ public class ExecutorServiceTest {
.thenReturn(checkProjectAndAuth()); .thenReturn(checkProjectAndAuth());
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition); Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant()); Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); doReturn(1).when(commandService).createCommand(argThat(c -> c.getId() == null));
doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); doReturn(0).when(commandService).createCommand(argThat(c -> c.getId() != null));
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)) Mockito.when(processService.findProcessInstanceDetailById(processInstanceId))
.thenReturn(Optional.ofNullable(processInstance)); .thenReturn(Optional.ofNullable(processInstance));
@ -230,7 +234,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
} }
@ -253,7 +257,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
} }
@ -320,7 +324,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class)); verify(commandService, times(0)).createCommand(any(Command.class));
} }
/** /**
@ -342,7 +346,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(commandService, times(1)).createCommand(any(Command.class));
} }
/** /**
@ -364,7 +368,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class)); verify(commandService, times(31)).createCommand(any(Command.class));
} }
@ -387,7 +391,7 @@ public class ExecutorServiceTest {
Constants.TEST_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class)); verify(commandService, times(15)).createCommand(any(Command.class));
} }
@ -411,7 +415,7 @@ public class ExecutorServiceTest {
@Test @Test
public void testExecuteRepeatRunning() { public void testExecuteRepeatRunning() {
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth()); .thenReturn(checkProjectAndAuth());
Map<String, Object> result = Map<String, Object> result =
@ -421,7 +425,7 @@ public class ExecutorServiceTest {
@Test @Test
public void testOfTestRun() { public void testOfTestRun() {
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); Mockito.when(commandService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth()); .thenReturn(checkProjectAndAuth());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode, Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,

View File

@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils; import org.apache.dolphinscheduler.service.utils.LoggerUtils;
@ -66,6 +67,9 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private CommandService commandService;
@Autowired @Autowired
private ProcessInstanceDao processInstanceDao; private ProcessInstanceDao processInstanceDao;
@ -172,6 +176,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
"The workflow instance is already been cached, this case shouldn't be happened"); "The workflow instance is already been cached, this case shouldn't be happened");
} }
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
commandService,
processService, processService,
processInstanceDao, processInstanceDao,
nettyExecutorManager, nettyExecutorManager,
@ -225,7 +230,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Master handle command {} error ", command.getId(), e); logger.error("Master handle command {} error ", command.getId(), e);
processService.moveToErrorCommand(command, e.toString()); commandService.moveToErrorCommand(command, e.toString());
} finally { } finally {
latch.countDown(); latch.countDown();
} }
@ -254,7 +259,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
int pageNumber = 0; int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum(); int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result = final List<Command> result =
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) { if (CollectionUtils.isNotEmpty(result)) {
logger.info( logger.info(
"Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}", "Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",

View File

@ -76,6 +76,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@ -127,6 +128,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final ProcessService processService; private final ProcessService processService;
private final CommandService commandService;
private ProcessInstanceDao processInstanceDao; private ProcessInstanceDao processInstanceDao;
private final ProcessAlertManager processAlertManager; private final ProcessAlertManager processAlertManager;
@ -233,6 +236,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/ */
public WorkflowExecuteRunnable( public WorkflowExecuteRunnable(
@NonNull ProcessInstance processInstance, @NonNull ProcessInstance processInstance,
@NonNull CommandService commandService,
@NonNull ProcessService processService, @NonNull ProcessService processService,
@NonNull ProcessInstanceDao processInstanceDao, @NonNull ProcessInstanceDao processInstanceDao,
@NonNull NettyExecutorManager nettyExecutorManager, @NonNull NettyExecutorManager nettyExecutorManager,
@ -241,6 +245,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
@NonNull StateWheelExecuteThread stateWheelExecuteThread, @NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) { @NonNull CuringParamsService curingParamsService) {
this.processService = processService; this.processService = processService;
this.commandService = commandService;
this.processInstanceDao = processInstanceDao; this.processInstanceDao = processInstanceDao;
this.processInstance = processInstance; this.processInstance = processInstance;
this.nettyExecutorManager = nettyExecutorManager; this.nettyExecutorManager = nettyExecutorManager;
@ -657,7 +662,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setProcessInstanceId(0); command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
command.setTestFlag(processInstance.getTestFlag()); command.setTestFlag(processInstance.getTestFlag());
return processService.createCommand(command); return commandService.createCommand(command);
} }
private boolean needComplementProcess() { private boolean needComplementProcess() {
@ -750,7 +755,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setProcessDefinitionCode(processDefinition.getCode()); command.setProcessDefinitionCode(processDefinition.getCode());
command.setProcessDefinitionVersion(processDefinition.getVersion()); command.setProcessDefinitionVersion(processDefinition.getVersion());
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command); commandService.createCommand(command);
} }
/** /**

View File

@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -72,6 +73,8 @@ public class WorkflowExecuteRunnableTest {
private ProcessService processService; private ProcessService processService;
private CommandService commandService;
private ProcessInstanceDao processInstanceDao; private ProcessInstanceDao processInstanceDao;
private MasterConfig config; private MasterConfig config;
@ -90,6 +93,7 @@ public class WorkflowExecuteRunnableTest {
config = new MasterConfig(); config = new MasterConfig();
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
commandService = Mockito.mock(CommandService.class);
processInstanceDao = Mockito.mock(ProcessInstanceDao.class); processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
processInstance = Mockito.mock(ProcessInstance.class); processInstance = Mockito.mock(ProcessInstance.class);
Map<String, String> cmdParam = new HashMap<>(); Map<String, String> cmdParam = new HashMap<>();
@ -105,7 +109,8 @@ public class WorkflowExecuteRunnableTest {
NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class); NettyExecutorManager nettyExecutorManager = Mockito.mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class); ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class);
workflowExecuteThread = Mockito.spy( workflowExecuteThread = Mockito.spy(
new WorkflowExecuteRunnable(processInstance, processService, processInstanceDao, nettyExecutorManager, new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao,
nettyExecutorManager,
processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true); dag.setAccessible(true);

View File

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils; import org.apache.dolphinscheduler.scheduler.quartz.utils.QuartzTaskUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -49,6 +50,9 @@ public class ProcessScheduleTask extends QuartzJobBean {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private CommandService commandService;
@Counted(value = "ds.master.quartz.job.executed") @Counted(value = "ds.master.quartz.job.executed")
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override @Override
@ -100,7 +104,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
command.setProcessInstancePriority(schedule.getProcessInstancePriority()); command.setProcessInstancePriority(schedule.getProcessInstancePriority());
command.setProcessDefinitionVersion(processDefinition.getVersion()); command.setProcessDefinitionVersion(processDefinition.getVersion());
processService.createCommand(command); commandService.createCommand(command);
} }
private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) { private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.command;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.List;
/**
* Command Service
*/
public interface CommandService {
/**
* Save error command, and delete original command. If the given command has already been moved into error command,
* will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
* @param command command
* @param message message
*/
void moveToErrorCommand(Command command, String message);
/**
* Create new command
* @param command command
* @return result
*/
int createCommand(Command command);
/**
* Get command page
* @param pageSize page size
* @param pageNumber page number
* @param masterCount master count
* @param thisMasterSlot master slot
* @return command page
*/
List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
/**
* check the input command exists in queue list
*
* @param command command
* @return create command result
*/
boolean verifyIsNeedCreateCommand(Command command);
/**
* create recovery waiting thread command when thread pool is not enough for the process instance.
* sub work process instance need not create recovery command.
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param processInstance processInstance
*/
void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
/**
* create sub work process command
* @param parentProcessInstance parent process instance
* @param childInstance child process instance
* @param instanceMap process instance map
* @param task task instance
* @return command
*/
Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task);
}

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.command;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.utils.ParamUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/**
* Command Service implementation
*/
@Component
public class CommandServiceImpl implements CommandService {
private final Logger logger = LoggerFactory.getLogger(CommandServiceImpl.class);
@Autowired
private ErrorCommandMapper errorCommandMapper;
@Autowired
private CommandMapper commandMapper;
@Autowired
private ScheduleMapper scheduleMapper;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@Override
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
this.commandMapper.deleteById(command.getId());
}
@Override
@Counted("ds.workflow.create.command.count")
public int createCommand(Command command) {
int result = 0;
if (command == null) {
return result;
}
// add command timezone
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
if (schedule != null) {
Map<String, String> commandParams =
StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
: new HashMap<>();
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
command.setId(null);
result = commandMapper.insert(command);
return result;
}
@Override
public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
}
return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
}
@Override
public boolean verifyIsNeedCreateCommand(Command command) {
boolean isNeedCreate = true;
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
CommandType commandType = command.getCommandType();
if (!cmdTypeMap.containsKey(commandType)) {
return true;
}
ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
List<Command> commands = commandMapper.selectList(null);
// for all commands
for (Command tmpCommand : commands) {
if (!cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
continue;
}
ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
if (tempObj != null
&& processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
isNeedCreate = false;
break;
}
}
return isNeedCreate;
}
@Override
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
// sub process doesn't need to create wait command
if (processInstance.getIsSubProcess() == Flag.YES) {
if (originCommand != null) {
commandMapper.deleteById(originCommand.getId());
}
return;
}
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
CommandType.RECOVER_WAITING_THREAD,
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
processInstance.getProcessDefinition().getCode(),
JSONUtils.toJsonString(cmdParam),
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(),
processInstance.getProcessInstancePriority(),
processInstance.getDryRun(),
processInstance.getId(),
processInstance.getProcessDefinitionVersion(),
processInstance.getTestFlag());
upsertCommand(command);
return;
}
// update the command time if current command is recover from waiting
if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
originCommand.setUpdateTime(new Date());
upsertCommand(originCommand);
} else {
// delete old command and create new waiting thread command
commandMapper.deleteById(originCommand.getId());
originCommand.setId(0);
originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
originCommand.setUpdateTime(new Date());
originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
upsertCommand(originCommand);
}
}
private int upsertCommand(@NotNull Command command) {
if (command.getId() != null) {
return commandMapper.updateById(command);
} else {
return commandMapper.insert(command);
}
}
@Override
public Command createSubProcessCommand(ProcessInstance parentProcessInstance, ProcessInstance childInstance,
ProcessInstanceMap instanceMap, TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
long childDefineCode = 0L;
if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
try {
childDefineCode =
Long.parseLong(
String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
} catch (NumberFormatException nfe) {
logger.error("processDefinitionCode is not a number", nfe);
return null;
}
}
ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = ParamUtils.getGlobalParamMap(task.getVarPool());
Map<String, String> fatherParams = new HashMap<>();
if (CollectionUtils.isNotEmpty(allParam)) {
for (Property info : allParam) {
if (Direct.OUT == info.getDirect()) {
continue;
}
fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
}
}
String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
int subProcessInstanceId =
childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
subProcessDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun(),
subProcessInstanceId,
subProcessDefinition.getVersion(),
parentProcessInstance.getTestFlag());
}
/**
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*/
private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
if (childInstance == null) {
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
}
}

View File

@ -68,16 +68,6 @@ public interface ProcessService {
ProcessInstance handleCommand(String host, ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException; Command command) throws CronParseException, CodeGenerateUtils.CodeGenerateException;
void moveToErrorCommand(Command command, String message);
int createCommand(Command command);
List<Command> findCommandPage(int pageSize, int pageNumber);
List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot);
boolean verifyIsNeedCreateCommand(Command command);
Optional<ProcessInstance> findProcessInstanceDetailById(int processId); Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode); List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);
@ -100,8 +90,6 @@ public interface ProcessService {
void recurseFindSubProcess(long parentCode, List<Long> ids); void recurseFindSubProcess(long parentCode, List<Long> ids);
void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance);
Tenant getTenantForProcess(int tenantId, int userId); Tenant getTenantForProcess(int tenantId, int userId);
Environment findEnvironmentByCode(Long environmentCode); Environment findEnvironmentByCode(Long environmentCode);
@ -116,19 +104,10 @@ public interface ProcessService {
void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task); void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task);
Map<String, String> getGlobalParamMap(String globalParams);
Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task);
TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance); TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance);
TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance); TaskExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance);
int saveCommand(Command command);
boolean saveTaskInstance(TaskInstance taskInstance); boolean saveTaskInstance(TaskInstance taskInstance);
boolean createTaskInstance(TaskInstance taskInstance); boolean createTaskInstance(TaskInstance taskInstance);

View File

@ -22,8 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
@ -61,7 +59,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry; import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue; import org.apache.dolphinscheduler.dao.entity.DqTaskStatisticsValue;
import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -126,6 +123,7 @@ import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
@ -138,12 +136,10 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -169,7 +165,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
@ -285,6 +280,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired @Autowired
private LogClient logClient; private LogClient logClient;
@Autowired
private CommandService commandService;
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
@ -300,7 +298,7 @@ public class ProcessServiceImpl implements ProcessService {
// cannot construct process instance, return null // cannot construct process instance, return null
if (processInstance == null) { if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command); logger.error("scan command, command parameter is error: {}", command);
moveToErrorCommand(command, "process instance is null"); commandService.moveToErrorCommand(command, "process instance is null");
return null; return null;
} }
processInstance.setCommandType(command.getCommandType()); processInstance.setCommandType(command.getCommandType());
@ -387,101 +385,6 @@ public class ProcessServiceImpl implements ProcessService {
} }
} }
/**
* Save error command, and delete original command. If the given command has already been moved into error command,
* will throw {@link java.sql.SQLIntegrityConstraintViolationException ).
*
* @param command command
* @param message message
*/
@Override
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
this.commandMapper.deleteById(command.getId());
}
/**
* insert one command
*
* @param command command
* @return create result
*/
@Override
@Counted("ds.workflow.create.command.count")
public int createCommand(Command command) {
int result = 0;
if (command == null) {
return result;
}
// add command timezone
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(command.getProcessDefinitionCode());
if (schedule != null) {
Map<String, String> commandParams =
StringUtils.isNotBlank(command.getCommandParam()) ? JSONUtils.toMap(command.getCommandParam())
: new HashMap<>();
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
command.setId(null);
result = commandMapper.insert(command);
return result;
}
/**
* get command page
*/
@Override
public List<Command> findCommandPage(int pageSize, int pageNumber) {
return commandMapper.queryCommandPage(pageSize, pageNumber * pageSize);
}
/**
* get command page
*/
@Override
public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {
if (masterCount <= 0) {
return Lists.newArrayList();
}
return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
}
/**
* check the input command exists in queue list
*
* @param command command
* @return create command result
*/
@Override
public boolean verifyIsNeedCreateCommand(Command command) {
boolean isNeedCreate = true;
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
CommandType commandType = command.getCommandType();
if (cmdTypeMap.containsKey(commandType)) {
ObjectNode cmdParamObj = JSONUtils.parseObject(command.getCommandParam());
int processInstanceId = cmdParamObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt();
List<Command> commands = commandMapper.selectList(null);
// for all commands
for (Command tmpCommand : commands) {
if (cmdTypeMap.containsKey(tmpCommand.getCommandType())) {
ObjectNode tempObj = JSONUtils.parseObject(tmpCommand.getCommandParam());
if (tempObj != null
&& processInstanceId == tempObj.path(CMD_PARAM_RECOVER_PROCESS_ID_STRING).asInt()) {
isNeedCreate = false;
break;
}
}
}
}
return isNeedCreate;
}
/** /**
* find process instance detail by id * find process instance detail by id
* *
@ -670,66 +573,6 @@ public class ProcessServiceImpl implements ProcessService {
} }
} }
/**
* create recovery waiting thread command when thread pool is not enough for the process instance.
* sub work process instance need not to create recovery command.
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param processInstance processInstance
*/
@Override
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
// sub process doesnot need to create wait command
if (processInstance.getIsSubProcess() == Flag.YES) {
if (originCommand != null) {
commandMapper.deleteById(originCommand.getId());
}
return;
}
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(Constants.CMD_PARAM_RECOVERY_WAITING_THREAD, String.valueOf(processInstance.getId()));
// process instance quit by "waiting thread" state
if (originCommand == null) {
Command command = new Command(
CommandType.RECOVER_WAITING_THREAD,
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
processInstance.getProcessDefinition().getCode(),
JSONUtils.toJsonString(cmdParam),
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
processInstance.getScheduleTime(),
processInstance.getWorkerGroup(),
processInstance.getEnvironmentCode(),
processInstance.getProcessInstancePriority(),
processInstance.getDryRun(),
processInstance.getId(),
processInstance.getProcessDefinitionVersion(),
processInstance.getTestFlag());
saveCommand(command);
return;
}
// update the command time if current command if recover from waiting
if (originCommand.getCommandType() == CommandType.RECOVER_WAITING_THREAD) {
originCommand.setUpdateTime(new Date());
saveCommand(originCommand);
} else {
// delete old command and create new waiting thread command
commandMapper.deleteById(originCommand.getId());
originCommand.setId(0);
originCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
originCommand.setUpdateTime(new Date());
originCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
originCommand.setProcessInstancePriority(processInstance.getProcessInstancePriority());
saveCommand(originCommand);
}
}
/** /**
* get schedule time from command * get schedule time from command
* *
@ -1445,105 +1288,18 @@ public class ProcessServiceImpl implements ProcessService {
logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId()); logger.info("sub process instance {} status is success, so skip creating command", childInstance.getId());
return; return;
} }
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task); Command subProcessCommand =
commandService.createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
if (subProcessCommand == null) {
logger.error("create sub process command failed, so skip creating command");
return;
}
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode()); updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionCode());
initSubInstanceState(childInstance); initSubInstanceState(childInstance);
createCommand(subProcessCommand); commandService.createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand); logger.info("sub process command created: {} ", subProcessCommand);
} }
/**
* complement data needs transform parent parameter to child.
*/
protected String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
Map<String, String> fatherParams) {
// set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
if (parentProcessInstance.isComplementData()) {
Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
}
if (StringUtils.isNotEmpty(scheduleTime)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
}
processMapStr = JSONUtils.toJsonString(cmdParam);
}
if (MapUtils.isNotEmpty(fatherParams)) {
cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
processMapStr = JSONUtils.toJsonString(cmdParam);
}
return processMapStr;
}
@Override
public Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String, String> globalParamMap = new HashMap<>();
if (!Strings.isNullOrEmpty(globalParams)) {
propList = JSONUtils.toList(globalParams, Property.class);
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return globalParamMap;
}
/**
* create sub work process command
*/
@Override
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
long childDefineCode = 0L;
if (subProcessParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)) {
childDefineCode =
Long.parseLong(String.valueOf(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)));
}
ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode);
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = this.getGlobalParamMap(task.getVarPool());
Map<String, String> fatherParams = new HashMap<>();
if (CollectionUtils.isNotEmpty(allParam)) {
for (Property info : allParam) {
if (Direct.OUT == info.getDirect()) {
continue;
}
fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
}
}
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams);
int subProcessInstanceId =
childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
subProcessDefinition.getCode(),
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun(),
subProcessInstanceId,
subProcessDefinition.getVersion(),
parentProcessInstance.getTestFlag());
}
/** /**
* initialize sub work flow state * initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure' * child instance state would be initialized when 'recovery from pause/stop/failure'
@ -1681,21 +1437,6 @@ public class ProcessServiceImpl implements ProcessService {
return true; return true;
} }
/**
* insert or update command
*
* @param command command
* @return save command result
*/
@Override
public int saveCommand(Command command) {
if (command.getId() != null) {
return commandMapper.updateById(command);
} else {
return commandMapper.insert(command);
}
}
/** /**
* insert or update task instance * insert or update task instance
* *
@ -2112,7 +1853,7 @@ public class ProcessServiceImpl implements ProcessService {
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS); cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority()); cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
cmd.setTestFlag(processInstance.getTestFlag()); cmd.setTestFlag(processInstance.getTestFlag());
createCommand(cmd); commandService.createCommand(cmd);
} }
/** /**

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
/**
* Param Utility class
*/
public class ParamUtils {
/**
* convert globalParams string to global parameter map
* @param globalParams globalParams
* @return parameter map
*/
public static Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String, String> globalParamMap = new HashMap<>();
if (!Strings.isNullOrEmpty(globalParams)) {
propList = JSONUtils.toList(globalParams, Property.class);
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return globalParamMap;
}
/**
* Get sub workflow parameters
* @param instanceMap process instance map
* @param parentProcessInstance parent process instance
* @param fatherParams fatherParams
* @return sub workflow parameters
*/
public static String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,
Map<String, String> fatherParams) {
// set sub work process command
String processMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);
if (parentProcessInstance.isComplementData()) {
Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
String scheduleTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
}
if (StringUtils.isNotEmpty(scheduleTime)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
}
processMapStr = JSONUtils.toJsonString(cmdParam);
}
if (MapUtils.isNotEmpty(fatherParams)) {
cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
processMapStr = JSONUtils.toJsonString(cmdParam);
}
return processMapStr;
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.command;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.fasterxml.jackson.databind.JsonNode;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class CommandServiceImplTest {
@InjectMocks
private CommandServiceImpl commandService;
@Mock
private CommandMapper commandMapper;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ScheduleMapper scheduleMapper;
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskParams("{\"processDefinitionCode\":10}}");
task.setId(10);
task.setTaskCode(1L);
task.setTaskDefinitionVersion(1);
ProcessInstance childInstance = null;
ProcessInstanceMap instanceMap = new ProcessInstanceMap();
instanceMap.setParentProcessInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command;
// father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
// father history: complement,start failure; child null == command type: complement
String startString = "2020-01-01 00:00:00";
String endString = "2020-01-10 00:00:00";
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
Map<String, String> complementMap = new HashMap<>();
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
Assertions.assertEquals(startString, DateUtils.dateToString(start));
Assertions.assertEquals(endString, DateUtils.dateToString(end));
// father history: start,failure,start failure; child not null == command type: start failure
childInstance = new ProcessInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
@Test
public void testVerifyIsNeedCreateCommand() {
List<Command> commands = new ArrayList<>();
Command command = new Command();
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
commands.add(command);
Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
Assertions.assertFalse(commandService.verifyIsNeedCreateCommand(command));
Command command1 = new Command();
command1.setCommandType(CommandType.REPEAT_RUNNING);
command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command1));
Command command2 = new Command();
command2.setCommandType(CommandType.PAUSE);
Assertions.assertTrue(commandService.verifyIsNeedCreateCommand(command2));
}
@Test
public void testCreateRecoveryWaitingThreadCommand() {
int id = 123;
Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
ProcessInstance subProcessInstance = new ProcessInstance();
subProcessInstance.setIsSubProcess(Flag.YES);
Command originCommand = new Command();
originCommand.setId(id);
commandService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(111);
commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
Command recoverCommand = new Command();
recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
commandService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
Command repeatRunningCommand = new Command();
recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
ProcessInstance subProcessInstance2 = new ProcessInstance();
subProcessInstance2.setId(111);
subProcessInstance2.setIsSubProcess(Flag.NO);
commandService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
}
@Test
public void giveNullOriginCommand_thenCreateRecoveryWaitingThreadCommand_expectNoDelete() {
ProcessInstance subProcessInstance = new ProcessInstance();
subProcessInstance.setIsSubProcess(Flag.NO);
subProcessInstance.setId(111);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(111);
processDefinition.setCode(10L);
subProcessInstance.setProcessDefinition(processDefinition);
subProcessInstance.setWarningGroupId(1);
commandService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
Mockito.verify(commandMapper, Mockito.times(0)).deleteById(anyString());
}
@Test
public void testCreateCommand() {
Command command = new Command();
command.setProcessDefinitionCode(123);
command.setCommandParam("{\"ProcessInstanceId\":222}");
command.setCommandType(CommandType.START_PROCESS);
int mockResult = 1;
Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
int exeMethodResult = commandService.createCommand(command);
Assertions.assertEquals(mockResult, exeMethodResult);
Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
}
@Test
public void testFindCommandPageBySlot() {
int pageSize = 1;
int pageNumber = 0;
int masterCount = 0;
int thisMasterSlot = 2;
List<Command> commandList =
commandService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
Assertions.assertEquals(0, commandList.size());
}
}

View File

@ -24,15 +24,12 @@ import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DqExecuteResult; import org.apache.dolphinscheduler.dao.entity.DqExecuteResult;
@ -42,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@ -63,7 +59,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
@ -91,7 +86,6 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -105,8 +99,6 @@ import org.mockito.quality.Strictness;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
/** /**
* process service test * process service test
*/ */
@ -165,132 +157,12 @@ public class ProcessServiceTest {
@Mock @Mock
private DqComparisonTypeMapper dqComparisonTypeMapper; private DqComparisonTypeMapper dqComparisonTypeMapper;
@Mock
private ScheduleMapper scheduleMapper;
@Mock @Mock
CuringParamsService curingGlobalParamsService; CuringParamsService curingGlobalParamsService;
@Mock @Mock
TaskPluginManager taskPluginManager; TaskPluginManager taskPluginManager;
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskParams("{\"processDefinitionCode\":10}}");
task.setId(10);
task.setTaskCode(1L);
task.setTaskDefinitionVersion(1);
ProcessInstance childInstance = null;
ProcessInstanceMap instanceMap = new ProcessInstanceMap();
instanceMap.setParentProcessInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command;
// father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(10L);
Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition);
Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition);
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
// father history: complement,start failure; child null == command type: complement
String startString = "2020-01-01 00:00:00";
String endString = "2020-01-10 00:00:00";
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
Map<String, String> complementMap = new HashMap<>();
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString);
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText());
Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText());
Assertions.assertEquals(startString, DateUtils.dateToString(start));
Assertions.assertEquals(endString, DateUtils.dateToString(end));
// father history: start,failure,start failure; child not null == command type: start failure
childInstance = new ProcessInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
@Test
public void testVerifyIsNeedCreateCommand() {
List<Command> commands = new ArrayList<>();
Command command = new Command();
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\"}");
commands.add(command);
Mockito.when(commandMapper.selectList(null)).thenReturn(commands);
Assertions.assertFalse(processService.verifyIsNeedCreateCommand(command));
Command command1 = new Command();
command1.setCommandType(CommandType.REPEAT_RUNNING);
command1.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"222\"}");
Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command1));
Command command2 = new Command();
command2.setCommandType(CommandType.PAUSE);
Assertions.assertTrue(processService.verifyIsNeedCreateCommand(command2));
}
@Test
public void testCreateRecoveryWaitingThreadCommand() {
int id = 123;
Mockito.when(commandMapper.deleteById(id)).thenReturn(1);
ProcessInstance subProcessInstance = new ProcessInstance();
subProcessInstance.setIsSubProcess(Flag.YES);
Command originCommand = new Command();
originCommand.setId(id);
processService.createRecoveryWaitingThreadCommand(originCommand, subProcessInstance);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(111);
processService.createRecoveryWaitingThreadCommand(null, subProcessInstance);
Command recoverCommand = new Command();
recoverCommand.setCommandType(CommandType.RECOVER_WAITING_THREAD);
processService.createRecoveryWaitingThreadCommand(recoverCommand, subProcessInstance);
Command repeatRunningCommand = new Command();
recoverCommand.setCommandType(CommandType.REPEAT_RUNNING);
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance);
ProcessInstance subProcessInstance2 = new ProcessInstance();
subProcessInstance2.setId(111);
subProcessInstance2.setIsSubProcess(Flag.NO);
processService.createRecoveryWaitingThreadCommand(repeatRunningCommand, subProcessInstance2);
}
@Test @Test
public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException { public void testHandleCommand() throws CronParseException, CodeGenerateUtils.CodeGenerateException {
@ -789,19 +661,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount()); Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount());
} }
@Test
public void testCreateCommand() {
Command command = new Command();
command.setProcessDefinitionCode(123);
command.setCommandParam("{\"ProcessInstanceId\":222}");
command.setCommandType(CommandType.START_PROCESS);
int mockResult = 1;
Mockito.when(commandMapper.insert(command)).thenReturn(mockResult);
int exeMethodResult = processService.createCommand(command);
Assertions.assertEquals(mockResult, exeMethodResult);
Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
}
@Test @Test
public void testChangeOutParam() { public void testChangeOutParam() {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
@ -887,17 +746,6 @@ public class ProcessServiceTest {
Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId()); Assertions.assertEquals(instance.getId(), taskInstanceByIdList.get(0).getId());
} }
@Test
public void testFindCommandPageBySlot() {
int pageSize = 1;
int pageNumber = 0;
int masterCount = 0;
int thisMasterSlot = 2;
List<Command> commandList =
processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
Assertions.assertEquals(0, commandList.size());
}
@Test @Test
public void testFindLastManualProcessInterval() { public void testFindLastManualProcessInterval() {
long definitionCode = 1L; long definitionCode = 1L;

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ParamUtilsTest {
@Test
public void testGetGlobalParamMap() {
String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]";
Map<String, String> globalParamMap = ParamUtils.getGlobalParamMap(globalParam);
Assertions.assertEquals(globalParamMap.size(), 1);
Assertions.assertEquals(globalParamMap.get("startParam1"), "");
Map<String, String> emptyParamMap = ParamUtils.getGlobalParamMap(null);
Assertions.assertEquals(emptyParamMap.size(), 0);
}
}