diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index bc28c1d5e6..04e818f5c3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -170,18 +170,14 @@ public interface TaskInstanceMapper extends BaseMapper { * find last task instance list in the date interval * * @param taskCodes taskCodes - * @param startTime startTime - * @param endTime endTime * @param testFlag testFlag * @return task instance list */ - List findLastTaskInstances(@Param("taskCodes") Set taskCodes, - @Param("startTime") Date startTime, - @Param("endTime") Date endTime, + List findLastTaskInstances(@Param("processInstanceId") Integer processInstanceId, + @Param("taskCodes") Set taskCodes, @Param("testFlag") int testFlag); - TaskInstance findLastTaskInstance(@Param("taskCode") long depTaskCode, - @Param("startTime") Date startTime, - @Param("endTime") Date endTime, + TaskInstance findLastTaskInstance(@Param("processInstanceId") Integer processInstanceId, + @Param("taskCode") long depTaskCode, @Param("testFlag") int testFlag); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index b5d41f8783..0156416fd3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import java.util.List; import java.util.Set; @@ -91,20 +90,21 @@ public interface TaskInstanceDao extends IDao { /** * find last task instance list corresponding to taskCodes in the date interval * + * @param processInstanceId Task's parent process instance id * @param taskCodes taskCodes - * @param dateInterval dateInterval * @param testFlag test flag * @return task instance list */ - List queryLastTaskInstanceListIntervalByTaskCodes(Set taskCodes, DateInterval dateInterval, - int testFlag); + List queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId, + Set taskCodes, int testFlag); /** * find last task instance corresponding to taskCode in the date interval + * @param processInstanceId Task's parent process instance id * @param depTaskCode taskCode - * @param dateInterval dateInterval * @param testFlag test flag * @return task instance */ - TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, int testFlag); + TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, + long depTaskCode, int testFlag); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 9cbb92286c..7e96bcd68b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.commons.lang3.StringUtils; @@ -174,16 +173,15 @@ public class TaskInstanceDaoImpl extends BaseDao queryLastTaskInstanceListIntervalByTaskCodes(Set taskCodes, - DateInterval dateInterval, int testFlag) { - return mybatisMapper.findLastTaskInstances(taskCodes, dateInterval.getStartTime(), dateInterval.getEndTime(), - testFlag); + public List queryLastTaskInstanceListIntervalInProcessInstance(Integer processInstanceId, + Set taskCodes, + int testFlag) { + return mybatisMapper.findLastTaskInstances(processInstanceId, taskCodes, testFlag); } @Override - public TaskInstance queryLastTaskInstanceIntervalByTaskCode(long depTaskCode, DateInterval dateInterval, - int testFlag) { - return mybatisMapper.findLastTaskInstance(depTaskCode, dateInterval.getStartTime(), dateInterval.getEndTime(), - testFlag); + public TaskInstance queryLastTaskInstanceIntervalInProcessInstance(Integer processInstanceId, long depTaskCode, + int testFlag) { + return mybatisMapper.findLastTaskInstance(processInstanceId, depTaskCode, testFlag); } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 8e3674e523..1544d0ed8f 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -350,27 +350,25 @@ select task_code, max(end_time) as max_end_time from t_ds_task_instance where 1=1 and test_flag = #{testFlag} + and instance.process_instance_id = #{processInstanceId} and task_code in #{i} - - and start_time = ]]> #{startTime} and start_time #{endTime} - group by task_code ) t_max - on instance.task_code = t_max.task_code and instance.end_time = t_max.max_end_time + on instance.process_instance_id = t_max.process_instance_id + and instance.task_code = t_max.task_code + and instance.end_time = t_max.max_end_time diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java index 195212ef34..0d343a709f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java @@ -158,10 +158,9 @@ public class DependentExecute { if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_WORKFLOW_CODE) { result = dependResultByProcessInstance(processInstance); } else if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { - result = dependResultByAllTaskOfProcessInstance(processInstance, dateInterval, testFlag); + result = dependResultByAllTaskOfProcessInstance(processInstance, testFlag); } else { - result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), dateInterval, - testFlag); + result = dependResultBySingleTaskInstance(processInstance, dependentItem.getDepTaskCode(), testFlag); } if (result != DependResult.SUCCESS) { break; @@ -194,8 +193,7 @@ public class DependentExecute { * * @return */ - private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, - DateInterval dateInterval, int testFlag) { + private DependResult dependResultByAllTaskOfProcessInstance(ProcessInstance processInstance, int testFlag) { if (!processInstance.getState().isFinished()) { log.info("Wait for the dependent workflow to complete, processCode: {}, processInstanceId: {}.", processInstance.getProcessDefinitionCode(), processInstance.getId()); @@ -212,8 +210,8 @@ public class DependentExecute { .collect(Collectors.toMap(TaskDefinitionLog::getCode, TaskDefinitionLog::getName)); List taskInstanceList = - taskInstanceDao.queryLastTaskInstanceListIntervalByTaskCodes(taskDefinitionCodeMap.keySet(), - dateInterval, testFlag); + taskInstanceDao.queryLastTaskInstanceListIntervalInProcessInstance(processInstance.getId(), + taskDefinitionCodeMap.keySet(), testFlag); Map taskExecutionStatusMap = taskInstanceList.stream() .filter(taskInstance -> taskInstance.getTaskExecuteType() != TaskExecuteType.STREAM) @@ -245,14 +243,14 @@ public class DependentExecute { * * @param processInstance last process instance in the date interval * @param depTaskCode the dependent task code - * @param dateInterval date interval * @param testFlag test flag * @return depend result */ private DependResult dependResultBySingleTaskInstance(ProcessInstance processInstance, long depTaskCode, - DateInterval dateInterval, int testFlag) { + int testFlag) { TaskInstance taskInstance = - taskInstanceDao.queryLastTaskInstanceIntervalByTaskCode(depTaskCode, dateInterval, testFlag); + taskInstanceDao.queryLastTaskInstanceIntervalInProcessInstance(processInstance.getId(), + depTaskCode, testFlag); if (taskInstance == null) { TaskDefinition taskDefinition = taskDefinitionDao.queryByCode(depTaskCode);