mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-04 05:09:48 +08:00
updates
This commit is contained in:
parent
658922056a
commit
00c9b80108
@ -34,6 +34,6 @@ public class Stopper {
|
||||
}
|
||||
|
||||
public static final void stop(){
|
||||
signal.getAndSet(true);
|
||||
signal.set(true);
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
|
||||
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
|
||||
@ -33,7 +34,8 @@ import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
|
||||
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
@ -64,7 +66,7 @@ public class NettyRemotingClient {
|
||||
/**
|
||||
* channels
|
||||
*/
|
||||
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
|
||||
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
|
||||
|
||||
/**
|
||||
* started flag
|
||||
@ -158,17 +160,17 @@ public class NettyRemotingClient {
|
||||
|
||||
/**
|
||||
* async send
|
||||
* @param address address
|
||||
* @param host host
|
||||
* @param command command
|
||||
* @param timeoutMillis timeoutMillis
|
||||
* @param invokeCallback callback function
|
||||
* @throws InterruptedException
|
||||
* @throws RemotingException
|
||||
*/
|
||||
public void sendAsync(final Address address, final Command command,
|
||||
public void sendAsync(final Host host, final Command command,
|
||||
final long timeoutMillis,
|
||||
final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
final Channel channel = getChannel(host);
|
||||
if (channel == null) {
|
||||
throw new RemotingException("network error");
|
||||
}
|
||||
@ -214,7 +216,7 @@ public class NettyRemotingClient {
|
||||
});
|
||||
} catch (Throwable ex){
|
||||
responseFuture.release();
|
||||
throw new RemotingException(String.format("send command to address: %s failed", address), ex);
|
||||
throw new RemotingException(String.format("send command to host: %s failed", host), ex);
|
||||
}
|
||||
} else{
|
||||
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
|
||||
@ -225,17 +227,17 @@ public class NettyRemotingClient {
|
||||
|
||||
/**
|
||||
* sync send
|
||||
* @param address address
|
||||
* @param host host
|
||||
* @param command command
|
||||
* @param timeoutMillis timeoutMillis
|
||||
* @return command
|
||||
* @throws InterruptedException
|
||||
* @throws RemotingException
|
||||
*/
|
||||
public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
|
||||
final Channel channel = getChannel(address);
|
||||
public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
|
||||
final Channel channel = getChannel(host);
|
||||
if (channel == null) {
|
||||
throw new RemotingException(String.format("connect to : %s fail", address));
|
||||
throw new RemotingException(String.format("connect to : %s fail", host));
|
||||
}
|
||||
final long opaque = command.getOpaque();
|
||||
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
|
||||
@ -250,7 +252,7 @@ public class NettyRemotingClient {
|
||||
}
|
||||
responseFuture.setCause(future.cause());
|
||||
responseFuture.putResponse(null);
|
||||
logger.error("send command {} to address {} failed", command, address);
|
||||
logger.error("send command {} to host {} failed", command, host);
|
||||
}
|
||||
});
|
||||
/**
|
||||
@ -259,49 +261,89 @@ public class NettyRemotingClient {
|
||||
Command result = responseFuture.waitResponse();
|
||||
if(result == null){
|
||||
if(responseFuture.isSendOK()){
|
||||
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
|
||||
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
|
||||
} else{
|
||||
throw new RemotingException(address.toString(), responseFuture.getCause());
|
||||
throw new RemotingException(host.toString(), responseFuture.getCause());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void send(final Host host, final Command command) throws RemotingException {
|
||||
Channel channel = getChannel(host);
|
||||
if (channel == null) {
|
||||
throw new RemotingException(String.format("connect to : %s fail", host));
|
||||
}
|
||||
try {
|
||||
ChannelFuture future = channel.writeAndFlush(command).await();
|
||||
if (future.isSuccess()) {
|
||||
logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
|
||||
} else {
|
||||
String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
|
||||
logger.error(msg, future.cause());
|
||||
throw new RemotingException(msg);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
|
||||
throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* register processor
|
||||
* @param commandType command type
|
||||
* @param processor processor
|
||||
*/
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
|
||||
this.registerProcessor(commandType, processor, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* register processor
|
||||
*
|
||||
* @param commandType command type
|
||||
* @param processor processor
|
||||
* @param executor thread executor
|
||||
*/
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
|
||||
this.clientHandler.registerProcessor(commandType, processor, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* get channel
|
||||
* @param address
|
||||
* @param host
|
||||
* @return
|
||||
*/
|
||||
public Channel getChannel(Address address) {
|
||||
Channel channel = channels.get(address);
|
||||
public Channel getChannel(Host host) {
|
||||
Channel channel = channels.get(host);
|
||||
if(channel != null && channel.isActive()){
|
||||
return channel;
|
||||
}
|
||||
return createChannel(address, true);
|
||||
return createChannel(host, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* create channel
|
||||
* @param address address
|
||||
* @param host host
|
||||
* @param isSync sync flag
|
||||
* @return channel
|
||||
*/
|
||||
public Channel createChannel(Address address, boolean isSync) {
|
||||
public Channel createChannel(Host host, boolean isSync) {
|
||||
ChannelFuture future;
|
||||
try {
|
||||
synchronized (bootstrap){
|
||||
future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort()));
|
||||
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
|
||||
}
|
||||
if(isSync){
|
||||
future.sync();
|
||||
}
|
||||
if (future.isSuccess()) {
|
||||
Channel channel = future.channel();
|
||||
channels.put(address, channel);
|
||||
channels.put(host, channel);
|
||||
return channel;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.info("connect to {} error {}", address, ex);
|
||||
logger.info("connect to {} error {}", host, ex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -341,10 +383,10 @@ public class NettyRemotingClient {
|
||||
|
||||
/**
|
||||
* close channel
|
||||
* @param address address
|
||||
* @param host host
|
||||
*/
|
||||
public void closeChannel(Address address){
|
||||
Channel channel = this.channels.remove(address);
|
||||
public void closeChannel(Host host){
|
||||
Channel channel = this.channels.remove(host);
|
||||
if(channel != null){
|
||||
channel.close();
|
||||
}
|
||||
|
@ -1 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task request command
*/
public class ExecuteTaskAckCommand implements Serializable {
private int taskInstanceId;
private Date startTime;
private String host;
private int status;
private String logPath;
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
", logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task request command
*/
public class ExecuteTaskAckCommand implements Serializable {
private int taskInstanceId;
private Date startTime;
private String host;
private int status;
private String logPath;
private String executePath;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_ACK);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskAckCommand{" +
"taskInstanceId=" + taskInstanceId +
", startTime=" + startTime +
", host='" + host + '\'' +
", status=" + status +
", logPath='" + logPath + '\'' +
", executePath='" + executePath + '\'' +
'}';
}
}
|
@ -1 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
/**
* task instance json
*/
private String taskInfoJson;
public String getTaskInfoJson() {
return taskInfoJson;
}
public void setTaskInfoJson(String taskInfoJson) {
this.taskInfoJson = taskInfoJson;
}
public ExecuteTaskRequestCommand() {
}
instance json
instance json
this.taskInfoJson = taskInfoJson;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskInfoJson='" + taskInfoJson + '\'' +
'}';
}
}
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* execute task request command
*/
public class ExecuteTaskRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
public ExecuteTaskRequestCommand() {
}
public ExecuteTaskRequestCommand(String taskExecutionContext) {
private String taskInfoJson;
instance json
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "ExecuteTaskRequestCommand{" +
"taskExecutionContext='" + taskExecutionContext + '\'' +
'}';
}
}
|
@ -1 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
*/
public class ExecuteTaskResponseCommand implements Serializable {
public ExecuteTaskResponseCommand() {
}
public ExecuteTaskResponseCommand(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
/**
* task instance id
*/
private int taskInstanceId;
/**
* status
*/
private int status;
/**
* end time
*/
private Date endTime;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
/**
* package response command
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
|
@ -26,9 +26,9 @@ import java.util.Date;
|
||||
public class TaskExecutionContext implements Serializable{
|
||||
|
||||
/**
|
||||
* task instance id
|
||||
* task id
|
||||
*/
|
||||
private Integer taskId;
|
||||
private Integer taskInstanceId;
|
||||
|
||||
|
||||
/**
|
||||
@ -107,12 +107,13 @@ public class TaskExecutionContext implements Serializable{
|
||||
*/
|
||||
private Integer projectId;
|
||||
|
||||
public Integer getTaskId() {
|
||||
return taskId;
|
||||
|
||||
public Integer getTaskInstanceId() {
|
||||
return taskInstanceId;
|
||||
}
|
||||
|
||||
public void setTaskId(Integer taskId) {
|
||||
this.taskId = taskId;
|
||||
public void setTaskInstanceId(Integer taskInstanceId) {
|
||||
this.taskInstanceId = taskInstanceId;
|
||||
}
|
||||
|
||||
public String getTaskName() {
|
||||
@ -230,7 +231,7 @@ public class TaskExecutionContext implements Serializable{
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TaskExecutionContext{" +
|
||||
"taskId=" + taskId +
|
||||
"taskInstanceId=" + taskInstanceId +
|
||||
", taskName='" + taskName + '\'' +
|
||||
", startTime=" + startTime +
|
||||
", taskType='" + taskType + '\'' +
|
||||
|
@ -19,12 +19,19 @@ package org.apache.dolphinscheduler.remote.handler;
|
||||
import io.netty.channel.*;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants;
|
||||
import org.apache.dolphinscheduler.remote.utils.Pair;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
/**
|
||||
* netty client request handler
|
||||
@ -44,9 +51,20 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
||||
*/
|
||||
private final ExecutorService callbackExecutor;
|
||||
|
||||
/**
|
||||
* processors
|
||||
*/
|
||||
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors;
|
||||
|
||||
/**
|
||||
* default executor
|
||||
*/
|
||||
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
|
||||
|
||||
public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
|
||||
this.nettyRemotingClient = nettyRemotingClient;
|
||||
this.callbackExecutor = callbackExecutor;
|
||||
this.processors = new ConcurrentHashMap();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,18 +89,43 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
processReceived((Command)msg);
|
||||
processReceived(ctx.channel(), (Command)msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* register processor
|
||||
*
|
||||
* @param commandType command type
|
||||
* @param processor processor
|
||||
*/
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
|
||||
this.registerProcessor(commandType, processor, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* register processor
|
||||
*
|
||||
* @param commandType command type
|
||||
* @param processor processor
|
||||
* @param executor thread executor
|
||||
*/
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
|
||||
ExecutorService executorRef = executor;
|
||||
if(executorRef == null){
|
||||
executorRef = defaultExecutor;
|
||||
}
|
||||
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
|
||||
}
|
||||
|
||||
/**
|
||||
* process received logic
|
||||
*
|
||||
* @param responseCommand responseCommand
|
||||
* @param command command
|
||||
*/
|
||||
private void processReceived(final Command responseCommand) {
|
||||
ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
|
||||
private void processReceived(final Channel channel, final Command command) {
|
||||
ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
|
||||
if(future != null){
|
||||
future.setResponseCommand(responseCommand);
|
||||
future.setResponseCommand(command);
|
||||
future.release();
|
||||
if(future.getInvokeCallback() != null){
|
||||
this.callbackExecutor.submit(new Runnable() {
|
||||
@ -92,10 +135,30 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
});
|
||||
} else{
|
||||
future.putResponse(responseCommand);
|
||||
future.putResponse(command);
|
||||
}
|
||||
} else{
|
||||
logger.warn("receive response {}, but not matched any request ", responseCommand);
|
||||
processByCommandType(channel, command);
|
||||
}
|
||||
}
|
||||
|
||||
public void processByCommandType(final Channel channel, final Command command) {
|
||||
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(command.getType());
|
||||
if (pair != null) {
|
||||
Runnable run = () -> {
|
||||
try {
|
||||
pair.getLeft().process(channel, command);
|
||||
} catch (Throwable e) {
|
||||
logger.error(String.format("process command %s exception", command), e);
|
||||
}
|
||||
};
|
||||
try {
|
||||
pair.getRight().submit(run);
|
||||
} catch (RejectedExecutionException e) {
|
||||
logger.warn("thread pool is full, discard command {} from {}", command, ChannelUtils.getRemoteAddress(channel));
|
||||
}
|
||||
} else {
|
||||
logger.warn("receive response {}, but not matched any request ", command);
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,30 +175,4 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
|
||||
ctx.channel().close();
|
||||
}
|
||||
|
||||
/**
|
||||
* channel write changed
|
||||
*
|
||||
* @param ctx channel handler context
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||
Channel ch = ctx.channel();
|
||||
ChannelConfig config = ch.config();
|
||||
|
||||
if (!ch.isWritable()) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("{} is not writable, over high water level : {}",
|
||||
new Object[]{ch, config.getWriteBufferHighWaterMark()});
|
||||
}
|
||||
|
||||
config.setAutoRead(false);
|
||||
} else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("{} is writable, to low water : {}",
|
||||
new Object[]{ch, config.getWriteBufferLowWaterMark()});
|
||||
}
|
||||
config.setAutoRead(true);
|
||||
}
|
||||
}
|
||||
}
|
@ -98,7 +98,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
||||
if(executorRef == null){
|
||||
executorRef = nettyRemotingServer.getDefaultExecutor();
|
||||
}
|
||||
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
|
||||
this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,96 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.utils;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* server address
|
||||
*/
|
||||
public class Address implements Serializable {
|
||||
|
||||
/**
|
||||
* host
|
||||
*/
|
||||
private String host;
|
||||
|
||||
/**
|
||||
* port
|
||||
*/
|
||||
private int port;
|
||||
|
||||
public Address(){
|
||||
//NOP
|
||||
}
|
||||
|
||||
public Address(String host, int port){
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((host == null) ? 0 : host.hashCode());
|
||||
result = prime * result + port;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
Address other = (Address) obj;
|
||||
if (host == null) {
|
||||
if (other.host != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!host.equals(other.host)) {
|
||||
return false;
|
||||
}
|
||||
return port == other.port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Address [host=" + host + ", port=" + port + "]";
|
||||
}
|
||||
}
|
@ -49,9 +49,9 @@ public class ChannelUtils {
|
||||
* @param channel channel
|
||||
* @return address
|
||||
*/
|
||||
public static Address toAddress(Channel channel){
|
||||
public static Host toAddress(Channel channel){
|
||||
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
|
||||
return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
|
||||
return new Host(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,14 +14,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.remote.utils;
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.host;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
public class Host {
|
||||
/**
|
||||
* server address
|
||||
*/
|
||||
public class Host implements Serializable {
|
||||
|
||||
private String address;
|
||||
|
||||
@ -67,7 +68,7 @@ public class Host {
|
||||
public static Host of(String address){
|
||||
String[] parts = address.split(":");
|
||||
if (parts.length != 2) {
|
||||
throw new IllegalArgumentException(String.format("Address : %s illegal.", address));
|
||||
throw new IllegalArgumentException(String.format("Host : %s illegal.", address));
|
||||
}
|
||||
Host host = new Host(parts[0], Integer.parseInt(parts[1]));
|
||||
return host;
|
@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -62,7 +62,7 @@ public class NettyRemotingClientTest {
|
||||
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
|
||||
Command commandPing = Ping.create();
|
||||
try {
|
||||
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
|
||||
Command response = client.sendSync(new Host("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
|
||||
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
@ -93,7 +93,7 @@ public class NettyRemotingClientTest {
|
||||
Command commandPing = Ping.create();
|
||||
try {
|
||||
final AtomicLong opaque = new AtomicLong(0);
|
||||
client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
|
||||
client.sendAsync(new Host("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
|
||||
@Override
|
||||
public void operationComplete(ResponseFuture responseFuture) {
|
||||
opaque.set(responseFuture.getOpaque());
|
||||
|
@ -40,7 +40,7 @@ public class TaskExecutionContextBuilder {
|
||||
* @return TaskExecutionContextBuilder
|
||||
*/
|
||||
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
|
||||
taskExecutionContext.setTaskId(taskInstance.getId());
|
||||
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
|
||||
taskExecutionContext.setTaskName(taskInstance.getName());
|
||||
taskExecutionContext.setStartTime(taskInstance.getStartTime());
|
||||
taskExecutionContext.setTaskType(taskInstance.getTaskType());
|
||||
|
@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
|
||||
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
|
||||
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
|
||||
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
|
||||
@ -106,7 +107,6 @@ public class MasterServer implements IStoppable {
|
||||
public static void main(String[] args) {
|
||||
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
|
||||
new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -121,6 +121,7 @@ public class MasterServer implements IStoppable {
|
||||
serverConfig.setListenPort(45678);
|
||||
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor(processService));
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor(processService));
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
//
|
||||
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.dispatch;
|
||||
|
||||
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Service
|
||||
public class ExecutorDispatcher implements InitializingBean {
|
||||
|
||||
@Autowired
|
||||
private NettyExecutorManager nettyExecutorManager;
|
||||
|
||||
@Autowired
|
||||
private RoundRobinHostManager hostManager;
|
||||
|
||||
private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers;
|
||||
|
||||
public ExecutorDispatcher(){
|
||||
this.executorManagers = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public void dispatch(final ExecutionContext executeContext) throws ExecuteException {
|
||||
ExecutorManager executorManager = this.executorManagers.get(executeContext.getExecutorType());
|
||||
if(executorManager == null){
|
||||
throw new ExecuteException("no ExecutorManager for type : " + executeContext.getExecutorType());
|
||||
}
|
||||
Host host = hostManager.select(executeContext);
|
||||
if (StringUtils.isEmpty(host.getAddress())) {
|
||||
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", executeContext.getContext()));
|
||||
}
|
||||
executeContext.setHost(host);
|
||||
executorManager.beforeExecute(executeContext);
|
||||
try {
|
||||
executorManager.execute(executeContext);
|
||||
} finally {
|
||||
executorManager.afterExecute(executeContext);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
register(ExecutorType.WORKER, nettyExecutorManager);
|
||||
register(ExecutorType.CLIENT, nettyExecutorManager);
|
||||
}
|
||||
|
||||
public void register(ExecutorType type, ExecutorManager executorManager){
|
||||
executorManagers.put(type, executorManager);
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.context;
|
||||
|
||||
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
|
||||
public class ExecutionContext {
|
||||
|
||||
private Host host;
|
||||
|
||||
private final Object context;
|
||||
|
||||
private final ExecutorType executorType;
|
||||
|
||||
public ExecutionContext(Object context, ExecutorType executorType) {
|
||||
this.context = context;
|
||||
this.executorType = executorType;
|
||||
}
|
||||
|
||||
public ExecutorType getExecutorType() {
|
||||
return executorType;
|
||||
}
|
||||
|
||||
public Object getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public Host getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public void setHost(Host host) {
|
||||
this.host = host;
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.enums;
|
||||
|
||||
|
||||
public enum ExecutorType {
|
||||
|
||||
WORKER,
|
||||
|
||||
CLIENT;
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
|
||||
|
||||
|
||||
public class ExecuteException extends Exception{
|
||||
|
||||
public ExecuteException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new exception with the specified detail message. The
|
||||
* cause is not initialized, and may subsequently be initialized by
|
||||
* a call to {@link #initCause}.
|
||||
*
|
||||
* @param message the detail message. The detail message is saved for
|
||||
* later retrieval by the {@link #getMessage()} method.
|
||||
*/
|
||||
public ExecuteException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new exception with the specified detail message and
|
||||
* cause. <p>Note that the detail message associated with
|
||||
* {@code cause} is <i>not</i> automatically incorporated in
|
||||
* this exception's detail message.
|
||||
*
|
||||
* @param message the detail message (which is saved for later retrieval
|
||||
* by the {@link #getMessage()} method).
|
||||
* @param cause the cause (which is saved for later retrieval by the
|
||||
* {@link #getCause()} method). (A <tt>null</tt> value is
|
||||
* permitted, and indicates that the cause is nonexistent or
|
||||
* unknown.)
|
||||
* @since 1.4
|
||||
*/
|
||||
public ExecuteException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new exception with the specified cause and a detail
|
||||
* message of <tt>(cause==null ? null : cause.toString())</tt> (which
|
||||
* typically contains the class and detail message of <tt>cause</tt>).
|
||||
* This constructor is useful for exceptions that are little more than
|
||||
* wrappers for other throwables (for example, {@link
|
||||
* java.security.PrivilegedActionException}).
|
||||
*
|
||||
* @param cause the cause (which is saved for later retrieval by the
|
||||
* {@link #getCause()} method). (A <tt>null</tt> value is
|
||||
* permitted, and indicates that the cause is nonexistent or
|
||||
* unknown.)
|
||||
* @since 1.4
|
||||
*/
|
||||
public ExecuteException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new exception with the specified detail message,
|
||||
* cause, suppression enabled or disabled, and writable stack
|
||||
* trace enabled or disabled.
|
||||
*
|
||||
* @param message the detail message.
|
||||
* @param cause the cause. (A {@code null} value is permitted,
|
||||
* and indicates that the cause is nonexistent or unknown.)
|
||||
* @param enableSuppression whether or not suppression is enabled
|
||||
* or disabled
|
||||
* @param writableStackTrace whether or not the stack trace should
|
||||
* be writable
|
||||
* @since 1.7
|
||||
*/
|
||||
protected ExecuteException(String message, Throwable cause,
|
||||
boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.executor;
|
||||
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
|
||||
|
||||
public abstract class AbstractExecutorManager implements ExecutorManager{
|
||||
|
||||
@Override
|
||||
public void beforeExecute(ExecutionContext executeContext) throws ExecuteException {
|
||||
//TODO add time monitor
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecute(ExecutionContext executeContext) throws ExecuteException {
|
||||
//TODO add dispatch monitor
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.executor;
|
||||
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
|
||||
|
||||
public interface ExecutorManager {
|
||||
|
||||
void beforeExecute(ExecutionContext executeContext) throws ExecuteException;
|
||||
|
||||
void execute(ExecutionContext executeContext) throws ExecuteException;
|
||||
|
||||
void afterExecute(ExecutionContext executeContext) throws ExecuteException;
|
||||
}
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.executor;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
@Service
|
||||
public class NettyExecutorManager extends AbstractExecutorManager{
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);
|
||||
|
||||
@Autowired
|
||||
private ZookeeperNodeManager zookeeperNodeManager;
|
||||
|
||||
private final NettyRemotingClient nettyRemotingClient;
|
||||
|
||||
public NettyExecutorManager(){
|
||||
final NettyClientConfig clientConfig = new NettyClientConfig();
|
||||
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(ExecutionContext executeContext) throws ExecuteException {
|
||||
Set<String> allNodes = getAllNodes(executeContext);
|
||||
Set<String> failNodeSet = new HashSet<>();
|
||||
//
|
||||
Command command = buildCommand(executeContext);
|
||||
Host host = executeContext.getHost();
|
||||
boolean success = false;
|
||||
//
|
||||
while (!success) {
|
||||
try {
|
||||
doExecute(host, command);
|
||||
success = true;
|
||||
executeContext.setHost(host);
|
||||
} catch (ExecuteException ex) {
|
||||
logger.error(String.format("execute context : %s error", executeContext.getContext()), ex);
|
||||
try {
|
||||
failNodeSet.add(host.getAddress());
|
||||
Set<String> tmpAllIps = new HashSet<>(allNodes);
|
||||
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
|
||||
if (remained != null && remained.size() > 0) {
|
||||
host = Host.of(remained.iterator().next());
|
||||
logger.error("retry execute context : {} host : {}", executeContext.getContext(), host);
|
||||
} else {
|
||||
throw new ExecuteException("fail after try all nodes");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new ExecuteException("fail after try all nodes");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Command buildCommand(ExecutionContext context) {
|
||||
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
|
||||
ExecutorType executorType = context.getExecutorType();
|
||||
switch (executorType){
|
||||
case WORKER:
|
||||
TaskExecutionContext taskExecutionContext = (TaskExecutionContext)context.getContext();
|
||||
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
|
||||
break;
|
||||
case CLIENT:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid executor type : " + executorType);
|
||||
|
||||
}
|
||||
return requestCommand.convert2Command();
|
||||
}
|
||||
|
||||
private void doExecute(final Host host, final Command command) throws ExecuteException {
|
||||
int retryCount = 3;
|
||||
boolean success = false;
|
||||
do {
|
||||
try {
|
||||
nettyRemotingClient.send(host, command);
|
||||
success = true;
|
||||
} catch (Exception ex) {
|
||||
logger.error(String.format("send command : %s to %s error", command, host), ex);
|
||||
retryCount--;
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignore) {}
|
||||
}
|
||||
} while (retryCount >= 0 && !success);
|
||||
|
||||
if (!success) {
|
||||
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
|
||||
}
|
||||
}
|
||||
|
||||
private Set<String> getAllNodes(ExecutionContext context){
|
||||
Set<String> nodes = Collections.EMPTY_SET;
|
||||
ExecutorType executorType = context.getExecutorType();
|
||||
switch (executorType){
|
||||
case WORKER:
|
||||
nodes = zookeeperNodeManager.getWorkerNodes();
|
||||
break;
|
||||
case CLIENT:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid executor type : " + executorType);
|
||||
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
}
|
@ -15,13 +15,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.host;
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host;
|
||||
|
||||
|
||||
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
|
||||
public interface HostManager {
|
||||
|
||||
Host select(TaskExecutionContext context);
|
||||
Host select(ExecutionContext context);
|
||||
|
||||
}
|
@ -15,11 +15,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.host;
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host;
|
||||
|
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.host.assign.RoundRobinSelector;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
|
||||
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -36,21 +39,36 @@ public class RoundRobinHostManager implements HostManager {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
|
||||
|
||||
@Autowired
|
||||
private RoundRobinSelector<Host> selector;
|
||||
|
||||
@Autowired
|
||||
private ZookeeperNodeManager zookeeperNodeManager;
|
||||
|
||||
private final Selector<Host> selector;
|
||||
|
||||
public RoundRobinHostManager(){
|
||||
this.selector = new RoundRobinSelector<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Host select(TaskExecutionContext context){
|
||||
public Host select(ExecutionContext context){
|
||||
Host host = new Host();
|
||||
Collection<String> nodes = zookeeperNodeManager.getWorkerNodes();
|
||||
Collection<String> nodes = null;
|
||||
ExecutorType executorType = context.getExecutorType();
|
||||
switch (executorType){
|
||||
case WORKER:
|
||||
nodes = zookeeperNodeManager.getWorkerNodes();
|
||||
break;
|
||||
case CLIENT:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid executorType : " + executorType);
|
||||
|
||||
}
|
||||
if(CollectionUtils.isEmpty(nodes)){
|
||||
return host;
|
||||
}
|
||||
List<Host> candidateHosts = new ArrayList<>(nodes.size());
|
||||
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
|
||||
|
||||
return selector.select(candidateHosts);
|
||||
}
|
||||
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.host.assign;
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Random;
|
@ -14,12 +14,14 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.dolphinscheduler.server.master.host.assign;
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
||||
@Service
|
||||
public class RoundRobinSelector<T> implements Selector<T> {
|
||||
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
@ -15,7 +15,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.host.assign;
|
||||
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.server.master.processor;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.utils.Preconditions;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* task ack processor
|
||||
*/
|
||||
public class TaskAckProcessor implements NettyRequestProcessor {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
|
||||
|
||||
/**
|
||||
* process service
|
||||
*/
|
||||
private final ProcessService processService;
|
||||
|
||||
public TaskAckProcessor(ProcessService processService){
|
||||
this.processService = processService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
|
||||
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
|
||||
logger.info("taskAckCommand : {}",taskAckCommand);
|
||||
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
|
||||
taskAckCommand.getStartTime(),
|
||||
taskAckCommand.getHost(),
|
||||
taskAckCommand.getExecutePath(),
|
||||
taskAckCommand.getLogPath(),
|
||||
taskAckCommand.getTaskInstanceId());
|
||||
}
|
||||
|
||||
}
|
@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
|
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.apache.dolphinscheduler.server.master.future.TaskFuture;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -59,7 +58,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
|
||||
logger.info("received command : {}", command);
|
||||
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
|
||||
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId());
|
||||
TaskFuture.notify(command);
|
||||
}
|
||||
|
||||
|
||||
|
@ -20,7 +20,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||
import org.apache.dolphinscheduler.common.utils.FileUtils;
|
||||
import org.apache.dolphinscheduler.common.utils.StringUtils;
|
||||
import org.apache.dolphinscheduler.dao.AlertDao;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.dao.entity.Tenant;
|
||||
@ -32,16 +31,23 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
|
||||
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
|
||||
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
import org.apache.dolphinscheduler.service.process.ProcessService;
|
||||
import org.apache.dolphinscheduler.service.queue.ITaskQueue;
|
||||
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
@ -92,9 +98,9 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
|
||||
|
||||
/**
|
||||
* netty remoting client
|
||||
* executor dispatcher
|
||||
*/
|
||||
private static final NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(new NettyClientConfig());
|
||||
private ExecutorDispatcher dispatcher;
|
||||
|
||||
/**
|
||||
* constructor of MasterBaseTaskExecThread
|
||||
@ -102,13 +108,14 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
* @param processInstance process instance
|
||||
*/
|
||||
public MasterBaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
|
||||
this.processService = BeanContext.getBean(ProcessService.class);
|
||||
this.alertDao = BeanContext.getBean(AlertDao.class);
|
||||
this.processService = SpringApplicationContext.getBean(ProcessService.class);
|
||||
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
|
||||
this.processInstance = processInstance;
|
||||
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
|
||||
this.cancel = false;
|
||||
this.taskInstance = taskInstance;
|
||||
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
|
||||
this.dispatcher = SpringApplicationContext.getBean(ExecutorDispatcher.class);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -126,30 +133,17 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
this.cancel = true;
|
||||
}
|
||||
|
||||
|
||||
// TODO send task to worker
|
||||
public void sendToWorker(TaskInstance taskInstance){
|
||||
final Address address = new Address("127.0.0.1", 12346);
|
||||
|
||||
ExecuteTaskRequestCommand taskRequestCommand = new ExecuteTaskRequestCommand(
|
||||
FastJsonSerializer.serializeToString(getTaskExecutionContext(taskInstance)));
|
||||
/**
|
||||
* dispatch task to worker
|
||||
* @param taskInstance
|
||||
*/
|
||||
public void dispatch(TaskInstance taskInstance){
|
||||
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
|
||||
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
|
||||
try {
|
||||
Command responseCommand = nettyRemotingClient.sendSync(address,
|
||||
taskRequestCommand.convert2Command(), 2000);
|
||||
|
||||
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(
|
||||
responseCommand.getBody(), ExecuteTaskAckCommand.class);
|
||||
|
||||
logger.info("taskAckCommand : {}",taskAckCommand);
|
||||
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
|
||||
taskAckCommand.getStartTime(),
|
||||
taskAckCommand.getHost(),
|
||||
taskAckCommand.getExecutePath(),
|
||||
taskAckCommand.getLogPath(),
|
||||
taskInstance.getId());
|
||||
|
||||
} catch (InterruptedException | RemotingException ex) {
|
||||
logger.error(String.format("send command to : %s error", address), ex);
|
||||
dispatcher.dispatch(executionContext);
|
||||
} catch (ExecuteException e) {
|
||||
logger.error("execute exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,7 +233,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
|
||||
}
|
||||
if(submitDB && !submitQueue){
|
||||
// submit task to queue
|
||||
sendToWorker(task);
|
||||
dispatch(task);
|
||||
submitQueue = true;
|
||||
}
|
||||
if(submitDB && submitQueue){
|
||||
|
@ -33,10 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||
import org.apache.dolphinscheduler.dao.utils.DagHelper;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
|
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
|
||||
import org.apache.dolphinscheduler.server.utils.AlertManager;
|
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||
|
@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
||||
@Service
|
||||
public abstract class ZookeeperNodeManager implements InitializingBean {
|
||||
public class ZookeeperNodeManager implements InitializingBean {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class);
|
||||
|
||||
|
@ -47,7 +47,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
||||
init();
|
||||
}
|
||||
|
||||
public void init() {
|
||||
|
@ -76,7 +76,7 @@ public class TaskCallbackService {
|
||||
*/
|
||||
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
|
||||
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
|
||||
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
|
||||
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -87,8 +87,7 @@ public class TaskCallbackService {
|
||||
*/
|
||||
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
|
||||
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
|
||||
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(
|
||||
callbackChannel.getOpaque())).addListener(new ChannelFutureListener(){
|
||||
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
|
@ -79,9 +79,9 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
|
||||
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
|
||||
command.getBody(), ExecuteTaskRequestCommand.class);
|
||||
|
||||
String taskInstanceJson = taskRequestCommand.getTaskInfoJson();
|
||||
String contextJson = taskRequestCommand.getTaskExecutionContext();
|
||||
|
||||
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(taskInstanceJson, TaskExecutionContext.class);
|
||||
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
|
||||
|
||||
// local execute path
|
||||
String execLocalPath = getExecLocalPath(taskExecutionContext);
|
||||
@ -92,7 +92,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
|
||||
} catch (Exception ex){
|
||||
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
|
||||
}
|
||||
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskId(),
|
||||
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(),
|
||||
new CallbackChannel(channel, command.getOpaque()));
|
||||
|
||||
// submit task
|
||||
@ -110,6 +110,6 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
|
||||
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
|
||||
taskExecutionContext.getProcessDefineId(),
|
||||
taskExecutionContext.getProcessInstanceId(),
|
||||
taskExecutionContext.getTaskId());
|
||||
taskExecutionContext.getTaskInstanceId());
|
||||
}
|
||||
}
|
||||
|
@ -93,12 +93,12 @@ public class TaskScheduleThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskId());
|
||||
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
|
||||
|
||||
try {
|
||||
// tell master that task is in executing
|
||||
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext.getTaskType());
|
||||
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskId(), ackCommand);
|
||||
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
|
||||
|
||||
logger.info("script path : {}", taskExecutionContext.getExecutePath());
|
||||
// task node
|
||||
@ -118,7 +118,7 @@ public class TaskScheduleThread implements Runnable {
|
||||
taskExecutionContext.getScheduleTime(),
|
||||
taskExecutionContext.getTaskName(),
|
||||
taskExecutionContext.getTaskType(),
|
||||
taskExecutionContext.getTaskId(),
|
||||
taskExecutionContext.getTaskInstanceId(),
|
||||
CommonUtils.getSystemEnvPath(),
|
||||
taskExecutionContext.getTenantCode(),
|
||||
taskExecutionContext.getQueue(),
|
||||
@ -132,13 +132,13 @@ public class TaskScheduleThread implements Runnable {
|
||||
taskProps.setTaskAppId(String.format("%s_%s_%s",
|
||||
taskExecutionContext.getProcessDefineId(),
|
||||
taskExecutionContext.getProcessInstanceId(),
|
||||
taskExecutionContext.getTaskId()));
|
||||
taskExecutionContext.getTaskInstanceId()));
|
||||
|
||||
// custom logger
|
||||
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
|
||||
taskExecutionContext.getProcessDefineId(),
|
||||
taskExecutionContext.getProcessInstanceId(),
|
||||
taskExecutionContext.getTaskId()));
|
||||
taskExecutionContext.getTaskInstanceId()));
|
||||
|
||||
task = TaskManager.newTask(taskExecutionContext.getTaskType(),
|
||||
taskProps,
|
||||
@ -156,14 +156,14 @@ public class TaskScheduleThread implements Runnable {
|
||||
//
|
||||
responseCommand.setStatus(task.getExitStatus().getCode());
|
||||
responseCommand.setEndTime(new Date());
|
||||
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskId(), task.getExitStatus());
|
||||
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
|
||||
}catch (Exception e){
|
||||
logger.error("task scheduler failure", e);
|
||||
kill();
|
||||
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
|
||||
responseCommand.setEndTime(new Date());
|
||||
} finally {
|
||||
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskId(), responseCommand);
|
||||
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,13 +213,13 @@ public class TaskScheduleThread implements Runnable {
|
||||
return baseLog + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getTaskId() + ".log";
|
||||
taskExecutionContext.getTaskInstanceId() + ".log";
|
||||
}
|
||||
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
|
||||
baseLog + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
|
||||
taskExecutionContext.getTaskId() + ".log";
|
||||
taskExecutionContext.getTaskInstanceId() + ".log";
|
||||
}
|
||||
|
||||
/**
|
||||
@ -325,9 +325,9 @@ public class TaskScheduleThread implements Runnable {
|
||||
* @throws Exception exception
|
||||
*/
|
||||
private void checkDownloadPermission(List<String> projectRes) throws Exception {
|
||||
int userId = taskExecutionContext.getExecutorId();
|
||||
int executorId = taskExecutionContext.getExecutorId();
|
||||
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
|
||||
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,userId,logger);
|
||||
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE, processService,resNames,executorId,logger);
|
||||
permissionCheck.checkPermission();
|
||||
}
|
||||
}
|
@ -16,12 +16,11 @@
|
||||
*/
|
||||
package org.apache.dolphinscheduler.service.log;
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants;
|
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
|
||||
import org.apache.dolphinscheduler.remote.command.Command;
|
||||
import org.apache.dolphinscheduler.remote.command.log.*;
|
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
|
||||
import org.apache.dolphinscheduler.remote.utils.Address;
|
||||
import org.apache.dolphinscheduler.remote.utils.Host;
|
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -73,7 +72,7 @@ public class LogClientService {
|
||||
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
|
||||
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
|
||||
String result = "";
|
||||
final Address address = new Address(host, port);
|
||||
final Host address = new Host(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
@ -101,7 +100,7 @@ public class LogClientService {
|
||||
logger.info("view log path {}", path);
|
||||
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
|
||||
String result = "";
|
||||
final Address address = new Address(host, port);
|
||||
final Host address = new Host(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
@ -129,7 +128,7 @@ public class LogClientService {
|
||||
logger.info("log path {}", path);
|
||||
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
|
||||
byte[] result = null;
|
||||
final Address address = new Address(host, port);
|
||||
final Host address = new Host(host, port);
|
||||
try {
|
||||
Command command = request.convert2Command();
|
||||
Command response = this.client.sendSync(address, command, logRequestTimeout);
|
||||
|
Loading…
Reference in New Issue
Block a user