[Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code (#4765)

* [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code
This commit is contained in:
Shiwen Cheng 2021-02-18 19:27:13 +08:00 committed by GitHub
parent 070424fc78
commit dc55b5ba6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1283 additions and 950 deletions

View File

@ -14,143 +14,51 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
/** /**
* monitor service * monitor service
*/ */
@Service public interface MonitorService {
public class MonitorService extends BaseService {
@Autowired
private ZookeeperMonitor zookeeperMonitor;
@Autowired
private MonitorDBDao monitorDBDao;
/**
* query database state
*
* @param loginUser login user
* @return data base state
*/
public Map<String,Object> queryDatabaseState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
result.put(Constants.DATA_LIST, monitorRecordList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query master list
*
* @param loginUser login user
* @return master information list
*/
public Map<String,Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<Server> masterServers = getServerListFromZK(true);
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
return result;
}
/**
* query zookeeper state
*
* @param loginUser login user
* @return zookeeper information list
*/
public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<ZookeeperRecord> zookeeperRecordList = zookeeperMonitor.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query worker list
*
* @param loginUser login user
* @return worker information list
*/
public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<WorkerServerModel> workerServers = getServerListFromZK(false)
.stream()
.map((Server server) -> {
WorkerServerModel model = new WorkerServerModel();
model.setId(server.getId());
model.setHost(server.getHost());
model.setPort(server.getPort());
model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
model.setResInfo(server.getResInfo());
model.setCreateTime(server.getCreateTime());
model.setLastHeartbeatTime(server.getLastHeartbeatTime());
return model;
})
.collect(Collectors.toList());
Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
.stream()
.collect(Collectors.toMap(
(WorkerServerModel worker) -> {
String[] s = worker.getZkDirectories().iterator().next().split("/");
return s[s.length - 1];
}
, Function.identity()
, (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
return oldOne;
}));
result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
putMsg(result,Status.SUCCESS);
return result;
}
public List<Server> getServerListFromZK(boolean isMaster) {
checkNotNull(zookeeperMonitor);
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
return zookeeperMonitor.getServersList(zkNodeType);
}
/**
* query database state
*
* @param loginUser login user
* @return data base state
*/
Map<String,Object> queryDatabaseState(User loginUser);
/**
* query master list
*
* @param loginUser login user
* @return master information list
*/
Map<String,Object> queryMaster(User loginUser);
/**
* query zookeeper state
*
* @param loginUser login user
* @return zookeeper information list
*/
Map<String,Object> queryZookeeperState(User loginUser);
/**
* query worker list
*
* @param loginUser login user
* @return worker information list
*/
Map<String,Object> queryWorker(User loginUser);
List<Server> getServerListFromZK(boolean isMaster);
} }

View File

@ -17,157 +17,26 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/** /**
* process instance service * process instance service
*/ */
@Service public interface ProcessInstanceService {
public class ProcessInstanceService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class);
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
ProcessService processService;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessDefinitionMapper processDefineMapper;
@Autowired
ProcessDefinitionService processDefinitionService;
@Autowired
ProcessDefinitionVersionService processDefinitionVersionService;
@Autowired
ExecutorService execService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
LoggerService loggerService;
@Autowired
UsersService usersService;
/** /**
* return top n SUCCESS process instance order by running time which started between startTime and endTime * return top n SUCCESS process instance order by running time which started between startTime and endTime
*/ */
public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) { Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
if (0 > size) {
putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
return result;
}
if (Objects.isNull(startTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
return result;
}
Date start = DateUtils.stringToDate(startTime);
if (Objects.isNull(endTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
return result;
}
Date end = DateUtils.stringToDate(endTime);
if (start == null || end == null) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
return result;
}
if (start.getTime() > end.getTime()) {
putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* query process instance by id * query process instance by id
@ -177,24 +46,7 @@ public class ProcessInstanceService extends BaseService {
* @param processId process instance id * @param processId process instance id
* @return process instance detail * @return process instance detail
*/ */
public Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId) { Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status * paging query process instance list, filtering according to project, process definition, time range, keyword, process status
@ -211,64 +63,10 @@ public class ProcessInstanceService extends BaseService {
* @param endDate end time * @param endDate end time
* @return process instance list * @return process instance list
*/ */
public Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId, Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
String startDate, String endDate, String startDate, String endDate,
String searchVal, String executorName, ExecutionStatus stateType, String host, String searchVal, String executorName, ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) { Integer pageNo, Integer pageSize);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
int[] statusArray = null;
// filter by state
if (stateType != null) {
statusArray = new int[]{stateType.ordinal()};
}
Date start = null;
Date end = null;
try {
if (StringUtils.isNotEmpty(startDate)) {
start = DateUtils.getScheduleDate(startDate);
}
if (StringUtils.isNotEmpty(endDate)) {
end = DateUtils.getScheduleDate(endDate);
}
} catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
return result;
}
Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<ProcessInstance> processInstanceList =
processInstanceMapper.queryProcessInstanceListPaging(page,
project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
for (ProcessInstance processInstance : processInstances) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));
User executor = usersService.queryUser(processInstance.getExecutorId());
if (null != executor) {
processInstance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotalCount((int) processInstanceList.getTotal());
pageInfo.setLists(processInstances);
result.put(DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* query task list by process instance id * query task list by process instance id
@ -279,71 +77,9 @@ public class ProcessInstanceService extends BaseService {
* @return task list for the process instance * @return task list for the process instance
* @throws IOException io exception * @throws IOException io exception
*/ */
public Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException { Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException;
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, DependResult> parseLogForDependentResult(String log) throws IOException;
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* add dependent result for dependent task
*/
private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
for (TaskInstance taskInstance : taskInstanceList) {
if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
Result<String> logResult = loggerService.queryLog(
taskInstance.getId(), 0, 4098);
if (logResult.getCode() == Status.SUCCESS.ordinal()) {
String log = logResult.getData();
Map<String, DependResult> resultMap = parseLogForDependentResult(log);
taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap));
}
}
}
}
public Map<String, DependResult> parseLogForDependentResult(String log) throws IOException {
Map<String, DependResult> resultMap = new HashMap<>();
if (StringUtils.isEmpty(log)) {
return resultMap;
}
BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(log.getBytes(
StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
if (line.contains(DEPENDENT_SPLIT)) {
String[] tmpStringArray = line.split(":\\|\\|");
if (tmpStringArray.length != 2) {
continue;
}
String dependResultString = tmpStringArray[1];
String[] dependStringArray = dependResultString.split(",");
if (dependStringArray.length != 2) {
continue;
}
String key = dependStringArray[0].trim();
DependResult dependResult = DependResult.valueOf(dependStringArray[1].trim());
resultMap.put(key, dependResult);
}
}
return resultMap;
}
/** /**
* query sub process instance detail info by task id * query sub process instance detail info by task id
@ -353,38 +89,7 @@ public class ProcessInstanceService extends BaseService {
* @param taskId task id * @param taskId task id
* @return sub process instance detail * @return sub process instance detail
*/ */
public Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId) { Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
if (taskInstance == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
if (!taskInstance.isSubProcess()) {
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
return result;
}
ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
taskInstance.getProcessInstanceId(), taskInstance.getId());
if (subWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
return result;
}
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("subProcessInstanceId", subWorkflowInstance.getId());
result.put(DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* update process instance * update process instance
@ -401,97 +106,9 @@ public class ProcessInstanceService extends BaseService {
* @return update result code * @return update result code
* @throws ParseException parse exception for json parse * @throws ParseException parse exception for json parse
*/ */
public Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId,
String processInstanceJson, String scheduleTime, Boolean syncDefine, String processInstanceJson, String scheduleTime, Boolean syncDefine,
Flag flag, String locations, String connects) throws ParseException { Flag flag, String locations, String connects) throws ParseException;
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
//check project permission
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
//check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
Date schedule = null;
schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
}
processInstance.setScheduleTime(schedule);
processInstance.setLocations(locations);
processInstance.setConnects(connects);
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
List<Property> globalParamList = processData.getGlobalParams();
Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
// get the processinstancejson before saving,and then save the name and taskid
String oldJson = processInstance.getProcessInstanceJson();
if (StringUtils.isNotEmpty(oldJson)) {
processInstanceJson = processService.changeJson(processData,oldJson);
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(timeout);
processDefinition.setUpdateTime(new Date());
// add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version);
updateDefine = processDefineMapper.updateById(processDefinition);
}
if (update > 0 && updateDefine > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/** /**
* query parent process instance detail info by sub process instance id * query parent process instance detail info by sub process instance id
@ -501,37 +118,7 @@ public class ProcessInstanceService extends BaseService {
* @param subId sub process id * @param subId sub process id
* @return parent instance detail * @return parent instance detail
*/ */
public Map<String, Object> queryParentInstanceBySubId(User loginUser, String projectName, Integer subId) { Map<String, Object> queryParentInstanceBySubId(User loginUser, String projectName, Integer subId);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
if (subInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
return result;
}
if (subInstance.getIsSubProcess() == Flag.NO) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result;
}
ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId);
if (parentWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST);
return result;
}
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("parentWorkflowInstance", parentWorkflowInstance.getId());
result.put(DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* delete process instance by id, at the same timedelete task instance and their mapping relation data * delete process instance by id, at the same timedelete task instance and their mapping relation data
@ -541,38 +128,7 @@ public class ProcessInstanceService extends BaseService {
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @return delete result code * @return delete result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId);
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
}
return result;
}
/** /**
* view process instance variables * view process instance variables
@ -580,71 +136,7 @@ public class ProcessInstanceService extends BaseService {
* @param processInstanceId process instance id * @param processInstanceId process instance id
* @return variables data * @return variables data
*/ */
public Map<String, Object> viewVariables(Integer processInstanceId) { Map<String, Object> viewVariables(Integer processInstanceId);
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("workflow instance is null");
}
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
String workflowInstanceJson = processInstance.getProcessInstanceJson();
ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class);
String userDefinedParams = processInstance.getGlobalParams();
// global params
List<Property> globalParams = new ArrayList<>();
if (userDefinedParams != null && userDefinedParams.length() > 0) {
globalParams = JSONUtils.toList(userDefinedParams, Property.class);
}
List<TaskNode> taskNodeList = workflowData.getTasks();
// global param string
String globalParamStr = JSONUtils.toJsonString(globalParams);
globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams);
globalParams = JSONUtils.toList(globalParamStr, Property.class);
for (Property property : globalParams) {
timeParams.put(property.getProp(), property.getValue());
}
// local params
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
Map<String, String> map = JSONUtils.toMap(parameter);
String localParams = map.get(LOCAL_PARAMS);
if (localParams != null && !localParams.isEmpty()) {
localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams);
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
Map<String, Object> localParamsMap = new HashMap<>();
localParamsMap.put("taskType", taskNode.getType());
localParamsMap.put("localParamsList", localParamsList);
if (CollectionUtils.isNotEmpty(localParamsList)) {
localUserDefParams.put(taskNode.getName(), localParamsMap);
}
}
}
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(GLOBAL_PARAMS, globalParams);
resultMap.put(LOCAL_PARAMS, localUserDefParams);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* encapsulation gantt structure * encapsulation gantt structure
@ -653,67 +145,7 @@ public class ProcessInstanceService extends BaseService {
* @return gantt tree data * @return gantt tree data
* @throws Exception exception when json parse * @throws Exception exception when json parse
*/ */
public Map<String, Object> viewGantt(Integer processInstanceId) throws Exception { Map<String, Object> viewGantt(Integer processInstanceId) throws Exception;
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("workflow instance is null");
}
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag = processInstance2DAG(processInstance);
//topological sort
List<String> nodeList = dag.topologicalSort();
ganttDto.setTaskNames(nodeList);
List<Task> taskList = new ArrayList<>();
for (String node : nodeList) {
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node);
if (taskInstance == null) {
continue;
}
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
Task task = new Task();
task.setTaskName(taskInstance.getName());
task.getStartDate().add(startTime.getTime());
task.getEndDate().add(endTime.getTime());
task.setIsoStart(startTime);
task.setIsoEnd(endTime);
task.setStatus(taskInstance.getState().toString());
task.setExecutionDate(taskInstance.getStartTime());
task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
taskList.add(task);
}
ganttDto.setTasks(taskList);
result.put(DATA_LIST, ganttDto);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* process instance to DAG
*
* @param processInstance input process instance
* @return process instance dag.
*/
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
String processDefinitionJson = processInstance.getProcessInstanceJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
return DagHelper.buildDagGraph(processDag);
}
/** /**
* query process instance by processDefinitionId and stateArray * query process instance by processDefinitionId and stateArray
@ -721,9 +153,7 @@ public class ProcessInstanceService extends BaseService {
* @param states states array * @param states states array
* @return process instance list * @return process instance list
*/ */
public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states);
return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
}
/** /**
* query process instance by processDefinitionId * query process instance by processDefinitionId
@ -731,8 +161,6 @@ public class ProcessInstanceService extends BaseService {
* @param size size * @param size size
* @return process instance list * @return process instance list
*/ */
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size) { List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size);
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
}
} }

View File

@ -14,43 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* queue service * queue service
*/ */
@Service public interface QueueService {
public class QueueService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(QueueService.class);
@Autowired
private QueueMapper queueMapper;
@Autowired
private UserMapper userMapper;
/** /**
* query queue list * query queue list
@ -58,18 +33,7 @@ public class QueueService extends BaseService {
* @param loginUser login user * @param loginUser login user
* @return queue list * @return queue list
*/ */
public Map<String, Object> queryList(User loginUser) { Map<String, Object> queryList(User loginUser);
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
List<Queue> queueList = queueMapper.selectList(null);
result.put(Constants.DATA_LIST, queueList);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* query queue list paging * query queue list paging
@ -80,26 +44,7 @@ public class QueueService extends BaseService {
* @param pageSize page size * @param pageSize page size
* @return queue list * @return queue list
*/ */
public Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
Page<Queue> page = new Page(pageNo, pageSize);
IPage<Queue> queueList = queueMapper.queryQueuePaging(page, searchVal);
Integer count = (int) queueList.getTotal();
PageInfo<Queue> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount(count);
pageInfo.setLists(queueList.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* create queue * create queue
@ -109,45 +54,7 @@ public class QueueService extends BaseService {
* @param queueName queue name * @param queueName queue name
* @return create result * @return create result
*/ */
public Map<String, Object> createQueue(User loginUser, String queue, String queueName) { Map<String, Object> createQueue(User loginUser, String queue, String queueName);
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
return result;
}
if (checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
if (checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
Queue queueObj = new Queue();
Date now = new Date();
queueObj.setQueue(queue);
queueObj.setQueueName(queueName);
queueObj.setCreateTime(now);
queueObj.setUpdateTime(now);
queueMapper.insert(queueObj);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* update queue * update queue
@ -158,66 +65,7 @@ public class QueueService extends BaseService {
* @param queueName queue name * @param queueName queue name
* @return update result code * @return update result code
*/ */
public Map<String, Object> updateQueue(User loginUser, int id, String queue, String queueName) { Map<String, Object> updateQueue(User loginUser, int id, String queue, String queueName);
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
return result;
}
Queue queueObj = queueMapper.selectById(id);
if (queueObj == null) {
putMsg(result, Status.QUEUE_NOT_EXIST, id);
return result;
}
// whether queue value or queueName is changed
if (queue.equals(queueObj.getQueue()) && queueName.equals(queueObj.getQueueName())) {
putMsg(result, Status.NEED_NOT_UPDATE_QUEUE);
return result;
}
// check queue name is exist
if (!queueName.equals(queueObj.getQueueName())
&& checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
// check queue value is exist
if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
// check old queue using by any user
if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) {
//update user related old queue
Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName);
logger.info("old queue have related {} user, exec update user success.", relatedUserNums);
}
// update queue
Date now = new Date();
queueObj.setQueue(queue);
queueObj.setQueueName(queueName);
queueObj.setUpdateTime(now);
queueMapper.updateById(queueObj);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* verify queue and queueName * verify queue and queueName
@ -226,69 +74,6 @@ public class QueueService extends BaseService {
* @param queueName queue name * @param queueName queue name
* @return true if the queue name not exists, otherwise return false * @return true if the queue name not exists, otherwise return false
*/ */
public Result verifyQueue(String queue, String queueName) { Result<Object> verifyQueue(String queue, String queueName);
Result result = new Result();
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue");
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName");
return result;
}
if (checkQueueNameExist(queueName)) {
logger.error("queue name {} has exist, can't create again.", queueName);
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
if (checkQueueExist(queue)) {
logger.error("queue value {} has exist, can't create again.", queue);
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check queue exist
* if exists return truenot exists return false
* check queue exist
*
* @param queue queue
* @return true if the queue not exists, otherwise return false
*/
private boolean checkQueueExist(String queue) {
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null));
}
/**
* check queue name exist
* if exists return truenot exists return false
*
* @param queueName queue name
* @return true if the queue name not exists, otherwise return false
*/
private boolean checkQueueNameExist(String queueName) {
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName));
}
/**
* check old queue name using by any user
* if need to update user
*
* @param oldQueue old queue name
* @param newQueue new queue name
* @return true if need to update user
*/
private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue));
}
} }

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerServerModel;
import org.apache.dolphinscheduler.dao.MonitorDBDao;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
/**
* monitor service impl
*/
@Service
public class MonitorServiceImpl extends BaseService implements MonitorService {
@Autowired
private ZookeeperMonitor zookeeperMonitor;
@Autowired
private MonitorDBDao monitorDBDao;
/**
* query database state
*
* @param loginUser login user
* @return data base state
*/
public Map<String,Object> queryDatabaseState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<MonitorRecord> monitorRecordList = monitorDBDao.queryDatabaseState();
result.put(Constants.DATA_LIST, monitorRecordList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query master list
*
* @param loginUser login user
* @return master information list
*/
public Map<String,Object> queryMaster(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<Server> masterServers = getServerListFromZK(true);
result.put(Constants.DATA_LIST, masterServers);
putMsg(result,Status.SUCCESS);
return result;
}
/**
* query zookeeper state
*
* @param loginUser login user
* @return zookeeper information list
*/
public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<ZookeeperRecord> zookeeperRecordList = zookeeperMonitor.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query worker list
*
* @param loginUser login user
* @return worker information list
*/
public Map<String,Object> queryWorker(User loginUser) {
Map<String, Object> result = new HashMap<>();
List<WorkerServerModel> workerServers = getServerListFromZK(false)
.stream()
.map((Server server) -> {
WorkerServerModel model = new WorkerServerModel();
model.setId(server.getId());
model.setHost(server.getHost());
model.setPort(server.getPort());
model.setZkDirectories(Sets.newHashSet(server.getZkDirectory()));
model.setResInfo(server.getResInfo());
model.setCreateTime(server.getCreateTime());
model.setLastHeartbeatTime(server.getLastHeartbeatTime());
return model;
})
.collect(Collectors.toList());
Map<String, WorkerServerModel> workerHostPortServerMapping = workerServers
.stream()
.collect(Collectors.toMap(
(WorkerServerModel worker) -> {
String[] s = worker.getZkDirectories().iterator().next().split("/");
return s[s.length - 1];
}
, Function.identity()
, (WorkerServerModel oldOne, WorkerServerModel newOne) -> {
oldOne.getZkDirectories().addAll(newOne.getZkDirectories());
return oldOne;
}));
result.put(Constants.DATA_LIST, workerHostPortServerMapping.values());
putMsg(result,Status.SUCCESS);
return result;
}
public List<Server> getServerListFromZK(boolean isMaster) {
checkNotNull(zookeeperMonitor);
ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER;
return zookeeperMonitor.getServersList(zkNodeType);
}
}

View File

@ -0,0 +1,741 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process instance service impl
*/
@Service
public class ProcessInstanceServiceImpl extends BaseService implements ProcessInstanceService {
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
ProcessService processService;
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessDefinitionMapper processDefineMapper;
@Autowired
ProcessDefinitionService processDefinitionService;
@Autowired
ProcessDefinitionVersionService processDefinitionVersionService;
@Autowired
ExecutorService execService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
LoggerService loggerService;
@Autowired
UsersService usersService;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
public Map<String, Object> queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
if (0 > size) {
putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size);
return result;
}
if (Objects.isNull(startTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME);
return result;
}
Date start = DateUtils.stringToDate(startTime);
if (Objects.isNull(endTime)) {
putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME);
return result;
}
Date end = DateUtils.stringToDate(endTime);
if (start == null || end == null) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
if (start.getTime() > end.getTime()) {
putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime);
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query process instance by id
*
* @param loginUser login user
* @param projectName project name
* @param processId process instance id
* @return process instance detail
*/
public Map<String, Object> queryProcessInstanceById(User loginUser, String projectName, Integer processId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status
*
* @param loginUser login user
* @param projectName project name
* @param pageNo page number
* @param pageSize page size
* @param processDefineId process definition id
* @param searchVal search value
* @param stateType state type
* @param host host
* @param startDate start time
* @param endDate end time
* @return process instance list
*/
public Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
String startDate, String endDate,
String searchVal, String executorName, ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
int[] statusArray = null;
// filter by state
if (stateType != null) {
statusArray = new int[]{stateType.ordinal()};
}
Date start = null;
Date end = null;
try {
if (StringUtils.isNotEmpty(startDate)) {
start = DateUtils.getScheduleDate(startDate);
}
if (StringUtils.isNotEmpty(endDate)) {
end = DateUtils.getScheduleDate(endDate);
}
} catch (Exception e) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
Page<ProcessInstance> page = new Page<>(pageNo, pageSize);
PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<ProcessInstance> processInstanceList =
processInstanceMapper.queryProcessInstanceListPaging(page,
project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
for (ProcessInstance processInstance : processInstances) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));
User executor = usersService.queryUser(processInstance.getExecutorId());
if (null != executor) {
processInstance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotalCount((int) processInstanceList.getTotal());
pageInfo.setLists(processInstances);
result.put(DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query task list by process instance id
*
* @param loginUser login user
* @param projectName project name
* @param processId process instance id
* @return task list for the process instance
* @throws IOException io exception
*/
public Map<String, Object> queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* add dependent result for dependent task
*/
private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
for (TaskInstance taskInstance : taskInstanceList) {
if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) {
Result<String> logResult = loggerService.queryLog(
taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
if (logResult.getCode() == Status.SUCCESS.ordinal()) {
String log = logResult.getData();
Map<String, DependResult> resultMap = parseLogForDependentResult(log);
taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap));
}
}
}
}
public Map<String, DependResult> parseLogForDependentResult(String log) throws IOException {
Map<String, DependResult> resultMap = new HashMap<>();
if (StringUtils.isEmpty(log)) {
return resultMap;
}
BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(log.getBytes(
StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
if (line.contains(DEPENDENT_SPLIT)) {
String[] tmpStringArray = line.split(":\\|\\|");
if (tmpStringArray.length != 2) {
continue;
}
String dependResultString = tmpStringArray[1];
String[] dependStringArray = dependResultString.split(",");
if (dependStringArray.length != 2) {
continue;
}
String key = dependStringArray[0].trim();
DependResult dependResult = DependResult.valueOf(dependStringArray[1].trim());
resultMap.put(key, dependResult);
}
}
return resultMap;
}
/**
* query sub process instance detail info by task id
*
* @param loginUser login user
* @param projectName project name
* @param taskId task id
* @return sub process instance detail
*/
public Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
if (taskInstance == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
if (!taskInstance.isSubProcess()) {
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
return result;
}
ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
taskInstance.getProcessInstanceId(), taskInstance.getId());
if (subWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
return result;
}
Map<String, Object> dataMap = new HashMap<>();
dataMap.put(Constants.SUBPROCESS_INSTANCE_ID, subWorkflowInstance.getId());
result.put(DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* update process instance
*
* @param loginUser login user
* @param projectName project name
* @param processInstanceJson process instance json
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
* @param flag flag
* @param locations locations
* @param connects connects
* @return update result code
* @throws ParseException parse exception for json parse
*/
public Map<String, Object> updateProcessInstance(User loginUser, String projectName, Integer processInstanceId,
String processInstanceJson, String scheduleTime, Boolean syncDefine,
Flag flag, String locations, String connects) throws ParseException {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
//check project permission
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
//check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
Date schedule = null;
schedule = processInstance.getScheduleTime();
if (scheduleTime != null) {
schedule = DateUtils.getScheduleDate(scheduleTime);
}
processInstance.setScheduleTime(schedule);
processInstance.setLocations(locations);
processInstance.setConnects(connects);
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
Map<String, Object> checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson);
if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
originDefParams = JSONUtils.toJsonString(processData.getGlobalParams());
List<Property> globalParamList = processData.getGlobalParams();
Map<String, String> globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
Tenant tenant = processService.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
if (tenant != null) {
processInstance.setTenantCode(tenant.getTenantCode());
}
// get the processinstancejson before saving,and then save the name and taskid
String oldJson = processInstance.getProcessInstanceJson();
if (StringUtils.isNotEmpty(oldJson)) {
processInstanceJson = processService.changeJson(processData,oldJson);
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
int update = processService.updateProcessInstance(processInstance);
int updateDefine = 1;
if (Boolean.TRUE.equals(syncDefine)) {
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(timeout);
processDefinition.setUpdateTime(new Date());
// add process definition version
long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition);
processDefinition.setVersion(version);
updateDefine = processDefineMapper.updateById(processDefinition);
}
if (update > 0 && updateDefine > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
}
return result;
}
/**
* query parent process instance detail info by sub process instance id
*
* @param loginUser login user
* @param projectName project name
* @param subId sub process id
* @return parent instance detail
*/
public Map<String, Object> queryParentInstanceBySubId(User loginUser, String projectName, Integer subId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
if (subInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
return result;
}
if (subInstance.getIsSubProcess() == Flag.NO) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result;
}
ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId);
if (parentWorkflowInstance == null) {
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST);
return result;
}
Map<String, Object> dataMap = new HashMap<>();
dataMap.put(Constants.PARENT_WORKFLOW_INSTANCE, parentWorkflowInstance.getId());
result.put(DATA_LIST, dataMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete process instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
}
return result;
}
/**
* view process instance variables
*
* @param processInstanceId process instance id
* @return variables data
*/
public Map<String, Object> viewVariables(Integer processInstanceId) {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("workflow instance is null");
}
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
String workflowInstanceJson = processInstance.getProcessInstanceJson();
ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class);
String userDefinedParams = processInstance.getGlobalParams();
// global params
List<Property> globalParams = new ArrayList<>();
if (userDefinedParams != null && userDefinedParams.length() > 0) {
globalParams = JSONUtils.toList(userDefinedParams, Property.class);
}
List<TaskNode> taskNodeList = workflowData.getTasks();
// global param string
String globalParamStr = JSONUtils.toJsonString(globalParams);
globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams);
globalParams = JSONUtils.toList(globalParamStr, Property.class);
for (Property property : globalParams) {
timeParams.put(property.getProp(), property.getValue());
}
// local params
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
Map<String, String> map = JSONUtils.toMap(parameter);
String localParams = map.get(LOCAL_PARAMS);
if (localParams != null && !localParams.isEmpty()) {
localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams);
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
Map<String, Object> localParamsMap = new HashMap<>();
localParamsMap.put(Constants.TASK_TYPE, taskNode.getType());
localParamsMap.put(Constants.LOCAL_PARAMS_LIST, localParamsList);
if (CollectionUtils.isNotEmpty(localParamsList)) {
localUserDefParams.put(taskNode.getName(), localParamsMap);
}
}
}
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(GLOBAL_PARAMS, globalParams);
resultMap.put(LOCAL_PARAMS, localUserDefParams);
result.put(DATA_LIST, resultMap);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* encapsulation gantt structure
*
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
*/
public Map<String, Object> viewGantt(Integer processInstanceId) throws Exception {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("workflow instance is null");
}
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag = processInstance2DAG(processInstance);
//topological sort
List<String> nodeList = dag.topologicalSort();
ganttDto.setTaskNames(nodeList);
List<Task> taskList = new ArrayList<>();
for (String node : nodeList) {
TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node);
if (taskInstance == null) {
continue;
}
Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime();
Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime();
Task task = new Task();
task.setTaskName(taskInstance.getName());
task.getStartDate().add(startTime.getTime());
task.getEndDate().add(endTime.getTime());
task.setIsoStart(startTime);
task.setIsoEnd(endTime);
task.setStatus(taskInstance.getState().toString());
task.setExecutionDate(taskInstance.getStartTime());
task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime()));
taskList.add(task);
}
ganttDto.setTasks(taskList);
result.put(DATA_LIST, ganttDto);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* process instance to DAG
*
* @param processInstance input process instance
* @return process instance dag.
*/
private static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(ProcessInstance processInstance) {
String processDefinitionJson = processInstance.getProcessInstanceJson();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);
return DagHelper.buildDagGraph(processDag);
}
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* @param states states array
* @return process instance list
*/
public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) {
return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
}
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* @param size size
* @return process instance list
*/
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size) {
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
}
}

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* queue service impl
*/
@Service
public class QueueServiceImpl extends BaseService implements QueueService {
private static final Logger logger = LoggerFactory.getLogger(QueueServiceImpl.class);
@Autowired
private QueueMapper queueMapper;
@Autowired
private UserMapper userMapper;
/**
* query queue list
*
* @param loginUser login user
* @return queue list
*/
public Map<String, Object> queryList(User loginUser) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
List<Queue> queueList = queueMapper.selectList(null);
result.put(Constants.DATA_LIST, queueList);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query queue list paging
*
* @param loginUser login user
* @param pageNo page number
* @param searchVal search value
* @param pageSize page size
* @return queue list
*/
public Map<String, Object> queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
Page<Queue> page = new Page<>(pageNo, pageSize);
IPage<Queue> queueList = queueMapper.queryQueuePaging(page, searchVal);
Integer count = (int) queueList.getTotal();
PageInfo<Queue> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount(count);
pageInfo.setLists(queueList.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* create queue
*
* @param loginUser login user
* @param queue queue
* @param queueName queue name
* @return create result
*/
public Map<String, Object> createQueue(User loginUser, String queue, String queueName) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
return result;
}
if (checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
if (checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
Queue queueObj = new Queue();
Date now = new Date();
queueObj.setQueue(queue);
queueObj.setQueueName(queueName);
queueObj.setCreateTime(now);
queueObj.setUpdateTime(now);
queueMapper.insert(queueObj);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* update queue
*
* @param loginUser login user
* @param queue queue
* @param id queue id
* @param queueName queue name
* @return update result code
*/
public Map<String, Object> updateQueue(User loginUser, int id, String queue, String queueName) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
return result;
}
Queue queueObj = queueMapper.selectById(id);
if (queueObj == null) {
putMsg(result, Status.QUEUE_NOT_EXIST, id);
return result;
}
// whether queue value or queueName is changed
if (queue.equals(queueObj.getQueue()) && queueName.equals(queueObj.getQueueName())) {
putMsg(result, Status.NEED_NOT_UPDATE_QUEUE);
return result;
}
// check queue name is exist
if (!queueName.equals(queueObj.getQueueName())
&& checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
// check queue value is exist
if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
// check old queue using by any user
if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) {
//update user related old queue
Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName);
logger.info("old queue have related {} user, exec update user success.", relatedUserNums);
}
// update queue
Date now = new Date();
queueObj.setQueue(queue);
queueObj.setQueueName(queueName);
queueObj.setUpdateTime(now);
queueMapper.updateById(queueObj);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* verify queue and queueName
*
* @param queue queue
* @param queueName queue name
* @return true if the queue name not exists, otherwise return false
*/
public Result<Object> verifyQueue(String queue, String queueName) {
Result<Object> result = new Result<>();
if (StringUtils.isEmpty(queue)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE);
return result;
}
if (StringUtils.isEmpty(queueName)) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME);
return result;
}
if (checkQueueNameExist(queueName)) {
putMsg(result, Status.QUEUE_NAME_EXIST, queueName);
return result;
}
if (checkQueueExist(queue)) {
putMsg(result, Status.QUEUE_VALUE_EXIST, queue);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check queue exist
* if exists return truenot exists return false
* check queue exist
*
* @param queue queue
* @return true if the queue not exists, otherwise return false
*/
private boolean checkQueueExist(String queue) {
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null));
}
/**
* check queue name exist
* if exists return truenot exists return false
*
* @param queueName queue name
* @return true if the queue name not exists, otherwise return false
*/
private boolean checkQueueNameExist(String queueName) {
return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName));
}
/**
* check old queue name using by any user
* if need to update user
*
* @param oldQueue old queue name
* @param newQueue new queue name
* @return true if need to update user
*/
private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) {
return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.MonitorServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
@ -43,7 +44,8 @@ public class MonitorServiceTest {
private static final Logger logger = LoggerFactory.getLogger(MonitorServiceTest.class); private static final Logger logger = LoggerFactory.getLogger(MonitorServiceTest.class);
@InjectMocks @InjectMocks
private MonitorService monitorService; private MonitorServiceImpl monitorService;
@Mock @Mock
private MonitorDBDao monitorDBDao; private MonitorDBDao monitorDBDao;

View File

@ -22,6 +22,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -68,7 +69,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
public class ProcessInstanceServiceTest { public class ProcessInstanceServiceTest {
@InjectMocks @InjectMocks
ProcessInstanceService processInstanceService; ProcessInstanceServiceImpl processInstanceService;
@Mock @Mock
ProjectMapper projectMapper; ProjectMapper projectMapper;

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.QueueServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -50,7 +51,7 @@ public class QueueServiceTest {
private static final Logger logger = LoggerFactory.getLogger(QueueServiceTest.class); private static final Logger logger = LoggerFactory.getLogger(QueueServiceTest.class);
@InjectMocks @InjectMocks
private QueueService queueService; private QueueServiceImpl queueService;
@Mock @Mock
private QueueMapper queueMapper; private QueueMapper queueMapper;
@Mock @Mock

View File

@ -752,9 +752,17 @@ public final class Constants {
public static final String SUBTRACT_STRING = "-"; public static final String SUBTRACT_STRING = "-";
public static final String GLOBAL_PARAMS = "globalParams"; public static final String GLOBAL_PARAMS = "globalParams";
public static final String LOCAL_PARAMS = "localParams"; public static final String LOCAL_PARAMS = "localParams";
public static final String LOCAL_PARAMS_LIST = "localParamsList";
public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId";
public static final String PROCESS_INSTANCE_STATE = "processInstanceState"; public static final String PROCESS_INSTANCE_STATE = "processInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String TASK_TYPE = "taskType";
public static final String TASK_LIST = "taskList"; public static final String TASK_LIST = "taskList";
public static final String RWXR_XR_X = "rwxr-xr-x"; public static final String RWXR_XR_X = "rwxr-xr-x";
public static final String QUEUE = "queue";
public static final String QUEUE_NAME = "queueName";
public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0;
public static final int LOG_QUERY_LIMIT = 4096;
/** /**
* master/worker server use for zk * master/worker server use for zk
@ -1010,11 +1018,13 @@ public final class Constants {
*/ */
public static final String PLUGIN_JAR_SUFFIX = ".jar"; public static final String PLUGIN_JAR_SUFFIX = ".jar";
public static final int NORAML_NODE_STATUS = 0; public static final int NORMAL_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1; public static final int ABNORMAL_NODE_STATUS = 1;
public static final String START_TIME = "start time"; public static final String START_TIME = "start time";
public static final String END_TIME = "end time"; public static final String END_TIME = "end time";
public static final String START_END_DATE = "startDate,endDate";
/** /**
* system line separator * system line separator
*/ */

View File

@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import java.util.Date;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -56,7 +57,7 @@ public class HeartBeatTask extends Thread {
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage(); double loadAverage = OSUtils.loadAverage();
int status = Constants.NORAML_NODE_STATUS; int status = Constants.NORMAL_NODE_STATUS;
if (availablePhysicalMemorySize < reservedMemory if (availablePhysicalMemorySize < reservedMemory
|| loadAverage > maxCpuloadAvg) { || loadAverage > maxCpuloadAvg) {