mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-01 11:47:51 +08:00
[Improvement] Refactor code to support distributed tracing (#4270)
* Refactor code to support tracing * Extension network protocol, support context and version * Extension master asynchronous queue support context * Extract scan task method from MasterSchedulerService for tracing * fix * fix * add test case * fix * fix Co-authored-by: hailin0 <hailin0@yeah.net>
This commit is contained in:
parent
de1b87f305
commit
3a28a71b00
@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ReplayingDecoder;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandContext;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandHeader;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.slf4j.Logger;
|
||||
@ -54,16 +55,34 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
|
||||
switch (state()){
|
||||
case MAGIC:
|
||||
checkMagic(in.readByte());
|
||||
checkpoint(State.VERSION);
|
||||
// fallthru
|
||||
case VERSION:
|
||||
checkVersion(in.readByte());
|
||||
checkpoint(State.COMMAND);
|
||||
// fallthru
|
||||
case COMMAND:
|
||||
commandHeader.setType(in.readByte());
|
||||
checkpoint(State.OPAQUE);
|
||||
// fallthru
|
||||
case OPAQUE:
|
||||
commandHeader.setOpaque(in.readLong());
|
||||
checkpoint(State.CONTEXT_LENGTH);
|
||||
// fallthru
|
||||
case CONTEXT_LENGTH:
|
||||
commandHeader.setContextLength(in.readInt());
|
||||
checkpoint(State.CONTEXT);
|
||||
// fallthru
|
||||
case CONTEXT:
|
||||
byte[] context = new byte[commandHeader.getContextLength()];
|
||||
in.readBytes(context);
|
||||
commandHeader.setContext(context);
|
||||
checkpoint(State.BODY_LENGTH);
|
||||
// fallthru
|
||||
case BODY_LENGTH:
|
||||
commandHeader.setBodyLength(in.readInt());
|
||||
checkpoint(State.BODY);
|
||||
// fallthru
|
||||
case BODY:
|
||||
byte[] body = new byte[commandHeader.getBodyLength()];
|
||||
in.readBytes(body);
|
||||
@ -71,6 +90,7 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
|
||||
Command packet = new Command();
|
||||
packet.setType(commandType(commandHeader.getType()));
|
||||
packet.setOpaque(commandHeader.getOpaque());
|
||||
packet.setContext(CommandContext.valueOf(commandHeader.getContext()));
|
||||
packet.setBody(body);
|
||||
out.add(packet);
|
||||
//
|
||||
@ -105,10 +125,23 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check version
|
||||
* @param version
|
||||
*/
|
||||
private void checkVersion(byte version) {
|
||||
if (version != Command.VERSION) {
|
||||
throw new IllegalArgumentException("illegal protocol [version]" + version);
|
||||
}
|
||||
}
|
||||
|
||||
enum State{
|
||||
MAGIC,
|
||||
VERSION,
|
||||
COMMAND,
|
||||
OPAQUE,
|
||||
CONTEXT_LENGTH,
|
||||
CONTEXT,
|
||||
BODY_LENGTH,
|
||||
BODY;
|
||||
}
|
||||
|
@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder<Command> {
|
||||
throw new Exception("encode msg is null");
|
||||
}
|
||||
out.writeByte(Command.MAGIC);
|
||||
out.writeByte(Command.VERSION);
|
||||
out.writeByte(msg.getType().ordinal());
|
||||
out.writeLong(msg.getOpaque());
|
||||
writeContext(msg, out);
|
||||
out.writeInt(msg.getBody().length);
|
||||
out.writeBytes(msg.getBody());
|
||||
}
|
||||
|
||||
private void writeContext(Command msg, ByteBuf out) {
|
||||
byte[] headerBytes = msg.getContext().toBytes();
|
||||
out.writeInt(headerBytes.length);
|
||||
out.writeBytes(headerBytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ public class Command implements Serializable {
|
||||
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
|
||||
|
||||
public static final byte MAGIC = (byte) 0xbabe;
|
||||
public static final byte VERSION = 0;
|
||||
|
||||
public Command(){
|
||||
this.opaque = REQUEST_ID.getAndIncrement();
|
||||
@ -47,6 +48,11 @@ public class Command implements Serializable {
|
||||
*/
|
||||
private long opaque;
|
||||
|
||||
/**
|
||||
* request context
|
||||
*/
|
||||
private CommandContext context = new CommandContext();
|
||||
|
||||
/**
|
||||
* data body
|
||||
*/
|
||||
@ -76,6 +82,14 @@ public class Command implements Serializable {
|
||||
this.body = body;
|
||||
}
|
||||
|
||||
public CommandContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public void setContext(CommandContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.remote.command;
|
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* command context
|
||||
*/
|
||||
public class CommandContext implements Serializable {
|
||||
|
||||
private Map<String, String> items = new LinkedHashMap<>();
|
||||
|
||||
public Map<String, String> getItems() {
|
||||
return items;
|
||||
}
|
||||
|
||||
public void setItems(Map<String, String> items) {
|
||||
this.items = items;
|
||||
}
|
||||
|
||||
public void put(String key, String value) {
|
||||
items.put(key, value);
|
||||
}
|
||||
|
||||
public String get(String key) {
|
||||
return items.get(key);
|
||||
}
|
||||
|
||||
public byte[] toBytes() {
|
||||
return JSONUtils.toJsonByteArray(this);
|
||||
}
|
||||
|
||||
public static CommandContext valueOf(byte[] src) {
|
||||
return JSONUtils.parseObject(src, CommandContext.class);
|
||||
}
|
||||
}
|
@ -33,6 +33,16 @@ public class CommandHeader implements Serializable {
|
||||
*/
|
||||
private long opaque;
|
||||
|
||||
/**
|
||||
* context length
|
||||
*/
|
||||
private int contextLength;
|
||||
|
||||
/**
|
||||
* context
|
||||
*/
|
||||
private byte[] context;
|
||||
|
||||
/**
|
||||
* body length
|
||||
*/
|
||||
@ -61,4 +71,20 @@ public class CommandHeader implements Serializable {
|
||||
public void setOpaque(long opaque) {
|
||||
this.opaque = opaque;
|
||||
}
|
||||
|
||||
public int getContextLength() {
|
||||
return contextLength;
|
||||
}
|
||||
|
||||
public void setContextLength(int contextLength) {
|
||||
this.contextLength = contextLength;
|
||||
}
|
||||
|
||||
public byte[] getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public void setContext(byte[] context) {
|
||||
this.context = context;
|
||||
}
|
||||
}
|
||||
|
@ -50,13 +50,13 @@ import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.entity.TaskPriority;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
|
||||
* taskUpdateQueue
|
||||
*/
|
||||
@Autowired
|
||||
private TaskPriorityQueue<String> taskPriorityQueue;
|
||||
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
|
||||
|
||||
/**
|
||||
* processService
|
||||
@ -119,7 +119,7 @@ public class TaskPriorityQueueConsumer extends Thread {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<String> failedDispatchTasks = new ArrayList<>();
|
||||
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
|
||||
while (Stopper.isRunning()) {
|
||||
try {
|
||||
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
|
||||
@ -130,15 +130,14 @@ public class TaskPriorityQueueConsumer extends Thread {
|
||||
continue;
|
||||
}
|
||||
// if not task , blocking here
|
||||
String taskPriorityInfo = taskPriorityQueue.take();
|
||||
TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
|
||||
boolean dispatchResult = dispatch(taskPriority.getTaskId());
|
||||
TaskPriority taskPriority = taskPriorityQueue.take();
|
||||
boolean dispatchResult = dispatch(taskPriority);
|
||||
if (!dispatchResult) {
|
||||
failedDispatchTasks.add(taskPriorityInfo);
|
||||
failedDispatchTasks.add(taskPriority);
|
||||
}
|
||||
}
|
||||
if (!failedDispatchTasks.isEmpty()) {
|
||||
for (String dispatchFailedTask : failedDispatchTasks) {
|
||||
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
|
||||
taskPriorityQueue.put(dispatchFailedTask);
|
||||
}
|
||||
// If there are tasks in a cycle that cannot find the worker group,
|
||||
@ -157,12 +156,13 @@ public class TaskPriorityQueueConsumer extends Thread {
|
||||
/**
|
||||
* dispatch task
|
||||
*
|
||||
* @param taskInstanceId taskInstanceId
|
||||
* @param taskPriority taskPriority
|
||||
* @return result
|
||||
*/
|
||||
protected boolean dispatch(int taskInstanceId) {
|
||||
protected boolean dispatch(TaskPriority taskPriority) {
|
||||
boolean result = false;
|
||||
try {
|
||||
int taskInstanceId = taskPriority.getTaskId();
|
||||
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
|
||||
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
|
||||
|
||||
|
@ -16,8 +16,6 @@
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.runner;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
|
||||
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode;
|
||||
@ -27,17 +25,15 @@ import org.apache.dolphinscheduler.dao.AlertDao;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import static org.apache.dolphinscheduler.common.Constants.*;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -217,14 +213,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
logger.info("task ready to submit: {}", taskInstance);
|
||||
|
||||
/**
|
||||
* taskPriorityInfo
|
||||
* taskPriority
|
||||
*/
|
||||
String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(),
|
||||
TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
|
||||
processInstance.getId(),
|
||||
taskInstance.getProcessInstancePriority().getCode(),
|
||||
taskInstance.getId(),
|
||||
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
|
||||
taskUpdateQueue.put(taskPriorityInfo);
|
||||
taskUpdateQueue.put(taskPriority);
|
||||
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
|
||||
return true;
|
||||
}catch (Exception e){
|
||||
@ -235,29 +231,22 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
}
|
||||
|
||||
/**
|
||||
* buildTaskPriorityInfo
|
||||
* buildTaskPriority
|
||||
*
|
||||
* @param processInstancePriority processInstancePriority
|
||||
* @param processInstanceId processInstanceId
|
||||
* @param taskInstancePriority taskInstancePriority
|
||||
* @param taskInstanceId taskInstanceId
|
||||
* @param workerGroup workerGroup
|
||||
* @return TaskPriorityInfo
|
||||
* @return TaskPriority
|
||||
*/
|
||||
private String buildTaskPriorityInfo(int processInstancePriority,
|
||||
int processInstanceId,
|
||||
int taskInstancePriority,
|
||||
int taskInstanceId,
|
||||
String workerGroup) {
|
||||
return processInstancePriority +
|
||||
UNDERLINE +
|
||||
processInstanceId +
|
||||
UNDERLINE +
|
||||
taskInstancePriority +
|
||||
UNDERLINE +
|
||||
taskInstanceId +
|
||||
UNDERLINE +
|
||||
workerGroup;
|
||||
private TaskPriority buildTaskPriority(int processInstancePriority,
|
||||
int processInstanceId,
|
||||
int taskInstancePriority,
|
||||
int taskInstanceId,
|
||||
String workerGroup) {
|
||||
return new TaskPriority(processInstancePriority, processInstanceId,
|
||||
taskInstancePriority, taskInstanceId, workerGroup);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,55 +124,60 @@ public class MasterSchedulerService extends Thread {
|
||||
public void run() {
|
||||
logger.info("master scheduler started");
|
||||
while (Stopper.isRunning()){
|
||||
InterProcessMutex mutex = null;
|
||||
try {
|
||||
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
|
||||
if(!runCheckFlag) {
|
||||
if (!runCheckFlag) {
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
continue;
|
||||
}
|
||||
if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
|
||||
|
||||
mutex = zkMasterClient.blockAcquireMutex();
|
||||
|
||||
int activeCount = masterExecService.getActiveCount();
|
||||
// make sure to scan and delete command table in one transaction
|
||||
Command command = processService.findOneCommand();
|
||||
if (command != null) {
|
||||
logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
|
||||
|
||||
try{
|
||||
|
||||
ProcessInstance processInstance = processService.handleCommand(logger,
|
||||
getLocalAddress(),
|
||||
this.masterConfig.getMasterExecThreads() - activeCount, command);
|
||||
if (processInstance != null) {
|
||||
logger.info("start master exec thread , split DAG ...");
|
||||
masterExecService.execute(
|
||||
new MasterExecThread(
|
||||
processInstance
|
||||
, processService
|
||||
, nettyRemotingClient
|
||||
, alertManager
|
||||
, masterConfig));
|
||||
}
|
||||
}catch (Exception e){
|
||||
logger.error("scan command error ", e);
|
||||
processService.moveToErrorCommand(command, e.toString());
|
||||
}
|
||||
} else{
|
||||
//indicate that no command ,sleep for 1s
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
}
|
||||
scheduleProcess();
|
||||
}
|
||||
} catch (Exception e){
|
||||
logger.error("master scheduler thread error",e);
|
||||
} finally{
|
||||
zkMasterClient.releaseMutex(mutex);
|
||||
} catch (Exception e) {
|
||||
logger.error("master scheduler thread error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleProcess() throws Exception {
|
||||
InterProcessMutex mutex = null;
|
||||
try {
|
||||
mutex = zkMasterClient.blockAcquireMutex();
|
||||
|
||||
int activeCount = masterExecService.getActiveCount();
|
||||
// make sure to scan and delete command table in one transaction
|
||||
Command command = processService.findOneCommand();
|
||||
if (command != null) {
|
||||
logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());
|
||||
|
||||
try {
|
||||
|
||||
ProcessInstance processInstance = processService.handleCommand(logger,
|
||||
getLocalAddress(),
|
||||
this.masterConfig.getMasterExecThreads() - activeCount, command);
|
||||
if (processInstance != null) {
|
||||
logger.info("start master exec thread , split DAG ...");
|
||||
masterExecService.execute(
|
||||
new MasterExecThread(
|
||||
processInstance
|
||||
, processService
|
||||
, nettyRemotingClient
|
||||
, alertManager
|
||||
, masterConfig));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("scan command error ", e);
|
||||
processService.moveToErrorCommand(command, e.toString());
|
||||
}
|
||||
} else {
|
||||
//indicate that no command ,sleep for 1s
|
||||
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
|
||||
}
|
||||
} finally {
|
||||
zkMasterClient.releaseMutex(mutex);
|
||||
}
|
||||
}
|
||||
|
||||
private String getLocalAddress(){
|
||||
return NetUtils.getHost() + ":" + masterConfig.getListenPort();
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
|
||||
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
|
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
|
||||
@ -72,7 +73,7 @@ public class TaskPriorityQueueConsumerTest {
|
||||
|
||||
|
||||
@Autowired
|
||||
private TaskPriorityQueue taskPriorityQueue;
|
||||
private TaskPriorityQueue<TaskPriority> taskPriorityQueue;
|
||||
|
||||
@Autowired
|
||||
private TaskPriorityQueueConsumer taskPriorityQueueConsumer;
|
||||
@ -142,9 +143,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
|
||||
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
|
||||
@ -180,7 +180,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(1);
|
||||
@ -243,7 +244,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(80);
|
||||
@ -310,7 +312,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
processDefinition.setProjectId(1);
|
||||
taskInstance.setProcessDefine(processDefinition);
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
taskPriorityQueue.put("2_1_2_1_default");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
DataSource dataSource = new DataSource();
|
||||
dataSource.setId(1);
|
||||
@ -402,7 +405,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
|
||||
|
||||
taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
TimeUnit.SECONDS.sleep(10);
|
||||
|
||||
@ -455,7 +459,9 @@ public class TaskPriorityQueueConsumerTest {
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
|
||||
|
||||
boolean res = taskPriorityQueueConsumer.dispatch(1);
|
||||
TaskPriority taskPriority = new TaskPriority();
|
||||
taskPriority.setTaskId(1);
|
||||
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
|
||||
|
||||
Assert.assertFalse(res);
|
||||
}
|
||||
@ -649,7 +655,8 @@ public class TaskPriorityQueueConsumerTest {
|
||||
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
|
||||
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
|
||||
|
||||
taskPriorityQueue.put("2_1_2_1_NoWorkGroup");
|
||||
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
|
||||
taskPriorityQueue.put(taskPriority);
|
||||
|
||||
taskPriorityQueueConsumer.run();
|
||||
|
||||
|
@ -15,14 +15,15 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.entity;
|
||||
package org.apache.dolphinscheduler.service.queue;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.*;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* task priority info
|
||||
*/
|
||||
public class TaskPriority {
|
||||
public class TaskPriority implements Comparable<TaskPriority> {
|
||||
|
||||
/**
|
||||
* processInstancePriority
|
||||
@ -50,9 +51,9 @@ public class TaskPriority {
|
||||
private String groupName;
|
||||
|
||||
/**
|
||||
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
|
||||
* context
|
||||
*/
|
||||
private String taskPriorityInfo;
|
||||
private Map<String, String> context;
|
||||
|
||||
public TaskPriority(){}
|
||||
|
||||
@ -65,15 +66,6 @@ public class TaskPriority {
|
||||
this.taskInstancePriority = taskInstancePriority;
|
||||
this.taskId = taskId;
|
||||
this.groupName = groupName;
|
||||
this.taskPriorityInfo = this.processInstancePriority +
|
||||
UNDERLINE +
|
||||
this.processInstanceId +
|
||||
UNDERLINE +
|
||||
this.taskInstancePriority +
|
||||
UNDERLINE +
|
||||
this.taskId +
|
||||
UNDERLINE +
|
||||
this.groupName;
|
||||
}
|
||||
|
||||
public int getProcessInstancePriority() {
|
||||
@ -104,6 +96,10 @@ public class TaskPriority {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public Map<String, String> getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public void setTaskId(int taskId) {
|
||||
this.taskId = taskId;
|
||||
}
|
||||
@ -116,32 +112,61 @@ public class TaskPriority {
|
||||
this.groupName = groupName;
|
||||
}
|
||||
|
||||
public String getTaskPriorityInfo() {
|
||||
return taskPriorityInfo;
|
||||
public void setContext(Map<String, String> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public void setTaskPriorityInfo(String taskPriorityInfo) {
|
||||
this.taskPriorityInfo = taskPriorityInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* taskPriorityInfo convert taskPriority
|
||||
*
|
||||
* @param taskPriorityInfo taskPriorityInfo
|
||||
* @return TaskPriority
|
||||
*/
|
||||
public static TaskPriority of(String taskPriorityInfo){
|
||||
String[] parts = taskPriorityInfo.split(UNDERLINE);
|
||||
|
||||
if (parts.length != 5) {
|
||||
throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo));
|
||||
@Override
|
||||
public int compareTo(TaskPriority other) {
|
||||
if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
|
||||
return 1;
|
||||
}
|
||||
TaskPriority taskPriority = new TaskPriority(
|
||||
Integer.parseInt(parts[0]),
|
||||
Integer.parseInt(parts[1]),
|
||||
Integer.parseInt(parts[2]),
|
||||
Integer.parseInt(parts[3]),
|
||||
parts[4]);
|
||||
return taskPriority;
|
||||
if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (this.getProcessInstanceId() > other.getProcessInstanceId()) {
|
||||
return 1;
|
||||
}
|
||||
if (this.getProcessInstanceId() < other.getProcessInstanceId()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
|
||||
return 1;
|
||||
}
|
||||
if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (this.getTaskId() > other.getTaskId()) {
|
||||
return 1;
|
||||
}
|
||||
if (this.getTaskId() < other.getTaskId()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return this.getGroupName().compareTo(other.getGroupName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TaskPriority that = (TaskPriority) o;
|
||||
return processInstancePriority == that.processInstancePriority
|
||||
&& processInstanceId == that.processInstanceId
|
||||
&& taskInstancePriority == that.taskInstancePriority
|
||||
&& taskId == that.taskId
|
||||
&& Objects.equals(groupName, that.groupName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
|
||||
}
|
||||
}
|
@ -17,24 +17,18 @@
|
||||
|
||||
package org.apache.dolphinscheduler.service.queue;
|
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
|
||||
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
|
||||
|
||||
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* A singleton of a task queue implemented with zookeeper
|
||||
* tasks queue implementation
|
||||
*/
|
||||
@Service
|
||||
public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
|
||||
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
|
||||
/**
|
||||
* queue size
|
||||
*/
|
||||
@ -43,7 +37,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
|
||||
/**
|
||||
* queue
|
||||
*/
|
||||
private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
|
||||
private PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE);
|
||||
|
||||
/**
|
||||
* put task takePriorityInfo
|
||||
@ -52,7 +46,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
|
||||
* @throws TaskPriorityQueueException
|
||||
*/
|
||||
@Override
|
||||
public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
|
||||
public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
|
||||
queue.put(taskPriorityInfo);
|
||||
}
|
||||
|
||||
@ -63,7 +57,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
|
||||
* @throws TaskPriorityQueueException
|
||||
*/
|
||||
@Override
|
||||
public String take() throws TaskPriorityQueueException, InterruptedException {
|
||||
public TaskPriority take() throws TaskPriorityQueueException, InterruptedException {
|
||||
return queue.take();
|
||||
}
|
||||
|
||||
@ -77,36 +71,4 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
|
||||
public int size() throws TaskPriorityQueueException {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* TaskInfoComparator
|
||||
*/
|
||||
private class TaskInfoComparator implements Comparator<String> {
|
||||
|
||||
/**
|
||||
* compare o1 o2
|
||||
*
|
||||
* @param o1 o1
|
||||
* @param o2 o2
|
||||
* @return compare result
|
||||
*/
|
||||
@Override
|
||||
public int compare(String o1, String o2) {
|
||||
String s1 = o1;
|
||||
String s2 = o2;
|
||||
String[] s1Array = s1.split(UNDERLINE);
|
||||
if (s1Array.length > TASK_INFO_LENGTH) {
|
||||
// warning: if this length > 5, need to be changed
|
||||
s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
|
||||
}
|
||||
|
||||
String[] s2Array = s2.split(UNDERLINE);
|
||||
if (s2Array.length > TASK_INFO_LENGTH) {
|
||||
// warning: if this length > 5, need to be changed
|
||||
s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
|
||||
}
|
||||
|
||||
return s1.compareTo(s2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package queue;
|
||||
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TaskPriorityTest {
|
||||
|
||||
@Test
|
||||
public void testSort() {
|
||||
TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
|
||||
TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
|
||||
TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
|
||||
List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
|
||||
Collections.sort(taskPrioritys);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(priorityOne, priorityTwo, priorityThree),
|
||||
taskPrioritys
|
||||
);
|
||||
|
||||
priorityOne = new TaskPriority(0, 1, 0, 0, "default");
|
||||
priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
|
||||
priorityThree = new TaskPriority(0, 3, 0, 0, "default");
|
||||
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
|
||||
Collections.sort(taskPrioritys);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(priorityOne, priorityTwo, priorityThree),
|
||||
taskPrioritys
|
||||
);
|
||||
|
||||
priorityOne = new TaskPriority(0, 0, 1, 0, "default");
|
||||
priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
|
||||
priorityThree = new TaskPriority(0, 0, 3, 0, "default");
|
||||
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
|
||||
Collections.sort(taskPrioritys);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(priorityOne, priorityTwo, priorityThree),
|
||||
taskPrioritys
|
||||
);
|
||||
|
||||
priorityOne = new TaskPriority(0, 0, 0, 1, "default");
|
||||
priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
|
||||
priorityThree = new TaskPriority(0, 0, 0, 3, "default");
|
||||
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
|
||||
Collections.sort(taskPrioritys);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(priorityOne, priorityTwo, priorityThree),
|
||||
taskPrioritys
|
||||
);
|
||||
|
||||
priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
|
||||
priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
|
||||
priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
|
||||
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
|
||||
Collections.sort(taskPrioritys);
|
||||
Assert.assertEquals(
|
||||
Arrays.asList(priorityOne, priorityTwo, priorityThree),
|
||||
taskPrioritys
|
||||
);
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@
|
||||
|
||||
package queue;
|
||||
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriority;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
|
||||
import org.junit.Test;
|
||||
@ -31,19 +32,16 @@ public class TaskUpdateQueueTest {
|
||||
@Test
|
||||
public void testQueue() throws Exception{
|
||||
|
||||
// ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName}
|
||||
|
||||
/**
|
||||
* 1_1_2_1_default
|
||||
* 1_1_2_2_default
|
||||
* 1_1_0_3_default
|
||||
* 1_1_0_4_default
|
||||
*/
|
||||
|
||||
String taskInfo1 = "1_1_2_1_default";
|
||||
String taskInfo2 = "1_1_2_2_default";
|
||||
String taskInfo3 = "1_1_0_3_default";
|
||||
String taskInfo4 = "1_1_0_4_default";
|
||||
TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default");
|
||||
TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default");
|
||||
TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default");
|
||||
TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default");
|
||||
|
||||
TaskPriorityQueue queue = new TaskPriorityQueueImpl();
|
||||
queue.put(taskInfo1);
|
||||
@ -51,9 +49,9 @@ public class TaskUpdateQueueTest {
|
||||
queue.put(taskInfo3);
|
||||
queue.put(taskInfo4);
|
||||
|
||||
assertEquals("1_1_0_3_default", queue.take());
|
||||
assertEquals("1_1_0_4_default", queue.take());
|
||||
assertEquals("1_1_2_1_default",queue.take());
|
||||
assertEquals("1_1_2_2_default",queue.take());
|
||||
assertEquals(taskInfo3, queue.take());
|
||||
assertEquals(taskInfo4, queue.take());
|
||||
assertEquals(taskInfo1, queue.take());
|
||||
assertEquals(taskInfo2, queue.take());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user