From e6c57430e33df31b56bb0f1a4ecfaecd04e6cfc9 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 27 May 2024 21:53:11 +0800 Subject: [PATCH] [DSIP-44] Set a delay time to TaskExecuteRunnable if it dispatched failed (#16069) --- .../builder/TaskExecutionContextBuilder.java | 1 - .../runner/BaseTaskExecuteRunnable.java | 28 +++ .../runner/DefaultTaskExecuteRunnable.java | 2 +- .../GlobalTaskDispatchWaitingQueue.java | 35 +++- .../GlobalTaskDispatchWaitingQueueLooper.java | 53 +++-- .../PriorityDelayTaskExecuteRunnable.java | 85 -------- .../master/runner/TaskExecuteRunnable.java | 3 +- ...seTaskExecuteRunnableDispatchOperator.java | 14 +- .../master/runner/queue/DelayEntry.java | 82 ++++++++ .../runner/queue/PriorityDelayQueue.java | 41 ++++ .../GlobalTaskDispatchWaitingQueueTest.java | 184 ++++++++++++++++++ .../PriorityDelayTaskExecuteRunnableTest.java | 67 ------- .../master/runner/queue/DelayEntryTest.java | 35 ++++ .../plugin/task/api/TaskExecutionContext.java | 12 +- 14 files changed, 434 insertions(+), 208 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java delete mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java index 832c1b336b..5990a53e0f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java @@ -66,7 +66,6 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setEnvironmentConfig(taskInstance.getEnvironmentConfig()); taskExecutionContext.setHost(taskInstance.getHost()); - taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); taskExecutionContext.setVarPool(taskInstance.getVarPool()); taskExecutionContext.setDryRun(taskInstance.getDryRun()); taskExecutionContext.setTestFlag(taskInstance.getTestFlag()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java index fefdbf3493..2e41173c4b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java @@ -52,4 +52,32 @@ public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable { return taskExecutionContext; } + @Override + public int compareTo(TaskExecuteRunnable other) { + if (other == null) { + return 1; + } + int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() - + other.getWorkflowInstance().getProcessInstancePriority().getCode(); + if (workflowInstancePriorityCompareResult != 0) { + return workflowInstancePriorityCompareResult; + } + + // smaller number, higher priority + int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode() + - other.getTaskInstance().getTaskInstancePriority().getCode(); + if (taskInstancePriorityCompareResult != 0) { + return taskInstancePriorityCompareResult; + } + + // larger number, higher priority + int taskGroupPriorityCompareResult = + taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority(); + if (taskGroupPriorityCompareResult != 0) { + return -taskGroupPriorityCompareResult; + } + // earlier submit time, higher priority + return taskInstance.getFirstSubmitTime().compareTo(other.getTaskInstance().getFirstSubmitTime()); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java index c1b13717bd..ba4f447216 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; -public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable { +public class DefaultTaskExecuteRunnable extends BaseTaskExecuteRunnable { private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java index f03bd6b903..21d6c890f5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java @@ -17,7 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner; -import java.util.concurrent.DelayQueue; +import org.apache.dolphinscheduler.server.master.runner.queue.DelayEntry; +import org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -25,26 +26,42 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** - * The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link DelayQueue}, - * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be consumed by {@link GlobalTaskDispatchWaitingQueueLooper}. + * The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable} + * will be stored in {@link PriorityDelayQueue}, if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be + * consumed by {@link GlobalTaskDispatchWaitingQueueLooper}. + *

+ * The order of {@link TaskExecuteRunnable} in the {@link PriorityDelayQueue} is determined by {@link TaskExecuteRunnable#compareTo}. */ @Slf4j @Component public class GlobalTaskDispatchWaitingQueue { - private final DelayQueue queue = new DelayQueue<>(); + private final PriorityDelayQueue> priorityDelayQueue = new PriorityDelayQueue<>(); - public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) { - queue.put(priorityTaskExecuteRunnable); + /** + * Submit a {@link TaskExecuteRunnable} with delay time 0, it will be consumed immediately. + */ + public void dispatchTaskExecuteRunnable(TaskExecuteRunnable taskExecuteRunnable) { + dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 0); } + /** + * Submit a {@link TaskExecuteRunnable} with delay time, if the delay time <= 0 then it can be consumed. + */ + public void dispatchTaskExecuteRunnableWithDelay(TaskExecuteRunnable taskExecuteRunnable, long delayTimeMills) { + priorityDelayQueue.add(new DelayEntry<>(delayTimeMills, taskExecuteRunnable)); + } + + /** + * Consume {@link TaskExecuteRunnable} from the {@link PriorityDelayQueue}, only the delay time <= 0 can be consumed. + */ @SneakyThrows - public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() { - return queue.take(); + public TaskExecuteRunnable takeTaskExecuteRunnable() { + return priorityDelayQueue.take().getData(); } public int getWaitingDispatchTaskNumber() { - return queue.size(); + return priorityDelayQueue.size(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java index eabbdd8e10..5cfb285c28 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java @@ -18,13 +18,11 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory; -import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; @@ -43,10 +41,6 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new AtomicInteger(); - - private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100; - public GlobalTaskDispatchWaitingQueueLooper() { super("GlobalTaskDispatchWaitingQueueLooper"); } @@ -64,29 +58,34 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple @Override public void run() { - DefaultTaskExecuteRunnable defaultTaskExecuteRunnable; while (RUNNING_FLAG.get()) { - defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable(); - try { - TaskExecutionStatus status = defaultTaskExecuteRunnable.getTaskInstance().getState(); - if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != TaskExecutionStatus.DELAY_EXECUTION) { - log.warn("The TaskInstance {} state is : {}, will not dispatch", - defaultTaskExecuteRunnable.getTaskInstance().getName(), status); - continue; - } + doDispatch(); + } + } - TaskDispatcher taskDispatcher = - taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance()); - taskDispatcher.dispatchTask(defaultTaskExecuteRunnable); - DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0); - } catch (Exception e) { - defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes(); - globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable); - if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) { - ThreadUtils.sleep(10 * 1000L); - } - log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e); + void doDispatch() { + final TaskExecuteRunnable taskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable(); + TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance(); + if (taskInstance == null) { + // This case shouldn't happen, but if it does, log an error and continue + log.error("The TaskInstance is null, drop it(This case shouldn't happen)"); + return; + } + try { + TaskExecutionStatus status = taskInstance.getState(); + if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != TaskExecutionStatus.DELAY_EXECUTION) { + log.warn("The TaskInstance {} state is : {}, will not dispatch", taskInstance.getName(), status); + return; } + taskDispatchFactory.getTaskDispatcher(taskInstance).dispatchTask(taskExecuteRunnable); + } catch (Exception e) { + // If dispatch failed, will put the task back to the queue + // The task will be dispatched after waiting time. + // the waiting time will increase multiple of times, but will not exceed 60 seconds + long waitingTimeMills = Math.max( + taskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1_000L, 60_000L); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, waitingTimeMills); + log.error("Dispatch Task: {} failed will retry after: {}/ms", taskInstance.getName(), waitingTimeMills, e); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java deleted file mode 100644 index 255ec6c8ac..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner; - -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -public abstract class PriorityDelayTaskExecuteRunnable extends BaseTaskExecuteRunnable implements Delayed { - - public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance, - TaskInstance taskInstance, - TaskExecutionContext taskExecutionContext) { - super(workflowInstance, taskInstance, taskExecutionContext); - } - - @Override - public long getDelay(TimeUnit unit) { - return unit.convert( - DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getDelayTime() * 60L), - TimeUnit.SECONDS); - } - - @Override - public int compareTo(Delayed o) { - if (o == null) { - return 1; - } - int delayTimeCompareResult = - Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); - if (delayTimeCompareResult != 0) { - return delayTimeCompareResult; - } - PriorityDelayTaskExecuteRunnable other = (PriorityDelayTaskExecuteRunnable) o; - // the smaller dispatch fail times, the higher priority - int dispatchFailTimesCompareResult = taskExecutionContext.getDispatchFailTimes() - - other.getTaskExecutionContext().getDispatchFailTimes(); - if (dispatchFailTimesCompareResult != 0) { - return dispatchFailTimesCompareResult; - } - int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() - - other.getWorkflowInstance().getProcessInstancePriority().getCode(); - if (workflowInstancePriorityCompareResult != 0) { - return workflowInstancePriorityCompareResult; - } - long workflowInstanceIdCompareResult = workflowInstance.getId().compareTo(other.getWorkflowInstance().getId()); - if (workflowInstanceIdCompareResult != 0) { - return workflowInstancePriorityCompareResult; - } - int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode() - - other.getTaskInstance().getTaskInstancePriority().getCode(); - if (taskInstancePriorityCompareResult != 0) { - return taskInstancePriorityCompareResult; - } - // larger number, higher priority - int taskGroupPriorityCompareResult = - taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority(); - if (taskGroupPriorityCompareResult != 0) { - return -taskGroupPriorityCompareResult; - } - // The task instance shouldn't be equals - return taskInstance.getId().compareTo(other.getTaskInstance().getId()); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java index 8f66189617..62617f4aac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; * This interface is used to define a task which is executing. * todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable */ -public interface TaskExecuteRunnable { +public interface TaskExecuteRunnable extends Comparable { void dispatch(); @@ -40,4 +40,5 @@ public interface TaskExecuteRunnable { TaskInstance getTaskInstance(); TaskExecutionContext getTaskExecutionContext(); + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java index 8fa2e2926d..72073359d3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java @@ -17,14 +17,13 @@ package org.apache.dolphinscheduler.server.master.runner.operator; +import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; @Slf4j @@ -43,16 +42,17 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExe @Override public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) { - long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS); TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance(); - if (remainTime > 0) { + long remainTimeMills = + DateUtils.getRemainTime(taskInstance.getFirstSubmitTime(), taskInstance.getDelayTime() * 60L) * 1_000; + if (remainTimeMills > 0) { taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION); taskInstanceDao.updateById(taskInstance); - log.info("Current taskInstance: {} is choose delay execution, delay time: {}/min, remainTime: {}/s", + log.info("Current taskInstance: {} is choose delay execution, delay time: {}/min, remainTime: {}/ms", taskInstance.getName(), taskInstance.getDelayTime(), - remainTime); + remainTimeMills); } - globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, remainTimeMills); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java new file mode 100644 index 0000000000..da6a750261 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntry.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.queue; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Objects; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import lombok.Getter; + +import org.jetbrains.annotations.NotNull; + +public class DelayEntry> implements Delayed { + + private final long delayTimeMills; + + private final long triggerTimeMills; + + @Getter + private final V data; + + public DelayEntry(long delayTimeMills, V data) { + this.delayTimeMills = delayTimeMills; + this.triggerTimeMills = System.currentTimeMillis() + delayTimeMills; + this.data = checkNotNull(data, "data is null"); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + long remainTimeMills = triggerTimeMills - System.currentTimeMillis(); + if (TimeUnit.MILLISECONDS.equals(unit)) { + return remainTimeMills; + } + return unit.convert(remainTimeMills, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + DelayEntry other = (DelayEntry) o; + int delayTimeMillsCompareResult = Long.compare(delayTimeMills, other.delayTimeMills); + if (delayTimeMillsCompareResult != 0) { + return delayTimeMillsCompareResult; + } + + if (data == null || other.data == null) { + return 0; + } + return data.compareTo(other.data); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DelayEntry that = (DelayEntry) o; + return delayTimeMills == that.delayTimeMills && Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(delayTimeMills, data); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java new file mode 100644 index 0000000000..8ed4869625 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/queue/PriorityDelayQueue.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.queue; + +import java.util.concurrent.DelayQueue; + +import lombok.SneakyThrows; + +public class PriorityDelayQueue { + + private final DelayQueue queue = new DelayQueue<>(); + + public void add(V v) { + queue.put(v); + } + + @SneakyThrows + public V take() { + return queue.take(); + } + + public int size() { + return queue.size(); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java new file mode 100644 index 0000000000..843456b98f --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import static com.google.common.truth.Truth.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; + +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; + +import org.apache.commons.lang3.time.DateUtils; + +import java.time.Duration; +import java.util.Date; +import java.util.concurrent.CompletableFuture; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class GlobalTaskDispatchWaitingQueueTest { + + private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue; + + @BeforeEach + public void setUp() { + globalTaskDispatchWaitingQueue = new GlobalTaskDispatchWaitingQueue(); + } + + @Test + void submitTaskExecuteRunnable() { + TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable(); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable); + Awaitility.await() + .atMost(Duration.ofSeconds(1)) + .untilAsserted( + () -> Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable())); + } + + @Test + void testSubmitTaskExecuteRunnableWithDelay() { + TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable(); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecuteRunnable, 3_000L); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable); + + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()).isNotNull(); + Awaitility.await() + .atLeast(Duration.ofSeconds(2)) + .atMost(Duration.ofSeconds(4)) + .untilAsserted( + () -> Assertions.assertNotNull(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable())); + } + + @Test + void takeTaskExecuteRunnable_NoElementShouldBlock() { + CompletableFuture completableFuture = + CompletableFuture.runAsync(() -> globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable()); + assertThrowsExactly(ConditionTimeoutException.class, + () -> await() + .atLeast(Duration.ofSeconds(2)) + .timeout(Duration.ofSeconds(3)) + .until(completableFuture::isDone)); + } + + @Test + void takeTaskExecuteRunnable_withDifferentTaskInstancePriority() { + TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable(); + taskExecuteRunnable1.getTaskInstance().setId(1); + taskExecuteRunnable1.getTaskInstance().setTaskInstancePriority(Priority.MEDIUM); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1); + + TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable(); + taskExecuteRunnable2.getTaskInstance().setId(2); + taskExecuteRunnable2.getTaskInstance().setTaskInstancePriority(Priority.HIGH); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2); + + TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable(); + taskExecuteRunnable3.getTaskInstance().setId(3); + taskExecuteRunnable3.getTaskInstance().setTaskInstancePriority(Priority.LOW); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3); + + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(2); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(1); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(3); + } + + @Test + void takeTaskExecuteRunnable_withDifferentTaskGroupPriority() { + TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable(); + taskExecuteRunnable1.getTaskInstance().setId(1); + taskExecuteRunnable1.getTaskInstance().setTaskGroupPriority(Priority.MEDIUM.getCode()); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1); + + TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable(); + taskExecuteRunnable2.getTaskInstance().setId(2); + taskExecuteRunnable2.getTaskInstance().setTaskGroupPriority(Priority.HIGH.getCode()); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2); + + TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable(); + taskExecuteRunnable3.getTaskInstance().setId(3); + taskExecuteRunnable3.getTaskInstance().setTaskGroupPriority(Priority.LOW.getCode()); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3); + + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(3); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(1); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(2); + } + + @Test + void takeTaskExecuteRunnable_withDifferentSubmitTime() { + Date now = new Date(); + + TaskExecuteRunnable taskExecuteRunnable1 = createTaskExecuteRunnable(); + taskExecuteRunnable1.getTaskInstance().setId(1); + taskExecuteRunnable1.getTaskInstance().setFirstSubmitTime(now); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable1); + + TaskExecuteRunnable taskExecuteRunnable2 = createTaskExecuteRunnable(); + taskExecuteRunnable2.getTaskInstance().setId(2); + taskExecuteRunnable2.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now, 1)); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable2); + + TaskExecuteRunnable taskExecuteRunnable3 = createTaskExecuteRunnable(); + taskExecuteRunnable3.getTaskInstance().setId(3); + taskExecuteRunnable3.getTaskInstance().setFirstSubmitTime(DateUtils.addMinutes(now, -1)); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable3); + + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(3); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(1); + assertThat(globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable().getTaskInstance().getId()) + .isEqualTo(2); + } + + @Test + void getWaitingDispatchTaskNumber() { + Assertions.assertEquals(0, globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber()); + TaskExecuteRunnable taskExecuteRunnable = createTaskExecuteRunnable(); + globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnable(taskExecuteRunnable); + Assertions.assertEquals(1, globalTaskDispatchWaitingQueue.getWaitingDispatchTaskNumber()); + } + + private TaskExecuteRunnable createTaskExecuteRunnable() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setProcessInstancePriority(Priority.MEDIUM); + + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setTaskInstancePriority(Priority.MEDIUM); + taskInstance.setFirstSubmitTime(new Date()); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + + return new DefaultTaskExecuteRunnable(processInstance, taskInstance, taskExecutionContext, + new TaskExecuteRunnableOperatorManager()); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java deleted file mode 100644 index 778884e066..0000000000 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.execute; - -import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class PriorityDelayTaskExecuteRunnableTest { - - @Test - public void testCompareTo() { - TaskExecuteRunnableOperatorManager taskOperatorManager = new TaskExecuteRunnableOperatorManager(); - - ProcessInstance workflowInstance = new ProcessInstance(); - workflowInstance.setId(1); - workflowInstance.setProcessInstancePriority(Priority.HIGH); - - TaskInstance t1 = new TaskInstance(); - t1.setId(1); - t1.setTaskInstancePriority(Priority.HIGH); - - TaskInstance t2 = new TaskInstance(); - t2.setId(1); - t2.setTaskInstancePriority(Priority.HIGH); - - TaskExecutionContext context1 = new TaskExecutionContext(); - TaskExecutionContext context2 = new TaskExecutionContext(); - PriorityDelayTaskExecuteRunnable p1 = - new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, taskOperatorManager); - PriorityDelayTaskExecuteRunnable p2 = - new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, taskOperatorManager); - - Assertions.assertEquals(0, p1.compareTo(p2)); - - // the higher priority, the higher priority - t2.setTaskInstancePriority(Priority.MEDIUM); - Assertions.assertTrue(p1.compareTo(p2) < 0); - - // the smaller dispatch fail times, the higher priority - context1.setDispatchFailTimes(1); - Assertions.assertTrue(p1.compareTo(p2) > 0); - } - -} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java new file mode 100644 index 0000000000..0e3ab5da36 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/queue/DelayEntryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.queue; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import com.google.common.truth.Truth; + +class DelayEntryTest { + + @Test + void getDelay() { + DelayEntry delayEntry = new DelayEntry<>(1_000L, "Item"); + Truth.assertThat(delayEntry.getDelay(TimeUnit.NANOSECONDS)) + .isWithin(100) + .of(TimeUnit.NANOSECONDS.convert(1_000L, TimeUnit.MILLISECONDS)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 4304ad2c13..b825949e88 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -203,11 +203,6 @@ public class TaskExecutionContext implements Serializable { */ private String workerGroup; - /** - * delay execution time. - */ - private int delayTime; - /** * current execution status */ @@ -262,12 +257,9 @@ public class TaskExecutionContext implements Serializable { private boolean logBufferEnable; - /** - * dispatch fail times - */ private int dispatchFailTimes; - public void increaseDispatchFailTimes() { - this.dispatchFailTimes++; + public int increaseDispatchFailTimes() { + return ++dispatchFailTimes; } }