diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 93b095ddf8..b5dfc8fa39 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -64,7 +64,7 @@ public class ExecutorController extends BaseController { * @param receiversCc receivers cc * @param runMode run mode * @param processInstancePriority process instance priority - * @param workerGroupId worker group id + * @param workerGroup worker group * @param timeout timeout * @return start process result code */ @@ -82,7 +82,7 @@ public class ExecutorController extends BaseController { @ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ), @ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ), @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ), - @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String",example = "default"), @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"), }) @PostMapping(value = "start-process-instance") @@ -101,15 +101,15 @@ public class ExecutorController extends BaseController { @RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, - @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, + @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "timeout", required = false) Integer timeout) { try { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " - + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}", + + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}", loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, - failureStrategy, startNodeList, taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority, - workerGroupId, timeout); + failureStrategy, startNodeList, taskDependType, warningType, workerGroup,receivers,receiversCc,runMode,processInstancePriority, + workerGroup, timeout); if (timeout == null) { timeout = Constants.MAX_TASK_TIMEOUT; @@ -117,7 +117,7 @@ public class ExecutorController extends BaseController { Map result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, - warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroupId, timeout); + warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroup, timeout); return returnDataList(result); } catch (Exception e) { logger.error(Status.START_PROCESS_INSTANCE_ERROR.getMsg(),e); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java index 96038dcf8c..974dc1bf8b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java @@ -70,7 +70,7 @@ public class SchedulerController extends BaseController { * @param processInstancePriority process instance priority * @param receivers receivers * @param receiversCc receivers cc - * @param workerGroupId worker group id + * @param workerGroup worker group * @return create result code */ @ApiOperation(value = "createSchedule", notes= "CREATE_SCHEDULE_NOTES") @@ -96,15 +96,15 @@ public class SchedulerController extends BaseController { @RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, - @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, + @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}", loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId, - failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId); + failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup); try { Map result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule, - warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId); + warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup); return returnDataList(result); } catch (Exception e) { @@ -124,7 +124,7 @@ public class SchedulerController extends BaseController { * @param warningGroupId warning group id * @param failureStrategy failure strategy * @param receivers receivers - * @param workerGroupId worker group id + * @param workerGroup worker group * @param processInstancePriority process instance priority * @param receiversCc receivers cc * @return update result code @@ -151,16 +151,16 @@ public class SchedulerController extends BaseController { @RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy, @RequestParam(value = "receivers", required = false) String receivers, @RequestParam(value = "receiversCc", required = false) String receiversCc, - @RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId, + @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) { logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " + "failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}", loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy, - receivers, receiversCc, processInstancePriority, workerGroupId); + receivers, receiversCc, processInstancePriority, workerGroup); try { Map result = schedulerService.updateSchedule(loginUser, projectName, id, schedule, - warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroupId); + warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroup); return returnDataList(result); } catch (Exception e) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index 8ec1335442..d7c898a29f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -27,6 +27,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +35,7 @@ import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import springfox.documentation.annotations.ApiIgnore; +import java.util.List; import java.util.Map; /** @@ -46,7 +48,6 @@ public class WorkerGroupController extends BaseController{ private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class); - @Autowired WorkerGroupService workerGroupService; @@ -135,6 +136,7 @@ public class WorkerGroupController extends BaseController{ loginUser.getUserName() ); try { + Map result = workerGroupService.queryAllGroup(); return returnDataList(result); }catch (Exception e){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java index f14d8df097..3e776a5048 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java @@ -96,11 +96,6 @@ public class ProcessMeta { */ private String scheduleProcessInstancePriority; - /** - * worker group id - */ - private Integer scheduleWorkerGroupId; - /** * worker group name */ @@ -229,14 +224,6 @@ public class ProcessMeta { this.scheduleProcessInstancePriority = scheduleProcessInstancePriority; } - public Integer getScheduleWorkerGroupId() { - return scheduleWorkerGroupId; - } - - public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) { - this.scheduleWorkerGroupId = scheduleWorkerGroupId; - } - public String getScheduleWorkerGroupName() { return scheduleWorkerGroupName; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 0389890691..d785686b98 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -85,7 +85,7 @@ public class ExecutorService extends BaseService{ * @param receivers receivers * @param receiversCc receivers cc * @param processInstancePriority process instance priority - * @param workerGroupId worker group id + * @param workerGroup worker group name * @param runMode run mode * @param timeout timeout * @return execute process instance code @@ -96,7 +96,7 @@ public class ExecutorService extends BaseService{ FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, int warningGroupId, String receivers, String receiversCc, RunMode runMode, - Priority processInstancePriority, int workerGroupId, Integer timeout) throws ParseException { + Priority processInstancePriority, String workerGroup, Integer timeout) throws ParseException { Map result = new HashMap<>(5); // timeout is valid if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { @@ -128,7 +128,7 @@ public class ExecutorService extends BaseService{ */ int create = this.createCommand(commandType, processDefinitionId, taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), - warningGroupId, runMode,processInstancePriority, workerGroupId); + warningGroupId, runMode,processInstancePriority, workerGroup); if(create > 0 ){ /** * according to the process definition ID updateProcessInstance and CC recipient @@ -452,11 +452,29 @@ public class ExecutorService extends BaseService{ * @return * @throws ParseException */ + + /** + * create commonad + * @param commandType command type + * @param processDefineId process define id + * @param nodeDep node dependency + * @param failureStrategy failure strategy + * @param startNodeList start node list + * @param schedule schedule + * @param warningType warning type + * @param executorId executor id + * @param warningGroupId warning group id + * @param runMode run mode + * @param processInstancePriority process instance priority + * @param workerGroup worker group + * @return create command result + * @throws ParseException parse exception + */ private int createCommand(CommandType commandType, int processDefineId, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, WarningType warningType, - int excutorId, int warningGroupId, - RunMode runMode,Priority processInstancePriority, int workerGroupId) throws ParseException { + int executorId, int warningGroupId, + RunMode runMode,Priority processInstancePriority, String workerGroup) throws ParseException { /** * instantiate command schedule instance @@ -484,10 +502,10 @@ public class ExecutorService extends BaseService{ command.setWarningType(warningType); } command.setCommandParam(JSONUtils.toJson(cmdParam)); - command.setExecutorId(excutorId); + command.setExecutorId(executorId); command.setWarningGroupId(warningGroupId); command.setProcessInstancePriority(processInstancePriority); - command.setWorkerGroupId(workerGroupId); + command.setWorkerGroup(workerGroup); Date start = null; Date end = null; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 22e3593a52..7232c55be8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -561,13 +561,13 @@ public class ProcessDefinitionService extends BaseDAGService { List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); - WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); + /*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); if (null == workerGroup && schedule.getWorkerGroupId() == -1) { workerGroup = new WorkerGroup(); workerGroup.setId(-1); workerGroup.setName(""); - } + }*/ exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); @@ -577,11 +577,7 @@ public class ProcessDefinitionService extends BaseDAGService { exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE)); exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); - - if (null != workerGroup) { - exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId()); - exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName()); - } + exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); } //create workflow json file return JSONUtils.toJsonString(exportProcessMeta); @@ -780,15 +776,9 @@ public class ProcessDefinitionService extends BaseDAGService { if (null != processMeta.getScheduleProcessInstancePriority()) { scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority())); } - if (null != processMeta.getScheduleWorkerGroupId()) { - scheduleObj.setWorkerGroupId(processMeta.getScheduleWorkerGroupId()); - } else { - if (null != processMeta.getScheduleWorkerGroupName()) { - List workerGroups = workerGroupMapper.queryWorkerGroupByName(processMeta.getScheduleWorkerGroupName()); - if(CollectionUtils.isNotEmpty(workerGroups)){ - scheduleObj.setWorkerGroupId(workerGroups.get(0).getId()); - } - } + + if (null != processMeta.getScheduleWorkerGroupName()) { + scheduleObj.setWorkerGroup(processMeta.getScheduleWorkerGroupName()); } return scheduleMapper.insert(scheduleObj); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 4b809a8d01..09b21d4304 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -112,9 +112,9 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); - String workerGroupName = ""; - if(processInstance.getWorkerGroupId() == -1){ - workerGroupName = DEFAULT; + /*String workerGroupName = ""; + if(StringUtils.isBlank(processInstance.getWorkerGroup())){ + workerGroupName = ; }else{ WorkerGroup workerGroup = workerGroupMapper.selectById(processInstance.getWorkerGroupId()); if(workerGroup != null){ @@ -123,7 +123,7 @@ public class ProcessInstanceService extends BaseDAGService { workerGroupName = DEFAULT; } } - processInstance.setWorkerGroupName(workerGroupName); + processInstance.setWorkerGroupName(workerGroupName);*/ ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceiversCc(processDefinition.getReceiversCc()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index 72122100a1..cb07ffbbe3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -92,7 +92,7 @@ public class SchedulerService extends BaseService { * @param processInstancePriority process instance priority * @param receivers receivers * @param receiversCc receivers cc - * @param workerGroupId worker group id + * @param workerGroup worker group * @return create result code * @throws IOException ioexception */ @@ -106,7 +106,7 @@ public class SchedulerService extends BaseService { String receivers, String receiversCc, Priority processInstancePriority, - int workerGroupId) throws IOException { + String workerGroup) throws IOException { Map result = new HashMap(5); @@ -156,7 +156,7 @@ public class SchedulerService extends BaseService { scheduleObj.setUserName(loginUser.getUserName()); scheduleObj.setReleaseState(ReleaseState.OFFLINE); scheduleObj.setProcessInstancePriority(processInstancePriority); - scheduleObj.setWorkerGroupId(workerGroupId); + scheduleObj.setWorkerGroup(workerGroup); scheduleMapper.insert(scheduleObj); /** @@ -182,7 +182,7 @@ public class SchedulerService extends BaseService { * @param warningType warning type * @param warningGroupId warning group id * @param failureStrategy failure strategy - * @param workerGroupId worker group id + * @param workerGroup worker group * @param processInstancePriority process instance priority * @param receiversCc receiver cc * @param receivers receivers @@ -202,7 +202,7 @@ public class SchedulerService extends BaseService { String receiversCc, ReleaseState scheduleStatus, Priority processInstancePriority, - int workerGroupId) throws IOException { + String workerGroup) throws IOException { Map result = new HashMap(5); Project project = projectMapper.queryByName(projectName); @@ -266,7 +266,7 @@ public class SchedulerService extends BaseService { if (scheduleStatus != null) { schedule.setReleaseState(scheduleStatus); } - schedule.setWorkerGroupId(workerGroupId); + schedule.setWorkerGroup(workerGroup); schedule.setUpdateTime(now); schedule.setProcessInstancePriority(processInstancePriority); scheduleMapper.updateById(schedule); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index c44c446d5c..6384e38026 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -50,6 +51,9 @@ public class WorkerGroupService extends BaseService { @Autowired ProcessInstanceMapper processInstanceMapper; + @Autowired + protected ZookeeperCachedOperator zookeeperCachedOperator; + /** * create or update a worker group * @@ -181,7 +185,8 @@ public class WorkerGroupService extends BaseService { */ public Map queryAllGroup() { Map result = new HashMap<>(5); - List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); + String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; + List workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH); result.put(Constants.DATA_LIST, workerGroupList); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java index 07d7477930..a8777541b7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java @@ -117,7 +117,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, 0, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ @@ -138,7 +138,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, 0, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); }catch (Exception e){ @@ -159,7 +159,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_SERIAL, - Priority.LOW, 0, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); }catch (Exception e){ @@ -180,7 +180,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, - Priority.LOW, 0, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); }catch (Exception e){ @@ -201,7 +201,7 @@ public class ExecutorService2Test { null, null, null, null, 0, "", "", RunMode.RUN_MODE_PARALLEL, - Priority.LOW, 0, 110); + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); }catch (Exception e){ diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 1e6ee13c57..d434b88b8b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -803,7 +803,7 @@ public class ProcessDefinitionServiceTest { schedule.setProcessInstancePriority(Priority.MEDIUM); schedule.setWarningType(WarningType.NONE); schedule.setWarningGroupId(1); - schedule.setWorkerGroupId(-1); + schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); return schedule; } @@ -822,7 +822,6 @@ public class ProcessDefinitionServiceTest { processMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); processMeta.setScheduleReleaseState(String.valueOf(schedule.getReleaseState())); processMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); - processMeta.setScheduleWorkerGroupId(schedule.getWorkerGroupId()); processMeta.setScheduleWorkerGroupName("workgroup1"); return processMeta; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index 40efd0a24f..193e0bc006 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -114,9 +114,9 @@ public class TaskNode { private Priority taskInstancePriority; /** - * worker group id + * worker group */ - private int workerGroupId; + private String workerGroup; /** @@ -230,7 +230,7 @@ public class TaskNode { Objects.equals(extras, taskNode.extras) && Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(dependence, taskNode.dependence) && - Objects.equals(workerGroupId, taskNode.workerGroupId) && + Objects.equals(workerGroup, taskNode.workerGroup) && CollectionUtils.equalLists(depList, taskNode.depList); } @@ -310,15 +310,15 @@ public class TaskNode { ", dependence='" + dependence + '\'' + ", taskInstancePriority=" + taskInstancePriority + ", timeout='" + timeout + '\'' + - ", workerGroupId='" + workerGroupId + '\'' + + ", workerGroup='" + workerGroup + '\'' + '}'; } - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 25667924ac..5a6974803c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -108,13 +108,11 @@ public class Command { @TableField("update_time") private Date updateTime; - /** - * + * worker group */ - @TableField("worker_group_id") - private int workerGroupId; - + @TableField(exist = false) + private String workerGroup; public Command() { this.taskDependType = TaskDependType.TASK_POST; @@ -254,13 +252,12 @@ public class Command { this.updateTime = updateTime; } - - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; } @Override @@ -283,7 +280,7 @@ public class Command { if (executorId != command.executorId) { return false; } - if (workerGroupId != command.workerGroupId) { + if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) { return false; } if (commandType != command.commandType) { @@ -332,10 +329,9 @@ public class Command { result = 31 * result + (startTime != null ? startTime.hashCode() : 0); result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0); result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0); - result = 31 * result + workerGroupId; + result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); return result; } - @Override public String toString() { return "Command{" + @@ -352,7 +348,7 @@ public class Command { ", startTime=" + startTime + ", processInstancePriority=" + processInstancePriority + ", updateTime=" + updateTime + - ", workerGroupId=" + workerGroupId + + ", workerGroup='" + workerGroup + '\'' + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 54c96e932d..77e148a8f0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -189,9 +189,9 @@ public class ProcessInstance { private Priority processInstancePriority; /** - * worker group id + * worker group */ - private int workerGroupId; + private String workerGroup; /** * process timeout for warning @@ -203,12 +203,6 @@ public class ProcessInstance { */ private int tenantId; - /** - * worker group name. for api. - */ - @TableField(exist = false) - private String workerGroupName; - /** * receivers for api */ @@ -527,12 +521,12 @@ public class ProcessInstance { this.duration = duration; } - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; } public int getTimeout() { @@ -552,14 +546,6 @@ public class ProcessInstance { return this.tenantId ; } - public String getWorkerGroupName() { - return workerGroupName; - } - - public void setWorkerGroupName(String workerGroupName) { - this.workerGroupName = workerGroupName; - } - public String getReceivers() { return receivers; } @@ -610,10 +596,9 @@ public class ProcessInstance { ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + ", duration=" + duration + ", processInstancePriority=" + processInstancePriority + - ", workerGroupId=" + workerGroupId + + ", workerGroup='" + workerGroup + '\'' + ", timeout=" + timeout + ", tenantId=" + tenantId + - ", workerGroupName='" + workerGroupName + '\'' + ", receivers='" + receivers + '\'' + ", receiversCc='" + receiversCc + '\'' + '}'; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java index cfda49df6e..0cb41080b2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java @@ -122,9 +122,9 @@ public class Schedule { private Priority processInstancePriority; /** - * worker group id + * worker group */ - private int workerGroupId; + private String workerGroup; public int getWarningGroupId() { return warningGroupId; @@ -265,13 +265,12 @@ public class Schedule { this.processInstancePriority = processInstancePriority; } - - public int getWorkerGroupId() { - return workerGroupId; + public String getWorkerGroup() { + return workerGroup; } - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; } @Override @@ -294,7 +293,7 @@ public class Schedule { ", releaseState=" + releaseState + ", warningGroupId=" + warningGroupId + ", processInstancePriority=" + processInstancePriority + - ", workerGroupId=" + workerGroupId + + ", workerGroup='" + workerGroup + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 3fc40ca5a8..0c7074f6df 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -186,15 +186,10 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private String dependentResult; - /** - * worker group id - */ - private int workerGroupId; /** * workerGroup */ - @TableField(exist = false) private String workerGroup; public ProcessInstance getProcessInstance() { @@ -450,14 +445,6 @@ public class TaskInstance implements Serializable { this.processInstancePriority = processInstancePriority; } - public int getWorkerGroupId() { - return workerGroupId; - } - - public void setWorkerGroupId(int workerGroupId) { - this.workerGroupId = workerGroupId; - } - public String getDependentResult() { return dependentResult; } @@ -505,7 +492,6 @@ public class TaskInstance implements Serializable { ", taskInstancePriority=" + taskInstancePriority + ", processInstancePriority=" + processInstancePriority + ", dependentResult='" + dependentResult + '\'' + - ", workerGroupId=" + workerGroupId + ", workerGroup='" + workerGroup + '\'' + '}'; } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java index c35ce7e8ce..6beb652ddf 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.CommandCount; @@ -265,7 +266,7 @@ public class CommandMapperTest { command.setProcessInstancePriority(Priority.MEDIUM); command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00")); command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00")); - command.setWorkerGroupId(-1); + command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); commandMapper.insert(command); return command; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index a05f8dc9ee..df1eac39ac 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -486,8 +486,13 @@ public class MasterExecThread implements Runnable { taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); } - int workerGroupId = taskNode.getWorkerGroupId(); - taskInstance.setWorkerGroupId(workerGroupId); + String processWorkerGroup = processInstance.getWorkerGroup(); + String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup(); + if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) { + taskInstance.setWorkerGroup(processWorkerGroup); + }else { + taskInstance.setWorkerGroup(taskWorkerGroup); + } } return taskInstance; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index ca0ed793f8..81c523c2e2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -427,8 +427,8 @@ public class ProcessService { processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); - int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId(); - processInstance.setWorkerGroupId(workerGroupId); + String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup(); + processInstance.setWorkerGroup(workerGroup); processInstance.setTimeout(processDefinition.getTimeout()); processInstance.setTenantId(processDefinition.getTenantId()); return processInstance; @@ -964,7 +964,7 @@ public class ProcessService { */ public String taskZkInfo(TaskInstance taskInstance) { - int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance); + String taskWorkerGroup = getTaskWorkerGroup(taskInstance); ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId()); if(processInstance == null){ logger.error("process instance is null. please check the task info, task id: " + taskInstance.getId()); @@ -976,9 +976,10 @@ public class ProcessService { sb.append(processInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) - .append(taskInstance.getId()).append(Constants.UNDERLINE); + .append(taskInstance.getId()).append(Constants.UNDERLINE) + .append(taskInstance.getWorkerGroup()); - if(taskWorkerGroupId > 0){ + /*if(StringUtils.isNotBlank(taskWorkerGroup)){ //not to find data from db WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId); if(workerGroup == null ){ @@ -1012,8 +1013,7 @@ public class ProcessService { sb.append(ipSb); }else{ sb.append(Constants.DEFAULT_WORKER_ID); - } - + }*/ return sb.toString(); } @@ -1689,24 +1689,24 @@ public class ProcessService { } /** - * get task worker group id + * get task worker group * @param taskInstance taskInstance * @return workerGroupId */ - public int getTaskWorkerGroupId(TaskInstance taskInstance) { - int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + public String getTaskWorkerGroup(TaskInstance taskInstance) { + String workerGroup = taskInstance.getWorkerGroup(); - if(taskWorkerGroupId > 0){ - return taskWorkerGroupId; + if(StringUtils.isNotBlank(workerGroup)){ + return workerGroup; } int processInstanceId = taskInstance.getProcessInstanceId(); ProcessInstance processInstance = findProcessInstanceById(processInstanceId); if(processInstance != null){ - return processInstance.getWorkerGroupId(); + return processInstance.getWorkerGroup(); } - logger.info("task : {} will use default worker group id", taskInstance.getId()); - return Constants.DEFAULT_WORKER_ID; + logger.info("task : {} will use default worker group", taskInstance.getId()); + return Constants.DEFAULT_WORKER_GROUP; } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java index d055e2de85..c89b7affb8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java @@ -98,7 +98,7 @@ public class ProcessScheduleJob implements Job { command.setScheduleTime(scheduledFireTime); command.setStartTime(fireTime); command.setWarningGroupId(schedule.getWarningGroupId()); - command.setWorkerGroupId(schedule.getWorkerGroupId()); + command.setWorkerGroup(schedule.getWorkerGroup()); command.setWarningType(schedule.getWarningType()); command.setProcessInstancePriority(schedule.getProcessInstancePriority()); diff --git a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql index 7fc12900e4..6a5cded591 100644 --- a/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -32,3 +32,67 @@ delimiter ; SELECT uc_dolphin_T_t_ds_process_definition_A_modify_by(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_definition_A_modify_by(); +-- ac_dolphin_T_t_ds_process_instance_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_instance_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_process_instance_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_process_instance_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_process_instance_A_worker_group(); + + +-- ac_dolphin_T_t_ds_task_instance_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_task_instance_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_task_instance_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_task_instance_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_task_instance_A_worker_group(); + +-- ac_dolphin_T_t_ds_process_instance_A_worker_group +delimiter ; +DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_instance_A_worker_group(); +delimiter d// +CREATE FUNCTION ac_dolphin_T_t_ds_schedules_A_worker_group() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_schedules' + AND COLUMN_NAME ='worker_group') + THEN + ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null; + END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select ac_dolphin_T_t_ds_schedules_A_worker_group(); +DROP FUNCTION ac_dolphin_T_t_ds_schedules_A_worker_group(); +