mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 12:17:43 +08:00
[DS-6640][WorkerServer] support process update host command type (#6642)
Co-authored-by: caishunfeng <534328519@qq.com>
This commit is contained in:
parent
dd6ed36f65
commit
8850baff07
@ -126,7 +126,7 @@ public enum CommandType {
|
||||
/**
|
||||
* process host update
|
||||
*/
|
||||
PROCESS_HOST_UPDATE_REQUST,
|
||||
PROCESS_HOST_UPDATE_REQUEST,
|
||||
|
||||
/**
|
||||
* process host update response
|
||||
|
@ -56,7 +56,7 @@ public class HostUpdateCommand implements Serializable {
|
||||
*/
|
||||
public Command convert2Command() {
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
|
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
|
||||
byte[] body = JSONUtils.toJsonByteArray(this);
|
||||
command.setBody(body);
|
||||
return command;
|
||||
|
@ -66,7 +66,7 @@ public class HostUpdateResponseCommand implements Serializable {
|
||||
*/
|
||||
public Command convert2Command() {
|
||||
Command command = new Command();
|
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUST);
|
||||
command.setType(CommandType.PROCESS_HOST_UPDATE_REQUEST);
|
||||
byte[] body = JSONUtils.toJsonByteArray(this);
|
||||
command.setBody(body);
|
||||
return command;
|
||||
|
@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
||||
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
|
||||
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
|
||||
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
|
||||
@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable {
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
|
||||
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
|
||||
this.nettyRemotingServer.start();
|
||||
|
||||
// worker registry
|
||||
|
@ -51,7 +51,7 @@ public class HostUpdateProcessor implements NettyRequestProcessor {
|
||||
|
||||
@Override
|
||||
public void process(Channel channel, Command command) {
|
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUST == command.getType(), String.format("invalid command type : %s", command.getType()));
|
||||
Preconditions.checkArgument(CommandType.PROCESS_HOST_UPDATE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
|
||||
HostUpdateCommand updateCommand = JSONUtils.parseObject(command.getBody(), HostUpdateCommand.class);
|
||||
logger.info("received host update command : {}", updateCommand);
|
||||
taskCallbackService.changeRemoteChannel(updateCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
|
||||
|
Loading…
Reference in New Issue
Block a user