From e31b48fdb21d0257440b67a1b8b7f0912281237c Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 17 Mar 2020 11:52:31 +0800 Subject: [PATCH] =?UTF-8?q?1=EF=BC=8Cget=20workergroup=20from=20zk=20modif?= =?UTF-8?q?y=202=EF=BC=8CSpringConnectionFactory=20repeat=20load=20modify?= =?UTF-8?q?=20(#2201)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format * TaskAckProcessor modify * TaskAckProcessor modify * task prioriry refator * remove ITaskQueue * task prioriry refator * remove ITaskQueue * TaskPriority refactor * remove logs * WorkerServer refactor * MasterSchedulerService modify * WorkerConfig listen port modify * modify master and worker listen port * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * cancelTaskInstance set TaskExecutionContext host,logPath,executePath * Encapsulate the parameters required by sqltask * 1,Encapsulate the parameters required by sqltask 2,SQLTask optimization * AbstractTask modify * ProcedureTask optimization * MasterSchedulerService modify * TaskUpdateQueueConsumer modify * test * DataxTask process run debug * DataxTask process run debug * add protobuf dependency,MR、Spark task etc need this * TaskUpdateQueueConsumer modify * TaskExecutionContextBuilder set TaskInstance workgroup * WorkerGroupService queryAllGroup modify query available work group * 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify Co-authored-by: qiaozhanwei --- .../api/service/WorkerGroupService.java | 25 +++++++++++++------ .../datasource/SpringConnectionFactory.java | 1 - .../builder/TaskExecutionContextBuilder.java | 5 +--- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 6384e38026..8317768783 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -33,10 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * work group service @@ -184,10 +181,22 @@ public class WorkerGroupService extends BaseService { * @return all worker group list */ public Map queryAllGroup() { - Map result = new HashMap<>(5); - String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; - List workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH); - result.put(Constants.DATA_LIST, workerGroupList); + Map result = new HashMap<>(); + String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; + List workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); + + // available workerGroup list + List availableWorkerGroupList = new ArrayList<>(); + + for (String workerGroup : workerGroupList){ + String workerGroupPath= workerPath + "/" + workerGroup; + List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); + if (CollectionUtils.isNotEmpty(childrenNodes)){ + availableWorkerGroupList.add(workerGroup); + } + } + + result.put(Constants.DATA_LIST, availableWorkerGroupList); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index 5788b9d1f7..cb9f22eb54 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -123,7 +123,6 @@ public class SpringConnectionFactory { @Bean public SqlSessionFactory sqlSessionFactory() throws Exception { MybatisConfiguration configuration = new MybatisConfiguration(); - configuration.addMappers("org.apache.dolphinscheduler.dao.mapper"); configuration.setMapUnderscoreToCamelCase(true); configuration.setCacheEnabled(false); configuration.setCallSettersOnNulls(true); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index a5e426e55a..08c105ac40 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -17,15 +17,12 @@ package org.apache.dolphinscheduler.server.builder; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import java.util.List; - /** * TaskExecutionContext builder */ @@ -51,7 +48,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); - taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); + taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); return this; }