From 41a6d09c4f960be0f10129f52c60b894adf57a89 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sat, 17 Sep 2022 17:51:40 +0800 Subject: [PATCH] Fix workflow instance may failover many times due to doesn't check the restart time (#11445) (#12010) (cherry picked from commit 0ca308629677d6df2b0d32db4d9d7f3aac78af8d) --- .../master/service/MasterFailoverService.java | 35 ++++++++++++------- .../master/service/FailoverServiceTest.java | 5 ++- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 9aa3f2d7b5..5e422f7600 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -17,6 +17,11 @@ package org.apache.dolphinscheduler.server.master.service; +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; +import lombok.NonNull; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.time.StopWatch; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -31,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; @@ -41,9 +47,9 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; import java.util.Date; import java.util.List; @@ -51,14 +57,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; -import lombok.NonNull; - @Service public class MasterFailoverService { @@ -70,15 +68,19 @@ public class MasterFailoverService { private final NettyExecutorManager nettyExecutorManager; + private final ProcessInstanceExecCacheManager processInstanceExecCacheManager; + public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, - @NonNull NettyExecutorManager nettyExecutorManager) { + @NonNull NettyExecutorManager nettyExecutorManager, + @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.nettyExecutorManager = nettyExecutorManager; + this.processInstanceExecCacheManager = processInstanceExecCacheManager; } @@ -282,6 +284,15 @@ public class MasterFailoverService { // The processInstance is newly created return false; } + if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) { + // the processInstance is already be failovered. + return false; + } + + if (processInstanceExecCacheManager.contains(processInstance.getId())) { + // the processInstance is a running process instance in the current master + return false; + } return true; } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index ab71cd75cd..cd26b1a57c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -86,6 +86,9 @@ public class FailoverServiceTest { @Mock private NettyExecutorManager nettyExecutorManager; + @Mock + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + private static int masterPort = 5678; private static int workerPort = 1234; @@ -104,7 +107,7 @@ public class FailoverServiceTest { given(masterConfig.getListenPort()).willReturn(masterPort); MasterFailoverService masterFailoverService = - new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager); + new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager); WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, masterConfig, processService,