[Bug-7474][MasterServer] fix failover when node host is null (#7475)

* fix failover when node host is null

* add failover execute thread

* worker handle dead server

* fix task instance failover time check

* fix upgrade sql

* failover logic update

Co-authored-by: caishunfeng <534328519@qq.com>
This commit is contained in:
wind 2021-12-28 19:32:10 +08:00 committed by GitHub
parent cc77963522
commit 8808c0a700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 478 additions and 73 deletions

View File

@ -209,6 +209,11 @@ public final class Constants {
*/
public static final int SOCKET_TIMEOUT = 60 * 1000;
/**
* registry session timeout
*/
public static final int REGISTRY_SESSION_TIMEOUT = 10 * 1000;
/**
* http header
*/

View File

@ -249,6 +249,12 @@ public class ProcessInstance {
*/
private int dryRun;
/**
* re-start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date restartTime;
public ProcessInstance() {
}
@ -521,6 +527,14 @@ public class ProcessInstance {
this.dryRun = dryRun;
}
public Date getRestartTime() {
return restartTime;
}
public void setRestartTime(Date restartTime) {
this.restartTime = restartTime;
}
/**
* add command to history
*
@ -689,6 +703,10 @@ public class ProcessInstance {
+ ", dryRun='"
+ dryRun
+ '\''
+ '}'
+ ", restartTime='"
+ restartTime
+ '\''
+ '}';
}

View File

@ -53,6 +53,13 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
List<ProcessInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);
/**
* query process instance host by stateArray
* @param stateArray
* @return
*/
List<String> queryNeedFailoverProcessInstanceHost(@Param("states") int[] stateArray);
/**
* query process instance by tenantId and stateArray
*

View File

@ -23,7 +23,8 @@
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, dry_run,next_process_instance_id
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool,
dry_run, next_process_instance_id, restart_time
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@ -45,6 +46,14 @@
</foreach>
order by id asc
</select>
<select id="queryNeedFailoverProcessInstanceHost" resultType="String">
select distinct host
from t_ds_process_instance
where state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</select>
<select id="queryTopNProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@ -93,7 +102,8 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id,
restart_time
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0

View File

@ -605,6 +605,7 @@ CREATE TABLE t_ds_process_instance
tenant_id int(11) NOT NULL DEFAULT '-1',
var_pool longtext,
dry_run int NULL DEFAULT 0,
restart_time datetime DEFAULT NULL,
PRIMARY KEY (id)
);

View File

@ -603,6 +603,7 @@ CREATE TABLE `t_ds_process_instance` (
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId',
`restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`,`end_time`) USING BTREE

View File

@ -529,6 +529,7 @@ CREATE TABLE t_ds_process_instance (
var_pool text ,
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
restart_time timestamp DEFAULT NULL ,
PRIMARY KEY (id)
) ;

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_process_instance_A_restart_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='restart_time')
THEN
ALTER TABLE t_ds_process_instance ADD COLUMN `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_restart_time();
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema varchar;
BEGIN
---get schema name
v_schema =current_schema();
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_update_metadata();
d//

View File

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

View File

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -90,6 +91,9 @@ public class MasterServer implements IStoppable {
@Autowired
private EventExecuteService eventExecuteService;
@Autowired
private FailoverExecuteThread failoverExecuteThread;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@ -122,6 +126,7 @@ public class MasterServer implements IStoppable {
this.masterSchedulerService.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
this.scheduler.start();

View File

@ -40,6 +40,8 @@ public class MasterConfig {
private double maxCpuLoadAvg;
private double reservedMemory;
private boolean taskLogger;
private int failoverInterval;
private boolean killYarnJobWhenTaskFailover;
public int getListenPort() {
return listenPort;
@ -144,4 +146,20 @@ public class MasterConfig {
public void setTaskLogger(boolean taskLogger) {
this.taskLogger = taskLogger;
}
public int getFailoverInterval() {
return failoverInterval;
}
public void setFailoverInterval(int failoverInterval) {
this.failoverInterval = failoverInterval;
}
public boolean isKillYarnJobWhenTaskFailover() {
return killYarnJobWhenTaskFailover;
}
public void setKillYarnJobWhenTaskFailover(boolean killYarnJobWhenTaskFailover) {
this.killYarnJobWhenTaskFailover = killYarnJobWhenTaskFailover;
}
}

View File

@ -43,8 +43,10 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -113,7 +115,6 @@ public class MasterRegistryClient {
String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters
registryClient.getLock(nodeLock);
// master registry
registry();
@ -126,11 +127,6 @@ public class MasterRegistryClient {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// self tolerant
if (registryClient.getActiveMasterNum() == 1) {
removeNodePath(null, NodeType.MASTER, true);
removeNodePath(null, NodeType.WORKER, true);
}
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
@ -149,18 +145,57 @@ public class MasterRegistryClient {
}
/**
* remove zookeeper node path
* remove master node path
*
* @param path zookeeper node path
* @param nodeType zookeeper node type
* @param path node path
* @param nodeType node type
* @param failover is failover
*/
public void removeNodePath(String path, NodeType nodeType, boolean failover) {
public void removeMasterNodePath(String path, NodeType nodeType, boolean failover) {
logger.info("{} node deleted : {}", nodeType, path);
String failoverPath = getFailoverLockPath(nodeType);
if (StringUtils.isEmpty(path)) {
logger.error("server down error: empty path: {}, nodeType:{}", path, nodeType);
return;
}
String serverHost = registryClient.getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}, nodeType:{}", path, nodeType);
return;
}
String failoverPath = getFailoverLockPath(nodeType, serverHost);
try {
registryClient.getLock(failoverPath);
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", nodeType, serverHost, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
/**
* remove worker node path
*
* @param path node path
* @param nodeType node type
* @param failover is failover
*/
public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
logger.info("{} node deleted : {}", nodeType, path);
try {
String serverHost = null;
if (!StringUtils.isEmpty(path)) {
serverHost = registryClient.getHostByEventDataPath(path);
@ -168,21 +203,37 @@ public class MasterRegistryClient {
logger.error("server down error: unknown path: {}", path);
return;
}
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
if (!registryClient.exists(path)) {
logger.info("path: {} not exists", path);
// handle dead server
registryClient.handleDeadServer(Collections.singleton(path), nodeType, Constants.ADD_OP);
}
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, nodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed.", nodeType);
logger.error("failover exception ", e);
} finally {
registryClient.releaseLock(failoverPath);
logger.error("{} server failover failed", nodeType, e);
}
}
private boolean isNeedToHandleDeadServer(String host, NodeType nodeType, Duration sessionTimeout) {
long sessionTimeoutMillis = Math.max(Constants.REGISTRY_SESSION_TIMEOUT, sessionTimeout.toMillis());
List<Server> serverList = registryClient.getServerList(nodeType);
if (CollectionUtils.isEmpty(serverList)) {
return true;
}
Date startupTime = getServerStartupTime(serverList, host);
if (startupTime == null) {
return true;
}
if (System.currentTimeMillis() - startupTime.getTime() > sessionTimeoutMillis) {
return true;
}
return false;
}
/**
* failover server when server down
*
@ -208,12 +259,12 @@ public class MasterRegistryClient {
* @param nodeType zookeeper node type
* @return fail over lock path
*/
private String getFailoverLockPath(NodeType nodeType) {
public String getFailoverLockPath(NodeType nodeType, String host) {
switch (nodeType) {
case MASTER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
case WORKER:
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
return Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
default:
return "";
}
@ -222,10 +273,11 @@ public class MasterRegistryClient {
/**
* task needs failover if task start before worker starts
*
* @param workerServers worker servers
* @param taskInstance task instance
* @return true if task instance need fail over
*/
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, TaskInstance taskInstance) {
boolean taskNeedFailover = true;
@ -234,13 +286,11 @@ public class MasterRegistryClient {
return false;
}
// if the worker node exists in zookeeper, we must check the task starts after the worker
if (registryClient.checkNodeExists(taskInstance.getHost(), NodeType.WORKER)) {
//if task start after worker starts, there is no need to failover the task.
if (checkTaskAfterWorkerStart(taskInstance)) {
taskNeedFailover = false;
}
//if task start after worker starts, there is no need to failover the task.
if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
taskNeedFailover = false;
}
return taskNeedFailover;
}
@ -250,22 +300,47 @@ public class MasterRegistryClient {
* @param taskInstance task instance
* @return true if task instance start time after worker server start date
*/
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
private boolean checkTaskAfterWorkerStart(List<Server> workerServers, TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return false;
}
Date workerServerStartDate = null;
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (Server workerServer : workerServers) {
if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) {
workerServerStartDate = workerServer.getCreateTime();
Date workerServerStartDate = getServerStartupTime(workerServers, taskInstance.getHost());
if (workerServerStartDate != null) {
if (taskInstance.getStartTime() == null) {
return taskInstance.getSubmitTime().after(workerServerStartDate);
} else {
return taskInstance.getStartTime().after(workerServerStartDate);
}
}
return false;
}
/**
* get server startup time
*/
private Date getServerStartupTime(List<Server> servers, String host) {
if (CollectionUtils.isEmpty(servers)) {
return null;
}
Date serverStartupTime = null;
for (Server server : servers) {
if (host.equals(server.getHost() + Constants.COLON + server.getPort())) {
serverStartupTime = server.getCreateTime();
break;
}
}
if (workerServerStartDate != null) {
return taskInstance.getStartTime().after(workerServerStartDate);
return serverStartupTime;
}
/**
* get server startup time
*/
private Date getServerStartupTime(NodeType nodeType, String host) {
if (StringUtils.isEmpty(host)) {
return null;
}
return false;
List<Server> servers = registryClient.getServerList(nodeType);
return getServerStartupTime(servers, host);
}
/**
@ -278,10 +353,13 @@ public class MasterRegistryClient {
* @param workerHost worker host
*/
private void failoverWorker(String workerHost) {
if (StringUtils.isEmpty(workerHost)) {
return;
}
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
@ -297,31 +375,19 @@ public class MasterRegistryClient {
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
taskInstance.setProcessInstance(processInstance);
}
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}
// only failover the task owned myself if worker down.
if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}
@ -333,11 +399,15 @@ public class MasterRegistryClient {
*
* @param masterHost master host
*/
private void failoverMaster(String masterHost) {
public void failoverMaster(String masterHost) {
if (StringUtils.isEmpty(masterHost)) {
return;
}
Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
@ -347,16 +417,27 @@ public class MasterRegistryClient {
continue;
}
logger.info("failover process instance id: {}", processInstance.getId());
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
if (Constants.NULL.equals(taskInstance.getHost())) {
continue;
}
if (taskInstance.getState().typeIsFinished()) {
continue;
}
if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
if (serverStartupTime != null && processInstance.getRestartTime() != null
&& processInstance.getRestartTime().after(serverStartupTime)) {
continue;
}
logger.info("failover process instance id: {}", processInstance.getId());
//updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance);
}
@ -364,6 +445,13 @@ public class MasterRegistryClient {
logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}
/**
* failover task instance
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*/
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("failover task instance error, taskInstance is null");
@ -376,18 +464,16 @@ public class MasterRegistryClient {
return;
}
if (!checkTaskInstanceNeedFailover(taskInstance)) {
return;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
@ -466,7 +552,7 @@ public class MasterRegistryClient {
/**
* get local address
*/
private String getLocalAddress() {
public String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}

View File

@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
logger.info("master node added : {}", path);
break;
case REMOVE:
masterRegistryClient.removeNodePath(path, NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath(path, NodeType.MASTER, true);
break;
default:
break;
@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements SubscribeListener {
break;
case REMOVE:
logger.info("worker node deleted : {}", path);
masterRegistryClient.removeNodePath(path, NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
break;
default:
break;

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FailoverExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
@Autowired
private MasterRegistryClient masterRegistryClient;
@Autowired
private RegistryClient registryClient;
@Autowired
private MasterConfig masterConfig;
/**
* process service
*/
@Autowired
private ProcessService processService;
@Override
public synchronized void start() {
super.setName("FailoverExecuteThread");
super.start();
}
@Override
public void run() {
while (Stopper.isRunning()) {
logger.info("failover execute started");
try {
List<String> hosts = getNeedFailoverMasterServers();
if (CollectionUtils.isEmpty(hosts)) {
continue;
}
logger.info("need failover hosts:{}", hosts);
for (String host : hosts) {
String failoverPath = masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
try {
registryClient.getLock(failoverPath);
masterRegistryClient.failoverMaster(host);
} catch (Exception e) {
logger.error("{} server failover failed, host:{}", NodeType.MASTER, host, e);
} finally {
registryClient.releaseLock(failoverPath);
}
}
} catch (Exception e) {
logger.error("failover execute error", e);
} finally {
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
}
}
}
private List<String> getNeedFailoverMasterServers() {
// failover myself && failover dead masters
List<String> hosts = processService.queryNeedFailoverProcessInstanceHost();
Iterator<String> iterator = hosts.iterator();
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!host.equals(masterRegistryClient.getLocalAddress())) {
iterator.remove();
}
}
}
return hosts;
}
}

View File

@ -623,6 +623,7 @@ public class WorkflowExecuteThread {
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processService.saveProcessInstance(processInstance);
this.taskInstanceMap.clear();

View File

@ -106,6 +106,10 @@ master:
reserved-memory: 0.3
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
task-logger: true
# failover interval, the unit is minute
failover-interval: 10
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
server:
port: 5679

View File

@ -86,6 +86,7 @@ public class MasterRegistryClientTest {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setHost("127.0.0.1:8080");
processInstance.setRestartTime(new Date());
processInstance.setHistoryCmd("xxx");
processInstance.setCommandType(CommandType.STOP);
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString())).willReturn(Arrays.asList(processInstance));
@ -102,6 +103,7 @@ public class MasterRegistryClientTest {
server.setPort(8080);
server.setCreateTime(new Date());
given(registryClient.getServerList(NodeType.WORKER)).willReturn(Arrays.asList(server));
given(registryClient.getServerList(NodeType.MASTER)).willReturn(Arrays.asList(server));
}
@Test
@ -118,9 +120,9 @@ public class MasterRegistryClientTest {
@Test
public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, false);
masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, true);
//Cannot mock static methods
masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true);
masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, true);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.dolphinscheduler.registry.api;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
@ -43,4 +44,6 @@ public interface Registry extends Closeable {
boolean acquireLock(String key);
boolean releaseLock(String key);
Duration getSessionTimeout();
}

View File

@ -42,6 +42,7 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -233,6 +234,11 @@ public final class ZookeeperRegistry implements Registry {
return true;
}
@Override
public Duration getSessionTimeout() {
return properties.getSessionTimeout();
}
@Override
public void close() {
treeCacheMap.values().forEach(CloseableUtils::closeQuietly);

View File

@ -664,6 +664,7 @@ public class ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@ -853,6 +854,7 @@ public class ProcessService {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
processInstance.setRestartTime(new Date());
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
int runTime = processInstance.getRunTimes();
switch (commandType) {
@ -922,6 +924,7 @@ public class ProcessService {
updateTaskInstance(taskInstance);
}
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
processInstance.setEndTime(null);
processInstance.setRunTimes(runTime + 1);
initComplementDataParam(processDefinition, processInstance, cmdParam);
@ -1862,6 +1865,10 @@ public class ProcessService {
return processInstanceMapper.queryByHostAndStatus(host, stateArray);
}
public List<String> queryNeedFailoverProcessInstanceHost() {
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
}
/**
* process need failover process instance
*

View File

@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@ -310,4 +311,8 @@ public class RegistryClient {
}
}
}
public Duration getSessionTimeout() {
return registry.getSessionTimeout();
}
}

View File

@ -110,6 +110,10 @@ master:
reserved-memory: 0.3
# use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file
task-logger: true
# failover interval, the unit is minute
failover-interval: 10
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
worker:
# worker listener port