Fix taskInstance's host is not worker nettyServer address (#10926)

* Fix taskInstance's host is not worker nettyServer address

* Remove unnecessary mock
This commit is contained in:
Wenjun Ruan 2022-07-13 20:46:33 +08:00 committed by GitHub
parent 9f34a837b8
commit df0416c193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 11 deletions

View File

@ -60,7 +60,9 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());

View File

@ -57,7 +57,9 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel,
taskExecuteRunningMessage.getMessageSenderAddress());
taskEventService.addEvent(taskEvent);
}

View File

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
@ -106,7 +105,7 @@ public class TaskEvent {
return event;
}
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@ -115,12 +114,12 @@ public class TaskEvent {
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RUNNING);
return event;
}
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
@ -133,7 +132,7 @@ public class TaskEvent {
event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setWorkerAddress(workerAddress);
event.setEvent(TaskEventType.RESULT);
return event;
}

View File

@ -75,8 +75,6 @@ public class TaskResponseServiceTest {
public void before() {
taskEventService.start();
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
@ -88,7 +86,9 @@ public class TaskResponseServiceTest {
taskExecuteRunningMessage.setHost("127.*.*.*");
taskExecuteRunningMessage.setStartTime(new Date());
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage,
channel,
taskExecuteRunningMessage.getMessageSenderAddress());
TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
NetUtils.getAddr(5678),
@ -100,7 +100,9 @@ public class TaskResponseServiceTest {
taskExecuteResultMessage.setVarPool("varPol");
taskExecuteResultMessage.setAppIds("ids");
taskExecuteResultMessage.setProcessId(1);
resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
resultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
taskInstance = new TaskInstance();
taskInstance.setId(22);