[Fix-8828] [Master] Assign tasks to worker optimization (#9919)

* fix 9584

* master recall

* fix ut

* update logger

* update delay queue

* fix ut

* remove sleep

Co-authored-by: 进勇 <lijinyong@cai-inc.com>
Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
This commit is contained in:
JinYong Li 2022-05-31 11:49:54 +08:00 committed by GitHub
parent a8a5da367a
commit 49979c658e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 554 additions and 81 deletions

View File

@ -22,5 +22,5 @@ public enum Event {
DELAY,
RUNNING,
RESULT,
;
WORKER_REJECT
}

View File

@ -97,7 +97,7 @@ public class StateEvent {
public String toString() {
return "State Event :"
+ "key: " + key
+ " type: " + type.toString()
+ " type: " + type
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId

View File

@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
public class HeartBeat {
private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class);
public static final String COMMA = ",";
private long startupTime;
private long reportTime;
@ -205,18 +204,18 @@ public class HeartBeat {
this.updateServerState();
StringBuilder builder = new StringBuilder(100);
builder.append(cpuUsage).append(COMMA);
builder.append(memoryUsage).append(COMMA);
builder.append(loadAverage).append(COMMA);
builder.append(cpuUsage).append(Constants.COMMA);
builder.append(memoryUsage).append(Constants.COMMA);
builder.append(loadAverage).append(Constants.COMMA);
builder.append(availablePhysicalMemorySize).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startupTime).append(Constants.COMMA);
builder.append(reportTime).append(Constants.COMMA);
builder.append(serverStatus).append(COMMA);
builder.append(processId).append(COMMA);
builder.append(workerHostWeight).append(COMMA);
builder.append(workerExecThreadCount).append(COMMA);
builder.append(serverStatus).append(Constants.COMMA);
builder.append(processId).append(Constants.COMMA);
builder.append(workerHostWeight).append(Constants.COMMA);
builder.append(workerExecThreadCount).append(Constants.COMMA);
builder.append(workerWaitingTaskCount);
return builder.toString();

View File

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
@ -96,6 +97,9 @@ public class MasterServer implements IStoppable {
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskRecallProcessor taskRecallProcessor;
@Autowired
private EventExecuteService eventExecuteService;
@ -126,6 +130,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -29,6 +31,8 @@ import org.apache.commons.lang.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -39,6 +43,8 @@ import org.springframework.stereotype.Service;
@Service
public class ExecutorDispatcher implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ExecutorDispatcher.class);
/**
* netty executor manager
*/
@ -71,30 +77,23 @@ public class ExecutorDispatcher implements InitializingBean {
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
// get executor manager
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
/**
* host select
*/
// host select
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
+ "current task needs worker group %s to execute",
context.getCommand(),context.getWorkerGroup()));
logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
return false;
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
/**
* task execute
*/
// task execute
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
@ -68,6 +70,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@Autowired
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskRecallProcessor taskRecallProcessor;
/**
* netty remote client
*/
@ -86,6 +91,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
}
/**
@ -97,25 +103,13 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
/**
* all nodes
*/
// all nodes
Set<String> allNodes = getAllNodes(context);
/**
* fail nodes
*/
// fail nodes
Set<String> failNodeSet = new HashSet<>();
/**
* build command accord executeContext
*/
// build command accord executeContext
Command command = context.getCommand();
/**
* execute task host
*/
// execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
@ -158,9 +152,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
* @throws ExecuteException if error throws ExecuteException
*/
public void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry countdefault retry 3
*/
// retry countdefault retry 3
int retryCount = 3;
boolean success = false;
do {
@ -170,7 +162,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
ThreadUtils.sleep(100);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} while (retryCount >= 0 && !success);

View File

@ -178,7 +178,7 @@ public class LowerWeightHostManager extends CommonHostManager {
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getStartupTime()));
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
}
}

View File

@ -37,10 +37,13 @@ public class HostWeight {
private double currentWeight;
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, long startTime) {
private final int waitingTaskCount;
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, int waitingTaskCount, long startTime) {
this.hostWorker = hostWorker;
this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
this.currentWeight = this.weight;
this.waitingTaskCount = waitingTaskCount;
}
public double getWeight() {
@ -63,12 +66,17 @@ public class HostWeight {
return (Host)hostWorker;
}
public int getWaitingTaskCount() {
return waitingTaskCount;
}
@Override
public String toString() {
return "HostWeight{"
+ "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ ", waitingTaskCount=" + waitingTaskCount
+ '}';
}

View File

@ -18,6 +18,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
/**
* lower weight round robin
@ -35,7 +40,8 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
List<HostWeight> weights = canAssignTaskHost(sources);
for (HostWeight hostWeight : weights) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
@ -45,7 +51,21 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
if (!zeroWaitingTask.isEmpty()) {
return zeroWaitingTask;
}
HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())
.collect(Collectors.toList());
if (!equalWaitingTask.isEmpty()) {
waitingTask.addAll(equalWaitingTask);
}
return waitingTask;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task recall processor
*/
@Component
public class TaskRecallProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskRecallProcessor.class);
@Autowired
private TaskEventService taskEventService;
/**
* task ack process
*
* @param channel channel channel
* @param command command TaskExecuteAckCommand
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
logger.info("taskRecallCommand : {}", recallCommand);
TaskEvent taskEvent = TaskEvent.newRecall(recallCommand, channel);
taskEventService.addEvent(taskEvent);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@ -135,6 +136,15 @@ public class TaskEvent {
return event;
}
public static TaskEvent newRecall(TaskRecallCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
event.setChannel(channel);
event.setEvent(Event.WORKER_REJECT);
return event;
}
public String getVarPool() {
return varPool;
}

View File

@ -24,9 +24,12 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -133,6 +136,9 @@ public class TaskExecuteThread {
case RESULT:
handleResultEvent(taskEvent, taskInstance);
break;
case WORKER_REJECT:
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteThread);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
@ -185,7 +191,7 @@ public class TaskExecuteThread {
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker ack master error", e);
logger.error("handle worker ack master error", e);
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
@ -216,9 +222,28 @@ public class TaskExecuteThread {
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
logger.error("handle worker response master error", e);
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
/**
* handle result event
*/
private void handleWorkerRejectEvent(Channel channel, TaskInstance taskInstance, WorkflowExecuteThread executeThread) {
try {
if (executeThread != null) {
executeThread.resubmit(taskInstance.getTaskCode());
}
if (channel != null) {
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
}
} catch (Exception e) {
logger.error("handle worker reject error", e);
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import net.bytebuddy.implementation.bytecode.Throw;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
@ -1974,6 +1975,15 @@ public class WorkflowExecuteThread {
}
}
public void resubmit(long taskCode) throws Exception {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskCode);
if (taskProcessor != null) {
taskProcessor.action(TaskAction.RESUBMIT);
logger.debug("RESUBMIT: task code:{}", taskCode);
} else {
throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode);
}
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param
Map<String, String> startParamMap = new HashMap<>();

View File

@ -159,6 +159,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean submitTask();
/*
* resubmit task
*/
protected abstract boolean resubmitTask();
/**
* run task
*/
@ -188,6 +193,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return run();
case DISPATCH:
return dispatch();
case RESUBMIT:
return resubmit();
default:
logger.error("unknown task action: {}", taskAction);
}
@ -196,6 +203,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return false;
}
protected boolean resubmit() {
return resubmitTask();
}
protected boolean submit() {
return submitTask();
}

View File

@ -126,6 +126,11 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
protected boolean dispatchTask() {
return false;

View File

@ -71,6 +71,15 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
return dispatchTask();
}
@Override
public boolean runTask() {
return true;
@ -110,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
logger.debug("task ready to submit: {}", taskInstance.getName());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),

View File

@ -83,6 +83,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
protected boolean dispatchTask() {
return true;

View File

@ -99,6 +99,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
protected boolean dispatchTask() {
return true;

View File

@ -92,6 +92,11 @@ public class SubTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
protected boolean dispatchTask() {
return true;

View File

@ -90,6 +90,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
protected boolean dispatchTask() {
return true;

View File

@ -26,5 +26,6 @@ public enum TaskAction {
TIMEOUT,
SUBMIT,
RUN,
DISPATCH
DISPATCH,
RESUBMIT
}

View File

@ -28,33 +28,36 @@ public class LowerWeightRoundRobinTest {
@Test
public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 1, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.3", result.getHost().getIp());
Assert.assertEquals("192.158.2.3", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
}
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;

View File

@ -98,6 +98,16 @@ public enum CommandType {
*/
TASK_KILL_RESPONSE,
/**
* task recall
*/
TASK_RECALL,
/**
* task recall ack
*/
TASK_RECALL_ACK,
/**
* HEART_BEAT
*/

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* task recall ack command
*/
public class TaskRecallAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public TaskRecallAckCommand() {
super();
}
public TaskRecallAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_RECALL_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* kill task recall command
*/
public class TaskRecallCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* host
*/
private String host;
/**
* process instance id
*/
private int processInstanceId;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_RECALL);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", host='" + host + '\''
+ ", processInstanceId=" + processInstanceId
+ '}';
}
}

View File

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@ -110,6 +111,9 @@ public class WorkerServer implements IStoppable {
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskRecallAckProcessor taskRecallAckProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@ -146,7 +150,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);

View File

@ -37,8 +37,9 @@ public class ResponseCache {
return instance;
}
private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
private final Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
private final Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
private final Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
/**
* cache response
@ -55,11 +56,27 @@ public class ResponseCache {
case RESULT:
responseCache.put(taskInstanceId, command);
break;
case WORKER_REJECT:
recallCache.put(taskInstanceId, command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
}
/**
* recall response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeRecallCache(Integer taskInstanceId) {
recallCache.remove(taskInstanceId);
}
public Map<Integer, Command> getRecallCache() {
return recallCache;
}
/**
* remove running cache
*

View File

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
@ -222,6 +223,14 @@ public class TaskCallbackService {
return taskKillResponseCommand;
}
private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskRecallCommand.setHost(taskExecutionContext.getHost());
return taskRecallCommand;
}
/**
* send task execute running command
* todo unified callback command
@ -257,4 +266,13 @@ public class TaskCallbackService {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
}
/**
* send task execute response command
*/
public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
}
}

View File

@ -159,8 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
@ -174,10 +173,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendRecallCommand(taskExecutionContext);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
@Component
public class TaskRecallAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(command.getBody(), TaskRecallAckCommand.class);
if (taskRecallAckCommand == null) {
return;
}
if (taskRecallAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
logger.debug("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
}
}
}

View File

@ -82,6 +82,14 @@ public class RetryReportTaskStatusThread implements Runnable {
taskCallbackService.send(taskInstanceId, responseCommand);
}
}
if (!instance.getRecallCache().isEmpty()) {
Map<Integer, Command> recallCache = instance.getRecallCache();
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.send(taskInstanceId, responseCommand);
}
}
} catch (Exception e) {
logger.warn("retry report task status error", e);
}

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -26,10 +27,9 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,7 +47,7 @@ public class WorkerManagerThread implements Runnable {
/**
* task queue
*/
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
@Autowired(required = false)
private StorageOperate storageOperate;
@ -63,12 +63,16 @@ public class WorkerManagerThread implements Runnable {
@Autowired
private TaskCallbackService taskCallbackService;
private volatile int workerExecThreads;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecThreads = workerConfig.getExecThreads();
this.waitSubmitQueue = new DelayQueue<>();
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
taskExecuteThreadMap
@ -80,12 +84,12 @@ public class WorkerManagerThread implements Runnable {
}
/**
* get delay queue size
* get wait submit queue size
*
* @return queue size
*/
public int getDelayQueueSize() {
return workerExecuteQueue.size();
public int getWaitSubmitQueueSize() {
return waitSubmitQueue.size();
}
/**
@ -102,9 +106,9 @@ public class WorkerManagerThread implements Runnable {
* then send Response to Master, update the execution status of task instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
workerExecuteQueue.stream()
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(workerExecuteQueue::remove);
.forEach(waitSubmitQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
@ -127,7 +131,14 @@ public class WorkerManagerThread implements Runnable {
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
return workerExecuteQueue.offer(taskExecuteThread);
if (waitSubmitQueue.size() > workerExecThreads) {
// if waitSubmitQueue is full, it will wait 1s, then try add
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
if (waitSubmitQueue.size() > workerExecThreads) {
return false;
}
}
return waitSubmitQueue.offer(taskExecuteThread);
}
public void start() {
@ -142,9 +153,15 @@ public class WorkerManagerThread implements Runnable {
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
taskExecuteThread = workerExecuteQueue.take();
taskExecuteThread.setStorageOperate(storageOperate);
workerExecService.submit(taskExecuteThread);
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
taskExecuteThread = waitSubmitQueue.take();
taskExecuteThread.setStorageOperate(storageOperate);
workerExecService.submit(taskExecuteThread);
} else {
logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);