From 83d53505de6cbd13fb988559dc409091a8d69264 Mon Sep 17 00:00:00 2001 From: Dean Wong Date: Tue, 12 Jan 2021 09:13:27 +0800 Subject: [PATCH] [Feature-#130] pass global param values when starting new process instance (#4372) * [DS-130][feat] pass global param values when starting new process instance add optional param for start-process-instance api reuse command_param in command table for persistence overload curingGlobalParams function in ParameterUtils not adapt the UI code yet * change import order * support datetime expression * print start params * (fix) avoid npe when cmdParam is null Change-Id: I3b4c4b5fa1df316ff221e27146e45d7d4d3a404e --- .../api/controller/ExecutorController.java | 16 ++++++++++----- .../api/service/ExecutorService.java | 13 +++++++++--- .../api/service/ExecutorService2Test.java | 16 +++++++-------- .../dolphinscheduler/common/Constants.java | 2 ++ .../common/utils/ParameterUtilsTest.java | 20 +++++++++++++++++++ .../service/process/ProcessService.java | 19 ++++++++++++++++++ .../service/process/ProcessServiceTest.java | 15 +++++++++++++- 7 files changed, 84 insertions(+), 17 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 20f4285ffa..b093483df1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.User; import io.swagger.annotations.*; import org.apache.dolphinscheduler.common.enums.*; @@ -107,21 +108,26 @@ public class ExecutorController extends BaseController { @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, - @RequestParam(value = "timeout", required = false) Integer timeout) throws ParseException { + @RequestParam(value = "timeout", required = false) Integer timeout, + @RequestParam(value = "startParams", required = false) String startParams) throws ParseException { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " - + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}", + + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, " + + "startParams: {}", loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy, startNodeList, taskDependType, warningType, workerGroup, receivers, receiversCc, runMode, processInstancePriority, - workerGroup, timeout); + workerGroup, timeout, startParams); if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; } - + Map startParamMap = null; + if (startParams != null) { + startParamMap = JSONUtils.toMap(startParams); + } Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, - warningGroupId, receivers, receiversCc, runMode, processInstancePriority, workerGroup, timeout); + warningGroupId, receivers, receiversCc, runMode, processInstancePriority, workerGroup, timeout, startParamMap); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 7a0fd0f845..f53d9026f9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; import org.apache.dolphinscheduler.api.enums.ExecuteType; @@ -112,6 +113,7 @@ public class ExecutorService extends BaseService { * @param workerGroup worker group name * @param runMode run mode * @param timeout timeout + * @param startParams the global param values which pass to new process instance * @return execute process instance code * @throws ParseException Parse Exception */ @@ -120,7 +122,8 @@ public class ExecutorService extends BaseService { FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, String receivers, String receiversCc, RunMode runMode, - Priority processInstancePriority, String workerGroup, Integer timeout) throws ParseException { + Priority processInstancePriority, String workerGroup, Integer timeout, + Map startParams) throws ParseException { Map result = new HashMap<>(); // timeout is invalid if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { @@ -157,7 +160,7 @@ public class ExecutorService extends BaseService { */ int create = this.createCommand(commandType, processDefinitionId, taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode, processInstancePriority, workerGroup); + warningGroupId, runMode, processInstancePriority, workerGroup, startParams); if (create > 0) { /** * according to the process definition ID updateProcessInstance and CC recipient @@ -502,7 +505,8 @@ public class ExecutorService extends BaseService { TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, int executorId, int warningGroupId, - RunMode runMode, Priority processInstancePriority, String workerGroup) throws ParseException { + RunMode runMode, Priority processInstancePriority, String workerGroup, + Map startParams) throws ParseException { /** * instantiate command schedule instance @@ -529,6 +533,9 @@ public class ExecutorService extends BaseService { if (warningType != null) { command.setWarningType(warningType); } + if (startParams != null && startParams.size() > 0) { + cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams)); + } command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setExecutorId(executorId); command.setWarningGroupId(warningGroupId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 09b9f7d2e5..93dc3bc727 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -149,7 +149,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } catch (Exception e) { @@ -169,7 +169,7 @@ public class ExecutorService2Test { null, "n1,n2", null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } catch (Exception e) { @@ -190,7 +190,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); } catch (Exception e) { @@ -210,7 +210,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); } catch (Exception e) { @@ -230,7 +230,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); } catch (Exception e) { @@ -250,7 +250,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); } catch (Exception e) { @@ -260,14 +260,14 @@ public class ExecutorService2Test { @Test public void testNoMsterServers() throws ParseException { - Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList()); + Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<>()); Map result = executorService.execProcessInstance(loginUser, projectName, processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index cd6008df5a..17fa753a37 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -458,6 +458,8 @@ public final class Constants { public static final String CMD_PARAM_START_NODE_NAMES = "StartNodeNameList"; + public static final String CMD_PARAM_START_PARAMS = "StartParams"; + /** * complement data start date */ diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index 796a5faa45..9ebfd69430 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -132,6 +132,26 @@ public class ParameterUtilsTest { String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList)); + + Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, ""); + globalParamList.add(testStartParamProperty); + Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]"); + globalParamList.add(testStartParam2Property); + globalParamMap.put("testStartParam", ""); + globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]"); + + Map startParamMap = new HashMap<>(2); + startParamMap.put("testStartParam", "$[yyyyMMdd]"); + + for (Map.Entry param : globalParamMap.entrySet()) { + String val = startParamMap.get(param.getKey()); + if (val != null) { + param.setValue(val); + } + } + + String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); + Assert.assertTrue(result6.contains("20191220")); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 14990042bc..d77eb79ba3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -558,6 +558,25 @@ public class ProcessService { processInstance.setCommandStartTime(command.getStartTime()); processInstance.setLocations(processDefinition.getLocations()); processInstance.setConnects(processDefinition.getConnects()); + + // get start params from command param + Map startParamMap = null; + if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) { + String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS); + startParamMap = JSONUtils.toMap(startParamJson); + } + + // set start param into global params + if (startParamMap != null && startParamMap.size() > 0 + && processDefinition.getGlobalParamMap() != null) { + for (Map.Entry param : processDefinition.getGlobalParamMap().entrySet()) { + String val = startParamMap.get(param.getKey()); + if (val != null) { + param.setValue(val); + } + } + } + // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processDefinition.getGlobalParamMap(), diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 4ac91f017c..db83e71725 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; import org.apache.dolphinscheduler.common.Constants; @@ -234,13 +235,14 @@ public class ProcessServiceTest { processDefinition.setId(123); processDefinition.setName("test"); processDefinition.setVersion(1); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" + processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); + processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"); ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(222); Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition); @@ -265,6 +267,17 @@ public class ProcessServiceTest { command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandType(CommandType.REPEAT_RUNNING); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4)); + + Command command5 = new Command(); + command5.setProcessDefinitionId(123); + HashMap startParams = new HashMap<>(); + startParams.put("startParam1", "testStartParam1"); + HashMap commandParams = new HashMap<>(); + commandParams.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams)); + command5.setCommandParam(JSONUtils.toJsonString(commandParams)); + command5.setCommandType(CommandType.START_PROCESS); + ProcessInstance processInstance1 = processService.handleCommand(logger, host, validThreadNum, command5); + Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); } @Test