mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 03:08:01 +08:00
Remove unused dependentResult in TaskInstance (#16236)
This commit is contained in:
parent
d42ac96c55
commit
177a001392
@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.api.utils.Result;
|
|||||||
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
|
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
|
||||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
||||||
import org.apache.dolphinscheduler.dao.entity.User;
|
import org.apache.dolphinscheduler.dao.entity.User;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -122,8 +121,6 @@ public interface ProcessInstanceService {
|
|||||||
long projectCode,
|
long projectCode,
|
||||||
Integer processId) throws IOException;
|
Integer processId) throws IOException;
|
||||||
|
|
||||||
Map<String, DependResult> parseLogForDependentResult(String log) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* query sub process instance detail info by task id
|
* query sub process instance detail info by task id
|
||||||
*
|
*
|
||||||
|
@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
|
|||||||
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
|
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
|
||||||
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
|
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
|
||||||
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
|
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
|
||||||
import static org.apache.dolphinscheduler.common.constants.Constants.DEPENDENT_SPLIT;
|
|
||||||
import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
|
import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
|
||||||
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
|
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
|
||||||
import static org.apache.dolphinscheduler.common.constants.Constants.PROCESS_INSTANCE_STATE;
|
import static org.apache.dolphinscheduler.common.constants.Constants.PROCESS_INSTANCE_STATE;
|
||||||
@ -59,7 +58,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
|
|||||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
|
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
|
||||||
import org.apache.dolphinscheduler.dao.entity.Project;
|
import org.apache.dolphinscheduler.dao.entity.Project;
|
||||||
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
|
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
|
||||||
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
|
|
||||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
|
||||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
|
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
|
||||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
|
||||||
@ -76,7 +74,6 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
|
|||||||
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
|
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
|
||||||
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
|
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
|
||||||
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
|
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
|
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
|
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
|
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
|
||||||
@ -86,11 +83,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
|
|||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -446,11 +438,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||||||
* @param projectCode project code
|
* @param projectCode project code
|
||||||
* @param processId process instance id
|
* @param processId process instance id
|
||||||
* @return task list for the process instance
|
* @return task list for the process instance
|
||||||
* @throws IOException io exception
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Object> queryTaskListByProcessId(User loginUser, long projectCode,
|
public Map<String, Object> queryTaskListByProcessId(User loginUser, long projectCode,
|
||||||
Integer processId) throws IOException {
|
Integer processId) {
|
||||||
Project project = projectMapper.queryByCode(projectCode);
|
Project project = projectMapper.queryByCode(projectCode);
|
||||||
// check user access for project
|
// check user access for project
|
||||||
Map<String, Object> result =
|
Map<String, Object> result =
|
||||||
@ -471,7 +462,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||||||
}
|
}
|
||||||
List<TaskInstance> taskInstanceList =
|
List<TaskInstance> taskInstanceList =
|
||||||
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processId, processInstance.getTestFlag());
|
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processId, processInstance.getTestFlag());
|
||||||
addDependResultForTaskList(loginUser, taskInstanceList);
|
|
||||||
Map<String, Object> resultMap = new HashMap<>();
|
Map<String, Object> resultMap = new HashMap<>();
|
||||||
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
|
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
|
||||||
resultMap.put(TASK_LIST, taskInstanceList);
|
resultMap.put(TASK_LIST, taskInstanceList);
|
||||||
@ -541,57 +531,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
|
|||||||
return allDynamicSubWorkflowDtos;
|
return allDynamicSubWorkflowDtos;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* add dependent result for dependent task
|
|
||||||
*/
|
|
||||||
private void addDependResultForTaskList(User loginUser, List<TaskInstance> taskInstanceList) throws IOException {
|
|
||||||
for (TaskInstance taskInstance : taskInstanceList) {
|
|
||||||
if (TaskTypeUtils.isDependentTask(taskInstance.getTaskType())) {
|
|
||||||
log.info("DEPENDENT type task instance need to set dependent result, taskCode:{}, taskInstanceId:{}",
|
|
||||||
taskInstance.getTaskCode(), taskInstance.getId());
|
|
||||||
// TODO The result of dependent item should not be obtained from the log, waiting for optimization.
|
|
||||||
Result<ResponseTaskLog> logResult = loggerService.queryLog(loginUser,
|
|
||||||
taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
|
|
||||||
if (logResult.getCode() == Status.SUCCESS.ordinal()) {
|
|
||||||
String log = logResult.getData().getMessage();
|
|
||||||
Map<String, DependResult> resultMap = parseLogForDependentResult(log);
|
|
||||||
taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, DependResult> parseLogForDependentResult(String content) throws IOException {
|
|
||||||
Map<String, DependResult> resultMap = new HashMap<>();
|
|
||||||
if (StringUtils.isEmpty(content)) {
|
|
||||||
log.warn("Log content is empty.");
|
|
||||||
return resultMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content.getBytes(
|
|
||||||
StandardCharsets.UTF_8)), StandardCharsets.UTF_8));
|
|
||||||
String line;
|
|
||||||
while ((line = br.readLine()) != null) {
|
|
||||||
if (line.contains(DEPENDENT_SPLIT)) {
|
|
||||||
String[] tmpStringArray = line.split(":\\|\\|");
|
|
||||||
if (tmpStringArray.length != 2) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String dependResultString = tmpStringArray[1];
|
|
||||||
String[] dependStringArray = dependResultString.split(",");
|
|
||||||
if (dependStringArray.length != 3) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String key = dependStringArray[0].trim().split(":")[1].trim();
|
|
||||||
String result = dependStringArray[1].trim().split(":")[1].trim();
|
|
||||||
DependResult dependResult = DependResult.valueOf(result);
|
|
||||||
resultMap.put(key, dependResult);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return resultMap;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* query sub process instance detail info by task id
|
* query sub process instance detail info by task id
|
||||||
*
|
*
|
||||||
|
@ -64,7 +64,6 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
|
|||||||
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
|
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
|
||||||
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
|
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
|
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
|
|
||||||
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
|
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
|
||||||
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
|
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
|
||||||
import org.apache.dolphinscheduler.service.model.TaskNode;
|
import org.apache.dolphinscheduler.service.model.TaskNode;
|
||||||
@ -481,23 +480,6 @@ public class ProcessInstanceServiceTest {
|
|||||||
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
|
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testParseLogForDependentResult() throws IOException {
|
|
||||||
String logString =
|
|
||||||
"[INFO] 2019-03-19 17:11:08.475 org.apache.dolphinscheduler.server.worker.log.TaskLogger:[172]"
|
|
||||||
+ " - [taskAppId=TASK_223_10739_452334] dependent item complete, :|| dependentKey: 223-ALL-day-last1Day, result: SUCCESS, dependentDate: Wed Mar 19 17:10:36 CST 2019\n"
|
|
||||||
+ "[INFO] 2019-03-19 17:11:08.476 org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread:[172]"
|
|
||||||
+ " - task : 223_10739_452334 exit status code : 0\n"
|
|
||||||
+ "[root@node2 current]# ";
|
|
||||||
Map<String, DependResult> resultMap =
|
|
||||||
processInstanceService.parseLogForDependentResult(logString);
|
|
||||||
Assertions.assertEquals(1, resultMap.size());
|
|
||||||
|
|
||||||
resultMap.clear();
|
|
||||||
resultMap = processInstanceService.parseLogForDependentResult("");
|
|
||||||
Assertions.assertEquals(0, resultMap.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQuerySubProcessInstanceByTaskId() {
|
public void testQuerySubProcessInstanceByTaskId() {
|
||||||
long projectCode = 1L;
|
long projectCode = 1L;
|
||||||
|
@ -201,12 +201,6 @@ public class TaskInstance implements Serializable {
|
|||||||
@TableField(exist = false)
|
@TableField(exist = false)
|
||||||
private Priority processInstancePriority;
|
private Priority processInstancePriority;
|
||||||
|
|
||||||
/**
|
|
||||||
* dependent state
|
|
||||||
*/
|
|
||||||
@TableField(exist = false)
|
|
||||||
private String dependentResult;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* workerGroup
|
* workerGroup
|
||||||
*/
|
*/
|
||||||
|
@ -62,7 +62,6 @@ public class TaskInstanceUtils {
|
|||||||
target.setMaxRetryTimes(source.getMaxRetryTimes());
|
target.setMaxRetryTimes(source.getMaxRetryTimes());
|
||||||
target.setRetryInterval(source.getRetryInterval());
|
target.setRetryInterval(source.getRetryInterval());
|
||||||
target.setTaskInstancePriority(source.getTaskInstancePriority());
|
target.setTaskInstancePriority(source.getTaskInstancePriority());
|
||||||
target.setDependentResult(source.getDependentResult());
|
|
||||||
target.setWorkerGroup(source.getWorkerGroup());
|
target.setWorkerGroup(source.getWorkerGroup());
|
||||||
target.setEnvironmentCode(source.getEnvironmentCode());
|
target.setEnvironmentCode(source.getEnvironmentCode());
|
||||||
target.setEnvironmentConfig(source.getEnvironmentConfig());
|
target.setEnvironmentConfig(source.getEnvironmentConfig());
|
||||||
|
Loading…
Reference in New Issue
Block a user