diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java index 179ae1bef8..343e8c63dd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandContext; import org.apache.dolphinscheduler.remote.command.CommandHeader; import org.apache.dolphinscheduler.remote.command.CommandType; import org.slf4j.Logger; @@ -54,16 +55,34 @@ public class NettyDecoder extends ReplayingDecoder { switch (state()){ case MAGIC: checkMagic(in.readByte()); + checkpoint(State.VERSION); + // fallthru + case VERSION: + checkVersion(in.readByte()); checkpoint(State.COMMAND); + // fallthru case COMMAND: commandHeader.setType(in.readByte()); checkpoint(State.OPAQUE); + // fallthru case OPAQUE: commandHeader.setOpaque(in.readLong()); + checkpoint(State.CONTEXT_LENGTH); + // fallthru + case CONTEXT_LENGTH: + commandHeader.setContextLength(in.readInt()); + checkpoint(State.CONTEXT); + // fallthru + case CONTEXT: + byte[] context = new byte[commandHeader.getContextLength()]; + in.readBytes(context); + commandHeader.setContext(context); checkpoint(State.BODY_LENGTH); + // fallthru case BODY_LENGTH: commandHeader.setBodyLength(in.readInt()); checkpoint(State.BODY); + // fallthru case BODY: byte[] body = new byte[commandHeader.getBodyLength()]; in.readBytes(body); @@ -71,6 +90,7 @@ public class NettyDecoder extends ReplayingDecoder { Command packet = new Command(); packet.setType(commandType(commandHeader.getType())); packet.setOpaque(commandHeader.getOpaque()); + packet.setContext(CommandContext.valueOf(commandHeader.getContext())); packet.setBody(body); out.add(packet); // @@ -105,10 +125,23 @@ public class NettyDecoder extends ReplayingDecoder { } } + /** + * check version + * @param version + */ + private void checkVersion(byte version) { + if (version != Command.VERSION) { + throw new IllegalArgumentException("illegal protocol [version]" + version); + } + } + enum State{ MAGIC, + VERSION, COMMAND, OPAQUE, + CONTEXT_LENGTH, + CONTEXT, BODY_LENGTH, BODY; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java index 4e9836a26f..785ee5aaf2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyEncoder.java @@ -42,11 +42,18 @@ public class NettyEncoder extends MessageToByteEncoder { throw new Exception("encode msg is null"); } out.writeByte(Command.MAGIC); + out.writeByte(Command.VERSION); out.writeByte(msg.getType().ordinal()); out.writeLong(msg.getOpaque()); + writeContext(msg, out); out.writeInt(msg.getBody().length); out.writeBytes(msg.getBody()); } + private void writeContext(Command msg, ByteBuf out) { + byte[] headerBytes = msg.getContext().toBytes(); + out.writeInt(headerBytes.length); + out.writeBytes(headerBytes); + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java index ed46e1ff51..9baa321a9e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java @@ -28,6 +28,7 @@ public class Command implements Serializable { private static final AtomicLong REQUEST_ID = new AtomicLong(1); public static final byte MAGIC = (byte) 0xbabe; + public static final byte VERSION = 0; public Command(){ this.opaque = REQUEST_ID.getAndIncrement(); @@ -47,6 +48,11 @@ public class Command implements Serializable { */ private long opaque; + /** + * request context + */ + private CommandContext context = new CommandContext(); + /** * data body */ @@ -76,6 +82,14 @@ public class Command implements Serializable { this.body = body; } + public CommandContext getContext() { + return context; + } + + public void setContext(CommandContext context) { + this.context = context; + } + @Override public int hashCode() { final int prime = 31; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java new file mode 100644 index 0000000000..c9febee6fc --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * command context + */ +public class CommandContext implements Serializable { + + private Map items = new LinkedHashMap<>(); + + public Map getItems() { + return items; + } + + public void setItems(Map items) { + this.items = items; + } + + public void put(String key, String value) { + items.put(key, value); + } + + public String get(String key) { + return items.get(key); + } + + public byte[] toBytes() { + return JSONUtils.toJsonByteArray(this); + } + + public static CommandContext valueOf(byte[] src) { + return JSONUtils.parseObject(src, CommandContext.class); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java index 78948a5c0c..9e83a426f9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandHeader.java @@ -33,6 +33,16 @@ public class CommandHeader implements Serializable { */ private long opaque; + /** + * context length + */ + private int contextLength; + + /** + * context + */ + private byte[] context; + /** * body length */ @@ -61,4 +71,20 @@ public class CommandHeader implements Serializable { public void setOpaque(long opaque) { this.opaque = opaque; } + + public int getContextLength() { + return contextLength; + } + + public void setContextLength(int contextLength) { + this.contextLength = contextLength; + } + + public byte[] getContext() { + return context; + } + + public void setContext(byte[] context) { + this.context = context; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index a407e4ea90..23255084e0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -50,13 +50,13 @@ import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskPriority; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import java.util.ArrayList; @@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread { * taskUpdateQueue */ @Autowired - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueue; /** * processService @@ -119,7 +119,7 @@ public class TaskPriorityQueueConsumer extends Thread { @Override public void run() { - List failedDispatchTasks = new ArrayList<>(); + List failedDispatchTasks = new ArrayList<>(); while (Stopper.isRunning()) { try { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); @@ -130,15 +130,14 @@ public class TaskPriorityQueueConsumer extends Thread { continue; } // if not task , blocking here - String taskPriorityInfo = taskPriorityQueue.take(); - TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo); - boolean dispatchResult = dispatch(taskPriority.getTaskId()); + TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { - failedDispatchTasks.add(taskPriorityInfo); + failedDispatchTasks.add(taskPriority); } } if (!failedDispatchTasks.isEmpty()) { - for (String dispatchFailedTask : failedDispatchTasks) { + for (TaskPriority dispatchFailedTask : failedDispatchTasks) { taskPriorityQueue.put(dispatchFailedTask); } // If there are tasks in a cycle that cannot find the worker group, @@ -157,12 +156,13 @@ public class TaskPriorityQueueConsumer extends Thread { /** * dispatch task * - * @param taskInstanceId taskInstanceId + * @param taskPriority taskPriority * @return result */ - protected boolean dispatch(int taskInstanceId) { + protected boolean dispatch(TaskPriority taskPriority) { boolean result = false; try { + int taskInstanceId = taskPriority.getTaskId(); TaskExecutionContext context = getTaskExecutionContext(taskInstanceId); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f5c3708af1..fcff67f15b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -16,8 +16,6 @@ */ package org.apache.dolphinscheduler.server.master.runner; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; - import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -27,17 +25,15 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import java.util.concurrent.Callable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; import java.util.Date; import java.util.concurrent.Callable; @@ -217,14 +213,14 @@ public class MasterBaseTaskExecThread implements Callable { logger.info("task ready to submit: {}", taskInstance); /** - * taskPriorityInfo + * taskPriority */ - String taskPriorityInfo = buildTaskPriorityInfo(processInstance.getProcessInstancePriority().getCode(), + TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); - taskUpdateQueue.put(taskPriorityInfo); + taskUpdateQueue.put(taskPriority); logger.info(String.format("master submit success, task : %s", taskInstance.getName()) ); return true; }catch (Exception e){ @@ -235,29 +231,22 @@ public class MasterBaseTaskExecThread implements Callable { } /** - * buildTaskPriorityInfo + * buildTaskPriority * * @param processInstancePriority processInstancePriority * @param processInstanceId processInstanceId * @param taskInstancePriority taskInstancePriority * @param taskInstanceId taskInstanceId * @param workerGroup workerGroup - * @return TaskPriorityInfo + * @return TaskPriority */ - private String buildTaskPriorityInfo(int processInstancePriority, - int processInstanceId, - int taskInstancePriority, - int taskInstanceId, - String workerGroup) { - return processInstancePriority + - UNDERLINE + - processInstanceId + - UNDERLINE + - taskInstancePriority + - UNDERLINE + - taskInstanceId + - UNDERLINE + - workerGroup; + private TaskPriority buildTaskPriority(int processInstancePriority, + int processInstanceId, + int taskInstancePriority, + int taskInstanceId, + String workerGroup) { + return new TaskPriority(processInstancePriority, processInstanceId, + taskInstancePriority, taskInstanceId, workerGroup); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 30dd0f9f12..b0e0528c3e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -124,55 +124,60 @@ public class MasterSchedulerService extends Thread { public void run() { logger.info("master scheduler started"); while (Stopper.isRunning()){ - InterProcessMutex mutex = null; try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); - if(!runCheckFlag) { + if (!runCheckFlag) { Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; } if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) { - - mutex = zkMasterClient.blockAcquireMutex(); - - int activeCount = masterExecService.getActiveCount(); - // make sure to scan and delete command table in one transaction - Command command = processService.findOneCommand(); - if (command != null) { - logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); - - try{ - - ProcessInstance processInstance = processService.handleCommand(logger, - getLocalAddress(), - this.masterConfig.getMasterExecThreads() - activeCount, command); - if (processInstance != null) { - logger.info("start master exec thread , split DAG ..."); - masterExecService.execute( - new MasterExecThread( - processInstance - , processService - , nettyRemotingClient - , alertManager - , masterConfig)); - } - }catch (Exception e){ - logger.error("scan command error ", e); - processService.moveToErrorCommand(command, e.toString()); - } - } else{ - //indicate that no command ,sleep for 1s - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - } + scheduleProcess(); } - } catch (Exception e){ - logger.error("master scheduler thread error",e); - } finally{ - zkMasterClient.releaseMutex(mutex); + } catch (Exception e) { + logger.error("master scheduler thread error", e); } } } + private void scheduleProcess() throws Exception { + InterProcessMutex mutex = null; + try { + mutex = zkMasterClient.blockAcquireMutex(); + + int activeCount = masterExecService.getActiveCount(); + // make sure to scan and delete command table in one transaction + Command command = processService.findOneCommand(); + if (command != null) { + logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType()); + + try { + + ProcessInstance processInstance = processService.handleCommand(logger, + getLocalAddress(), + this.masterConfig.getMasterExecThreads() - activeCount, command); + if (processInstance != null) { + logger.info("start master exec thread , split DAG ..."); + masterExecService.execute( + new MasterExecThread( + processInstance + , processService + , nettyRemotingClient + , alertManager + , masterConfig)); + } + } catch (Exception e) { + logger.error("scan command error ", e); + processService.moveToErrorCommand(command, e.toString()); + } + } else { + //indicate that no command ,sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + } + } finally { + zkMasterClient.releaseMutex(mutex); + } + } + private String getLocalAddress(){ return NetUtils.getHost() + ":" + masterConfig.getListenPort(); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 049e30e732..8c2321dd8e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; @@ -72,7 +73,7 @@ public class TaskPriorityQueueConsumerTest { @Autowired - private TaskPriorityQueue taskPriorityQueue; + private TaskPriorityQueue taskPriorityQueue; @Autowired private TaskPriorityQueueConsumer taskPriorityQueueConsumer; @@ -142,9 +143,8 @@ public class TaskPriorityQueueConsumerTest { taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -180,7 +180,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(1); @@ -243,7 +244,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(80); @@ -310,7 +312,8 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setProjectId(1); taskInstance.setProcessDefine(processDefinition); Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); - taskPriorityQueue.put("2_1_2_1_default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); dataSource.setId(1); @@ -402,7 +405,8 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -455,7 +459,9 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - boolean res = taskPriorityQueueConsumer.dispatch(1); + TaskPriority taskPriority = new TaskPriority(); + taskPriority.setTaskId(1); + boolean res = taskPriorityQueueConsumer.dispatch(taskPriority); Assert.assertFalse(res); } @@ -649,7 +655,8 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - taskPriorityQueue.put("2_1_2_1_NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + taskPriorityQueue.put(taskPriority); taskPriorityQueueConsumer.run(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java similarity index 56% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java index 991eeed493..a872f6db9f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.service.queue; -import static org.apache.dolphinscheduler.common.Constants.*; +import java.util.Map; +import java.util.Objects; /** * task priority info */ -public class TaskPriority { +public class TaskPriority implements Comparable { /** * processInstancePriority @@ -50,9 +51,9 @@ public class TaskPriority { private String groupName; /** - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName} + * context */ - private String taskPriorityInfo; + private Map context; public TaskPriority(){} @@ -65,15 +66,6 @@ public class TaskPriority { this.taskInstancePriority = taskInstancePriority; this.taskId = taskId; this.groupName = groupName; - this.taskPriorityInfo = this.processInstancePriority + - UNDERLINE + - this.processInstanceId + - UNDERLINE + - this.taskInstancePriority + - UNDERLINE + - this.taskId + - UNDERLINE + - this.groupName; } public int getProcessInstancePriority() { @@ -104,6 +96,10 @@ public class TaskPriority { return taskId; } + public Map getContext() { + return context; + } + public void setTaskId(int taskId) { this.taskId = taskId; } @@ -116,32 +112,61 @@ public class TaskPriority { this.groupName = groupName; } - public String getTaskPriorityInfo() { - return taskPriorityInfo; + public void setContext(Map context) { + this.context = context; } - public void setTaskPriorityInfo(String taskPriorityInfo) { - this.taskPriorityInfo = taskPriorityInfo; - } - - /** - * taskPriorityInfo convert taskPriority - * - * @param taskPriorityInfo taskPriorityInfo - * @return TaskPriority - */ - public static TaskPriority of(String taskPriorityInfo){ - String[] parts = taskPriorityInfo.split(UNDERLINE); - - if (parts.length != 5) { - throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo)); + @Override + public int compareTo(TaskPriority other) { + if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) { + return 1; } - TaskPriority taskPriority = new TaskPriority( - Integer.parseInt(parts[0]), - Integer.parseInt(parts[1]), - Integer.parseInt(parts[2]), - Integer.parseInt(parts[3]), - parts[4]); - return taskPriority; + if (this.getProcessInstancePriority() < other.getProcessInstancePriority()) { + return -1; + } + + if (this.getProcessInstanceId() > other.getProcessInstanceId()) { + return 1; + } + if (this.getProcessInstanceId() < other.getProcessInstanceId()) { + return -1; + } + + if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) { + return 1; + } + if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) { + return -1; + } + + if (this.getTaskId() > other.getTaskId()) { + return 1; + } + if (this.getTaskId() < other.getTaskId()) { + return -1; + } + + return this.getGroupName().compareTo(other.getGroupName()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskPriority that = (TaskPriority) o; + return processInstancePriority == that.processInstancePriority + && processInstanceId == that.processInstanceId + && taskInstancePriority == that.taskInstancePriority + && taskId == that.taskId + && Objects.equals(groupName, that.groupName); + } + + @Override + public int hashCode() { + return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index aefad8499c..694d4c4763 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -17,24 +17,18 @@ package org.apache.dolphinscheduler.service.queue; -import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; - import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; -import java.util.Comparator; import java.util.concurrent.PriorityBlockingQueue; import org.springframework.stereotype.Service; - - /** * A singleton of a task queue implemented with zookeeper * tasks queue implementation */ @Service -public class TaskPriorityQueueImpl implements TaskPriorityQueue { +public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue size */ @@ -43,7 +37,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { /** * queue */ - private PriorityBlockingQueue queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private PriorityBlockingQueue queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE); /** * put task takePriorityInfo @@ -52,7 +46,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * @throws TaskPriorityQueueException */ @Override - public void put(String taskPriorityInfo) throws TaskPriorityQueueException { + public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException { queue.put(taskPriorityInfo); } @@ -63,7 +57,7 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { * @throws TaskPriorityQueueException */ @Override - public String take() throws TaskPriorityQueueException, InterruptedException { + public TaskPriority take() throws TaskPriorityQueueException, InterruptedException { return queue.take(); } @@ -77,36 +71,4 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { public int size() throws TaskPriorityQueueException { return queue.size(); } - - /** - * TaskInfoComparator - */ - private class TaskInfoComparator implements Comparator { - - /** - * compare o1 o2 - * - * @param o1 o1 - * @param o2 o2 - * @return compare result - */ - @Override - public int compare(String o1, String o2) { - String s1 = o1; - String s2 = o2; - String[] s1Array = s1.split(UNDERLINE); - if (s1Array.length > TASK_INFO_LENGTH) { - // warning: if this length > 5, need to be changed - s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE)); - } - - String[] s2Array = s2.split(UNDERLINE); - if (s2Array.length > TASK_INFO_LENGTH) { - // warning: if this length > 5, need to be changed - s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE)); - } - - return s1.compareTo(s2); - } - } } diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java new file mode 100644 index 0000000000..151177016f --- /dev/null +++ b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue; + +import org.apache.dolphinscheduler.service.queue.TaskPriority; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +public class TaskPriorityTest { + + @Test + public void testSort() { + TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default"); + TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default"); + TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default"); + List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 1, 0, 0, "default"); + priorityTwo = new TaskPriority(0, 2, 0, 0, "default"); + priorityThree = new TaskPriority(0, 3, 0, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 1, 0, "default"); + priorityTwo = new TaskPriority(0, 0, 2, 0, "default"); + priorityThree = new TaskPriority(0, 0, 3, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 1, "default"); + priorityTwo = new TaskPriority(0, 0, 0, 2, "default"); + priorityThree = new TaskPriority(0, 0, 0, 3, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, "default_3"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + } +} diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java index ca6c083a67..2c13afa227 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java @@ -17,6 +17,7 @@ package queue; +import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; import org.junit.Test; @@ -31,19 +32,16 @@ public class TaskUpdateQueueTest { @Test public void testQueue() throws Exception{ - // ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_${groupName} - /** * 1_1_2_1_default * 1_1_2_2_default * 1_1_0_3_default * 1_1_0_4_default */ - - String taskInfo1 = "1_1_2_1_default"; - String taskInfo2 = "1_1_2_2_default"; - String taskInfo3 = "1_1_0_3_default"; - String taskInfo4 = "1_1_0_4_default"; + TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default"); + TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default"); + TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default"); + TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default"); TaskPriorityQueue queue = new TaskPriorityQueueImpl(); queue.put(taskInfo1); @@ -51,9 +49,9 @@ public class TaskUpdateQueueTest { queue.put(taskInfo3); queue.put(taskInfo4); - assertEquals("1_1_0_3_default", queue.take()); - assertEquals("1_1_0_4_default", queue.take()); - assertEquals("1_1_2_1_default",queue.take()); - assertEquals("1_1_2_2_default",queue.take()); + assertEquals(taskInfo3, queue.take()); + assertEquals(taskInfo4, queue.take()); + assertEquals(taskInfo1, queue.take()); + assertEquals(taskInfo2, queue.take()); } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue index a982dff7c8..24b3bbd481 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/flink.vue @@ -81,6 +81,19 @@
+
+ {{$t('appName')}} + + + + +
{{$t('jobManagerMemory')}} @@ -216,6 +229,8 @@ jobManagerMemory: '1G', // taskManager Memory taskManagerMemory: '2G', + // Flink Job Name + appName: '', // Command line argument mainArgs: '', // Other parameters @@ -288,6 +303,11 @@ return false } + if (!this.appName) { + this.$message.warning(`${i18n.$t('Please enter the job name of Flink')}`) + return false + } + if (!this.jobManagerMemory) { this.$message.warning(`${i18n.$t('Please enter jobManager memory')}`) return false @@ -333,6 +353,7 @@ flinkVersion: this.flinkVersion, slot: this.slot, taskManager: this.taskManager, + appName: this.appName, jobManagerMemory: this.jobManagerMemory, taskManagerMemory: this.taskManagerMemory, mainArgs: this.mainArgs, @@ -468,6 +489,7 @@ localParams: this.localParams, slot: this.slot, taskManager: this.taskManager, + appName: this.appName, jobManagerMemory: this.jobManagerMemory, taskManagerMemory: this.taskManagerMemory, mainArgs: this.mainArgs, diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index 7d449d2be8..2ab56cb5ed 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -108,6 +108,7 @@ export default { 'Driver memory use': 'Driver memory use', 'Please enter driver memory use': 'Please enter driver memory use', 'Number of Executors': 'Number of Executors', + 'Please enter the job name of Flink': 'Please enter the job name of Flink', 'Please enter the number of Executor': 'Please enter the number of Executor', 'Executor memory': 'Executor memory', 'Please enter the Executor memory': 'Please enter the Executor memory', @@ -339,6 +340,7 @@ export default { 'Complement Data': 'Complement Data', slot: 'slot', taskManager: 'taskManager', + appName: 'Flink job name', jobManagerMemory: 'jobManagerMemory', taskManagerMemory: 'taskManagerMemory', 'Scheduling execution': 'Scheduling execution', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 0b46228a31..9198c988d7 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -106,6 +106,7 @@ export default { 'Driver memory use': 'Driver内存数', 'Please enter driver memory use': '请输入Driver内存数', 'Number of Executors': 'Executor数量', + 'Please enter the job name of Flink': '请输入Flink任务名称', 'Please enter the number of Executor': '请输入Executor数量', 'Executor memory': 'Executor内存数', 'Please enter the Executor memory': '请输入Executor内存数', @@ -538,6 +539,7 @@ export default { 'Complement range': '补数范围', slot: 'slot数量', taskManager: 'taskManager数量', + appName: 'Flink任务名称', jobManagerMemory: 'jobManager内存数', taskManagerMemory: 'taskManager内存数', 'Http Url': '请求地址',