add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify

* master and worker register ip  use OSUtils.getHost()

* ProcessInstance host set ip:port format

* worker fault tolerance modify

* Constants and .env modify

* master fault tolerant bug modify

* UT add pom.xml

* timing online  modify

* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem

* TaskExecutionContext set host

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
This commit is contained in:
qiaozhanwei 2020-03-24 15:12:48 +08:00 committed by GitHub
parent 7da35238e1
commit 62f7d21bda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 373 additions and 11 deletions

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.manager;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
/**
* task event
*/
public class TaskEvent {
public static final String ACK = "ack";
public static final String RESPONSE = "response";
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* worker address
*/
private String workerAddress;
/**
* state
*/
private ExecutionStatus state;
/**
* start time
*/
private Date startTime;
/**
* end time
*/
private Date endTime;
/**
* execute path
*/
private String executePath;
/**
* log path
*/
private String logPath;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
/**
* ack / response
*/
private String type;
/**
* receive ack info
* @param state state
* @param startTime startTime
* @param workerAddress workerAddress
* @param executePath executePath
* @param logPath logPath
* @param taskInstanceId taskInstanceId
* @param type type
*/
public void receiveAck(ExecutionStatus state,
Date startTime,
String workerAddress,
String executePath,
String logPath,
int taskInstanceId,
String type){
this.state = state;
this.startTime = startTime;
this.workerAddress = workerAddress;
this.executePath = executePath;
this.logPath = logPath;
this.taskInstanceId = taskInstanceId;
this.type = type;
}
/**
* receive response info
* @param state state
* @param endTime endTime
* @param processId processId
* @param appIds appIds
* @param taskInstanceId taskInstanceId
* @param type type
*/
public void receiveResponse(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstanceId,
String type){
this.state = state;
this.endTime = endTime;
this.processId = processId;
this.appIds = appIds;
this.taskInstanceId = taskInstanceId;
this.type = type;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getWorkerAddress() {
return workerAddress;
}
public void setWorkerAddress(String workerAddress) {
this.workerAddress = workerAddress;
}
public ExecutionStatus getState() {
return state;
}
public void setState(ExecutionStatus state) {
this.state = state;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "TaskEvent{" +
"taskInstanceId=" + taskInstanceId +
", workerAddress='" + workerAddress + '\'' +
", state=" + state +
", startTime=" + startTime +
", endTime=" + endTime +
", executePath='" + executePath + '\'' +
", logPath='" + logPath + '\'' +
", processId=" + processId +
", appIds='" + appIds + '\'' +
", type='" + type + '\'' +
'}';
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.manager;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* task manager
*/
@Component
public class TaskManager {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
/**
* attemptQueue
*/
private final BlockingQueue<TaskEvent> attemptQueue = new LinkedBlockingQueue<>(5000);
/**
* process service
*/
@Autowired
private ProcessService processService;
@PostConstruct
public void init(){
TaskWorker taskWorker = new TaskWorker();
taskWorker.start();
}
/**
* put task to attemptQueue
*
* @param taskEvent taskEvent
*/
public void putTask(TaskEvent taskEvent){
try {
attemptQueue.put(taskEvent);
} catch (InterruptedException e) {
logger.error("put task : {} error :{}",taskEvent,e);
}
}
/**
* task worker thread
*/
class TaskWorker extends Thread {
@Override
public void run() {
while (Stopper.isRunning()){
try {
if (attemptQueue.size() == 0){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
TaskEvent taskEvent = attemptQueue.take();
persist(taskEvent);
}catch (Exception e){
logger.error("persist task error",e);
}
}
}
/**
* persist taskEvent
* @param taskEvent taskEvent
*/
private void persist(TaskEvent taskEvent){
if (TaskEvent.ACK.equals(taskEvent.getType())){
processService.changeTaskState(taskEvent.getState(),
taskEvent.getStartTime(),
taskEvent.getWorkerAddress(),
taskEvent.getExecutePath(),
taskEvent.getLogPath(),
taskEvent.getTaskInstanceId());
}else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
processService.changeTaskState(taskEvent.getState(),
taskEvent.getEndTime(),
taskEvent.getProcessId(),
taskEvent.getAppIds(),
taskEvent.getTaskInstanceId());
}
}
}
}

View File

@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@ -43,7 +45,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
/**
* process service
*/
private final ProcessService processService;
private final TaskManager taskManager;
/**
* taskInstance cache manager
@ -51,7 +53,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@ -69,15 +71,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
/**
* change Task state
*/
processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
// TaskEvent
TaskEvent taskEvent = new TaskEvent();
taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId());
taskAckCommand.getTaskInstanceId(),
TaskEvent.ACK);
taskManager.putTask(taskEvent);
}

View File

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@ -42,7 +44,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
/**
* process service
*/
private final ProcessService processService;
private final TaskManager taskManager;
/**
* taskInstance cache manager
@ -50,7 +52,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@ -70,11 +72,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
// TaskEvent
TaskEvent taskEvent = new TaskEvent();
taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
responseCommand.getTaskInstanceId(),
TaskEvent.RESPONSE);
taskManager.putTask(taskEvent);
}

View File

@ -24,6 +24,7 @@ import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@ -86,6 +87,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);