Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979)

* Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed

* Move heartbeat error threshold to applicaiton.yml

(cherry picked from commit 2be1d4bf0a)
This commit is contained in:
Wenjun Ruan 2022-07-15 20:06:53 +08:00
parent 17f0be5bfb
commit f3250bf5fa
15 changed files with 137 additions and 78 deletions

View File

@ -50,6 +50,7 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String FOLDER_SEPARATOR = "/";

View File

@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ThreadUtils {
private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class);
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
@ -35,10 +41,7 @@ public class ThreadUtils {
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
@ -48,8 +51,9 @@ public class ThreadUtils {
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ignore) {
} catch (final InterruptedException interruptedException) {
Thread.currentThread().interrupt();
logger.error("Current thread sleep error", interruptedException);
}
}
}

View File

@ -115,31 +115,27 @@ public class MasterServer implements IStoppable {
* @param cause close cause
*/
public void close(String cause) {
try {
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
// set stop signal is true
// execute only once
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap;
MasterRPCServer closedRpcServer = masterRPCServer;
MasterRegistryClient closedMasterRegistryClient = masterRegistryClient;
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
SpringApplicationContext closedSpringContext = springApplicationContext) {
logger.info("Master server is stopping, current cause : {}", cause);
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.masterSchedulerBootstrap.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("MasterServer stop failed, current cause: {}", cause, e);
return;
}
logger.info("MasterServer stopped, current cause: {}", cause);
}
@Override

View File

@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
* Master heart beat task execute interval.
*/
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
*/
private int heartbeatErrorThreshold = 5;
/**
* task submit max retry times.
*/
@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}

View File

@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
* <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
public class MasterRegistryClient {
public class MasterRegistryClient implements AutoCloseable {
/**
* logger
@ -108,7 +108,8 @@ public class MasterRegistryClient {
registryClient.setStoppable(stoppable);
}
public void closeRegistry() {
@Override
public void close() {
// TODO unsubscribe MasterRegistryDataListener
deregister();
}
@ -194,7 +195,8 @@ public class MasterRegistryClient {
masterConfig.getReservedMemory(),
Sets.newHashSet(localNodePath),
Constants.MASTER_TYPE,
registryClient);
registryClient,
masterConfig.getHeartbeatErrorThreshold());
// remove before persist
registryClient.remove(localNodePath);

View File

@ -57,7 +57,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
public class MasterSchedulerBootstrap extends BaseDaemonThread {
public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
@ -112,6 +112,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread {
logger.info("Master schedule bootstrap started...");
}
@Override
public void close() {
logger.info("Master schedule bootstrap stopping...");
logger.info("Master schedule bootstrap stopped...");

View File

@ -98,6 +98,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval

View File

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
private final String serverType;
private final HeartBeat heartBeat;
private final int heartBeatErrorThreshold;
private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
RegistryClient registryClient) {
RegistryClient registryClient,
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
String serverType,
RegistryClient registryClient,
int workerThreadCount,
int workerWaitingTaskCount
) {
int workerWaitingTaskCount,
int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount);
this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat());
}
heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex);
if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) {
registryClient.getStoppable()
.stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes);
}
}
}
}

View File

@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class SpringApplicationContext implements ApplicationContextAware {
public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable {
private static ApplicationContext applicationContext;
@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
/**
* Close this application context, destroying all beans in its bean factory.
*/
@Override
public void close() {
((AbstractApplicationContext)applicationContext).close();
}

View File

@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
import org.apache.curator.test.TestingServer;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationFailedEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import lombok.NonNull;
@SpringBootApplication
public class StandaloneServer {
public class StandaloneServer implements ApplicationListener<ApplicationEvent> {
private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class);
private static TestingServer zookeeperServer;
public static void main(String[] args) throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.zookeeper.connect-string", server.getConnectString());
zookeeperServer = new TestingServer(true);
System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString());
SpringApplication.run(StandaloneServer.class, args);
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) {
try (TestingServer closedServer = zookeeperServer) {
// close the zookeeper server
logger.info("Receive spring context close event: {}, will closed zookeeper server", event);
} catch (IOException e) {
logger.error("Close zookeeper server error", e);
}
}
}
}

View File

@ -116,6 +116,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
@ -137,6 +139,8 @@ worker:
exec-threads: 10
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create

View File

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
import java.util.Set;
import javax.annotation.PostConstruct;
@ -111,8 +110,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
this.workerRegistryClient.handleDeadServer();
this.workerManagerThread.start();
@ -129,37 +127,24 @@ public class WorkerServer implements IStoppable {
}
public void close(String cause) {
try {
// execute only once
// set stop signal is true
if (!Stopper.stop()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
if (!Stopper.stop()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient = workerRegistryClient;
AlertClientService closedAlertClientService = alertClientService;
SpringApplicationContext closedSpringContext = springApplicationContext;) {
logger.info("Worker server is stopping, current cause : {}", cause);
try {
// thread sleep 3 seconds for thread quitely stop
Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
} catch (Exception e) {
logger.warn("Worker server close wait error", e);
}
// close
this.workerRpcServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
// kill running tasks
this.killAllRunningTasks();
// close the application context
this.springApplicationContext.close();
logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("Worker server stop failed, current cause: {}", cause, e);
return;
}
logger.info("Worker server stopped, current cause: {}", cause);
}
@Override

View File

@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234;
private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
/**
* Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
*/
private int heartbeatErrorThreshold = 5;
private int hostWeight = 100;
private boolean tenantAutoCreate = true;
private boolean tenantDistributedUser = false;
@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value");
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}

View File

@ -55,7 +55,7 @@ import com.google.common.collect.Sets;
* worker registry
*/
@Service
public class WorkerRegistryClient {
public class WorkerRegistryClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class);
@ -102,15 +102,15 @@ public class WorkerRegistryClient {
long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize()
);
workerConfig.getMaxCpuLoadAvg(),
workerConfig.getReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_TYPE,
registryClient,
workerConfig.getExecThreads(),
workerManagerThread.getThreadPoolQueueSize(),
workerConfig.getHeartbeatErrorThreshold());
for (String workerZKPath : workerZkPaths) {
// remove before persist
@ -148,8 +148,10 @@ public class WorkerRegistryClient {
logger.error("remove worker zk path exception", ex);
}
this.heartBeatExecutor.shutdownNow();
logger.info("heartbeat executor shutdown");
if (heartBeatExecutor != null) {
heartBeatExecutor.shutdownNow();
logger.info("Heartbeat executor shutdown");
}
registryClient.close();
logger.info("registry client closed");
@ -176,8 +178,9 @@ public class WorkerRegistryClient {
return workerPaths;
}
public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) {
registryClient.handleDeadServer(nodeSet, nodeType, opType);
public void handleDeadServer() {
Set<String> workerZkPaths = getWorkerZkPaths();
registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
}
/**
@ -191,4 +194,9 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
@Override
public void close() throws IOException {
unRegistry();
}
}

View File

@ -60,6 +60,8 @@ worker:
exec-threads: 100
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create