[DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue (#6406)

* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue

* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue

* [DS-6403][fix] Put DB queries ahead before submit to TaskPriorityQueue

* checkstyle

Co-authored-by: caishunfeng <534328519@qq.com>
This commit is contained in:
caishunfeng 2021-09-29 14:04:35 +08:00 committed by GitHub
parent 3e8fe68128
commit ea6503b997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 490 additions and 415 deletions

View File

@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;

View File

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
/**
* task instance state manager

View File

@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Map;
import java.util.Map.Entry;

View File

@ -18,32 +18,8 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@ -52,22 +28,12 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
@ -161,11 +127,10 @@ public class TaskPriorityQueueConsumer extends Thread {
protected boolean dispatch(TaskPriority taskPriority) {
boolean result = false;
try {
int taskInstanceId = taskPriority.getTaskId();
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
if (taskInstanceIsFinalState(taskInstanceId)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
return true;
} else {
@ -188,222 +153,4 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished();
}
/**
* get TaskExecutionContext
*
* @param taskInstanceId taskInstanceId
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) {
TaskInstance taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstanceId);
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setResources(getResourceFullNames(taskInstance));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
// SQL task
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
}
// DATAX task
if (TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
}
// procedure task
if (TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance);
}
if (TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
.buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
.create();
}
/**
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
int datasourceId = procedureParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
}
/**
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
if (dbSource != null) {
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
}
if (dbTarget != null) {
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
}
}
/**
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
// sqoop job type is template set task relation
if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
if (dataSource != null) {
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
if (dataTarget != null) {
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
}
}
}
/**
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
// whether udf type
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
int[] udfFunIdsArray = new int[udfFunIds.length];
for (int i = 0; i < udfFunIds.length; i++) {
udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray);
UdfFuncRequest udfFuncRequest;
Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
for (UdfFunc udfFunc : udfFuncList) {
udfFuncRequest = JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncRequestMap.put(udfFuncRequest, tenantCode);
}
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
}
}
/**
* whehter tenant is null
*
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
/**
* get resource map key is full name and value is tenantCode
*/
protected Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
Map<String, String> resourcesMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskInstance.getTaskType(), taskInstance.getTaskParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
}
// get the resource id in order to get the resource names in batch
Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(ResourceInfo::getId);
Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach(t -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
}
}
}
return resourcesMap;
}
}

View File

@ -35,12 +35,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.register.RegistryConnectListener;
import org.apache.dolphinscheduler.spi.register.RegistryConnectState;
@ -357,7 +357,6 @@ public class MasterRegistryClient {
registryClient.releaseLock(registryClient.getMasterLockPath());
}
/**
* registry
*/

View File

@ -17,8 +17,47 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
import org.apache.commons.lang.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +76,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessInstance processInstance;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
/**
* pause task, common tasks donot need this.
*
@ -109,4 +150,222 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
public String getType() {
return null;
}
/**
* get TaskExecutionContext
*
* @param taskInstance taskInstance
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
processService.setTaskInstanceDetail(taskInstance);
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setResources(getResourceFullNames(taskInstance));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
// SQL task
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSQLTaskRelation(sqlTaskExecutionContext, taskInstance);
}
// DATAX task
if (TaskType.DATAX.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
}
// procedure task
if (TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setProcedureTaskRelation(procedureTaskExecutionContext, taskInstance);
}
if (TaskType.SQOOP.getDesc().equalsIgnoreCase(taskInstance.getTaskType())) {
setSqoopTaskRelation(sqoopTaskExecutionContext, taskInstance);
}
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildSQLTaskRelatedInfo(sqlTaskExecutionContext)
.buildDataxTaskRelatedInfo(dataxTaskExecutionContext)
.buildProcedureTaskRelatedInfo(procedureTaskExecutionContext)
.buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext)
.create();
}
/**
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
int datasourceId = procedureParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
}
/**
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
DataSource dbSource = processService.findDataSourceById(dataxParameters.getDataSource());
DataSource dbTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
if (dbSource != null) {
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
dataxTaskExecutionContext.setSourcetype(dbSource.getType().getCode());
dataxTaskExecutionContext.setSourceConnectionParams(dbSource.getConnectionParams());
}
if (dbTarget != null) {
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
dataxTaskExecutionContext.setTargetType(dbTarget.getType().getCode());
dataxTaskExecutionContext.setTargetConnectionParams(dbTarget.getConnectionParams());
}
}
/**
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
// sqoop job type is template set task relation
if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
if (dataSource != null) {
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
if (dataTarget != null) {
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
}
}
}
/**
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
int datasourceId = sqlParameters.getDatasource();
DataSource datasource = processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
// whether udf type
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
int[] udfFunIdsArray = new int[udfFunIds.length];
for (int i = 0; i < udfFunIds.length; i++) {
udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray);
UdfFuncRequest udfFuncRequest;
Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
for (UdfFunc udfFunc : udfFuncList) {
udfFuncRequest = JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncRequestMap.put(udfFuncRequest, tenantCode);
}
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
}
}
/**
* whehter tenant is null
*
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
logger.error("tenant not exists,process instance id : {},task instance id : {}",
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
return true;
}
return false;
}
/**
* get resource map key is full name and value is tenantCode
*/
protected Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
Map<String, String> resourcesMap = new HashMap<>();
AbstractParameters baseParam = TaskParametersUtils.getParameters(taskInstance.getTaskType(), taskInstance.getTaskParams());
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles = baseParam.getResourceFilesList();
if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(), processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
}
// get the resource id in order to get the resource names in batch
Stream<Integer> resourceIdStream = projectResourceFiles.stream().map(ResourceInfo::getId);
Set<Integer> resourceIdsSet = resourceIdStream.collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(resourceIdsSet)) {
Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
List<Resource> resources = processService.listResourceByIds(resourceIds);
resources.forEach(t -> resourcesMap.put(t.getFullName(), processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
}
}
}
return resourcesMap;
}
}

View File

@ -30,10 +30,10 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.commons.lang.StringUtils;
@ -62,8 +62,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
@ -124,12 +122,16 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()));
return true;
} catch (Exception e) {
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJsonString(taskInstance));
logger.error("task error : {}", JSONUtils.toJsonString(taskInstance));
return false;
}
}

View File

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@ -66,7 +65,6 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
private TaskDefinition taskDefinition;

View File

@ -73,7 +73,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
ProcessInstance processInstance;
TaskDefinition taskDefinition;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
boolean allDependentItemFinished;

View File

@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.concurrent.locks.Lock;
@ -45,8 +43,6 @@ public class SubTaskProcessor extends BaseTaskProcessor {
*/
private final Lock runLock = new ReentrantLock();
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;

View File

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
@ -53,7 +52,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
/**

View File

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.nio.file.Path;
import java.nio.file.Paths;

View File

@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.commons.lang.StringUtils;

View File

@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -42,6 +41,7 @@ import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;

View File

@ -30,12 +30,12 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;

View File

@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
@ -44,6 +43,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;

View File

@ -23,11 +23,11 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;

View File

@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;

View File

@ -21,28 +21,21 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@ -327,113 +320,6 @@ public class TaskPriorityQueueConsumerTest {
Assert.assertFalse(res);
}
@Test
public void testGetTaskExecutionContext() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectCode(1L);
taskInstance.setProcessDefine(processDefinition);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
Assert.assertNotNull(taskExecutionContext);
}
@Test
public void testGetResourceFullNames() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
Map<String, String> map = taskPriorityQueueConsumer.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
resource.setFileName("fileName");
resourcesList.add(resource);
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
Assert.assertNotNull(map);
}
@Test
public void testVerifyTenantIsNull() {
Tenant tenant = null;
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
taskInstance.setProcessInstance(processInstance);
boolean res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance);
Assert.assertTrue(res);
tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("journey");
tenant.setDescription("journey");
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
res = taskPriorityQueueConsumer.verifyTenantIsNull(tenant, taskInstance);
Assert.assertFalse(res);
}
@Test
public void testSetDataxTaskRelation() throws Exception {
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setConnectionParams("");
dataSource.setType(DbType.MYSQL);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
taskPriorityQueueConsumer.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
}
@Test
public void testRun() throws Exception {
TaskInstance taskInstance = new TaskInstance();

View File

@ -26,11 +26,11 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.junit.Assert;
import org.junit.Ignore;

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@Ignore
public class CommonTaskProcessorTest {
@Autowired
private CommonTaskProcessor commonTaskProcessor;
@Autowired
private ProcessService processService;
@Test
public void testGetTaskExecutionContext() throws Exception {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setTenantId(1);
processInstance.setCommandType(CommandType.START_PROCESS);
taskInstance.setProcessInstance(processInstance);
taskInstance.setState(ExecutionStatus.DELAY_EXECUTION);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
processDefinition.setProjectCode(1L);
taskInstance.setProcessDefine(processDefinition);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
Assert.assertNotNull(taskExecutionContext);
}
@Test
public void testGetResourceFullNames() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setProcessInstancePriority(Priority.MEDIUM);
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
Map<String, String> map = commonTaskProcessor.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
Resource resource = new Resource();
resource.setFileName("fileName");
resourcesList.add(resource);
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
Assert.assertNotNull(map);
}
@Test
public void testVerifyTenantIsNull() {
Tenant tenant = null;
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType(TaskType.SHELL.getDesc());
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
taskInstance.setProcessInstance(processInstance);
boolean res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
Assert.assertTrue(res);
tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("journey");
tenant.setDescription("journey");
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
Assert.assertFalse(res);
}
@Test
public void testSetDataxTaskRelation() throws Exception {
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskParams("{\"dataSource\":1,\"dataTarget\":1}");
DataSource dataSource = new DataSource();
dataSource.setId(1);
dataSource.setConnectionParams("");
dataSource.setType(DbType.MYSQL);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
commonTaskProcessor.setDataxTaskRelation(dataxTaskExecutionContext, taskInstance);
Assert.assertEquals(1, dataxTaskExecutionContext.getDataSourceId());
Assert.assertEquals(1, dataxTaskExecutionContext.getDataTargetId());
}
}

View File

@ -24,9 +24,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.mockito.Mockito;
/**

View File

@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.nio.file.Path;
import java.nio.file.Paths;

View File

@ -29,12 +29,12 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;

View File

@ -26,11 +26,9 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -129,6 +127,7 @@ import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.facebook.presto.jdbc.internal.guava.collect.Lists;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
@ -1535,19 +1534,29 @@ public class ProcessService {
if (taskInstance == null) {
return null;
}
setTaskInstanceDetail(taskInstance);
return taskInstance;
}
/**
* package task instanceassociate processInstance and processDefine
*
* @param taskInstance taskInstance
* @return task instance
*/
public void setTaskInstanceDetail(TaskInstance taskInstance) {
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
return taskInstance;
}
/**

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Map;
import java.util.Objects;
@ -45,6 +47,11 @@ public class TaskPriority implements Comparable<TaskPriority> {
*/
private int taskId;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* groupName
*/
@ -116,6 +123,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.context = context;
}
public TaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}
public void setTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
@Override
public int compareTo(TaskPriority other) {
if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {

View File

@ -15,14 +15,14 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.service.queue.entity;
import java.io.Serializable;
/**
* master/worker task transport
* master/worker task transport
*/
public class DependenceTaskExecutionContext implements Serializable{
public class DependenceTaskExecutionContext implements Serializable {
private String dependence;
@ -36,8 +36,8 @@ public class DependenceTaskExecutionContext implements Serializable{
@Override
public String toString() {
return "DependenceTaskExecutionContext{" +
"dependence='" + dependence + '\'' +
'}';
return "DependenceTaskExecutionContext{"
+ "dependence='" + dependence + '\''
+ '}';
}
}

View File

@ -15,18 +15,18 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.service.queue.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import java.io.Serializable;
import java.util.Date;