Fix workflow instance may failover many times due to doesn't check the restart time (#11445) (#12010)

(cherry picked from commit 0ca3086296)
This commit is contained in:
Wenjun Ruan 2022-09-17 17:51:40 +08:00 committed by GitHub
parent 0237654e96
commit 41a6d09c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 13 deletions

View File

@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.master.service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -31,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
@ -41,9 +47,9 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@ -51,14 +57,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
@Service
public class MasterFailoverService {
@ -70,15 +68,19 @@ public class MasterFailoverService {
private final NettyExecutorManager nettyExecutorManager;
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager) {
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
}
@ -282,6 +284,15 @@ public class MasterFailoverService {
// The processInstance is newly created
return false;
}
if (processInstance.getRestartTime() != null && processInstance.getRestartTime().after(beFailoveredMasterStartupTime)) {
// the processInstance is already be failovered.
return false;
}
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
// the processInstance is a running process instance in the current master
return false;
}
return true;
}

View File

@ -86,6 +86,9 @@ public class FailoverServiceTest {
@Mock
private NettyExecutorManager nettyExecutorManager;
@Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private static int masterPort = 5678;
private static int workerPort = 1234;
@ -104,7 +107,7 @@ public class FailoverServiceTest {
given(masterConfig.getListenPort()).willReturn(masterPort);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager);
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,