merge force task success from legen618/dev-FTS

This commit is contained in:
chengshiwen 2020-12-20 15:44:36 +08:00
commit c3b5c5fe40
19 changed files with 492 additions and 18 deletions

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -127,4 +129,30 @@ public class TaskInstanceController extends BaseController {
return returnDataListPaging(result);
}
/**
* change one single task instance's state from FAILURE to FORCED_SUCCESS
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@ApiOperation(value = "force-success", notes = "FORCE_SINGLE_TASK_SUCCESS")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12")
})
@PostMapping(value = "/force-success")
@ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR)
public Result<Object> forceSingleTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "taskInstanceId") Integer taskInstanceId) {
String userNameReplace = StringUtils.replaceNRTtoUnderline(loginUser.getUserName());
String projectNameReplace = StringUtils.replaceNRTtoUnderline(projectName);
logger.info("force task success, login user: {}, project:{}, task instance id:{}",
userNameReplace, projectNameReplace, taskInstanceId);
Map<String, Object> result = taskInstanceService.forceSingleTaskSuccess(loginUser, projectName, taskInstanceId);
return returnDataList(result);
}
}

View File

@ -57,6 +57,16 @@ public class TaskCountDto {
.sum();
}
// remove the specified state
public void removeStateFromCountList(ExecutionStatus status) {
for (TaskStateCount count : this.taskCountDtos) {
if (count.getTaskStateType().equals(status)) {
this.taskCountDtos.remove(count);
break;
}
}
}
public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos;
}

View File

@ -29,8 +29,9 @@ public enum ExecuteType {
* 3 resume failure
* 4 stop
* 5 pause
* 6 resume from forced success
*/
NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, RESUME_FROM_FORCED_SUCCESS;
public static ExecuteType getEnum(int value){

View File

@ -196,7 +196,9 @@ public enum Status {
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_TENANT_CODE_ERROR(10164, "Please enter the English tenant code", "请输入英文租户编码"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"),
@ -247,8 +249,9 @@ public enum Status {
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
NO_VALID_FORCED_SUCCESS_TASK(50030, "there is no valid forced success node in process instance {0}", "工作流实例[{0}]中不包含有效的强制成功的任务实例"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),

View File

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
@ -283,6 +284,13 @@ public class ExecutorService extends BaseService {
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
case RESUME_FROM_FORCED_SUCCESS:
if (!this.checkValidForcedSuccessTask(processInstanceId)) {
putMsg(result, Status.NO_VALID_FORCED_SUCCESS_TASK, processInstance.getName());
} else {
result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.RESUME_FROM_FORCED_SUCCESS);
}
break;
default:
logger.error("unknown execute type : {}", executeType);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
@ -330,6 +338,7 @@ public class ExecutorService extends BaseService {
}
break;
case START_FAILURE_TASK_PROCESS:
case RESUME_FROM_FORCED_SUCCESS:
if (executionStatus.typeIsFailure()) {
checkResult = true;
}
@ -610,4 +619,26 @@ public class ExecutorService extends BaseService {
return null;
}
/**
* check if the process instance contains valid forced success task
*
* @param processInstanceId
* @return
*/
private boolean checkValidForcedSuccessTask(int processInstanceId) {
List<Integer> forcedSuccessList = processService.findTaskIdByInstanceState(processInstanceId, ExecutionStatus.FORCED_SUCCESS);
if (forcedSuccessList != null && !forcedSuccessList.isEmpty()) {
return true;
}
List<Integer> failedSubList = processService.findTaskIdByInstanceStatusAndType(processInstanceId,
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
for (int i = 0; i < failedSubList.size(); i++) {
if (processService.haveForcedSuccessInSubProcess(failedSubList.get(i))) {
return true;
}
}
return false;
}
}

View File

@ -145,6 +145,51 @@ public class TaskInstanceService extends BaseService {
return result;
}
/**
* change one single task instance's state from failure to forced success
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
public Map<String, Object> forceSingleTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
}
else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
}
/***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map

View File

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* @return process instance state count data
*/
public Map<String, Object> countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) {
return this.countStateByProject(
Map<String, Object> result = this.countStateByProject(
loginUser,
projectId,
startDate,
endDate,
(start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds));
// process state count needs to remove state of forced success
if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) {
((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS);
}
return result;
}
private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate

View File

@ -28,6 +28,14 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import java.util.HashMap;
import java.util.Map;
@ -40,11 +48,16 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* task instance controller test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskInstanceControllerTest {
public class TaskInstanceControllerTest extends AbstractControllerTest {
@InjectMocks
private TaskInstanceController taskInstanceController;
@ -70,4 +83,24 @@ public class TaskInstanceControllerTest {
}
@Test
public void testForceSingleTaskSuccess() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId","104");
Map<String, Object> mockResult = new HashMap<>(5);
mockResult.put(Constants.STATUS, Status.SUCCESS);
mockResult.put(Constants.MSG, Status.SUCCESS.getMsg());
when(taskInstanceService.forceSingleTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success","test")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
}
}

View File

@ -18,19 +18,24 @@
package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -82,12 +87,16 @@ public class ExecutorService2Test {
private int processDefinitionId = 1;
private int processInstanceId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private ProcessInstance processInstance = new ProcessInstance();
private User loginUser = new User();
private String projectName = "projectName";
@ -107,6 +116,13 @@ public class ExecutorService2Test {
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
// project
project.setName(projectName);
@ -120,6 +136,8 @@ public class ExecutorService2Test {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition);
}
/**
@ -257,6 +275,39 @@ public class ExecutorService2Test {
}
@Test
public void testExecute() {
List<Integer> mockRes = new ArrayList<>();
mockRes.add(1);
mockRes.add(2);
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class)))
.thenReturn(true);
// check execute type error
processInstance.setState(ExecutionStatus.SUCCESS);
Map<String, Object> checkExeTypeRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS);
Assert.assertEquals(Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, checkExeTypeRes.get(Constants.STATUS));
// no valid forced success task
processInstance.setState(ExecutionStatus.FAILURE);
Map<String, Object> noValidTaskRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS);
Assert.assertEquals(Status.NO_VALID_FORCED_SUCCESS_TASK, noValidTaskRes.get(Constants.STATUS));
// have forced success in sub-process
Mockito.when(processService.findTaskIdByInstanceStatusAndType(anyInt(), any(ExecutionStatus[].class), any(TaskType.class)))
.thenReturn(mockRes);
Mockito.when(processService.haveForcedSuccessInSubProcess(anyInt()))
.thenReturn(true);
Map<String, Object> successRes1 = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS);
Assert.assertEquals(Status.SUCCESS, successRes1.get(Constants.STATUS));
// test success
Mockito.when(processService.findTaskIdByInstanceState(processInstanceId, ExecutionStatus.FORCED_SUCCESS)).thenReturn(mockRes);
Map<String, Object> successRes = executorService.execute(loginUser, projectName, processInstanceId, ExecuteType.RESUME_FROM_FORCED_SUCCESS);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
verify(processService, times(2)).createCommand(any(Command.class));
}
private List<Server> getMasterServersList() {
List<Server> masterServerList = new ArrayList<>();
Server masterServer1 = new Server();

View File

@ -214,4 +214,48 @@ public class TaskInstanceServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
}
@Test
public void forceSingleTaskSuccess() {
User user = getAdminUser();
String projectName = "test";
Project project = getProject(projectName);
int taskId = 1;
TaskInstance task = getTaskInstance();
Map<String, Object> mockSuccess = new HashMap<>(5);
putMsg(mockSuccess, Status.SUCCESS);
when(projectMapper.queryByName(projectName)).thenReturn(project);
// user auth failed
Map<String, Object> mockFailure = new HashMap<>(5);
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName);
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure);
Map<String, Object> authFailRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS));
// test task not found
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
Map<String, Object> taskNotFoundRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));
// test task instance state error
task.setState(ExecutionStatus.SUCCESS);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
Map<String, Object> taskStateErrorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS));
// test error
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(0);
Map<String, Object> errorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS));
// test success
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(1);
Map<String, Object> successRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
}

View File

@ -39,6 +39,7 @@ public enum CommandType {
* 8 pause a process
* 9 stop a process
* 10 recover waiting thread
* 11 resume process from forced-success task nodes
*/
START_PROCESS(0, "start a new process"),
START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"),
@ -50,7 +51,8 @@ public enum CommandType {
REPEAT_RUNNING(7, "repeat running a process"),
PAUSE(8, "pause a process"),
STOP(9, "stop a process"),
RECOVER_WAITTING_THREAD(10, "recover waiting thread");
RECOVER_WAITTING_THREAD(10, "recover waiting thread"),
RESUME_FROM_FORCED_SUCCESS(11, "resume process from forced-success task nodes");
CommandType(int code, String descp){
this.code = code;

View File

@ -41,6 +41,7 @@ public enum ExecutionStatus {
* 10 waiting thread
* 11 waiting depend node complete
* 12 delay execution
* 13 forced success
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
@ -54,7 +55,8 @@ public enum ExecutionStatus {
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution");
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
ExecutionStatus(int code, String descp) {
this.code = code;
@ -79,7 +81,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsSuccess() {
return this == SUCCESS;
return this == SUCCESS || this == FORCED_SUCCESS;
}
/**

View File

@ -54,6 +54,7 @@ public class BusinessTimeUtils {
case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING:
case RESUME_FROM_FORCED_SUCCESS:
case SCHEDULER:
default:
businessDate = addDays(new Date(), -1);

View File

@ -72,4 +72,12 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime
);
List<Integer> queryTaskByPIdAndStatusAndType(@Param("processInstanceId") Integer processInstanceId,
@Param("states") int[] stateArray,
@Param("taskType") String taskType);
List<Integer> queryTaskBySubProcessTaskIdAndStatusAndType(@Param("subProcessTaskId") Integer subProcessTaskId,
@Param("states") int[] stateArray,
@Param("taskType") String taskType);
}

View File

@ -152,4 +152,27 @@
</if>
order by instance.start_time desc
</select>
<select id="queryTaskByPIdAndStatusAndType" resultType="java.lang.Integer">
select id from t_ds_task_instance
where process_instance_id = #{processInstanceId}
and task_type = #{taskType}
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
and flag = 1
</select>
<select id="queryTaskBySubProcessTaskIdAndStatusAndType" resultType="java.lang.Integer">
select id from t_ds_task_instance
where process_instance_id =
(select process_instance_id from t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId})
and state in
<foreach collection="states" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
<if test="taskType != null">
and task_type = #{taskType}
</if>
and flag = 1
</select>
</mapper>

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.junit.Assert;
import org.junit.Test;
@ -36,6 +37,7 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -55,20 +57,38 @@ public class TaskInstanceMapperTest {
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceMapMapper processInstanceMapMapper;
/**
* insert
*
* @return TaskInstance
*/
private TaskInstance insertOne(){
private TaskInstance insertOne() {
//insertOne
return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString());
}
/**
* construct a task instance and then insert
*
* @param taskName
* @param processInstanceId
* @param state
* @param taskType
* @return
*/
private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setFlag(Flag.YES);
taskInstance.setName("ut task");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setName(taskName);
taskInstance.setState(state);
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskJson("{}");
taskInstance.setTaskType(TaskType.SHELL.toString());
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstanceMapper.insert(taskInstance);
return taskInstance;
}
@ -90,7 +110,7 @@ public class TaskInstanceMapperTest {
* test delete
*/
@Test
public void testDelete(){
public void testDelete() {
TaskInstance taskInstance = insertOne();
int delete = taskInstanceMapper.deleteById(taskInstance.getId());
Assert.assertEquals(1, delete);
@ -149,7 +169,7 @@ public class TaskInstanceMapperTest {
taskInstanceMapper.deleteById(task2.getId());
taskInstanceMapper.deleteById(task.getId());
Assert.assertNotEquals(taskInstances.size(), 0);
Assert.assertNotEquals(taskInstances1.size(), 0 );
Assert.assertNotEquals(taskInstances1.size(), 0);
}
/**
@ -298,4 +318,53 @@ public class TaskInstanceMapperTest {
Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0);
}
}
@Test
public void testQueryTaskByPIdAndStatusAndType() {
// insert three task instances with the same process instance id
List<TaskInstance> taskList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String name = "ut task" + String.valueOf(i);
taskList.add(insertOne(name, 66, ExecutionStatus.FAILURE, TaskType.SUB_PROCESS.toString()));
}
// test query result
List<Integer> resultArray = taskInstanceMapper.queryTaskByPIdAndStatusAndType(66,
new int[] {ExecutionStatus.FAILURE.ordinal(), ExecutionStatus.KILL.ordinal(), ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()},
TaskType.SUB_PROCESS.toString());
Assert.assertEquals(3, resultArray.size());
// delete
for (int i = 0; i < 3; i++) {
taskInstanceMapper.deleteById(taskList.get(i));
}
}
@Test
public void testQueryTaskBySubProcessTaskIdAndStatusAndType() {
TaskInstance parentTask = insertOne("parent-task", 66, ExecutionStatus.FAILURE, TaskType.SUB_PROCESS.toString());
ProcessInstanceMap processInstanceMap = new ProcessInstanceMap();
processInstanceMap.setParentProcessInstanceId(66);
processInstanceMap.setParentTaskInstanceId(parentTask.getId());
processInstanceMap.setProcessInstanceId(67);
processInstanceMapMapper.insert(processInstanceMap);
TaskInstance subTask1 = insertOne("sub1", 67, ExecutionStatus.SUCCESS, TaskType.SHELL.toString());
TaskInstance subTask2 = insertOne("sub2", 67, ExecutionStatus.FORCED_SUCCESS, TaskType.SHELL.toString());
// test query result
List<Integer> resultList = taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(parentTask.getId(),
new int[] {ExecutionStatus.FORCED_SUCCESS.ordinal()},
null);
Assert.assertEquals(1, resultList.size());
Assert.assertEquals(subTask2.getId(), resultList.get(0).intValue());
// delete
taskInstanceMapper.deleteById(parentTask.getId());
processInstanceMapMapper.deleteById(processInstanceMap.getId());
taskInstanceMapper.deleteById(subTask1.getId());
taskInstanceMapper.deleteById(subTask2.getId());
}
}

View File

@ -414,7 +414,7 @@ public class MasterExecThread implements Runnable {
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getName(), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
if (task.getState().typeIsFailure() && !task.taskCanRetry() && !task.isConditionsTask()) {
errorTaskList.put(task.getName(), task);
}
}
@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable {
// updateProcessInstance completed task status
// failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill
// check if there exists forced success nodes in errorTaskList
if (errorTaskList.size() > 0) {
for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue();
@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable {
processService.updateTaskInstance(completeTask);
}
}
for (Map.Entry<String, TaskInstance> entry : errorTaskList.entrySet()) {
TaskInstance errorTask = entry.getValue();
TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId());
if (currentTask == null) {
continue;
}
// for nodes that have been forced success
if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
// update state in this thread and remove from errorTaskList
errorTask.setState(currentTask.getState());
logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName());
errorTaskList.remove(errorTask.getName());
// submit post nodes
submitPostNode(errorTask.getName());
}
}
}
if (canSubmitTaskToQueue()) {
submitStandByTask();
@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable {
int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance tmpTask = processService.findTaskInstanceById(task.getId());
if (tmpTask != null && tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(tmpTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;
}
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) {

View File

@ -81,6 +81,8 @@ public class AlertManager {
return "pause";
case STOP:
return "stop";
case RESUME_FROM_FORCED_SUCCESS:
return "resume from forced success";
default:
return "unknown type";
}

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -262,6 +264,7 @@ public class ProcessService {
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
cmdTypeMap.put(CommandType.RESUME_FROM_FORCED_SUCCESS, 1);
CommandType commandType = command.getCommandType();
if (cmdTypeMap.containsKey(commandType)) {
@ -772,6 +775,31 @@ public class ProcessService {
break;
case SCHEDULER:
break;
case RESUME_FROM_FORCED_SUCCESS:
List<Integer> failedSubList = this.findTaskIdByInstanceStatusAndType(processInstance.getId(),
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
for (int i = 0; i < failedSubList.size(); i++) {
// if there exists forced success in the sub_process
if (haveForcedSuccessInSubProcess(failedSubList.get(i))) {
// change sub_process task's state into submitted_success
TaskInstance taskInstance = this.findTaskInstanceById(failedSubList.get(i));
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
}
}
/**
* set resume node list to null
* 1. we can have a complete dag in the ExecThread so that it can restore the previous context
* 2. each time the operation is done the state of process will be reasonable as usual
*/
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(null)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
default:
break;
}
@ -779,6 +807,30 @@ public class ProcessService {
return processInstance;
}
/**
* recursively check if a sub process node contains forced success node
* @param taskInstanceId task instance id
* @return true or false
*/
public boolean haveForcedSuccessInSubProcess(int taskInstanceId) {
List<Integer> forcedSuccessList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
new ExecutionStatus[]{ExecutionStatus.FORCED_SUCCESS},
null);
if (forcedSuccessList != null && !forcedSuccessList.isEmpty()) {
return true;
}
List<Integer> childSubList = this.findTaskIdInSubProcessByStatusAndType(taskInstanceId,
new ExecutionStatus[]{ExecutionStatus.FAILURE, ExecutionStatus.KILL, ExecutionStatus.NEED_FAULT_TOLERANCE},
TaskType.SUB_PROCESS);
for (Integer child : childSubList) {
if (haveForcedSuccessInSubProcess(child)) {
return true;
}
}
return false;
}
/**
* return complement data if the process start with complement data
*
@ -1362,6 +1414,40 @@ public class ProcessService {
return taskInstanceMapper.queryTaskByProcessIdAndState(instanceId, state.ordinal());
}
/**
* get id list by task state and type
* @param processInstanceId process instance id
* @param states task instance state array
* @param taskType task type
* @return task instance id list
*/
public List<Integer> findTaskIdByInstanceStatusAndType(int processInstanceId, ExecutionStatus[] states, TaskType taskType) {
int[] statesArray = new int[states.length];
for (int i = 0; i < states.length; i++) {
statesArray[i] = states[i].ordinal();
}
return taskInstanceMapper.queryTaskByPIdAndStatusAndType(processInstanceId, statesArray, taskType.toString());
}
/**
* get tasks in sub_process by sub_process task id and state and type
* if param type is null, it queries all types
* @param taskId task instance id
* @param states task instance state array
* @param taskType task type
* @return task instance id list
*/
public List<Integer> findTaskIdInSubProcessByStatusAndType(int taskId, ExecutionStatus[] states, TaskType taskType) {
int[] statesArray = new int[states.length];
for (int i = 0; i < states.length; i++) {
statesArray[i] = states[i].ordinal();
}
if (taskType == null) {
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, null);
}
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, taskType.toString());
}
/**
* find valid task list by process definition id
*