mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-04 21:28:00 +08:00
Merge branch 'dev' into dev
This commit is contained in:
commit
5caa6e5d9d
@ -120,19 +120,31 @@ public class TaskPriorityQueueConsumer extends Thread{
|
||||
Boolean result = false;
|
||||
while (Stopper.isRunning()){
|
||||
try {
|
||||
result = dispatcher.dispatch(executionContext);
|
||||
result = dispatcher.dispatch(executionContext);
|
||||
} catch (ExecuteException e) {
|
||||
logger.error("dispatch error",e);
|
||||
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
|
||||
}
|
||||
|
||||
if (result){
|
||||
if (result || taskInstanceIsFinalState(taskInstanceId)){
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* taskInstance is final state
|
||||
* success,failure,kill,stop,pause,threadwaiting is final state
|
||||
* @param taskInstanceId taskInstanceId
|
||||
* @return taskInstance is final state
|
||||
*/
|
||||
public Boolean taskInstanceIsFinalState(int taskInstanceId){
|
||||
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
|
||||
return taskInstance.getState().typeIsFinished();
|
||||
}
|
||||
|
||||
/**
|
||||
* get TaskExecutionContext
|
||||
* @param taskInstanceId taskInstanceId
|
||||
|
@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode;
|
||||
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
|
||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
|
||||
@ -35,9 +36,12 @@ import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheMan
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
/**
|
||||
@ -53,6 +57,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
|
||||
|
||||
private NettyExecutorManager nettyExecutorManager;
|
||||
|
||||
|
||||
/**
|
||||
* zookeeper register center
|
||||
*/
|
||||
private ZookeeperRegistryCenter zookeeperRegistryCenter;
|
||||
|
||||
/**
|
||||
* constructor of MasterTaskExecThread
|
||||
* @param taskInstance task instance
|
||||
@ -61,6 +71,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
|
||||
super(taskInstance);
|
||||
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
|
||||
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
|
||||
this.zookeeperRegistryCenter = SpringApplicationContext.getBean(ZookeeperRegistryCenter.class);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -175,6 +186,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
|
||||
}
|
||||
alreadyKilled = true;
|
||||
|
||||
String taskInstanceWorkerGroup = taskInstance.getWorkerGroup();
|
||||
|
||||
// not exists
|
||||
if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setEndTime(new Date());
|
||||
processService.updateTaskInstance(taskInstance);
|
||||
return;
|
||||
}
|
||||
|
||||
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
|
||||
killCommand.setTaskInstanceId(taskInstance.getId());
|
||||
|
||||
@ -185,10 +206,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
|
||||
|
||||
nettyExecutorManager.executeDirectly(executionContext);
|
||||
|
||||
logger.info("master add kill task :{} id:{} to kill queue",
|
||||
logger.info("master kill taskInstance name :{} taskInstance id:{}",
|
||||
taskInstance.getName(), taskInstance.getId() );
|
||||
}
|
||||
|
||||
/**
|
||||
* whether exists valid worker group
|
||||
* @param taskInstanceWorkerGroup taskInstanceWorkerGroup
|
||||
* @return whether exists
|
||||
*/
|
||||
public Boolean existsValidWorkerGroup(String taskInstanceWorkerGroup){
|
||||
Set<String> workerGroups = zookeeperRegistryCenter.getWorkerGroupDirectly();
|
||||
// not worker group
|
||||
if (CollectionUtils.isEmpty(workerGroups)){
|
||||
return false;
|
||||
}
|
||||
|
||||
// has worker group , but not taskInstance assigned worker group
|
||||
if (!workerGroups.contains(taskInstanceWorkerGroup)){
|
||||
return false;
|
||||
}
|
||||
Set<String> workers = zookeeperRegistryCenter.getWorkerGroupNodesDirectly(taskInstanceWorkerGroup);
|
||||
if (CollectionUtils.isEmpty(workers)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* get task timeout parameter
|
||||
* @return TaskTimeoutParameter
|
||||
|
@ -63,7 +63,8 @@ mailPassword="xxxxxxxxxx"
|
||||
# TLS mail protocol support
|
||||
starttlsEnable="false"
|
||||
|
||||
sslTrust="xxxxxxxxxx"
|
||||
#note: sslTrust is the same as mailServerHost
|
||||
sslTrust="smtp.exmail.qq.com"
|
||||
|
||||
# SSL mail protocol support
|
||||
# note: The SSL protocol is enabled by default.
|
||||
|
@ -0,0 +1,262 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.consumer;
|
||||
|
||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||
import org.apache.dolphinscheduler.common.enums.DbType;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.enums.Priority;
|
||||
import org.apache.dolphinscheduler.dao.entity.*;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class,
|
||||
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
|
||||
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
|
||||
public class TaskPriorityQueueConsumerTest {
|
||||
|
||||
|
||||
@Autowired
|
||||
private TaskPriorityQueue taskPriorityQueue;
|
||||
|
||||
@Autowired
|
||||
private TaskPriorityQueueConsumer taskPriorityQueueConsumer;
|
||||
|
||||
@Autowired
|
||||
private ProcessService processService;
|
||||
|
||||
@Autowired
|
||||
private ExecutorDispatcher dispatcher;
|
||||
|
||||
@Before
|
||||
public void init(){
|
||||
|
||||
Tenant tenant = new Tenant();
|
||||
tenant.setId(1);
|
||||
tenant.setTenantCode("journey");
|
||||
tenant.setTenantName("journey");
|
||||
tenant.setDescription("journey");
|
||||
tenant.setQueueId(1);
|
||||
tenant.setCreateTime(new Date());
|
||||
tenant.setUpdateTime(new Date());
|
||||
|
||||
Mockito.when(processService.getTenantForProcess(1,2)).thenReturn(tenant);
|
||||
|
||||
Mockito.when(processService.queryUserQueueByProcessInstanceId(1)).thenReturn("default");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSHELLTask() throws Exception {
|
||||
TaskInstance taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setTaskType("SHELL");
|
||||
taskInstance.setProcessDefinitionId(1);
|
||||
taskInstance.setProcessInstanceId(1);
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
||||
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
|
||||
taskInstance.setWorkerGroup("default");
|
||||
taskInstance.setExecutorId(2);
|
||||
|
||||
ProcessInstance processInstance = new ProcessInstance();
|
||||
processInstance.setTenantId(1);
|
||||
processInstance.setCommandType(CommandType.START_PROCESS);
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
|
||||
ProcessDefinition processDefinition = new ProcessDefinition();
|
||||
processDefinition.setUserId(2);
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
|
||||
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSQLTask() throws Exception {
|
||||
TaskInstance taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setTaskType("SQL");
|
||||
taskInstance.setProcessDefinitionId(1);
|
||||
taskInstance.setProcessInstanceId(1);
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setTaskJson("{\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-3655\",\"maxRetryTimes\":0,\"name\":\"UDF测试\",\"params\":\"{\\\"postStatements\\\":[],\\\"connParams\\\":\\\"\\\",\\\"receiversCc\\\":\\\"\\\",\\\"udfs\\\":\\\"1\\\",\\\"type\\\":\\\"HIVE\\\",\\\"title\\\":\\\"test\\\",\\\"sql\\\":\\\"select id,name,ds,zodia(ds) from t_journey_user\\\",\\\"preStatements\\\":[],\\\"sqlType\\\":0,\\\"receivers\\\":\\\"825193156@qq.com\\\",\\\"datasource\\\":3,\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQL\"}");
|
||||
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
|
||||
taskInstance.setWorkerGroup("default");
|
||||
taskInstance.setExecutorId(2);
|
||||
|
||||
ProcessInstance processInstance = new ProcessInstance();
|
||||
processInstance.setTenantId(1);
|
||||
processInstance.setCommandType(CommandType.START_PROCESS);
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
|
||||
ProcessDefinition processDefinition = new ProcessDefinition();
|
||||
processDefinition.setUserId(2);
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
|
||||
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(1);
|
||||
dataSource.setName("sqlDatasource");
|
||||
dataSource.setType(DbType.MYSQL);
|
||||
dataSource.setUserId(2);
|
||||
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
|
||||
dataSource.setCreateTime(new Date());
|
||||
dataSource.setUpdateTime(new Date());
|
||||
|
||||
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
|
||||
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDataxTask() throws Exception {
|
||||
TaskInstance taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setTaskType("DATAX");
|
||||
taskInstance.setProcessDefinitionId(1);
|
||||
taskInstance.setProcessInstanceId(1);
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-97625\",\"maxRetryTimes\":0,\"name\":\"MySQL数据相互导入\",\"params\":\"{\\\"targetTable\\\":\\\"pv2\\\",\\\"postStatements\\\":[],\\\"jobSpeedRecord\\\":1000,\\\"customConfig\\\":0,\\\"dtType\\\":\\\"MYSQL\\\",\\\"dsType\\\":\\\"MYSQL\\\",\\\"jobSpeedByte\\\":0,\\\"dataSource\\\":80,\\\"dataTarget\\\":80,\\\"sql\\\":\\\"SELECT dt,count FROM pv\\\",\\\"preStatements\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"DATAX\",\"workerGroup\":\"default\"}");
|
||||
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
|
||||
taskInstance.setWorkerGroup("default");
|
||||
taskInstance.setExecutorId(2);
|
||||
|
||||
ProcessInstance processInstance = new ProcessInstance();
|
||||
processInstance.setTenantId(1);
|
||||
processInstance.setCommandType(CommandType.START_PROCESS);
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
|
||||
ProcessDefinition processDefinition = new ProcessDefinition();
|
||||
processDefinition.setUserId(2);
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
|
||||
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
|
||||
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(80);
|
||||
dataSource.setName("datax");
|
||||
dataSource.setType(DbType.MYSQL);
|
||||
dataSource.setUserId(2);
|
||||
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
|
||||
dataSource.setCreateTime(new Date());
|
||||
dataSource.setUpdateTime(new Date());
|
||||
|
||||
Mockito.when(processService.findDataSourceById(80)).thenReturn(dataSource);
|
||||
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSqoopTask() throws Exception {
|
||||
TaskInstance taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setTaskType("SQOOP");
|
||||
taskInstance.setProcessDefinitionId(1);
|
||||
taskInstance.setProcessInstanceId(1);
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-63634\",\"maxRetryTimes\":0,\"name\":\"MySQL数据导入HDSF\",\"params\":\"{\\\"sourceType\\\":\\\"MYSQL\\\",\\\"targetType\\\":\\\"HDFS\\\",\\\"targetParams\\\":\\\"{\\\\\\\"targetPath\\\\\\\":\\\\\\\"/test/datatest\\\\\\\",\\\\\\\"deleteTargetDir\\\\\\\":true,\\\\\\\"fileType\\\\\\\":\\\\\\\"--as-textfile\\\\\\\",\\\\\\\"compressionCodec\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"fieldsTerminated\\\\\\\":\\\\\\\",\\\\\\\",\\\\\\\"linesTerminated\\\\\\\":\\\\\\\"\\\\\\\\\\\\\\\\n\\\\\\\"}\\\",\\\"modelType\\\":\\\"import\\\",\\\"sourceParams\\\":\\\"{\\\\\\\"srcType\\\\\\\":\\\\\\\"MYSQL\\\\\\\",\\\\\\\"srcDatasource\\\\\\\":1,\\\\\\\"srcTable\\\\\\\":\\\\\\\"t_ds_user\\\\\\\",\\\\\\\"srcQueryType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcQuerySql\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcColumnType\\\\\\\":\\\\\\\"0\\\\\\\",\\\\\\\"srcColumns\\\\\\\":\\\\\\\"\\\\\\\",\\\\\\\"srcConditionList\\\\\\\":[],\\\\\\\"mapColumnHive\\\\\\\":[],\\\\\\\"mapColumnJava\\\\\\\":[]}\\\",\\\"localParams\\\":[],\\\"concurrency\\\":1}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SQOOP\",\"workerGroup\":\"default\"}");
|
||||
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
|
||||
taskInstance.setWorkerGroup("default");
|
||||
taskInstance.setExecutorId(2);
|
||||
|
||||
ProcessInstance processInstance = new ProcessInstance();
|
||||
processInstance.setTenantId(1);
|
||||
processInstance.setCommandType(CommandType.START_PROCESS);
|
||||
taskInstance.setProcessInstance(processInstance);
|
||||
|
||||
ProcessDefinition processDefinition = new ProcessDefinition();
|
||||
processDefinition.setUserId(2);
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
|
||||
Mockito.when(processService.getTaskInstanceDetailByTaskId(1)).thenReturn(taskInstance);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
|
||||
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(1);
|
||||
dataSource.setName("datax");
|
||||
dataSource.setType(DbType.MYSQL);
|
||||
dataSource.setUserId(2);
|
||||
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
|
||||
dataSource.setCreateTime(new Date());
|
||||
dataSource.setUpdateTime(new Date());
|
||||
|
||||
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
|
||||
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTaskInstanceIsFinalState(){
|
||||
TaskInstance taskInstance = new TaskInstance();
|
||||
taskInstance.setId(1);
|
||||
taskInstance.setTaskType("SHELL");
|
||||
taskInstance.setProcessDefinitionId(1);
|
||||
taskInstance.setProcessInstanceId(1);
|
||||
taskInstance.setState(ExecutionStatus.KILL);
|
||||
taskInstance.setTaskJson("{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-55201\",\"maxRetryTimes\":0,\"name\":\"测试任务\",\"params\":\"{\\\"rawScript\\\":\\\"echo \\\\\\\"测试任务\\\\\\\"\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
||||
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
|
||||
taskInstance.setWorkerGroup("default");
|
||||
taskInstance.setExecutorId(2);
|
||||
|
||||
|
||||
Mockito.when( processService.findTaskInstanceById(1)).thenReturn(taskInstance);
|
||||
|
||||
taskPriorityQueueConsumer.taskInstanceIsFinalState(1);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.registry.DependencyConfig;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class,
|
||||
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
|
||||
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class})
|
||||
public class MasterTaskExecThreadTest {
|
||||
|
||||
@Test
|
||||
public void testExistsValidWorkerGroup1(){
|
||||
ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
|
||||
Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(null);
|
||||
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
|
||||
masterTaskExecThread.existsValidWorkerGroup("default");
|
||||
}
|
||||
@Test
|
||||
public void testExistsValidWorkerGroup2(){
|
||||
ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
|
||||
Set<String> workerGorups = new HashSet<>();
|
||||
workerGorups.add("test1");
|
||||
workerGorups.add("test2");
|
||||
|
||||
Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups);
|
||||
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
|
||||
masterTaskExecThread.existsValidWorkerGroup("default");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExistsValidWorkerGroup3(){
|
||||
ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
|
||||
Set<String> workerGorups = new HashSet<>();
|
||||
workerGorups.add("test1");
|
||||
|
||||
Mockito.when(zookeeperRegistryCenter.getWorkerGroupDirectly()).thenReturn(workerGorups);
|
||||
Mockito.when(zookeeperRegistryCenter.getWorkerGroupNodesDirectly("test1")).thenReturn(workerGorups);
|
||||
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(null);
|
||||
masterTaskExecThread.existsValidWorkerGroup("test1");
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.registry;
|
||||
import org.apache.dolphinscheduler.dao.AlertDao;
|
||||
import org.apache.dolphinscheduler.dao.mapper.*;
|
||||
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
|
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -144,4 +147,9 @@ public class DependencyConfig {
|
||||
public TaskResponseService taskResponseService(){
|
||||
return Mockito.mock(TaskResponseService.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TaskPriorityQueue taskPriorityQueue(){
|
||||
return new TaskPriorityQueueImpl();
|
||||
}
|
||||
}
|
||||
|
2
pom.xml
2
pom.xml
@ -778,6 +778,8 @@
|
||||
<include>**/server/log/TaskLogDiscriminatorTest.java</include>
|
||||
<include>**/server/log/TaskLogFilterTest.java</include>
|
||||
<include>**/server/log/WorkerLogFilterTest.java</include>
|
||||
<include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
|
||||
<include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
|
||||
<include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
|
||||
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
|
||||
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
|
||||
|
4
script/env/dolphinscheduler_env.sh
vendored
4
script/env/dolphinscheduler_env.sh
vendored
@ -23,4 +23,6 @@ export PYTHON_HOME=/opt/soft/python
|
||||
export JAVA_HOME=/opt/soft/java
|
||||
export HIVE_HOME=/opt/soft/hive
|
||||
export FLINK_HOME=/opt/soft/flink
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$PATH
|
||||
export DATAX_HOME=/opt/soft/dataX
|
||||
|
||||
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH
|
||||
|
Loading…
Reference in New Issue
Block a user