mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-04 13:17:50 +08:00
add TaskResponseProcessor
This commit is contained in:
parent
6bf98146b6
commit
63b76d7154
@ -128,4 +128,13 @@ public enum ExecutionStatus {
|
||||
public String getDescp() {
|
||||
return descp;
|
||||
}
|
||||
|
||||
public static ExecutionStatus of(int status){
|
||||
for(ExecutionStatus es : values()){
|
||||
if(es.getCode() == status){
|
||||
return es;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("invalid status : " + status);
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,11 @@ import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
|
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.OSUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
|
||||
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
|
||||
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
@ -90,6 +94,8 @@ public class MasterServer implements IStoppable {
|
||||
@Autowired
|
||||
private SpringApplicationContext springApplicationContext;
|
||||
|
||||
private NettyRemotingServer nettyRemotingServer;
|
||||
|
||||
|
||||
/**
|
||||
* master server startup
|
||||
@ -108,6 +114,15 @@ public class MasterServer implements IStoppable {
|
||||
*/
|
||||
@PostConstruct
|
||||
public void run(){
|
||||
|
||||
//
|
||||
//init remoting server
|
||||
NettyServerConfig serverConfig = new NettyServerConfig();
|
||||
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
//
|
||||
zkMasterClient.init();
|
||||
|
||||
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
|
||||
|
@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.future;
|
||||
|
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TaskFuture {
|
||||
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(TaskFuture.class);
|
||||
|
||||
private final static ConcurrentHashMap<Long,TaskFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
|
||||
|
||||
/**
|
||||
* request unique identification
|
||||
*/
|
||||
private final long opaque;
|
||||
|
||||
/**
|
||||
* timeout
|
||||
*/
|
||||
private final long timeoutMillis;
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
private final long beginTimestamp = System.currentTimeMillis();
|
||||
|
||||
/**
|
||||
* response command
|
||||
*/
|
||||
private volatile Command responseCommand;
|
||||
|
||||
private volatile boolean sendOk = true;
|
||||
|
||||
private volatile Throwable cause;
|
||||
|
||||
public TaskFuture(long opaque, long timeoutMillis) {
|
||||
this.opaque = opaque;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
FUTURE_TABLE.put(opaque, this);
|
||||
}
|
||||
|
||||
/**
|
||||
* wait for response
|
||||
* @return command
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public Command waitResponse() throws InterruptedException {
|
||||
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
return this.responseCommand;
|
||||
}
|
||||
|
||||
/**
|
||||
* put response
|
||||
*
|
||||
* @param responseCommand responseCommand
|
||||
*/
|
||||
public void putResponse(final Command responseCommand) {
|
||||
this.responseCommand = responseCommand;
|
||||
this.latch.countDown();
|
||||
FUTURE_TABLE.remove(opaque);
|
||||
}
|
||||
|
||||
/**
|
||||
* whether timeout
|
||||
* @return timeout
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
long diff = System.currentTimeMillis() - this.beginTimestamp;
|
||||
return diff > this.timeoutMillis;
|
||||
}
|
||||
|
||||
public static void notify(final Command responseCommand){
|
||||
TaskFuture taskFuture = FUTURE_TABLE.remove(responseCommand.getOpaque());
|
||||
if(taskFuture != null){
|
||||
taskFuture.putResponse(responseCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean isSendOK() {
|
||||
return sendOk;
|
||||
}
|
||||
|
||||
public void setSendOk(boolean sendOk) {
|
||||
this.sendOk = sendOk;
|
||||
}
|
||||
|
||||
public void setCause(Throwable cause) {
|
||||
this.cause = cause;
|
||||
}
|
||||
|
||||
public Throwable getCause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
public long getOpaque() {
|
||||
return opaque;
|
||||
}
|
||||
|
||||
public long getTimeoutMillis() {
|
||||
return timeoutMillis;
|
||||
}
|
||||
|
||||
public long getBeginTimestamp() {
|
||||
return beginTimestamp;
|
||||
}
|
||||
|
||||
public Command getResponseCommand() {
|
||||
return responseCommand;
|
||||
}
|
||||
|
||||
public void setResponseCommand(Command responseCommand) {
|
||||
this.responseCommand = responseCommand;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ResponseFuture{" +
|
||||
"opaque=" + opaque +
|
||||
", timeoutMillis=" + timeoutMillis +
|
||||
", latch=" + latch +
|
||||
", beginTimestamp=" + beginTimestamp +
|
||||
", responseCommand=" + responseCommand +
|
||||
", sendOk=" + sendOk +
|
||||
", cause=" + cause +
|
||||
'}';
|
||||
}
|
||||
|
||||
/**
|
||||
* scan future table
|
||||
*/
|
||||
public static void scanFutureTable(){
|
||||
final List<TaskFuture> futureList = new LinkedList<>();
|
||||
Iterator<Map.Entry<Long, TaskFuture>> it = FUTURE_TABLE.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<Long, TaskFuture> next = it.next();
|
||||
TaskFuture future = next.getValue();
|
||||
if ((future.getBeginTimestamp() + future.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
|
||||
futureList.add(future);
|
||||
it.remove();
|
||||
LOGGER.warn("remove timeout request : {}", future);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.utils.Preconditions;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.apache.dolphinscheduler.server.master.future.TaskFuture;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* task response processor
|
||||
*/
|
||||
public class TaskResponseProcessor implements NettyRequestProcessor {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
|
||||
|
||||
/**
|
||||
* process service
|
||||
*/
|
||||
private final ProcessService processService;
|
||||
|
||||
public TaskResponseProcessor(ProcessService processService){
|
||||
this.processService = processService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
|
||||
logger.info("received command : {}", command);
|
||||
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
|
||||
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
|
||||
TaskFuture.notify(command);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -16,14 +16,15 @@
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.runner;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.dao.AlertDao;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.dao.utils.BeanContext;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.*;
|
||||
import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
@ -121,31 +122,20 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
|
||||
|
||||
// TODO send task to worker
|
||||
public void sendToWorker(String taskInstanceJson){
|
||||
public void sendToWorker(TaskInstance taskInstance){
|
||||
final Address address = new Address("127.0.0.1", 12346);
|
||||
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(taskInstanceJson);
|
||||
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(FastJsonSerializer.serializeToString(taskInstance));
|
||||
try {
|
||||
Command responseCommand = nettyRemotingClient.sendSync(address,
|
||||
taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
|
||||
Command responseCommand = nettyRemotingClient.sendSync(address, taskRequestCommand.convert2Command(), Integer.MAX_VALUE);
|
||||
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(responseCommand.getBody(), ExecuteTaskAckCommand.class);
|
||||
logger.info("taskAckCommand : {}",taskAckCommand);
|
||||
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
|
||||
taskAckCommand.getStartTime(),
|
||||
taskAckCommand.getHost(),
|
||||
taskAckCommand.getExecutePath(),
|
||||
taskAckCommand.getLogPath(),
|
||||
taskInstance.getId());
|
||||
|
||||
logger.info("receive command : {}", responseCommand);
|
||||
|
||||
final CommandType commandType = responseCommand.getType();
|
||||
switch (commandType){
|
||||
case EXECUTE_TASK_ACK:
|
||||
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
|
||||
responseCommand.getBody(), ExecuteTaskAckCommand.class);
|
||||
logger.info("taskAckCommand : {}",taskAckCommand);
|
||||
break;
|
||||
case EXECUTE_TASK_RESPONSE:
|
||||
ExecuteTaskResponseCommand taskResponseCommand = FastJsonSerializer.deserialize(
|
||||
responseCommand.getBody(), ExecuteTaskResponseCommand.class);
|
||||
logger.info("taskResponseCommand : {}",taskResponseCommand);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown commandType");
|
||||
}
|
||||
logger.info("response result : {}",responseCommand);
|
||||
} catch (InterruptedException | RemotingException ex) {
|
||||
logger.error(String.format("send command to : %s error", address), ex);
|
||||
}
|
||||
@ -174,7 +164,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
}
|
||||
if(submitDB && !submitQueue){
|
||||
// submit task to queue
|
||||
sendToWorker(JSONObject.toJSONString(task));
|
||||
sendToWorker(task);
|
||||
submitQueue = true;
|
||||
}
|
||||
if(submitDB && submitQueue){
|
||||
|
@ -171,9 +171,8 @@ public class WorkerServer implements IStoppable {
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService));
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
// TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry
|
||||
// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
|
||||
// this.workerRegistry.registry();
|
||||
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
|
||||
this.workerRegistry.registry();
|
||||
|
||||
this.zkWorkerClient.init();
|
||||
|
||||
|
@ -96,9 +96,6 @@ public class TaskScheduleThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
// TODO Need to be removed and kept temporarily update task instance state
|
||||
updateTaskState(taskInstance.getTaskType());
|
||||
|
||||
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
|
||||
|
||||
try {
|
||||
@ -167,31 +164,14 @@ public class TaskScheduleThread implements Runnable {
|
||||
responseCommand.setStatus(task.getExitStatus().getCode());
|
||||
responseCommand.setEndTime(new Date());
|
||||
logger.info("task instance id : {},task final status : {}", taskInstance.getId(), task.getExitStatus());
|
||||
|
||||
}catch (Exception e){
|
||||
logger.error("task scheduler failure", e);
|
||||
kill();
|
||||
|
||||
//TODO Need to be removed and kept temporarily update task instance state
|
||||
processService.changeTaskState(ExecutionStatus.FAILURE,
|
||||
new Date(),
|
||||
taskInstance.getId());
|
||||
|
||||
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
|
||||
responseCommand.setEndTime(new Date());
|
||||
|
||||
} finally {
|
||||
taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
|
||||
}
|
||||
|
||||
logger.info("task instance id : {},task final status : {}",
|
||||
taskInstance.getId(),
|
||||
task.getExitStatus());
|
||||
// update task instance state
|
||||
processService.changeTaskState(task.getExitStatus(),
|
||||
new Date(),
|
||||
taskInstance.getId());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user