Add projectCode in t_ds_process_instance and t_ds_task_instance to remove join (#13284)

This commit is contained in:
Wenjun Ruan 2023-01-03 09:52:28 +08:00 committed by GitHub
parent 52134277a3
commit 8a479927f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 632 additions and 327 deletions

View File

@ -119,9 +119,21 @@ public class TaskInstanceController extends BaseController {
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
result = taskInstanceService.queryTaskListPaging(loginUser, projectCode, processInstanceId, processInstanceName,
result = taskInstanceService.queryTaskListPaging(
loginUser,
projectCode,
processInstanceId,
processInstanceName,
processDefinitionName,
taskName, executorName, startTime, endTime, searchVal, stateType, host, taskExecuteType, pageNo,
taskName,
executorName,
startTime,
endTime,
searchVal,
stateType,
host,
taskExecuteType,
pageNo,
pageSize);
return result;
}

View File

@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -107,12 +109,8 @@ public interface BaseService {
/**
* check and parse date parameters
*
* @param startDateStr start date string
* @param endDateStr end date string
* @return map<status,startDate,endDate>
*/
Map<String, Object> checkAndParseDateParameters(String startDateStr, String endDateStr);
Date checkAndParseDateParameters(String startDateStr) throws ServiceException;
/**
* check checkDescriptionLength

View File

@ -19,6 +19,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -82,18 +83,18 @@ public interface ProcessInstanceService {
* @param otherParamsJson otherParamsJson handle other params
* @return process instance list
*/
Result queryProcessInstanceList(User loginUser,
long projectCode,
long processDefineCode,
String startDate,
String endDate,
String searchVal,
String executorName,
WorkflowExecutionStatus stateType,
String host,
String otherParamsJson,
Integer pageNo,
Integer pageSize);
Result<PageInfo<ProcessInstance>> queryProcessInstanceList(User loginUser,
long projectCode,
long processDefineCode,
String startDate,
String endDate,
String searchVal,
String executorName,
WorkflowExecutionStatus stateType,
String host,
String otherParamsJson,
Integer pageNo,
Integer pageSize);
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status

View File

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.AuditService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuditOperationType;
import org.apache.dolphinscheduler.common.enums.AuditResourceType;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
@ -34,7 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.AuditLogMapper;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
@ -85,13 +83,6 @@ public class AuditServiceImpl extends BaseServiceImpl implements AuditService {
Integer pageNo, Integer pageSize) {
Result result = new Result();
Map<String, Object> checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate);
Status resultEnum = (Status) checkAndParseDateResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
putMsg(result, resultEnum);
return result;
}
int[] resourceArray = null;
if (resourceType != null) {
resourceArray = new int[]{resourceType.getCode()};
@ -102,8 +93,8 @@ public class AuditServiceImpl extends BaseServiceImpl implements AuditService {
opsArray = new int[]{operationType.getCode()};
}
Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME);
Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME);
Date start = (Date) checkAndParseDateParameters(startDate);
Date end = (Date) checkAndParseDateParameters(endDate);
Page<AuditLog> page = new Page<>(pageNo, pageSize);
IPage<AuditLog> logIPage = auditLogMapper.queryAuditLog(page, resourceArray, opsArray, userName, start, end);

View File

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.utils.Result;
@ -31,7 +32,6 @@ import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -186,38 +186,18 @@ public class BaseServiceImpl implements BaseService {
/**
* check and parse date parameters
*
* @param startDateStr start date string
* @param endDateStr end date string
* @return map<status,startDate,endDate>
*/
@Override
public Map<String, Object> checkAndParseDateParameters(String startDateStr, String endDateStr) {
Map<String, Object> result = new HashMap<>();
public Date checkAndParseDateParameters(String startDateStr) throws ServiceException {
Date start = null;
if (!StringUtils.isEmpty(startDateStr)) {
start = DateUtils.stringToDate(startDateStr);
if (Objects.isNull(start)) {
logger.warn("Parameter startDateStr is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
}
}
result.put(Constants.START_TIME, start);
Date end = null;
if (!StringUtils.isEmpty(endDateStr)) {
end = DateUtils.stringToDate(endDateStr);
if (Objects.isNull(end)) {
logger.warn("Parameter endDateStr is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
}
result.put(Constants.END_TIME, end);
putMsg(result, Status.SUCCESS);
return result;
return start;
}
@Override

View File

@ -301,22 +301,24 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* @return process instance list
*/
@Override
public Result queryProcessInstanceList(User loginUser, long projectCode, long processDefineCode, String startDate,
String endDate, String searchVal, String executorName,
WorkflowExecutionStatus stateType, String host, String otherParamsJson,
Integer pageNo, Integer pageSize) {
public Result<PageInfo<ProcessInstance>> queryProcessInstanceList(User loginUser,
long projectCode,
long processDefineCode,
String startDate,
String endDate,
String searchVal,
String executorName,
WorkflowExecutionStatus stateType,
String host,
String otherParamsJson,
Integer pageNo,
Integer pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
putMsg(result, resultEnum);
return result;
}
projectService.checkProjectAndAuthThrowException(loginUser, project,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
int[] statusArray = null;
// filter by state
@ -324,21 +326,22 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
statusArray = new int[]{stateType.getCode()};
}
Map<String, Object> checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate);
resultEnum = (Status) checkAndParseDateResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
putMsg(result, resultEnum);
return result;
}
Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME);
Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME);
Date start = checkAndParseDateParameters(startDate);
Date end = checkAndParseDateParameters(endDate);
Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(page,
project.getCode(), processDefineCode, searchVal, executorId, statusArray, host, start, end);
IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(
page,
project.getCode(),
processDefineCode,
searchVal,
executorName,
statusArray,
host,
start,
end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
List<Integer> userIds = Collections.emptyList();
@ -385,6 +388,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition =
processDefineMapper.queryByDefineName(project.getCode(), processInstance.getName());
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProjectCode(project.getCode());
}
Page<ProcessInstance> page =
@ -392,10 +396,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
PageInfo<ProcessInstance> pageInfo =
new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize());
IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging(page,
processInstance.getProcessDefinitionCode(), processInstance.getName(),
workflowInstanceQueryRequest.getStartTime(), workflowInstanceQueryRequest.getEndTime(),
workflowInstanceQueryRequest.getState(), processInstance.getHost());
IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging(
page,
processInstance.getProjectCode(),
processInstance.getProcessDefinitionCode(),
processInstance.getName(),
workflowInstanceQueryRequest.getStartTime(),
workflowInstanceQueryRequest.getEndTime(),
workflowInstanceQueryRequest.getState(),
processInstance.getHost());
List<ProcessInstance> processInstances = processInstanceList.getRecords();
List<Integer> userIds = Collections.emptyList();

View File

@ -138,38 +138,44 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
putMsg(result, status);
return result;
}
projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_INSTANCE);
int[] statusArray = null;
if (stateType != null) {
statusArray = new int[]{stateType.getCode()};
}
Map<String, Object> checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate);
status = (Status) checkAndParseDateResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
putMsg(result, status);
return result;
}
Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME);
Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME);
Date start = checkAndParseDateParameters(startDate);
Date end = checkAndParseDateParameters(endDate);
Page<TaskInstance> page = new Page<>(pageNo, pageSize);
PageInfo<Map<String, Object>> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage;
if (taskExecuteType == TaskExecuteType.STREAM) {
// stream task without process instance
taskInstanceIPage = taskInstanceMapper.queryStreamTaskInstanceListPaging(
page, project.getCode(), processDefinitionName, searchVal, taskName, executorId, statusArray, host,
taskExecuteType, start, end);
page,
project.getCode(),
processDefinitionName,
searchVal,
taskName,
executorName,
statusArray,
host,
taskExecuteType,
start,
end);
} else {
taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
page, project.getCode(), processInstanceId, processInstanceName, searchVal, taskName, executorId,
statusArray, host, taskExecuteType, start, end);
page,
project.getCode(),
processInstanceId,
processInstanceName,
searchVal,
taskName,
executorName,
statusArray,
host,
taskExecuteType,
start,
end);
}
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);

View File

@ -185,13 +185,14 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
// project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
Result proejctAuthFailRes =
processInstanceService.queryProcessInstanceList(loginUser, projectCode, 46, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "test_user", WorkflowExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", "", 1, 10);
Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) proejctAuthFailRes.getCode());
when(projectMapper.queryByCode(projectCode)).thenReturn(null);
Mockito.doThrow(new ServiceException()).when(projectService).checkProjectAndAuthThrowException(Mockito.any(),
Mockito.any(), Mockito.any());
Assertions.assertThrows(ServiceException.class, () -> {
processInstanceService.queryProcessInstanceList(loginUser, projectCode, 46, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "test_user", WorkflowExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", "", 1, 10);
});
Date start = DateUtils.stringToDate("2020-01-01 00:00:00");
Date end = DateUtils.stringToDate("2020-01-02 00:00:00");
@ -204,17 +205,25 @@ public class ProcessInstanceServiceTest {
// data parameter check
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(Mockito.any(), Mockito.any(),
Mockito.any());
when(processDefineMapper.selectById(Mockito.anyInt())).thenReturn(getProcessDefinition());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(),
eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn);
Result dataParameterRes =
processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "20200101 00:00:00",
"20200102 00:00:00", "", loginUser.getUserName(), WorkflowExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", "", 1, 10);
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode());
Assertions.assertThrows(ServiceException.class, () -> processInstanceService.queryProcessInstanceList(
loginUser,
projectCode,
1,
"20200101 00:00:00",
"20200102 00:00:00",
"",
loginUser.getUserName(),
WorkflowExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx",
"",
1,
10));
// project auth success
putMsg(result, Status.SUCCESS, projectCode);
@ -224,7 +233,7 @@ public class ProcessInstanceServiceTest {
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()),
eq(1L), eq(""), eq(-1), Mockito.any(),
eq(1L), eq(""), eq(""), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
@ -236,7 +245,7 @@ public class ProcessInstanceServiceTest {
// data parameter empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()),
eq(1L), eq(""), eq(-1), Mockito.any(),
eq(1L), eq(""), eq(""), Mockito.any(),
eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn);
successRes = processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "",
"", "", loginUser.getUserName(), WorkflowExecutionStatus.SUBMITTED_SUCCESS,
@ -255,7 +264,7 @@ public class ProcessInstanceServiceTest {
// executor name empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()),
eq(1L), eq(""), eq(0), Mockito.any(),
eq(1L), eq(""), eq("admin"), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Result executorEmptyRes =
processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "2020-01-01 00:00:00",

View File

@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@ -107,9 +108,10 @@ public class TaskInstanceServiceTest {
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
// project auth fail
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser,
when(projectMapper.queryByCode(projectCode)).thenReturn(null);
Mockito.doThrow(new ServiceException()).when(projectService).checkProjectAndAuthThrowException(Mockito.any(),
Mockito.any(), Mockito.any());
Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(loginUser,
projectCode,
0,
"",
@ -123,14 +125,13 @@ public class TaskInstanceServiceTest {
"",
TaskExecuteType.BATCH,
1,
20);
Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) projectAuthFailRes.getCode());
20));
// data parameter check
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser,
Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(loginUser,
projectCode,
1,
"",
@ -144,8 +145,7 @@ public class TaskInstanceServiceTest {
"192.168.xx.xx",
TaskExecuteType.BATCH,
1,
20);
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode());
20));
// project
putMsg(result, Status.SUCCESS, projectCode);
@ -158,12 +158,23 @@ public class TaskInstanceServiceTest {
taskInstanceList.add(taskInstance);
pageReturn.setRecords(taskInstanceList);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result);
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(Mockito.any(), Mockito.any(),
Mockito.any());
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1),
eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end)))
when(taskInstanceMapper.queryTaskInstanceListPaging(
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any(),
Mockito.any()))
.thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()))
@ -175,9 +186,10 @@ public class TaskInstanceServiceTest {
Assertions.assertEquals(Status.SUCCESS.getCode(), (int) successRes.getCode());
// executor name empty
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1),
when(taskInstanceMapper.queryTaskInstanceListPaging(
Mockito.any(Page.class), eq(project.getCode()), eq(1),
eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end)))
eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end)))
.thenReturn(pageReturn);
Result executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx",
@ -196,7 +208,7 @@ public class TaskInstanceServiceTest {
// start/end date null
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1),
eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any()))
eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any()))
.thenReturn(pageReturn);
Result executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"", null, null, "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);
@ -205,18 +217,42 @@ public class TaskInstanceServiceTest {
// start date error format
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1),
eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any()))
eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any()))
.thenReturn(pageReturn);
Result executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "",
Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(
loginUser,
projectCode,
1,
"",
"", "error date", null, "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(),
(int) executorErrorStartDateRes.getCode());
Result executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"", null, "error date", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(),
(int) executorErrorEndDateRes.getCode());
"",
"",
"",
"error date",
null,
"",
TaskExecutionStatus.SUCCESS,
"192.168.xx.xx",
TaskExecuteType.BATCH,
1,
20));
Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(
loginUser,
projectCode,
1,
"",
"",
"",
"",
null,
"error date",
"",
TaskExecutionStatus.SUCCESS,
"192.168.xx.xx",
TaskExecuteType.BATCH,
1,
20));
}
/**

View File

@ -58,24 +58,14 @@ public class ProcessInstance {
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* process definition code
*/
private Long processDefinitionCode;
/**
* process definition version
*/
private int processDefinitionVersion;
/**
* process state
*/
private Long projectCode;
private WorkflowExecutionStatus state;
/**
* state history
*/
private String stateHistory;
/**
@ -88,80 +78,35 @@ public class ProcessInstance {
* recovery flag for failover
*/
private Flag recovery;
/**
* start time
*/
private Date startTime;
/**
* end time
*/
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Date endTime;
/**
* run time
*/
private int runTimes;
/**
* name
*/
private String name;
/**
* host
*/
private String host;
/**
* process definition structure
*/
@TableField(exist = false)
private ProcessDefinition processDefinition;
/**
* process command type
*/
private CommandType commandType;
/**
* command parameters
*/
private String commandParam;
/**
* node depend type
*/
private TaskDependType taskDependType;
/**
* task max try times
*/
private int maxTryTimes;
/**
* failure strategy when task failed.
*/
private FailureStrategy failureStrategy;
/**
* warning type
*/
private WarningType warningType;
/**
* warning group
*/
private Integer warningGroupId;
/**
* schedule time
*/
private Date scheduleTime;
/**
* command start time
*/
private Date commandStartTime;
/**
@ -175,21 +120,12 @@ public class ProcessInstance {
@TableField(exist = false)
private DagData dagData;
/**
* executor id
*/
private int executorId;
/**
* executor name
*/
@TableField(exist = false)
private String executorName;
/**
* tenant code
*/
@TableField(exist = false)
private int tenantId;
private String tenantCode;
/**
@ -248,11 +184,6 @@ public class ProcessInstance {
*/
private int timeout;
/**
* tenant id
*/
private int tenantId;
/**
* varPool string
*/

View File

@ -70,30 +70,16 @@ public class TaskInstance implements Serializable {
*/
private String taskType;
/**
* process instance id
*/
private int processInstanceId;
/**
* task code
*/
private long taskCode;
/**
* task definition version
*/
private int taskDefinitionVersion;
/**
* process instance name
*/
@TableField(exist = false)
private String processInstanceName;
/**
* process definition name
*/
private Long projectCode;
private long taskCode;
private int taskDefinitionVersion;
@TableField(exist = false)
private String processDefinitionName;
@ -269,10 +255,6 @@ public class TaskInstance implements Serializable {
*/
private String varPool;
/**
* executor name
*/
@TableField(exist = false)
private String executorName;
@TableField(exist = false)

View File

@ -101,7 +101,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
* @param executorId executorId
* @param executorName executorName
* @param statusArray statusArray
* @param host host
* @param startTime startTime
@ -112,7 +112,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("projectCode") Long projectCode,
@Param("processDefinitionCode") Long processDefinitionCode,
@Param("searchVal") String searchVal,
@Param("executorId") Integer executorId,
@Param("executorName") String executorName,
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("startTime") Date startTime,
@ -281,6 +281,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @return process instance IPage
*/
IPage<ProcessInstance> queryProcessInstanceListV2Paging(Page<ProcessInstance> page,
@Param("projectCode") Long projectCode,
@Param("processDefinitionCode") Long processDefinitionCode,
@Param("name") String name,
@Param("startTime") String startTime,

View File

@ -136,7 +136,7 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("processInstanceName") String processInstanceName,
@Param("searchVal") String searchVal,
@Param("taskName") String taskName,
@Param("executorId") int executorId,
@Param("executorName") String executorName,
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("taskExecuteType") TaskExecuteType taskExecuteType,
@ -148,7 +148,7 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("processDefinitionName") String processDefinitionName,
@Param("searchVal") String searchVal,
@Param("taskName") String taskName,
@Param("executorId") int executorId,
@Param("executorName") String executorName,
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("taskExecuteType") TaskExecuteType taskExecuteType,

View File

@ -92,6 +92,7 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
taskInstance.setState(TaskExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setExecutorName(processInstance.getExecutorName());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());

View File

@ -33,6 +33,8 @@ public class TaskInstanceUtils {
target.setName(source.getName());
target.setTaskType(source.getTaskType());
target.setProcessInstanceId(source.getProcessInstanceId());
target.setProcessInstanceName(source.getProcessInstanceName());
target.setProjectCode(source.getProjectCode());
target.setTaskCode(source.getTaskCode());
target.setTaskDefinitionVersion(source.getTaskDefinitionVersion());
target.setProcessInstanceName(source.getProcessInstanceName());

View File

@ -20,11 +20,11 @@
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql">
id
, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
, name, process_definition_version, process_definition_code, project_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, tenant_code, var_pool,
dry_run, test_flag, next_process_instance_id, restart_time, state_history
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
@ -109,40 +109,36 @@
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run , instance.test_flag
,instance.next_process_instance_id,
restart_time, instance.state_history
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
and define.project_code = #{projectCode}
select
<include refid="baseSql"/>
from t_ds_process_instance
where is_sub_process=0
and project_code = #{projectCode}
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #{processDefinitionCode}
and process_definition_code = #{processDefinitionCode}
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #{searchVal}, '%')
and name like concat('%', #{searchVal}, '%')
</if>
<if test="startTime != null">
and instance.start_time <![CDATA[ >= ]]> #{startTime}
and start_time <![CDATA[ >= ]]> #{startTime}
</if>
<if test="endTime != null">
and instance.start_time <![CDATA[ <= ]]> #{endTime}
and start_time <![CDATA[ <= ]]> #{endTime}
</if>
<if test="states != null and states.length > 0">
and instance.state in
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
and host like concat('%', #{host}, '%')
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
<if test="executorName != null and executorName != ''">
and executor_name = #{executorName}
</if>
order by instance.start_time desc,instance.end_time desc
order by start_time desc, end_time desc
</select>
<update id="setFailoverByHostAndStateArray">
update t_ds_process_instance
@ -276,35 +272,33 @@
and id <![CDATA[ > ]]> #{id}
order by id asc limit 1
</select>
<select id="queryProcessInstanceListV2Paging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryProcessInstanceListV2Paging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
SELECT
<include refid="baseSql">
<property name="alias" value="instance"/>
</include>
FROM t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
<include refid="baseSql"/>
FROM t_ds_process_instance
where is_sub_process=0
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #{processDefinitionCode}
and process_definition_code = #{processDefinitionCode}
</if>
<if test="name != null and name != ''">
and instance.name like concat('%', #{name}, '%')
and name like concat('%', #{name}, '%')
</if>
<if test="startTime != null and startTime != ''">
and instance.start_time <![CDATA[ >= ]]> #{startTime}
and start_time <![CDATA[ >= ]]> #{startTime}
</if>
<if test="endTime != null and endTime != ''">
and instance.start_time <![CDATA[ <= ]]> #{endTime}
and start_time <![CDATA[ <= ]]> #{endTime}
</if>
<if test="state != null and state != ''">
and instance.state = #{state}
and state = #{state}
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
and host like concat('%', #{host}, '%')
</if>
order by instance.start_time desc,instance.id desc
order by start_time desc, id desc
</select>
<select id="countInstanceStateV2" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count

View File

@ -19,9 +19,9 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper">
<sql id="baseSql">
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
id, name, task_type, process_instance_id, process_instance_name, project_code task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, executor_name,
first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
@ -249,93 +249,80 @@
</select>
<select id="queryTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
,
process.name as process_instance_name
from t_ds_task_instance instance
left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version
left join t_ds_process_instance process on process.id=instance.process_instance_id
where define.project_code = #{projectCode}
<include refid="baseSql"/>
from t_ds_task_instance
where project_code = #{projectCode}
<if test="startTime != null">
and instance.start_time <![CDATA[ >=]]> #{startTime}
and start_time <![CDATA[ >=]]> #{startTime}
</if>
<if test="endTime != null">
and instance.start_time <![CDATA[ <=]]> #{endTime}
and start_time <![CDATA[ <=]]> #{endTime}
</if>
<if test="processInstanceId != 0">
and instance.process_instance_id = #{processInstanceId}
and process_instance_id = #{processInstanceId}
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #{searchVal}, '%')
and name like concat('%', #{searchVal}, '%')
</if>
<if test="taskName != null and taskName != ''">
and instance.name=#{taskName}
and name = #{taskName}
</if>
<if test="states != null and states.length != 0">
and instance.state in
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
and host like concat('%', #{host}, '%')
</if>
<if test="taskExecuteType != null">
and instance.task_execute_type = #{taskExecuteType.code}
and task_execute_type = #{taskExecuteType.code}
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
<if test="executorName != null and executorName != ''">
and executor_name = #{executorName}
</if>
<if test="processInstanceName != null and processInstanceName != ''">
and process.name like concat('%', #{processInstanceName}, '%')
and process_instance_name like concat('%', #{processInstanceName}, '%')
</if>
order by instance.submit_time desc
order by submit_time desc
</select>
<select id="queryStreamTaskInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSqlV2">
<property name="alias" value="instance"/>
</include>
,
process.name as process_definition_name
from t_ds_task_instance instance
left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version
left join t_ds_process_task_relation relation on relation.post_task_code = define.code and relation.post_task_version = define.version
left join t_ds_process_definition process on process.code=relation.process_definition_code and process.version = relation.process_definition_version
where define.project_code = #{projectCode}
<include refid="baseSql"/>
from t_ds_task_instance
where project_code = #{projectCode}
<if test="startTime != null">
and instance.start_time <![CDATA[ >=]]> #{startTime}
and start_time <![CDATA[ >=]]> #{startTime}
</if>
<if test="endTime != null">
and instance.start_time <![CDATA[ <=]]> #{endTime}
and start_time <![CDATA[ <=]]> #{endTime}
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #{searchVal}, '%')
and name like concat('%', #{searchVal}, '%')
</if>
<if test="taskName != null and taskName != ''">
and instance.name=#{taskName}
and name=#{taskName}
</if>
<if test="states != null and states.length != 0">
and instance.state in
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
and host like concat('%', #{host}, '%')
</if>
<if test="taskExecuteType != null">
and instance.task_execute_type = #{taskExecuteType.code}
and task_execute_type = #{taskExecuteType.code}
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
<if test="executorName != null and executorName != ''">
and executor_name = #{executorName}
</if>
<if test="processDefinitionName != null and processDefinitionName != ''">
and process.name like concat('%', #{processDefinitionName}, '%')
and process_instance_name like concat('%', #{processDefinitionName}, '%')
</if>
order by instance.start_time desc
order by start_time desc
</select>
<select id="loadAllInfosNoRelease" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select

View File

@ -145,7 +145,7 @@
AND a.task_type = #{taskType}
</select>
<select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="DependentProcessDefinition">
<select id="queryDependentProcessDefinitionByProcessDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.DependentProcessDefinition">
SELECT
c.code AS process_definition_code
,c.name AS process_definition_name

View File

@ -596,6 +596,7 @@ CREATE TABLE t_ds_process_instance
name varchar(255) DEFAULT NULL,
process_definition_version int(11) DEFAULT NULL,
process_definition_code bigint(20) not NULL,
project_code bigint(20) DEFAULT NULL,
state tinyint(4) DEFAULT NULL,
state_history text,
recovery tinyint(4) DEFAULT NULL,
@ -617,6 +618,7 @@ CREATE TABLE t_ds_process_instance
update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_sub_process int(11) DEFAULT '0',
executor_id int(11) NOT NULL,
executor_name varchar(64) DEFAULT NULL,
history_cmd text,
process_instance_priority int(11) DEFAULT '2',
worker_group varchar(64) DEFAULT NULL,
@ -624,6 +626,7 @@ CREATE TABLE t_ds_process_instance
timeout int(11) DEFAULT '0',
next_process_instance_id int(11) DEFAULT '0',
tenant_id int(11) NOT NULL DEFAULT '-1',
tenant_code varchar(64) DEFAULT NULL,
var_pool longtext,
dry_run int NULL DEFAULT 0,
restart_time datetime DEFAULT NULL,
@ -872,6 +875,8 @@ CREATE TABLE t_ds_task_instance
task_code bigint(20) NOT NULL,
task_definition_version int(11) DEFAULT NULL,
process_instance_id int(11) DEFAULT NULL,
process_instance_name varchar(255) DEFAULT NULL,
project_code bigint(20) DEFAULT NULL,
state tinyint(4) DEFAULT NULL,
submit_time datetime DEFAULT NULL,
start_time datetime DEFAULT NULL,
@ -894,6 +899,7 @@ CREATE TABLE t_ds_task_instance
environment_code bigint(20) DEFAULT '-1',
environment_config text DEFAULT '',
executor_id int(11) DEFAULT NULL,
executor_name varchar(64) DEFAULT NULL,
first_submit_time datetime DEFAULT NULL,
delay_time int(4) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,

View File

@ -598,6 +598,7 @@ CREATE TABLE `t_ds_process_instance` (
`name` varchar(255) DEFAULT NULL COMMENT 'process instance name',
`process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code',
`process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version',
`project_code` bigint(20) DEFAULT NULL COMMENT 'project code',
`state` tinyint(4) DEFAULT NULL COMMENT 'process instance Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`state_history` text DEFAULT NULL COMMENT 'state history desc',
`recovery` tinyint(4) DEFAULT NULL COMMENT 'process instance failover flag0:normal,1:failover instance',
@ -619,12 +620,14 @@ CREATE TABLE `t_ds_process_instance` (
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`is_sub_process` int(11) DEFAULT '0' COMMENT 'flag, whether the process is sub process',
`executor_id` int(11) NOT NULL COMMENT 'executor id',
`executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name',
`history_cmd` text COMMENT 'history commands of process instance operation',
`process_instance_priority` int(11) DEFAULT '2' COMMENT 'process instance priority. 0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code',
`var_pool` longtext COMMENT 'var_pool',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId',
@ -866,6 +869,8 @@ CREATE TABLE `t_ds_task_instance` (
`task_code` bigint(20) NOT NULL COMMENT 'task definition code',
`task_definition_version` int(11) DEFAULT '0' COMMENT 'task definition version',
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id',
`process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name',
`project_code` bigint(20) DEFAULT NULL COMMENT 'project code',
`state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete',
`submit_time` datetime DEFAULT NULL COMMENT 'task submit time',
`start_time` datetime DEFAULT NULL COMMENT 'task start time',
@ -888,6 +893,7 @@ CREATE TABLE `t_ds_task_instance` (
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`environment_config` text COMMENT 'this config contains many environment variables config',
`executor_id` int(11) DEFAULT NULL,
`executor_name` varchar(64) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',

View File

@ -524,6 +524,7 @@ CREATE TABLE t_ds_process_instance (
name varchar(255) DEFAULT NULL ,
process_definition_code bigint DEFAULT NULL ,
process_definition_version int DEFAULT NULL ,
project_code bigint DEFAULT NULL ,
state int DEFAULT NULL ,
state_history text,
recovery int DEFAULT NULL ,
@ -546,6 +547,7 @@ CREATE TABLE t_ds_process_instance (
update_time timestamp NULL ,
is_sub_process int DEFAULT '0' ,
executor_id int NOT NULL ,
executor_name varchar(64) DEFAULT NULL,
history_cmd text ,
dependence_schedule_times text ,
process_instance_priority int DEFAULT '2' ,
@ -553,6 +555,7 @@ CREATE TABLE t_ds_process_instance (
environment_code bigint DEFAULT '-1',
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
tenant_code varchar(64) DEFAULT NULL ,
var_pool text ,
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
@ -766,6 +769,8 @@ CREATE TABLE t_ds_task_instance (
task_code bigint NOT NULL,
task_definition_version int DEFAULT NULL ,
process_instance_id int DEFAULT NULL ,
process_instance_name varchar(255) DEFAULT NULL,
project_code bigint DEFAULT NULL,
state int DEFAULT NULL ,
submit_time timestamp DEFAULT NULL ,
start_time timestamp DEFAULT NULL ,
@ -788,6 +793,7 @@ CREATE TABLE t_ds_task_instance (
environment_code bigint DEFAULT '-1',
environment_config text,
executor_id int DEFAULT NULL ,
executor_name varchar(64) DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
task_group_id int DEFAULT NULL,

View File

@ -218,3 +218,67 @@ d//
delimiter ;
CALL add_t_ds_task_instance_idx_cache_key;
DROP PROCEDURE add_t_ds_task_instance_idx_cache_key;
-- ALTER TABLE `t_ds_process_instance` ADD column `project_code`, `process_definition_name`, `executor_name`, `tenant_code`;
drop PROCEDURE if EXISTS add_t_ds_process_instance_add_project_code;
delimiter d//
CREATE PROCEDURE add_t_ds_process_instance_add_project_code()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='project_code')
THEN
ALTER TABLE t_ds_process_instance ADD `project_code` bigint(20) DEFAULT NULL COMMENT 'project code';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='executor_name')
THEN
ALTER TABLE t_ds_process_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='tenant_code')
THEN
ALTER TABLE t_ds_process_instance ADD `tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code';
END IF;
END;
d//
delimiter ;
CALL add_t_ds_process_instance_add_project_code;
DROP PROCEDURE add_t_ds_process_instance_add_project_code;
-- ALTER TABLE `t_ds_task_instance` ADD column `project_code`, `process_definition_name`, `executor_name`, `tenant_code`;
drop PROCEDURE if EXISTS add_t_ds_task_instance_add_project_code;
delimiter d//
CREATE PROCEDURE add_t_ds_task_instance_add_project_code()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='process_instance_name')
THEN
ALTER TABLE t_ds_task_instance ADD `process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='project_code')
THEN
ALTER TABLE t_ds_task_instance ADD `project_code` bigint(20) DEFAULT NULL COMMENT 'project code';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='executor_name')
THEN
ALTER TABLE t_ds_task_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name';
END IF;
END;
d//
delimiter ;
CALL add_t_ds_task_instance_add_project_code;
DROP PROCEDURE add_t_ds_task_instance_add_project_code;

View File

@ -133,3 +133,77 @@ ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS cache_key varchar(200) D
ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS cacke_key;
CREATE INDEX IF NOT EXISTS idx_cache_key ON t_ds_task_instance USING Btree("cache_key");
-- add_t_ds_process_instance_add_project_code
delimiter ;
DROP FUNCTION IF EXISTS add_t_ds_process_instance_add_project_code();
delimiter d//
CREATE FUNCTION add_t_ds_process_instance_add_project_code() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='project_code')
THEN
ALTER TABLE t_ds_process_instance ADD `project_code` bigint DEFAULT NULL COMMENT 'project code';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='executor_name')
THEN
ALTER TABLE t_ds_process_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='tenant_code')
THEN
ALTER TABLE t_ds_process_instance ADD `tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code';
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select add_t_ds_process_instance_add_project_code();
DROP FUNCTION add_t_ds_process_instance_add_project_code();
-- add_t_ds_process_instance_add_project_code
delimiter ;
DROP FUNCTION IF EXISTS add_t_ds_task_instance_add_project_code();
delimiter d//
CREATE FUNCTION add_t_ds_task_instance_add_project_code() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='process_instance_name')
THEN
ALTER TABLE t_ds_task_instance ADD `process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='project_code')
THEN
ALTER TABLE t_ds_process_instance ADD `project_code` bigint DEFAULT NULL COMMENT 'project code';
END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='executor_name')
THEN
ALTER TABLE t_ds_task_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name';
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select add_t_ds_task_instance_add_project_code();
DROP FUNCTION add_t_ds_task_instance_add_project_code();

View File

@ -170,6 +170,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processDefinitionMapper.insert(processDefinition);
ProcessInstance processInstance = insertOne();
processInstance.setProjectCode(processDefinition.getProjectCode());
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
processInstance.setIsSubProcess(Flag.NO);
@ -184,7 +185,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processDefinition.getProjectCode(),
processInstance.getProcessDefinitionCode(),
processInstance.getName(),
0,
"",
stateArray,
processInstance.getHost(),
null,

View File

@ -375,7 +375,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
"",
"",
"",
0,
"",
new int[0],
"",
TaskExecuteType.BATCH,

View File

@ -260,6 +260,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
// set process instance id to 0
taskInstance.setProcessInstanceId(0);
taskInstance.setProjectCode(taskDefinition.getProjectCode());
// task instance type
taskInstance.setTaskType(taskDefinition.getTaskType().toUpperCase());
// task instance whether alert

View File

@ -1183,6 +1183,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
// process instance id
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessInstanceName(processInstance.getName());
taskInstance.setProjectCode(processInstance.getProjectCode());
// task instance type
taskInstance.setTaskType(taskNode.getType().toUpperCase());
// task instance whether alert

View File

@ -595,6 +595,7 @@ public class ProcessServiceImpl implements ProcessService {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
processInstance.setProjectCode(processDefinition.getProjectCode());
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init running");
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
@ -608,6 +609,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setTaskDependType(command.getTaskDependType());
processInstance.setFailureStrategy(command.getFailureStrategy());
processInstance.setExecutorId(command.getExecutorId());
processInstance.setExecutorName(Optional.ofNullable(userMapper.selectById(command.getExecutorId()))
.map(User::getUserName).orElse(null));
WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType();
processInstance.setWarningType(warningType);
Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
@ -647,6 +650,8 @@ public class ProcessServiceImpl implements ProcessService {
.setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : command.getEnvironmentCode());
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantId(processDefinition.getTenantId());
processInstance.setTenantCode(Optional.ofNullable(tenantMapper.queryById(processDefinition.getTenantId()))
.map(Tenant::getTenantCode).orElse(null));
return processInstance;
}

View File

@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
@ -137,6 +138,9 @@ public class ProcessServiceTest {
@Mock
private UserMapper userMapper;
@Mock
private TenantMapper tenantMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock

View File

@ -20,10 +20,17 @@ package org.apache.dolphinscheduler.tools.datasource;
import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.sql.Connection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.sql.DataSource;
@ -38,13 +45,20 @@ public class DolphinSchedulerManager {
private final UpgradeDao upgradeDao;
public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao> daos) throws Exception {
private Map<String, DolphinSchedulerUpgrader> upgraderMap = new HashMap<>();
public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao> daos,
List<DolphinSchedulerUpgrader> dolphinSchedulerUpgraders) throws Exception {
final DbType type = getCurrentDbType(dataSource);
upgradeDao = daos.stream()
.filter(it -> it.getDbType() == type)
.findFirst()
.orElseThrow(() -> new RuntimeException(
"Cannot find UpgradeDao implementation for db type: " + type));
if (CollectionUtils.isNotEmpty(dolphinSchedulerUpgraders)) {
upgraderMap = dolphinSchedulerUpgraders.stream()
.collect(Collectors.toMap(DolphinSchedulerUpgrader::getCurrentVersion, Function.identity()));
}
}
private DbType getCurrentDbType(DataSource dataSource) throws Exception {
@ -114,6 +128,10 @@ public class DolphinSchedulerManager {
} else if ("2.0.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
}
DolphinSchedulerUpgrader dolphinSchedulerUpgrader = upgraderMap.get(schemaVersion);
if (dolphinSchedulerUpgrader != null) {
dolphinSchedulerUpgrader.doUpgrade();
}
version = schemaVersion;
}
}

View File

@ -17,14 +17,18 @@
package org.apache.dolphinscheduler.tools.datasource;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@ImportAutoConfiguration(DaoConfiguration.class)
@SpringBootApplication
public class InitDolphinScheduler {

View File

@ -17,14 +17,18 @@
package org.apache.dolphinscheduler.tools.datasource;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
@ImportAutoConfiguration(DaoConfiguration.class)
@SpringBootApplication
public class UpgradeDolphinScheduler {

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader;
public interface DolphinSchedulerUpgrader {
void doUpgrade();
String getCurrentVersion();
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader.v320;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
@Slf4j
@Component
public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired
private TenantMapper tenantMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private TaskInstanceMapper taskInstanceMapper;
@Override
public void doUpgrade() {
upgradeWorkflowInstance();
upgradeTaskInstance();
}
private void upgradeWorkflowInstance() {
Map<Integer, String> tenantMap = tenantMapper.selectList(new QueryWrapper<>())
.stream()
.collect(Collectors.toMap(Tenant::getId, Tenant::getTenantCode));
Map<Integer, String> userMap = userMapper.selectList(new QueryWrapper<>())
.stream()
.collect(Collectors.toMap(User::getId, User::getUserName));
while (true) {
LambdaQueryWrapper<ProcessInstance> wrapper = new QueryWrapper<>(new ProcessInstance())
.lambda()
.eq(ProcessInstance::getProjectCode, null)
.last("limit 1000");
List<ProcessInstance> needUpdateWorkflowInstance = processInstanceMapper.selectList(wrapper);
if (CollectionUtils.isEmpty(needUpdateWorkflowInstance)) {
return;
}
for (ProcessInstance processInstance : needUpdateWorkflowInstance) {
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
if (processDefinitionLog != null) {
processInstance.setProjectCode(processDefinitionLog.getProjectCode());
processInstance.setTenantCode(tenantMap.get(processDefinitionLog.getTenantId()));
processInstance.setExecutorName(userMap.get(processInstance.getExecutorId()));
} else {
processInstance.setProjectCode(-1L);
}
processInstanceMapper.updateById(processInstance);
}
log.info("Success upgrade workflow instance, current batch size: {}", needUpdateWorkflowInstance.size());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.error("Upgrade workflow instance error", e);
}
}
}
private void upgradeTaskInstance() {
while (true) {
LambdaQueryWrapper<TaskInstance> wrapper = new QueryWrapper<>(new TaskInstance())
.lambda()
.eq(TaskInstance::getProjectCode, null)
.last("limit 1000");
List<TaskInstance> taskInstances = taskInstanceMapper.selectList(wrapper);
if (CollectionUtils.isEmpty(taskInstances)) {
return;
}
for (TaskInstance taskInstance : taskInstances) {
ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId());
if (processInstance == null) {
taskInstance.setProjectCode(-1L);
} else {
taskInstance.setProjectCode(processInstance.getProjectCode());
taskInstance.setProcessInstanceName(processInstance.getName());
taskInstance.setExecutorName(processInstance.getExecutorName());
}
taskInstanceMapper.updateById(taskInstance);
}
log.info("Success upgrade task instance, current batch size: {}", taskInstances.size());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.error("Upgrade task instance error", e);
}
}
}
@Override
public String getCurrentVersion() {
return "3.2.0";
}
}