mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 19:27:38 +08:00
merge from dev..
This commit is contained in:
commit
fa892b3762
@ -50,7 +50,7 @@ Easy Scheduler
|
||||
|
||||
### 近期研发计划
|
||||
|
||||
EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas)
|
||||
EasyScheduler的工作计划:<a href="https://github.com/analysys/EasyScheduler/projects/1" target="_blank">研发计划</a> ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas)
|
||||
|
||||
### 贡献代码
|
||||
|
||||
|
@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants;
|
||||
import cn.escheduler.api.utils.Result;
|
||||
import cn.escheduler.common.enums.*;
|
||||
import cn.escheduler.dao.model.User;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*;
|
||||
|
||||
|
||||
/**
|
||||
* execute task controller
|
||||
* execute process controller
|
||||
*/
|
||||
@ApiIgnore
|
||||
@Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1)
|
||||
@RestController
|
||||
@RequestMapping("projects/{projectName}/executors")
|
||||
public class ExecutorController extends BaseController {
|
||||
@ -53,10 +53,27 @@ public class ExecutorController extends BaseController {
|
||||
/**
|
||||
* execute process instance
|
||||
*/
|
||||
@ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
|
||||
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"),
|
||||
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"),
|
||||
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"),
|
||||
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"),
|
||||
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"),
|
||||
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"),
|
||||
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"),
|
||||
@ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ),
|
||||
@ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ),
|
||||
@ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ),
|
||||
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ),
|
||||
@ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"),
|
||||
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"),
|
||||
})
|
||||
@PostMapping(value = "start-process-instance")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@PathVariable String projectName,
|
||||
public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
|
||||
@RequestParam(value = "processDefinitionId") int processDefinitionId,
|
||||
@RequestParam(value = "scheduleTime", required = false) String scheduleTime,
|
||||
@RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy,
|
||||
@ -102,10 +119,15 @@ public class ExecutorController extends BaseController {
|
||||
* @param processInstanceId
|
||||
* @return
|
||||
*/
|
||||
@ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"),
|
||||
@ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType")
|
||||
})
|
||||
@PostMapping(value = "/execute")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@PathVariable String projectName,
|
||||
public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
|
||||
@RequestParam("processInstanceId") Integer processInstanceId,
|
||||
@RequestParam("executeType") ExecuteType executeType
|
||||
) {
|
||||
@ -127,9 +149,13 @@ public class ExecutorController extends BaseController {
|
||||
* @param processDefinitionId
|
||||
* @return
|
||||
*/
|
||||
@ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100")
|
||||
})
|
||||
@PostMapping(value = "/start-check")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam(value = "processDefinitionId") int processDefinitionId) {
|
||||
logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId);
|
||||
try {
|
||||
@ -149,9 +175,16 @@ public class ExecutorController extends BaseController {
|
||||
* @param processDefinitionId
|
||||
* @return
|
||||
*/
|
||||
@ApiIgnore
|
||||
@ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
|
||||
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
|
||||
|
||||
})
|
||||
@GetMapping(value = "/get-receiver-cc")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId,
|
||||
@RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) {
|
||||
logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName());
|
||||
|
@ -219,6 +219,11 @@ public final class Constants {
|
||||
*/
|
||||
public static final String SEMICOLON = ";";
|
||||
|
||||
/**
|
||||
* DOT .
|
||||
*/
|
||||
public static final String DOT = ".";
|
||||
|
||||
/**
|
||||
* ZOOKEEPER_SESSION_TIMEOUT
|
||||
*/
|
||||
@ -883,6 +888,11 @@ public final class Constants {
|
||||
*/
|
||||
public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username";
|
||||
|
||||
/**
|
||||
* default worker group id
|
||||
*/
|
||||
public static final int DEFAULT_WORKER_ID = -1;
|
||||
|
||||
/**
|
||||
* loginUserFromKeytab path
|
||||
*/
|
||||
|
@ -24,20 +24,17 @@ public interface ITaskQueue {
|
||||
/**
|
||||
* take out all the elements
|
||||
*
|
||||
* this method has deprecated
|
||||
* use checkTaskExists instead
|
||||
*
|
||||
* @param key
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
List<String> getAllTasks(String key);
|
||||
|
||||
/**
|
||||
* check task exists in the task queue or not
|
||||
*
|
||||
* @param key queue name
|
||||
* @param task ${priority}_${processInstanceId}_${taskId}
|
||||
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
|
||||
* @return true if exists in the queue
|
||||
*/
|
||||
boolean checkTaskExists(String key, String task);
|
||||
@ -54,10 +51,10 @@ public interface ITaskQueue {
|
||||
* an element pops out of the queue
|
||||
*
|
||||
* @param key queue name
|
||||
* @param remove whether remove the element
|
||||
* @param n how many elements to poll
|
||||
* @return
|
||||
*/
|
||||
String poll(String key, boolean remove);
|
||||
List<String> poll(String key, int n);
|
||||
|
||||
/**
|
||||
* remove a element from queue
|
||||
|
@ -42,7 +42,7 @@ public class TaskQueueFactory {
|
||||
public static ITaskQueue getTaskQueueInstance() {
|
||||
String queueImplValue = CommonUtils.getQueueImplValue();
|
||||
if (StringUtils.isNotBlank(queueImplValue)) {
|
||||
// queueImplValue = StringUtils.trim(queueImplValue);
|
||||
// queueImplValue = IpUtils.trim(queueImplValue);
|
||||
|
||||
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
|
||||
// logger.info("task queue impl use reids ");
|
||||
|
@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
|
||||
|
||||
import cn.escheduler.common.Constants;
|
||||
import cn.escheduler.common.utils.Bytes;
|
||||
import cn.escheduler.common.utils.IpUtils;
|
||||
import cn.escheduler.common.utils.OSUtils;
|
||||
import cn.escheduler.common.zk.AbstractZKClient;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* A singleton of a task queue implemented with zookeeper
|
||||
@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
* @param key task queue name
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
public List<String> getAllTasks(String key) {
|
||||
try {
|
||||
@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
* check task exists in the task queue or not
|
||||
*
|
||||
* @param key queue name
|
||||
* @param task ${priority}_${processInstanceId}_${taskId}
|
||||
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
|
||||
* @return true if exists in the queue
|
||||
*/
|
||||
@Override
|
||||
@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
* add task to tasks queue
|
||||
*
|
||||
* @param key task queue name
|
||||
* @param value ${priority}_${processInstanceId}_${taskId}
|
||||
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
|
||||
*/
|
||||
@Override
|
||||
public void add(String key, String value) {
|
||||
@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
|
||||
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
|
||||
|
||||
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
|
||||
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
|
||||
// Bytes.toBytes(value));
|
||||
logger.info("add task : {} to tasks queue , result success",result);
|
||||
} catch (Exception e) {
|
||||
logger.error("add task to tasks queue exception",e);
|
||||
@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
/**
|
||||
* An element pops out of the queue <p>
|
||||
* note:
|
||||
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
|
||||
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
|
||||
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
|
||||
*
|
||||
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
|
||||
* 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
|
||||
* @param key task queue name
|
||||
* @param remove whether remove the element
|
||||
* @return the task id to be executed
|
||||
* @param tasksNum how many elements to poll
|
||||
* @return the task ids to be executed
|
||||
*/
|
||||
@Override
|
||||
public String poll(String key, boolean remove) {
|
||||
public List<String> poll(String key, int tasksNum) {
|
||||
try{
|
||||
CuratorFramework zk = getZkClient();
|
||||
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
|
||||
@ -149,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
|
||||
|
||||
if(list != null && list.size() > 0){
|
||||
|
||||
String workerIp = OSUtils.getHost();
|
||||
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
|
||||
|
||||
int size = list.size();
|
||||
|
||||
String formatTargetTask = null;
|
||||
String targetTaskKey = null;
|
||||
|
||||
Set<String> taskTreeSet = new TreeSet<>();
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
||||
String taskDetail = list.get(i);
|
||||
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
|
||||
|
||||
if(taskDetailArrs.length == 4){
|
||||
//向前版本兼容
|
||||
if(taskDetailArrs.length >= 4){
|
||||
|
||||
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
|
||||
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
|
||||
if(i > 0){
|
||||
int result = formatTask.compareTo(formatTargetTask);
|
||||
if(result < 0){
|
||||
formatTargetTask = formatTask;
|
||||
targetTaskKey = taskDetail;
|
||||
if(taskDetailArrs.length > 4){
|
||||
String taskHosts = taskDetailArrs[4];
|
||||
|
||||
//task can assign to any worker host if equals default ip value of worker server
|
||||
if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){
|
||||
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
|
||||
|
||||
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taskTreeSet.add(formatTask);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
|
||||
|
||||
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
|
||||
|
||||
return taskslist;
|
||||
}else{
|
||||
formatTargetTask = formatTask;
|
||||
targetTaskKey = taskDetail;
|
||||
}
|
||||
}else{
|
||||
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
|
||||
}
|
||||
}
|
||||
|
||||
if(formatTargetTask != null){
|
||||
String taskIdPath = tasksQueuePath + targetTaskKey;
|
||||
|
||||
logger.info("consume task {}", taskIdPath);
|
||||
|
||||
String[] vals = targetTaskKey.split(Constants.UNDERLINE);
|
||||
|
||||
if(remove){
|
||||
removeNode(key, targetTaskKey);
|
||||
}
|
||||
logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
|
||||
return targetTaskKey;
|
||||
}else{
|
||||
logger.error("should not go here, task queue poll error, please check!");
|
||||
}
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("add task to tasks queue exception",e);
|
||||
}
|
||||
return null;
|
||||
return new ArrayList<String>();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get task list from tree set
|
||||
*
|
||||
* @param tasksNum
|
||||
* @param taskTreeSet
|
||||
*/
|
||||
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
|
||||
Iterator<String> iterator = taskTreeSet.iterator();
|
||||
int j = 0;
|
||||
List<String> taskslist = new ArrayList<>(tasksNum);
|
||||
while(iterator.hasNext()){
|
||||
if(j++ < tasksNum){
|
||||
String task = iterator.next();
|
||||
taskslist.add(task);
|
||||
}
|
||||
}
|
||||
return taskslist;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void removeNode(String key, String nodeValue){
|
||||
|
||||
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package cn.escheduler.common.utils;
|
||||
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
/**
|
||||
* http utils
|
||||
*/
|
||||
public class IpUtils {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(IpUtils.class);
|
||||
public static final String DOT = ".";
|
||||
|
||||
/**
|
||||
* ip str to long <p>
|
||||
*
|
||||
* @param ipStr ip string
|
||||
*/
|
||||
public static Long ipToLong(String ipStr) {
|
||||
String[] ipSet = ipStr.split("\\" + DOT);
|
||||
|
||||
return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]);
|
||||
}
|
||||
|
||||
/**
|
||||
* long to ip
|
||||
* @param ipLong the long number converted from IP
|
||||
* @return String
|
||||
*/
|
||||
public static String longToIp(long ipLong) {
|
||||
long[] ipNumbers = new long[4];
|
||||
long tmp = 0xFF;
|
||||
ipNumbers[0] = ipLong >> 24 & tmp;
|
||||
ipNumbers[1] = ipLong >> 16 & tmp;
|
||||
ipNumbers[2] = ipLong >> 8 & tmp;
|
||||
ipNumbers[3] = ipLong & tmp;
|
||||
|
||||
StringBuilder sb = new StringBuilder(16);
|
||||
sb.append(ipNumbers[0]).append(DOT)
|
||||
.append(ipNumbers[1]).append(DOT)
|
||||
.append(ipNumbers[2]).append(DOT)
|
||||
.append(ipNumbers[3]);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void main(String[] args){
|
||||
long ipLong = ipToLong("11.3.4.5");
|
||||
logger.info(longToIp(ipLong));
|
||||
}
|
||||
}
|
@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
|
||||
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// logger.warn(e.getMessage());
|
||||
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
|
||||
logger.warn(e.getMessage(),e);
|
||||
}
|
||||
|
||||
return childrenList.size();
|
||||
}
|
||||
return childrenList.size();
|
||||
|
@ -37,6 +37,12 @@ public class OSUtilsTest {
|
||||
// static HardwareAbstractionLayer hal = si.getHardware();
|
||||
|
||||
|
||||
@Test
|
||||
public void getHost(){
|
||||
logger.info(OSUtils.getHost());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void memoryUsage() {
|
||||
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239
|
||||
|
@ -17,12 +17,15 @@
|
||||
package cn.escheduler.common.queue;
|
||||
|
||||
import cn.escheduler.common.Constants;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -34,59 +37,62 @@ public class TaskQueueImplTest {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
|
||||
|
||||
ITaskQueue tasksQueue = null;
|
||||
|
||||
@Test
|
||||
public void testTaskQueue(){
|
||||
|
||||
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
|
||||
@Before
|
||||
public void before(){
|
||||
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
|
||||
//clear all data
|
||||
tasksQueue.delete();
|
||||
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void after(){
|
||||
//clear all data
|
||||
tasksQueue.delete();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAdd(){
|
||||
|
||||
//add
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
|
||||
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
|
||||
|
||||
List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
|
||||
|
||||
if(tasks.size() < 0){
|
||||
return;
|
||||
}
|
||||
|
||||
//pop
|
||||
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
|
||||
assertEquals(node1,"1");
|
||||
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
|
||||
assertEquals(node2,"2");
|
||||
String node1 = tasks.get(0);
|
||||
|
||||
//sadd
|
||||
String task1 = "1.1.1.1-1-mr";
|
||||
String task2 = "1.1.1.2-2-mr";
|
||||
String task3 = "1.1.1.3-3-mr";
|
||||
String task4 = "1.1.1.4-4-mr";
|
||||
String task5 = "1.1.1.5-5-mr";
|
||||
assertEquals(node1,"0_0000000001_1_0000000001");
|
||||
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1);
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2);
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3);
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4);
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5);
|
||||
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
|
||||
tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
|
||||
|
||||
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5);
|
||||
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
|
||||
//srem
|
||||
tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5);
|
||||
//smembers
|
||||
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4);
|
||||
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
|
||||
if(tasks.size() < 0){
|
||||
return;
|
||||
}
|
||||
|
||||
String node2 = tasks.get(0);
|
||||
assertEquals(node2,"0_0000000001_1_0000000001");
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* test one million data from zookeeper queue
|
||||
*/
|
||||
@Test
|
||||
public void extremeTest(){
|
||||
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
|
||||
//clear all data
|
||||
tasksQueue.delete();
|
||||
int total = 30 * 10000;
|
||||
|
||||
for(int i = 0; i < total; i++)
|
||||
@ -99,14 +105,9 @@ public class TaskQueueImplTest {
|
||||
}
|
||||
}
|
||||
|
||||
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
|
||||
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
|
||||
assertEquals(node1,"0");
|
||||
|
||||
//clear all data
|
||||
tasksQueue.delete();
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
|
||||
import cn.escheduler.common.queue.TaskQueueFactory;
|
||||
import cn.escheduler.common.task.subprocess.SubProcessParameters;
|
||||
import cn.escheduler.common.utils.DateUtils;
|
||||
import cn.escheduler.common.utils.IpUtils;
|
||||
import cn.escheduler.common.utils.JSONUtils;
|
||||
import cn.escheduler.common.utils.ParameterUtils;
|
||||
import cn.escheduler.dao.mapper.*;
|
||||
@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
*
|
||||
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
|
||||
*
|
||||
* @param task
|
||||
* @param taskInstance
|
||||
* @return
|
||||
*/
|
||||
private String taskZkInfo(TaskInstance task) {
|
||||
return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
|
||||
private String taskZkInfo(TaskInstance taskInstance) {
|
||||
|
||||
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
|
||||
|
||||
StringBuilder sb = new StringBuilder(100);
|
||||
|
||||
sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
|
||||
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
|
||||
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
|
||||
.append(taskInstance.getId()).append(Constants.UNDERLINE);
|
||||
|
||||
if(taskWorkerGroupId > 0){
|
||||
//not to find data from db
|
||||
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
|
||||
if(workerGroup == null ){
|
||||
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
|
||||
|
||||
sb.append(Constants.DEFAULT_WORKER_ID);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
String ips = workerGroup.getIpList();
|
||||
|
||||
if(StringUtils.isBlank(ips)){
|
||||
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
|
||||
taskInstance.getId(), workerGroup.getId());
|
||||
sb.append(Constants.DEFAULT_WORKER_ID);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
StringBuilder ipSb = new StringBuilder(100);
|
||||
String[] ipArray = ips.split(COMMA);
|
||||
|
||||
for (String ip : ipArray) {
|
||||
long ipLong = IpUtils.ipToLong(ip);
|
||||
ipSb.append(ipLong).append(COMMA);
|
||||
}
|
||||
|
||||
if(ipSb.length() > 0) {
|
||||
ipSb.deleteCharAt(ipSb.length() - 1);
|
||||
}
|
||||
|
||||
sb.append(ipSb);
|
||||
}else{
|
||||
sb.append(Constants.DEFAULT_WORKER_ID);
|
||||
}
|
||||
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* get task worker group id
|
||||
*
|
||||
* @param taskInstance
|
||||
* @return
|
||||
*/
|
||||
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
|
||||
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
|
||||
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
|
||||
if(processInstance == null){
|
||||
logger.error("cannot find the task:{} process instance", taskInstance.getId());
|
||||
return Constants.DEFAULT_WORKER_ID;
|
||||
}
|
||||
int processWorkerGroupId = processInstance.getWorkerGroupId();
|
||||
|
||||
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
|
||||
return taskWorkerGroupId;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
|
||||
if(Stopper.isRunning()) {
|
||||
// send heartbeat to zk
|
||||
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
|
||||
logger.error("master send heartbeat to zk failed");
|
||||
logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
|
||||
import cn.escheduler.dao.ProcessDao;
|
||||
import cn.escheduler.dao.model.*;
|
||||
import cn.escheduler.server.zk.ZKWorkerClient;
|
||||
import com.cronutils.utils.StringUtils;
|
||||
import org.apache.commons.configuration.Configuration;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
|
||||
*/
|
||||
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
|
||||
|
||||
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
|
||||
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
|
||||
if(processInstance == null){
|
||||
logger.error("cannot find the task:{} process instance", taskInstance.getId());
|
||||
return false;
|
||||
}
|
||||
int processWorkerGroupId = processInstance.getWorkerGroupId();
|
||||
|
||||
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
|
||||
int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
|
||||
|
||||
if(taskWorkerGroupId <= 0){
|
||||
return true;
|
||||
@ -117,43 +110,47 @@ public class FetchTaskThread implements Runnable{
|
||||
return true;
|
||||
}
|
||||
String ips = workerGroup.getIpList();
|
||||
if(ips == null){
|
||||
if(StringUtils.isBlank(ips)){
|
||||
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
|
||||
taskInstance.getId(), workerGroup.getId());
|
||||
}
|
||||
String[] ipArray = ips.split(",");
|
||||
String[] ipArray = ips.split(Constants.COMMA);
|
||||
List<String> ipList = Arrays.asList(ipArray);
|
||||
return ipList.contains(host);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
while (Stopper.isRunning()){
|
||||
InterProcessMutex mutex = null;
|
||||
try {
|
||||
if(OSUtils.checkResource(this.conf, false)) {
|
||||
|
||||
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
|
||||
|
||||
//check memory and cpu usage and threads
|
||||
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
|
||||
|
||||
//whether have tasks, if no tasks , no need lock //get all tasks
|
||||
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
|
||||
if(tasksQueueList.size() > 0){
|
||||
// creating distributed locks, lock path /escheduler/lock/worker
|
||||
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
|
||||
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
|
||||
mutex.acquire();
|
||||
|
||||
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
|
||||
|
||||
for (int i = 0; i < taskNum; i++) {
|
||||
|
||||
int activeCount = poolExecutor.getActiveCount();
|
||||
if (activeCount >= workerExecNums) {
|
||||
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
|
||||
continue;
|
||||
}
|
||||
|
||||
// task instance id str
|
||||
String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
|
||||
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
|
||||
|
||||
if (!StringUtils.isEmpty(taskQueueStr )) {
|
||||
for(String taskQueueStr : taskQueueStrArr){
|
||||
if (StringUtils.isNotBlank(taskQueueStr )) {
|
||||
|
||||
if (!checkThreadCount(poolExecutor)) {
|
||||
break;
|
||||
}
|
||||
|
||||
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
|
||||
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
|
||||
@ -177,6 +174,7 @@ public class FetchTaskThread implements Runnable{
|
||||
logger.error("task instance is null. task id : {} ", taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
|
||||
continue;
|
||||
}
|
||||
@ -191,7 +189,6 @@ public class FetchTaskThread implements Runnable{
|
||||
// get process instance
|
||||
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
|
||||
|
||||
|
||||
// get process define
|
||||
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
|
||||
|
||||
@ -223,16 +220,17 @@ public class FetchTaskThread implements Runnable{
|
||||
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
|
||||
}catch (Exception e){
|
||||
logger.error("fetch task thread exception : " + e.getMessage(),e);
|
||||
}
|
||||
finally {
|
||||
}finally {
|
||||
if (mutex != null){
|
||||
try {
|
||||
mutex.release();
|
||||
@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param poolExecutor
|
||||
* @return
|
||||
*/
|
||||
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
|
||||
int activeCount = poolExecutor.getActiveCount();
|
||||
if (activeCount >= workerExecNums) {
|
||||
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask {
|
||||
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
|
||||
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
|
||||
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
|
||||
if(!(Boolean) mailResult.get(Constants.STATUS)){
|
||||
if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
|
||||
throw new RuntimeException("send mail failed!");
|
||||
}
|
||||
}else{
|
||||
|
@ -6,7 +6,7 @@
|
||||
<div class="row-title">
|
||||
<div class="left">
|
||||
<span class="sp">IP: {{item.host}}</span>
|
||||
<span class="sp">{{$t('Port')}}: {{item.port}}</span>
|
||||
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
|
||||
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
|
||||
</div>
|
||||
<div class="right">
|
||||
|
@ -6,7 +6,7 @@
|
||||
<div class="row-title">
|
||||
<div class="left">
|
||||
<span class="sp">IP: {{item.host}}</span>
|
||||
<span class="sp">{{$t('Port')}}: {{item.port}}</span>
|
||||
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
|
||||
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
|
||||
</div>
|
||||
<div class="right">
|
||||
|
@ -37,7 +37,7 @@ let warningTypeList = [
|
||||
]
|
||||
|
||||
const isEmial = (val) => {
|
||||
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
|
||||
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
|
||||
return regEmail.test(val)
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,7 @@
|
||||
}
|
||||
},
|
||||
_verification () {
|
||||
let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
|
||||
let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
|
||||
// Mobile phone number regular
|
||||
let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line
|
||||
|
||||
|
@ -330,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
|
||||
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
|
||||
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
|
||||
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
|
||||
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
|
||||
#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
|
||||
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user