[DSIP-82][Master/Worker] Use FAILOVER_FINISH_NODES to avoid duplicate workflow/task when failover (#16821)

This commit is contained in:
Wenjun Ruan 2024-11-20 16:04:11 +08:00 committed by GitHub
parent bec678a8b9
commit 9856453cd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 304 additions and 775 deletions

View File

@ -294,7 +294,6 @@ Location: `master-server/conf/application.yaml`
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. | | master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. |
| master.failover-interval | 10 | failover interval, the unit is minute | | master.failover-interval | 10 | failover interval, the unit is minute |
| master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance | | master.kill-application-when-task-failover | true | whether to kill yarn/k8s application when failover taskInstance |
| master.registry-disconnect-strategy.strategy | stop | Used when the master disconnect from registry, default value: stop. Optional values include stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely | | master.registry-disconnect-strategy.max-waiting-time | 100s | Used when the master disconnect from registry, and the disconnect strategy is waiting, this config means the master will waiting to reconnect to registry in given times, and after the waiting times, if the master still cannot connect to registry, will stop itself, if the value is 0s, the Master will wait infinitely |
| master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory | | master.worker-group-refresh-interval | 10s | The interval to refresh worker group from db to memory |
| master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` | | master.command-fetch-strategy.type | ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED` |

View File

@ -33,4 +33,5 @@ This document records the incompatible updates between each version. You need to
* Uniformly name `process` in code as `workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515) * Uniformly name `process` in code as `workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515)
* Deprecated upgrade code of 1.x and 2.x ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543) * Deprecated upgrade code of 1.x and 2.x ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) * Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)

View File

@ -276,30 +276,28 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
位置:`master-server/conf/application.yaml` 位置:`master-server/conf/application.yaml`
| 参数 | 默认值 | 描述 | | 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------| |-----------------------------------------------------------------------------|------------------------------|-----------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 | | master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量用于限制并行的command | | master.pre-exec-threads | 10 | master准备执行任务的数量用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 | | master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 | | master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载负载越低的worker将会有更高的机会被分发任务 | | master.worker-load-balancer-configuration-properties.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载负载越低的worker将会有更高的机会被分发任务 |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 | | master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 | | master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 | | master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 | | master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 | | master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU | | master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU | | master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 | | master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | | master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔单位为分钟 | | master.failover-interval | 10 | failover间隔单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时是否kill掉yarn或k8s application | | master.kill-application-when-task-failover | true | 当任务实例failover时是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting | | master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己在重连时Master会丢弃目前正在执行的工作流值为0表示会无限期等待 | | master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 | | master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` | | master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
## Worker Server相关配置 ## Worker Server相关配置
@ -319,7 +317,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
| worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 | | worker.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | worker最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,worker服务才能接收任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| worker.alert-listen-host | localhost | alert监听host | | worker.alert-listen-host | localhost | alert监听host |
| worker.alert-listen-port | 50052 | alert监听端口 | | worker.alert-listen-port | 50052 | alert监听端口 |
| worker.registry-disconnect-strategy.strategy | stop | 当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己在重连时Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 | | worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己在重连时Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
| worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务放入等待队列中等待空闲线程去执行该任务 | | worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务放入等待队列中等待空闲线程去执行该任务 |
| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | | worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |

View File

@ -31,4 +31,5 @@
* 统一代码中的 `process``workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515) * 统一代码中的 `process``workflow` ([#16515])(https://github.com/apache/dolphinscheduler/pull/16515)
* 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543) * 废弃从 1.x 至 2.x 的升级代码 ([#16543])(https://github.com/apache/dolphinscheduler/pull/16543)
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) * 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)

View File

@ -20,12 +20,17 @@ package org.apache.dolphinscheduler.server.master.cluster;
import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.enums.ServerStatus;
import lombok.Data; import lombok.Data;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@ToString
@SuperBuilder @SuperBuilder
public abstract class BaseServerMetadata implements IClusters.IServerMetadata { public abstract class BaseServerMetadata implements IClusters.IServerMetadata {
// The server startup time in milliseconds.
private final long serverStartupTime;
private final String address; private final String address;
private final double cpuUsage; private final double cpuUsage;

View File

@ -46,12 +46,16 @@ public class ClusterStateMonitors {
log.info("ClusterStateMonitors started..."); log.info("ClusterStateMonitors started...");
} }
void masterRemoved(MasterServerMetadata masterServer) { void masterRemoved(final MasterServerMetadata masterServer) {
systemEventBus.publish(MasterFailoverEvent.of(masterServer.getAddress(), new Date())); // We set a delay of 30 seconds for the master failover event
// If the master can reconnect to registry within 30 seconds, the master will skip failover.
systemEventBus.publish(MasterFailoverEvent.of(masterServer, new Date(), 30_000));
} }
void workerRemoved(WorkerServerMetadata workerServer) { void workerRemoved(final WorkerServerMetadata workerServer) {
systemEventBus.publish(WorkerFailoverEvent.of(workerServer.getAddress(), new Date())); // We set a delay of 30 seconds for the worker failover event
// If the worker can reconnect to registry within 30 seconds, the worker will skip failover.
systemEventBus.publish(WorkerFailoverEvent.of(workerServer, new Date(), 30_000));
} }
} }

View File

@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.master.cluster;
import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.enums.ServerStatus;
import java.util.List; import java.util.List;
import java.util.Optional;
public interface IClusters<S extends IClusters.IServerMetadata> { public interface IClusters<S extends IClusters.IServerMetadata> {
List<S> getServers(); List<S> getServers();
Optional<S> getServer(final String address);
void registerListener(IClustersChangeListener<S> listener); void registerListener(IClustersChangeListener<S> listener);
interface IServerMetadata { interface IServerMetadata {

View File

@ -26,6 +26,7 @@ import org.apache.commons.collections4.list.UnmodifiableList;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -50,6 +51,11 @@ public class MasterClusters extends AbstractClusterSubscribeListener<MasterServe
return UnmodifiableList.unmodifiableList(new ArrayList<>(masterServerMap.values())); return UnmodifiableList.unmodifiableList(new ArrayList<>(masterServerMap.values()));
} }
@Override
public Optional<MasterServerMetadata> getServer(final String address) {
return Optional.ofNullable(masterServerMap.get(address));
}
public List<MasterServerMetadata> getNormalServers() { public List<MasterServerMetadata> getNormalServers() {
List<MasterServerMetadata> normalMasterServers = masterServerMap.values() List<MasterServerMetadata> normalMasterServers = masterServerMap.values()
.stream() .stream()
@ -59,12 +65,12 @@ public class MasterClusters extends AbstractClusterSubscribeListener<MasterServe
} }
@Override @Override
public void registerListener(IClustersChangeListener<MasterServerMetadata> listener) { public void registerListener(final IClustersChangeListener<MasterServerMetadata> listener) {
masterClusterChangeListeners.add(listener); masterClusterChangeListeners.add(listener);
} }
@Override @Override
MasterServerMetadata parseServerFromHeartbeat(String masterHeartBeatJson) { MasterServerMetadata parseServerFromHeartbeat(final String masterHeartBeatJson) {
MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(masterHeartBeatJson, MasterHeartBeat.class); MasterHeartBeat masterHeartBeat = JSONUtils.parseObject(masterHeartBeatJson, MasterHeartBeat.class);
if (masterHeartBeat == null) { if (masterHeartBeat == null) {
return null; return null;
@ -73,7 +79,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener<MasterServe
} }
@Override @Override
public void onServerAdded(MasterServerMetadata masterServer) { public void onServerAdded(final MasterServerMetadata masterServer) {
masterServerMap.put(masterServer.getAddress(), masterServer); masterServerMap.put(masterServer.getAddress(), masterServer);
for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) { for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) {
listener.onServerAdded(masterServer); listener.onServerAdded(masterServer);
@ -81,7 +87,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener<MasterServe
} }
@Override @Override
public void onServerRemove(MasterServerMetadata masterServer) { public void onServerRemove(final MasterServerMetadata masterServer) {
masterServerMap.remove(masterServer.getAddress()); masterServerMap.remove(masterServer.getAddress());
for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) { for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) {
listener.onServerRemove(masterServer); listener.onServerRemove(masterServer);
@ -89,7 +95,7 @@ public class MasterClusters extends AbstractClusterSubscribeListener<MasterServe
} }
@Override @Override
public void onServerUpdate(MasterServerMetadata masterServer) { public void onServerUpdate(final MasterServerMetadata masterServer) {
masterServerMap.put(masterServer.getAddress(), masterServer); masterServerMap.put(masterServer.getAddress(), masterServer);
for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) { for (IClustersChangeListener<MasterServerMetadata> listener : masterClusterChangeListeners) {
listener.onServerUpdate(masterServer); listener.onServerUpdate(masterServer);

View File

@ -21,15 +21,18 @@ import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.experimental.SuperBuilder; import lombok.experimental.SuperBuilder;
@Data @Data
@ToString(callSuper = true)
@SuperBuilder @SuperBuilder
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
public class MasterServerMetadata extends BaseServerMetadata implements Comparable<MasterServerMetadata> { public class MasterServerMetadata extends BaseServerMetadata implements Comparable<MasterServerMetadata> {
public static MasterServerMetadata parseFromHeartBeat(MasterHeartBeat masterHeartBeat) { public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
return MasterServerMetadata.builder() return MasterServerMetadata.builder()
.serverStartupTime(masterHeartBeat.getStartupTime())
.address(masterHeartBeat.getHost() + ":" + masterHeartBeat.getPort()) .address(masterHeartBeat.getHost() + ":" + masterHeartBeat.getPort())
.cpuUsage(masterHeartBeat.getCpuUsage()) .cpuUsage(masterHeartBeat.getCpuUsage())
.memoryUsage(masterHeartBeat.getMemoryUsage()) .memoryUsage(masterHeartBeat.getMemoryUsage())
@ -39,7 +42,7 @@ public class MasterServerMetadata extends BaseServerMetadata implements Comparab
// Use the master address to sort the master server // Use the master address to sort the master server
@Override @Override
public int compareTo(MasterServerMetadata o) { public int compareTo(final MasterServerMetadata o) {
return this.getAddress().compareTo(o.getAddress()); return this.getAddress().compareTo(o.getAddress());
} }

View File

@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -53,6 +54,11 @@ public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServe
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.values())); return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.values()));
} }
@Override
public Optional<WorkerServerMetadata> getServer(final String address) {
return Optional.ofNullable(workerMapping.get(address));
}
public List<String> getWorkerServerAddressByGroup(String workerGroup) { public List<String> getWorkerServerAddressByGroup(String workerGroup) {
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) { if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet())); return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));

View File

@ -38,8 +38,9 @@ public class WorkerServerMetadata extends BaseServerMetadata {
private final double taskThreadPoolUsage; private final double taskThreadPoolUsage;
public static WorkerServerMetadata parseFromHeartBeat(WorkerHeartBeat workerHeartBeat) { public static WorkerServerMetadata parseFromHeartBeat(final WorkerHeartBeat workerHeartBeat) {
return WorkerServerMetadata.builder() return WorkerServerMetadata.builder()
.serverStartupTime(workerHeartBeat.getStartupTime())
.address(workerHeartBeat.getHost() + ":" + workerHeartBeat.getPort()) .address(workerHeartBeat.getHost() + ":" + workerHeartBeat.getPort())
.cpuUsage(workerHeartBeat.getCpuUsage()) .cpuUsage(workerHeartBeat.getCpuUsage())
.memoryUsage(workerHeartBeat.getMemoryUsage()) .memoryUsage(workerHeartBeat.getMemoryUsage())

View File

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.config; package org.apache.dolphinscheduler.server.master.config;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties; import org.apache.dolphinscheduler.server.master.cluster.loadbalancer.WorkerLoadBalancerConfigurationProperties;
@ -60,8 +59,6 @@ public class MasterConfig implements Validator {
private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection(); private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection();
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy(); private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy();
@ -120,7 +117,6 @@ public class MasterConfig implements Validator {
"\n workflow-event-bus-fire-thread-count -> " + workflowEventBusFireThreadCount + "\n workflow-event-bus-fire-thread-count -> " + workflowEventBusFireThreadCount +
"\n max-heartbeat-interval -> " + maxHeartbeatInterval + "\n max-heartbeat-interval -> " + maxHeartbeatInterval +
"\n server-load-protection -> " + serverLoadProtection + "\n server-load-protection -> " + serverLoadProtection +
"\n registry-disconnect-strategy -> " + registryDisconnectStrategy +
"\n master-address -> " + masterAddress + "\n master-address -> " + masterAddress +
"\n master-registry-path: " + masterRegistryPath + "\n master-registry-path: " + masterRegistryPath +
"\n worker-group-refresh-interval: " + workerGroupRefreshInterval + "\n worker-group-refresh-interval: " + workerGroupRefreshInterval +

View File

@ -27,8 +27,14 @@ public abstract class AbstractSystemEvent extends AbstractDelayEvent {
super(delayTime); super(delayTime);
} }
/**
* The event happen time.
*/
public abstract Date getEventTime(); public abstract Date getEventTime();
/**
* The event type.
*/
public abstract SystemEventType getEventType(); public abstract SystemEventType getEventType();
} }

View File

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.engine.system.event;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
import java.util.Date; import java.util.Date;
import lombok.Getter; import lombok.Getter;
@ -26,20 +28,23 @@ import lombok.Getter;
@Getter @Getter
public class MasterFailoverEvent extends AbstractSystemEvent { public class MasterFailoverEvent extends AbstractSystemEvent {
private final String masterAddress; private final MasterServerMetadata masterServerMetadata;
private final Date eventTime; private final Date eventTime;
private MasterFailoverEvent(final String masterAddress, private MasterFailoverEvent(final MasterServerMetadata masterServerMetadata,
final Date eventTime) { final Date eventTime,
super(eventTime.getTime()); final long delayTime) {
this.masterAddress = masterAddress; super(delayTime);
this.masterServerMetadata = masterServerMetadata;
this.eventTime = eventTime; this.eventTime = eventTime;
} }
public static MasterFailoverEvent of(final String masterAddress, final Date eventTime) { public static MasterFailoverEvent of(final MasterServerMetadata masterServerMetadata,
checkNotNull(masterAddress); final Date eventTime,
final long delayTime) {
checkNotNull(masterServerMetadata);
checkNotNull(eventTime); checkNotNull(eventTime);
return new MasterFailoverEvent(masterAddress, eventTime); return new MasterFailoverEvent(masterServerMetadata, eventTime, delayTime);
} }
@Override @Override
@ -50,8 +55,9 @@ public class MasterFailoverEvent extends AbstractSystemEvent {
@Override @Override
public String toString() { public String toString() {
return "MasterFailoverEvent{" + return "MasterFailoverEvent{" +
"masterAddress='" + masterAddress + '\'' + "masterServerMetadata='" + masterServerMetadata + '\'' +
", eventTime=" + eventTime + ", eventTime=" + eventTime +
", delayTime=" + delayTime +
'}'; '}';
} }
} }

View File

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.engine.system.event;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata;
import java.util.Date; import java.util.Date;
import lombok.Getter; import lombok.Getter;
@ -26,20 +28,23 @@ import lombok.Getter;
@Getter @Getter
public class WorkerFailoverEvent extends AbstractSystemEvent { public class WorkerFailoverEvent extends AbstractSystemEvent {
private final String workerAddress; private final WorkerServerMetadata workerServerMetadata;
private final Date eventTime; private final Date eventTime;
private WorkerFailoverEvent(final String workerAddress, private WorkerFailoverEvent(final WorkerServerMetadata workerServerMetadata,
final Date eventTime) { final Date eventTime,
super(eventTime.getTime()); final long delayTime) {
this.workerAddress = workerAddress; super(delayTime);
this.workerServerMetadata = workerServerMetadata;
this.eventTime = eventTime; this.eventTime = eventTime;
} }
public static WorkerFailoverEvent of(final String workerAddress, final Date eventTime) { public static WorkerFailoverEvent of(final WorkerServerMetadata workerServerMetadata,
checkNotNull(workerAddress); final Date eventTime,
final long delayTime) {
checkNotNull(workerServerMetadata);
checkNotNull(eventTime); checkNotNull(eventTime);
return new WorkerFailoverEvent(workerAddress, eventTime); return new WorkerFailoverEvent(workerServerMetadata, eventTime, delayTime);
} }
@Override @Override
@ -50,8 +55,9 @@ public class WorkerFailoverEvent extends AbstractSystemEvent {
@Override @Override
public String toString() { public String toString() {
return "WorkerFailoverEvent{" + return "WorkerFailoverEvent{" +
"workerAddress='" + workerAddress + '\'' + "workerServerMetadata='" + workerServerMetadata + '\'' +
", eventTime=" + eventTime + ", eventTime=" + eventTime +
", delayTime=" + delayTime +
'}'; '}';
} }
} }

View File

@ -22,6 +22,10 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.master.cluster.ClusterManager;
import org.apache.dolphinscheduler.server.master.cluster.MasterServerMetadata;
import org.apache.dolphinscheduler.server.master.cluster.WorkerServerMetadata;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent; import org.apache.dolphinscheduler.server.master.engine.system.event.GlobalMasterFailoverEvent;
import org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent; import org.apache.dolphinscheduler.server.master.engine.system.event.MasterFailoverEvent;
@ -29,11 +33,11 @@ import org.apache.dolphinscheduler.server.master.engine.system.event.WorkerFailo
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -49,6 +53,9 @@ public class FailoverCoordinator implements IFailoverCoordinator {
@Autowired @Autowired
private RegistryClient registryClient; private RegistryClient registryClient;
@Autowired
private ClusterManager clusterManager;
@Autowired @Autowired
private IWorkflowRepository workflowRepository; private IWorkflowRepository workflowRepository;
@ -65,17 +72,23 @@ public class FailoverCoordinator implements IFailoverCoordinator {
private WorkflowFailover workflowFailover; private WorkflowFailover workflowFailover;
@Override @Override
public void globalMasterFailover(GlobalMasterFailoverEvent globalMasterFailoverEvent) { public void globalMasterFailover(final GlobalMasterFailoverEvent globalMasterFailoverEvent) {
final StopWatch failoverTimeCost = StopWatch.createStarted(); final StopWatch failoverTimeCost = StopWatch.createStarted();
log.info("Global master failover starting"); log.info("Global master failover starting");
final List<MasterFailoverEvent> masterFailoverEvents = workflowInstanceDao.queryNeedFailoverMasters() final List<String> masterAddressWhichContainsUnFinishedWorkflow =
.stream() workflowInstanceDao.queryNeedFailoverMasters();
.map(masterAddress -> MasterFailoverEvent.of(masterAddress, globalMasterFailoverEvent.getEventTime())) for (final String masterAddress : masterAddressWhichContainsUnFinishedWorkflow) {
.collect(Collectors.toList()); final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterAddress);
if (CollectionUtils.isNotEmpty(masterFailoverEvents)) { if (aliveMasterOptional.isPresent()) {
log.info("There are {} masters need to failover", masterFailoverEvents.size()); final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
masterFailoverEvents.forEach(this::failoverMaster); log.info("The master[{}] is alive, do global master failover on it", aliveMasterServerMetadata);
doMasterFailover(aliveMasterServerMetadata.getAddress(),
aliveMasterServerMetadata.getServerStartupTime());
} else {
log.info("The master[{}] is not alive, do global master failover on it", masterAddress);
doMasterFailover(masterAddress, globalMasterFailoverEvent.getEventTime().getTime());
}
} }
failoverTimeCost.stop(); failoverTimeCost.stop();
@ -84,16 +97,55 @@ public class FailoverCoordinator implements IFailoverCoordinator {
@Override @Override
public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) { public void failoverMaster(final MasterFailoverEvent masterFailoverEvent) {
final StopWatch failoverTimeCost = StopWatch.createStarted(); final MasterServerMetadata masterServerMetadata = masterFailoverEvent.getMasterServerMetadata();
final String masterAddress = masterFailoverEvent.getMasterAddress(); log.info("Master[{}] failover starting", masterServerMetadata);
log.info("Master[{}] failover starting", masterAddress);
final Optional<MasterServerMetadata> aliveMasterOptional =
clusterManager.getMasterClusters().getServer(masterServerMetadata.getAddress());
if (aliveMasterOptional.isPresent()) {
final MasterServerMetadata aliveMasterServerMetadata = aliveMasterOptional.get();
if (aliveMasterServerMetadata.getServerStartupTime() == masterServerMetadata.getServerStartupTime()) {
log.info("The master[{}] is alive, maybe it reconnect to registry skip failover", masterServerMetadata);
} else {
log.info("The master[{}] is alive, but the startup time is different, will failover on {}",
masterServerMetadata,
aliveMasterServerMetadata);
doMasterFailover(aliveMasterServerMetadata.getAddress(),
aliveMasterServerMetadata.getServerStartupTime());
}
} else {
log.info("The master[{}] is not alive, will failover", masterServerMetadata);
doMasterFailover(masterServerMetadata.getAddress(), masterServerMetadata.getServerStartupTime());
}
}
/**
* Do master failover.
* <p> Will failover the workflow which is scheduled by the master and the workflow's fire time is before the maxWorkflowFireTime.
*/
private void doMasterFailover(final String masterAddress, final long masterStartupTime) {
// We use lock to avoid multiple master failover at the same time.
// Once the workflow has been failovered, then it's state will be changed to FAILOVER
// Once the FAILOVER workflow has been refired, then it's host will be changed to the new master and have a new
// start time.
// So if a master has been failovered multiple times, there is no problem.
final StopWatch failoverTimeCost = StopWatch.createStarted();
registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath()); registryClient.getLock(RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath());
try { try {
final List<WorkflowInstance> needFailoverWorkflows = getFailoverWorkflowsForMaster(masterFailoverEvent); final String failoverFinishedNodePath =
RegistryUtils.getFailoverFinishedNodePath(masterAddress, masterStartupTime);
if (registryClient.exists(failoverFinishedNodePath)) {
log.error("The master[{}-{}] is exist at: {}, means it has already been failovered, skip failover",
masterAddress,
masterStartupTime,
failoverFinishedNodePath);
return;
}
final List<WorkflowInstance> needFailoverWorkflows =
getFailoverWorkflowsForMaster(masterAddress, new Date(masterStartupTime));
needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow); needFailoverWorkflows.forEach(workflowFailover::failoverWorkflow);
failoverTimeCost.stop(); failoverTimeCost.stop();
registryClient.persist(failoverFinishedNodePath, String.valueOf(System.currentTimeMillis()));
log.info("Master[{}] failover {} workflows finished, cost: {}/ms", log.info("Master[{}] failover {} workflows finished, cost: {}/ms",
masterAddress, masterAddress,
needFailoverWorkflows.size(), needFailoverWorkflows.size(),
@ -103,10 +155,11 @@ public class FailoverCoordinator implements IFailoverCoordinator {
} }
} }
private List<WorkflowInstance> getFailoverWorkflowsForMaster(final MasterFailoverEvent masterFailoverEvent) { private List<WorkflowInstance> getFailoverWorkflowsForMaster(final String masterAddress,
final Date masterCrashTime) {
// todo: use page query // todo: use page query
final List<WorkflowInstance> workflowInstances = workflowInstanceDao.queryNeedFailoverWorkflowInstances( final List<WorkflowInstance> workflowInstances =
masterFailoverEvent.getMasterAddress()); workflowInstanceDao.queryNeedFailoverWorkflowInstances(masterAddress);
return workflowInstances.stream() return workflowInstances.stream()
.filter(workflowInstance -> { .filter(workflowInstance -> {
@ -117,25 +170,49 @@ public class FailoverCoordinator implements IFailoverCoordinator {
// todo: If the first time run workflow have the restartTime, then we can only check this // todo: If the first time run workflow have the restartTime, then we can only check this
final Date restartTime = workflowInstance.getRestartTime(); final Date restartTime = workflowInstance.getRestartTime();
if (restartTime != null) { if (restartTime != null) {
return restartTime.before(masterFailoverEvent.getEventTime()); return restartTime.before(masterCrashTime);
} }
final Date startTime = workflowInstance.getStartTime(); final Date startTime = workflowInstance.getStartTime();
return startTime.before(masterFailoverEvent.getEventTime()); return startTime.before(masterCrashTime);
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override @Override
public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) { public void failoverWorker(final WorkerFailoverEvent workerFailoverEvent) {
final WorkerServerMetadata workerServerMetadata = workerFailoverEvent.getWorkerServerMetadata();
log.info("Worker[{}] failover starting", workerServerMetadata);
final Optional<WorkerServerMetadata> aliveWorkerOptional =
clusterManager.getWorkerClusters().getServer(workerServerMetadata.getAddress());
if (aliveWorkerOptional.isPresent()) {
final WorkerServerMetadata aliveWorkerServerMetadata = aliveWorkerOptional.get();
if (aliveWorkerServerMetadata.getServerStartupTime() == workerServerMetadata.getServerStartupTime()) {
log.info("The worker[{}] is alive, maybe it reconnect to registry skip failover", workerServerMetadata);
} else {
log.info("The worker[{}] is alive, but the startup time is different, will failover on {}",
workerServerMetadata,
aliveWorkerServerMetadata);
doWorkerFailover(aliveWorkerServerMetadata.getAddress(),
aliveWorkerServerMetadata.getServerStartupTime());
}
} else {
log.info("The worker[{}] is not alive, will failover", workerServerMetadata);
doWorkerFailover(workerServerMetadata.getAddress(), workerServerMetadata.getServerStartupTime());
}
}
private void doWorkerFailover(final String workerAddress, final long workerCrashTime) {
final StopWatch failoverTimeCost = StopWatch.createStarted(); final StopWatch failoverTimeCost = StopWatch.createStarted();
final String workerAddress = workerFailoverEvent.getWorkerAddress(); final List<ITaskExecutionRunnable> needFailoverTasks =
log.info("Worker[{}] failover starting", workerAddress); getFailoverTaskForWorker(workerAddress, new Date(workerCrashTime));
final List<ITaskExecutionRunnable> needFailoverTasks = getFailoverTaskForWorker(workerFailoverEvent);
needFailoverTasks.forEach(taskFailover::failoverTask); needFailoverTasks.forEach(taskFailover::failoverTask);
registryClient.persist(
RegistryUtils.getFailoverFinishedNodePath(workerAddress, workerCrashTime),
String.valueOf(System.currentTimeMillis()));
failoverTimeCost.stop(); failoverTimeCost.stop();
log.info("Worker[{}] failover {} tasks finished, cost: {}/ms", log.info("Worker[{}] failover {} tasks finished, cost: {}/ms",
workerAddress, workerAddress,
@ -143,9 +220,8 @@ public class FailoverCoordinator implements IFailoverCoordinator {
failoverTimeCost.getTime()); failoverTimeCost.getTime());
} }
private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final WorkerFailoverEvent workerFailoverEvent) { private List<ITaskExecutionRunnable> getFailoverTaskForWorker(final String workerAddress,
final String workerAddress = workerFailoverEvent.getWorkerAddress(); final Date workerCrashTime) {
final Date workerCrashTime = workerFailoverEvent.getEventTime();
return workflowRepository.getAll() return workflowRepository.getAll()
.stream() .stream()
.map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph) .map(IWorkflowExecutionRunnable::getWorkflowExecutionGraph)

View File

@ -20,17 +20,17 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class MasterConnectionStateListener implements ConnectionListener { public class MasterConnectionStateListener implements ConnectionListener {
private final MasterConnectStrategy masterConnectStrategy; private final RegistryClient registryClient;
public MasterConnectionStateListener(@NonNull MasterConnectStrategy masterConnectStrategy) { public MasterConnectionStateListener(final RegistryClient registryClient) {
this.masterConnectStrategy = masterConnectStrategy; this.registryClient = registryClient;
} }
@Override @Override
@ -43,12 +43,13 @@ public class MasterConnectionStateListener implements ConnectionListener {
case SUSPENDED: case SUSPENDED:
break; break;
case RECONNECTED: case RECONNECTED:
masterConnectStrategy.reconnect(); log.warn("Master reconnect to registry");
break; break;
case DISCONNECTED: case DISCONNECTED:
masterConnectStrategy.disconnect(); registryClient.getStoppable().stop("Master disconnected from registry, will stop myself");
break; break;
default: default:
log.warn("Unknown connection state: {}", state);
} }
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection; import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
@ -78,16 +79,27 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
} }
@Override @Override
public void writeHeartBeat(MasterHeartBeat masterHeartBeat) { public void writeHeartBeat(final MasterHeartBeat masterHeartBeat) {
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(masterHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The master: {} is under {}, means it has been failover will close myself",
masterHeartBeat,
failoverNodePath);
registryClient
.getStoppable()
.stop("The master exist: " + failoverNodePath + ", means it has been failover will close myself");
return;
}
String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat); String masterHeartBeatJson = JSONUtils.toJsonString(masterHeartBeat);
registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson); registryClient.persistEphemeral(heartBeatPath, masterHeartBeatJson);
MasterServerMetrics.incMasterHeartbeatCount(); MasterServerMetrics.incMasterHeartbeatCount();
log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, masterHeartBeatJson); heartBeatPath,
masterHeartBeatJson);
} }
private ServerStatus getServerStatus(SystemMetrics systemMetrics, private ServerStatus getServerStatus(final SystemMetrics systemMetrics,
MasterServerLoadProtection masterServerLoadProtection) { final MasterServerLoadProtection masterServerLoadProtection) {
return masterServerLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL; return masterServerLoadProtection.isOverload(systemMetrics) ? ServerStatus.BUSY : ServerStatus.NORMAL;
} }
} }

View File

@ -53,9 +53,6 @@ public class MasterRegistryClient implements AutoCloseable {
@Autowired @Autowired
private MetricsProvider metricsProvider; private MetricsProvider metricsProvider;
@Autowired
private MasterConnectStrategy masterConnectStrategy;
private MasterHeartBeatTask masterHeartBeatTask; private MasterHeartBeatTask masterHeartBeatTask;
public void start() { public void start() {
@ -63,7 +60,7 @@ public class MasterRegistryClient implements AutoCloseable {
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient); this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, metricsProvider, registryClient);
// master registry // master registry
registry(); registry();
registryClient.addConnectionStateListener(new MasterConnectionStateListener(masterConnectStrategy)); registryClient.addConnectionStateListener(new MasterConnectionStateListener(registryClient));
} catch (Exception e) { } catch (Exception e) {
throw new RegistryException("Master registry client start up error", e); throw new RegistryException("Master registry client start up error", e);
} }

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* This strategy will stop the master server, when disconnected from {@link org.apache.dolphinscheduler.registry.api.Registry}.
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
@Slf4j
public class MasterStopStrategy implements MasterConnectStrategy {
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterConfig masterConfig;
@Override
public void disconnect() {
registryClient.getStoppable()
.stop("Master disconnected from registry, will stop myself due to the stop strategy");
}
@Override
public void reconnect() {
log.warn("The current connect strategy is stop, so the master will not reconnect to registry");
}
@Override
public StrategyType getStrategyType() {
return StrategyType.STOP;
}
}

View File

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.lifecycle.ServerStatus;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* This strategy will change the server status to {@link ServerStatus#WAITING} when disconnect from {@link Registry}.
*/
@Service
@ConditionalOnProperty(prefix = "master.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
@Slf4j
public class MasterWaitingStrategy implements MasterConnectStrategy {
@Autowired
private MasterConfig masterConfig;
@Autowired
private RegistryClient registryClient;
@Autowired
private IWorkflowRepository IWorkflowRepository;
@Override
public void disconnect() {
try {
ServerLifeCycleManager.toWaiting();
clearMasterResource();
Duration maxWaitingTime = masterConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
try {
log.info("Master disconnect from registry will try to reconnect in {} s",
maxWaitingTime.getSeconds());
registryClient.connectUntilTimeout(maxWaitingTime);
} catch (RegistryException ex) {
throw new ServerLifeCycleException(
String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
}
} catch (ServerLifeCycleException e) {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
ServerLifeCycleManager.getServerStatus());
log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
} catch (RegistryException ex) {
String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
} catch (Exception ex) {
String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public void reconnect() {
if (ServerLifeCycleManager.isRunning()) {
log.info("no need to reconnect, as the current server status is running");
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
String errorMessage =
String.format(
"Recover from waiting failed, the current server status is %s, will stop the server",
ServerLifeCycleManager.getServerStatus());
log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
}
}
}
@Override
public StrategyType getStrategyType() {
return StrategyType.WAITING;
}
private void clearMasterResource() {
log.warn("Master clear workflow event queue due to lost registry connection");
IWorkflowRepository.clear();
log.warn("Master clear workflow instance cache due to lost registry connection");
}
}

View File

@ -102,9 +102,6 @@ master:
max-system-memory-usage-percentage-thresholds: 0.7 max-system-memory-usage-percentage-thresholds: 0.7
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.7 max-disk-usage-percentage-thresholds: 0.7
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: stop
worker-group-refresh-interval: 10s worker-group-refresh-interval: 10s
command-fetch-strategy: command-fetch-strategy:
type: ID_SLOT_BASED type: ID_SLOT_BASED

View File

@ -67,9 +67,6 @@ master:
memory-usage-weight: 40 memory-usage-weight: 40
cpu-usage-weight: 30 cpu-usage-weight: 30
task-thread-pool-usage-weight: 30 task-thread-pool-usage-weight: 30
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: stop
worker-group-refresh-interval: 10s worker-group-refresh-interval: 10s
command-fetch-strategy: command-fetch-strategy:
type: ID_SLOT_BASED type: ID_SLOT_BASED

View File

@ -26,6 +26,4 @@ public interface ConnectStrategy {
void reconnect(); void reconnect();
StrategyType getStrategyType();
} }

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
import java.time.Duration;
import lombok.Data;
@Data
public class ConnectStrategyProperties {
private StrategyType strategy = StrategyType.STOP;
private Duration maxWaitingTime = Duration.ofSeconds(0);
}

View File

@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.NonNull; import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -67,6 +68,10 @@ public class RegistryClient {
if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) { if (!registry.exists(RegistryNodeType.ALERT_SERVER.getRegistryPath())) {
registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false); registry.put(RegistryNodeType.ALERT_SERVER.getRegistryPath(), EMPTY, false);
} }
if (!registry.exists(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath())) {
registry.put(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath(), EMPTY, false);
}
cleanHistoryFailoverFinishedNodes();
} }
public boolean isConnected() { public boolean isConnected() {
@ -168,6 +173,11 @@ public class RegistryClient {
registry.put(key, value, true); registry.put(key, value, true);
} }
public void persist(String key, String value) {
log.info("persist key: {}, value: {}", key, value);
registry.put(key, value, false);
}
public void remove(String key) { public void remove(String key) {
registry.delete(key); registry.delete(key);
} }
@ -222,4 +232,27 @@ public class RegistryClient {
private Collection<String> getServerNodes(RegistryNodeType nodeType) { private Collection<String> getServerNodes(RegistryNodeType nodeType) {
return getChildrenKeys(nodeType.getRegistryPath()); return getChildrenKeys(nodeType.getRegistryPath());
} }
private void cleanHistoryFailoverFinishedNodes() {
// Clean the history failover finished nodes
// which failover is before the current time minus 1 week
final Collection<String> failoverFinishedNodes =
registry.children(RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath());
if (CollectionUtils.isEmpty(failoverFinishedNodes)) {
return;
}
for (final String failoverFinishedNode : failoverFinishedNodes) {
try {
final String failoverFinishTime = registry.get(failoverFinishedNode);
if (System.currentTimeMillis() - Long.parseLong(failoverFinishTime) > TimeUnit.DAYS.toMillis(7)) {
registry.delete(failoverFinishedNode);
log.info(
"Clear the failover finished node: {} which failover time is before the current time minus 1 week",
failoverFinishedNode);
}
} catch (Exception ex) {
log.error("Failed to clean the failoverFinishedNode: {}", failoverFinishedNode, ex);
}
}
}
} }

View File

@ -1,25 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api;
public enum StrategyType {
STOP,
WAITING,
;
}

View File

@ -24,6 +24,8 @@ import lombok.Getter;
@AllArgsConstructor @AllArgsConstructor
public enum RegistryNodeType { public enum RegistryNodeType {
FAILOVER_FINISH_NODES("FailoverFinishNodes", "/nodes/failover-finish-nodes"),
MASTER("Master", "/nodes/master"), MASTER("Master", "/nodes/master"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"), MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),

View File

@ -15,10 +15,19 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.registry.api.utils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategy; import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
public interface MasterConnectStrategy extends ConnectStrategy { public class RegistryUtils {
public static String getFailoverFinishedNodePath(final BaseHeartBeat baseHeartBeat) {
return getFailoverFinishedNodePath(baseHeartBeat.getHost() + ":" + baseHeartBeat.getPort(),
baseHeartBeat.getStartupTime());
}
public static String getFailoverFinishedNodePath(final String masterAddress, final long masterStartupTime) {
return RegistryNodeType.FAILOVER_FINISH_NODES.getRegistryPath() + "/" + masterAddress + "-" + masterStartupTime;
}
} }

View File

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.config; package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -46,7 +45,6 @@ public class WorkerConfig implements Validator {
private Duration maxHeartbeatInterval = Duration.ofSeconds(10); private Duration maxHeartbeatInterval = Duration.ofSeconds(10);
private int hostWeight = 100; private int hostWeight = 100;
private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection(); private WorkerServerLoadProtection serverLoadProtection = new WorkerServerLoadProtection();
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
/** /**
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
@ -90,7 +88,6 @@ public class WorkerConfig implements Validator {
"\n host-weight -> " + hostWeight + "\n host-weight -> " + hostWeight +
"\n tenantConfig -> " + tenantConfig + "\n tenantConfig -> " + tenantConfig +
"\n server-load-protection -> " + serverLoadProtection + "\n server-load-protection -> " + serverLoadProtection +
"\n registry-disconnect-strategy -> " + registryDisconnectStrategy +
"\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy + "\n task-execute-threads-full-policy: " + taskExecuteThreadsFullPolicy +
"\n address -> " + workerAddress + "\n address -> " + workerAddress +
"\n registry-path: " + workerRegistryPath + "\n registry-path: " + workerRegistryPath +

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.registry.api.ConnectStrategy;
public interface WorkerConnectStrategy extends ConnectStrategy {
}

View File

@ -20,21 +20,17 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class WorkerConnectionStateListener implements ConnectionListener { public class WorkerConnectionStateListener implements ConnectionListener {
private final WorkerConfig workerConfig; private final RegistryClient registryClient;
private final WorkerConnectStrategy workerConnectStrategy;
public WorkerConnectionStateListener(@NonNull WorkerConfig workerConfig, public WorkerConnectionStateListener(final RegistryClient registryClient) {
@NonNull WorkerConnectStrategy workerConnectStrategy) { this.registryClient = registryClient;
this.workerConfig = workerConfig;
this.workerConnectStrategy = workerConnectStrategy;
} }
@Override @Override
@ -47,10 +43,10 @@ public class WorkerConnectionStateListener implements ConnectionListener {
case SUSPENDED: case SUSPENDED:
break; break;
case RECONNECTED: case RECONNECTED:
workerConnectStrategy.reconnect(); log.warn("Worker reconnect to registry");
break; break;
case DISCONNECTED: case DISCONNECTED:
workerConnectStrategy.disconnect(); registryClient.getStoppable().stop("Worker disconnected from registry, will stop myself");
default: default:
} }
} }

View File

@ -46,7 +46,6 @@ import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Slf4j @Slf4j
@ -62,10 +61,6 @@ public class WorkerRegistryClient implements AutoCloseable {
@Autowired @Autowired
private RegistryClient registryClient; private RegistryClient registryClient;
@Autowired
@Lazy
private WorkerConnectStrategy workerConnectStrategy;
@Autowired @Autowired
private MetricsProvider metricsProvider; private MetricsProvider metricsProvider;
@ -83,8 +78,7 @@ public class WorkerRegistryClient implements AutoCloseable {
public void start() { public void start() {
try { try {
registry(); registry();
registryClient.addConnectionStateListener( registryClient.addConnectionStateListener(new WorkerConnectionStateListener(registryClient));
new WorkerConnectionStateListener(workerConfig, workerConnectStrategy));
} catch (Exception ex) { } catch (Exception ex) {
throw new RegistryException("Worker registry client start up error", ex); throw new RegistryException("Worker registry client start up error", ex);
} }

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "stop", matchIfMissing = true)
@Slf4j
public class WorkerStopStrategy implements WorkerConnectStrategy {
@Autowired
public RegistryClient registryClient;
@Override
public void disconnect() {
registryClient.getStoppable()
.stop("Worker disconnected from registry, will stop myself due to the stop strategy");
}
@Override
public void reconnect() {
log.warn("The current connect strategy is stop, so the worker will not reconnect to registry");
}
@Override
public StrategyType getStrategyType() {
return StrategyType.STOP;
}
}

View File

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@Service
@ConditionalOnProperty(prefix = "worker.registry-disconnect-strategy", name = "strategy", havingValue = "waiting")
@Slf4j
public class WorkerWaitingStrategy implements WorkerConnectStrategy {
@Autowired
private WorkerConfig workerConfig;
@Autowired
private RegistryClient registryClient;
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private WorkerTaskExecutorThreadPool workerManagerThread;
public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
@NonNull RegistryClient registryClient,
@NonNull MessageRetryRunner messageRetryRunner,
@NonNull WorkerTaskExecutorThreadPool workerManagerThread) {
this.workerConfig = workerConfig;
this.registryClient = registryClient;
this.messageRetryRunner = messageRetryRunner;
this.workerManagerThread = workerManagerThread;
}
@Override
public void disconnect() {
try {
ServerLifeCycleManager.toWaiting();
clearWorkerResource();
Duration maxWaitingTime = workerConfig.getRegistryDisconnectStrategy().getMaxWaitingTime();
try {
log.info("Worker disconnect from registry will try to reconnect in {} s",
maxWaitingTime.getSeconds());
registryClient.connectUntilTimeout(maxWaitingTime);
} catch (RegistryException ex) {
throw new ServerLifeCycleException(
String.format("Waiting to reconnect to registry in %s failed", maxWaitingTime), ex);
}
} catch (ServerLifeCycleException e) {
String errorMessage = String.format(
"Disconnect from registry and change the current status to waiting error, the current server state is %s, will stop the current server",
ServerLifeCycleManager.getServerStatus());
log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
} catch (RegistryException ex) {
String errorMessage = "Disconnect from registry and waiting to reconnect failed, will stop the server";
log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
} catch (Exception ex) {
String errorMessage = "Disconnect from registry and get an unknown exception, will stop the server";
log.error(errorMessage, ex);
registryClient.getStoppable().stop(errorMessage);
}
}
@Override
public void reconnect() {
if (ServerLifeCycleManager.isRunning()) {
log.info("no need to reconnect, as the current server status is running");
} else {
try {
ServerLifeCycleManager.recoverFromWaiting();
log.info("Recover from waiting success, the current server status is {}",
ServerLifeCycleManager.getServerStatus());
} catch (Exception e) {
String errorMessage =
String.format(
"Recover from waiting failed, the current server status is %s, will stop the server",
ServerLifeCycleManager.getServerStatus());
log.error(errorMessage, e);
registryClient.getStoppable().stop(errorMessage);
}
}
}
@Override
public StrategyType getStrategyType() {
return StrategyType.WAITING;
}
private void clearWorkerResource() {
workerManagerThread.clearTask();
WorkerTaskExecutorHolder.clear();
log.warn("Worker server clear the tasks due to lost connection from registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection from registry");
}
}

View File

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.utils.RegistryUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection; import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtection;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
@ -81,14 +82,25 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
} }
@Override @Override
public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) { public void writeHeartBeat(final WorkerHeartBeat workerHeartBeat) {
final String failoverNodePath = RegistryUtils.getFailoverFinishedNodePath(workerHeartBeat);
if (registryClient.exists(failoverNodePath)) {
log.warn("The worker: {} is under {}, means it has been failover will close myself",
workerHeartBeat,
failoverNodePath);
registryClient
.getStoppable()
.stop("The worker exist: " + failoverNodePath + ", means it has been failover will close myself");
return;
}
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat); String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
String workerRegistryPath = workerConfig.getWorkerRegistryPath(); String workerRegistryPath = workerConfig.getWorkerRegistryPath();
registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson); registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson);
WorkerServerMetrics.incWorkerHeartbeatCount(); WorkerServerMetrics.incWorkerHeartbeatCount();
log.debug( log.debug(
"Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}", "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}",
workerRegistryPath, workerHeartBeatJson); workerRegistryPath,
workerHeartBeatJson);
} }
private ServerStatus getServerStatus(SystemMetrics systemMetrics, private ServerStatus getServerStatus(SystemMetrics systemMetrics,

View File

@ -59,9 +59,6 @@ worker:
max-system-memory-usage-percentage-thresholds: 0.7 max-system-memory-usage-percentage-thresholds: 0.7
# Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks. # Worker max disk usage , when the worker's disk usage is smaller then this value, worker server can be dispatched tasks.
max-disk-usage-percentage-thresholds: 0.7 max-disk-usage-percentage-thresholds: 0.7
registry-disconnect-strategy:
# The disconnect strategy: stop, waiting
strategy: stop
task-execute-threads-full-policy: REJECT task-execute-threads-full-policy: REJECT
tenant-config: tenant-config:
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.

View File

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -28,8 +28,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* worker registry test * worker registry test
@ -37,24 +35,23 @@ import org.slf4j.LoggerFactory;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class WorkerConnectionStateListenerTest { public class WorkerConnectionStateListenerTest {
private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
@InjectMocks @InjectMocks
private WorkerConnectionStateListener workerConnectionStateListener; private WorkerConnectionStateListener workerConnectionStateListener;
@Mock @Mock
private WorkerConfig workerConfig; private RegistryClient registryClient;
@Mock
private WorkerConnectStrategy workerConnectStrategy;
@Test @Test
public void testWorkerConnectionStateListener() { public void testWorkerConnectionStateListener() {
Mockito.when(registryClient.getStoppable()).thenReturn(cause -> {
// do nothing
});
workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED); workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED); workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
Mockito.verify(workerConnectStrategy, times(1)).reconnect();
workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED); workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
Mockito.verify(registryClient, times(0)).getStoppable();
workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED); workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
Mockito.verify(workerConnectStrategy, times(1)).disconnect(); Mockito.verify(registryClient, times(1)).getStoppable();
} }
} }

View File

@ -63,8 +63,6 @@ public class WorkerRegistryClientTest {
@Mock @Mock
private WorkerTaskExecutorThreadPool workerManagerThread; private WorkerTaskExecutorThreadPool workerManagerThread;
@Mock @Mock
private WorkerConnectStrategy workerConnectStrategy;
@Mock
private IStoppable stoppable; private IStoppable stoppable;
@Test @Test

View File

@ -1,187 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* worker registry test
*/
@ExtendWith(MockitoExtension.class)
public class WorkerStrategyTest {
private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class);
@Mock
private RegistryClient registryClient;
@Mock
private IStoppable stoppable;
@Mock
private WorkerConfig workerConfig;
@Mock
private WorkerRpcServer workerRpcServer;
@Mock
private MessageRetryRunner messageRetryRunner;
@Mock
private WorkerTaskExecutorThreadPool workerManagerThread;
@Mock
private ConnectStrategyProperties connectStrategyProperties;
@Test
public void testWorkerStopStrategy() {
given(registryClient.getStoppable())
.willReturn(stoppable);
WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
workerStopStrategy.registryClient = registryClient;
workerStopStrategy.reconnect();
workerStopStrategy.disconnect();
Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP);
}
@Test
public void testWorkerWaitingStrategyreconnect() {
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
workerConfig,
registryClient,
messageRetryRunner,
workerManagerThread);
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.isRunning())
.thenReturn(true);
workerWaitingStrategy.reconnect();
}
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
doNothing().when(stoppable).stop(anyString());
given(registryClient.getStoppable())
.willReturn(stoppable);
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
.thenThrow(new ServerLifeCycleException(""));
workerWaitingStrategy.reconnect();
}
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
.thenAnswer(invocation -> null);
workerWaitingStrategy.reconnect();
}
}
@Test
public void testWorkerWaitingStrategydisconnect() {
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
workerConfig,
registryClient,
messageRetryRunner,
workerManagerThread);
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
doNothing().when(stoppable).stop(anyString());
given(registryClient.getStoppable())
.willReturn(stoppable);
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.toWaiting())
.thenThrow(new ServerLifeCycleException(""));
workerWaitingStrategy.disconnect();
}
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
Mockito.reset(registryClient);
doNothing().when(registryClient).connectUntilTimeout(any());
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.toWaiting())
.thenAnswer(invocation -> null);
workerWaitingStrategy.disconnect();
}
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
Mockito.reset(registryClient);
doNothing().when(stoppable).stop(anyString());
given(registryClient.getStoppable())
.willReturn(stoppable);
Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.toWaiting())
.thenAnswer(invocation -> null);
workerWaitingStrategy.disconnect();
}
try (
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
Mockito.mockStatic(ServerLifeCycleManager.class)) {
Mockito.reset(workerConfig);
given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException(""));
doNothing().when(stoppable).stop(anyString());
given(registryClient.getStoppable())
.willReturn(stoppable);
serverLifeCycleManagerMockedStatic
.when(() -> ServerLifeCycleManager.toWaiting())
.thenAnswer(invocation -> null);
workerWaitingStrategy.disconnect();
}
}
}