mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 19:27:38 +08:00
match frontend and backend for force success
This commit is contained in:
parent
925d88dd5a
commit
af720d3ef1
@ -130,28 +130,28 @@ public class TaskInstanceController extends BaseController {
|
||||
}
|
||||
|
||||
/**
|
||||
* change one single task instance's state from FAILURE to FORCED_SUCCESS
|
||||
* change one task instance's state from FAILURE to FORCED_SUCCESS
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param projectName project name
|
||||
* @param taskInstanceId task instance id
|
||||
* @return the result code and msg
|
||||
*/
|
||||
@ApiOperation(value = "force-success", notes = "FORCE_SINGLE_TASK_SUCCESS")
|
||||
@ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS")
|
||||
@ApiImplicitParams({
|
||||
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12")
|
||||
})
|
||||
@PostMapping(value = "/force-success")
|
||||
@ResponseStatus(HttpStatus.OK)
|
||||
@ApiException(FORCE_TASK_SUCCESS_ERROR)
|
||||
public Result<Object> forceSingleTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
public Result<Object> forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
|
||||
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
|
||||
@RequestParam(value = "taskInstanceId") Integer taskInstanceId) {
|
||||
String userNameReplace = StringUtils.replaceNRTtoUnderline(loginUser.getUserName());
|
||||
String projectNameReplace = StringUtils.replaceNRTtoUnderline(projectName);
|
||||
logger.info("force task success, login user: {}, project:{}, task instance id:{}",
|
||||
userNameReplace, projectNameReplace, taskInstanceId);
|
||||
Map<String, Object> result = taskInstanceService.forceSingleTaskSuccess(loginUser, projectName, taskInstanceId);
|
||||
Map<String, Object> result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId);
|
||||
return returnDataList(result);
|
||||
}
|
||||
|
||||
|
@ -146,14 +146,14 @@ public class TaskInstanceService extends BaseService {
|
||||
}
|
||||
|
||||
/**
|
||||
* change one single task instance's state from failure to forced success
|
||||
* change one task instance's state from failure to forced success
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param projectName project name
|
||||
* @param taskInstanceId task instance id
|
||||
* @return the result code and msg
|
||||
*/
|
||||
public Map<String, Object> forceSingleTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
|
||||
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
|
||||
Map<String, Object> result = new HashMap<>(5);
|
||||
Project project = projectMapper.queryByName(projectName);
|
||||
|
||||
@ -182,8 +182,7 @@ public class TaskInstanceService extends BaseService {
|
||||
int changedNum = taskInstanceMapper.updateById(task);
|
||||
if (changedNum > 0) {
|
||||
putMsg(result, Status.SUCCESS);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
|
||||
}
|
||||
|
||||
|
@ -84,14 +84,14 @@ public class TaskInstanceControllerTest extends AbstractControllerTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceSingleTaskSuccess() throws Exception {
|
||||
public void testForceTaskSuccess() throws Exception {
|
||||
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
|
||||
paramsMap.add("taskInstanceId","104");
|
||||
|
||||
Map<String, Object> mockResult = new HashMap<>(5);
|
||||
mockResult.put(Constants.STATUS, Status.SUCCESS);
|
||||
mockResult.put(Constants.MSG, Status.SUCCESS.getMsg());
|
||||
when(taskInstanceService.forceSingleTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);
|
||||
when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);
|
||||
|
||||
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success","test")
|
||||
.header(SESSION_ID, sessionId)
|
||||
|
@ -216,7 +216,7 @@ public class TaskInstanceServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void forceSingleTaskSuccess() {
|
||||
public void forceTaskSuccess() {
|
||||
User user = getAdminUser();
|
||||
String projectName = "test";
|
||||
Project project = getProject(projectName);
|
||||
@ -231,31 +231,31 @@ public class TaskInstanceServiceTest {
|
||||
Map<String, Object> mockFailure = new HashMap<>(5);
|
||||
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName);
|
||||
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure);
|
||||
Map<String, Object> authFailRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
|
||||
Map<String, Object> authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
|
||||
Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS));
|
||||
|
||||
// test task not found
|
||||
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess);
|
||||
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
|
||||
Map<String, Object> taskNotFoundRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
|
||||
Map<String, Object> taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
|
||||
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));
|
||||
|
||||
// test task instance state error
|
||||
task.setState(ExecutionStatus.SUCCESS);
|
||||
when(taskInstanceMapper.selectById(1)).thenReturn(task);
|
||||
Map<String, Object> taskStateErrorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
|
||||
Map<String, Object> taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
|
||||
Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS));
|
||||
|
||||
// test error
|
||||
task.setState(ExecutionStatus.FAILURE);
|
||||
when(taskInstanceMapper.updateById(task)).thenReturn(0);
|
||||
Map<String, Object> errorRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
|
||||
Map<String, Object> errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
|
||||
Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS));
|
||||
|
||||
// test success
|
||||
task.setState(ExecutionStatus.FAILURE);
|
||||
when(taskInstanceMapper.updateById(task)).thenReturn(1);
|
||||
Map<String, Object> successRes = taskInstanceService.forceSingleTaskSuccess(user, projectName, taskId);
|
||||
Map<String, Object> successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
|
||||
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
|
||||
}
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ public class VarPoolUtils {
|
||||
* @throws ParseException ParseException
|
||||
*/
|
||||
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
|
||||
if (varPool == null || propToValue == null) {
|
||||
if (propToValue == null || StringUtils.isEmpty(varPool)) {
|
||||
return;
|
||||
}
|
||||
String[] splits = varPool.split("\\$VarPool\\$");
|
||||
|
@ -73,11 +73,11 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
|
||||
@Param("endTime") Date endTime
|
||||
);
|
||||
|
||||
List<Integer> queryTaskByPIdAndStatusAndType(@Param("processInstanceId") Integer processInstanceId,
|
||||
@Param("states") int[] stateArray,
|
||||
@Param("taskType") String taskType);
|
||||
List<Integer> queryTaskByProcessIdAndStateAndType(@Param("processInstanceId") Integer processInstanceId,
|
||||
@Param("states") int[] stateArray,
|
||||
@Param("taskType") String taskType);
|
||||
|
||||
List<Integer> queryTaskBySubProcessTaskIdAndStatusAndType(@Param("subProcessTaskId") Integer subProcessTaskId,
|
||||
@Param("states") int[] stateArray,
|
||||
@Param("taskType") String taskType);
|
||||
List<Integer> queryTaskBySubProcessTaskIdAndStateAndType(@Param("subProcessTaskId") Integer subProcessTaskId,
|
||||
@Param("states") int[] stateArray,
|
||||
@Param("taskType") String taskType);
|
||||
}
|
||||
|
@ -152,7 +152,7 @@
|
||||
</if>
|
||||
order by instance.start_time desc
|
||||
</select>
|
||||
<select id="queryTaskByPIdAndStatusAndType" resultType="java.lang.Integer">
|
||||
<select id="queryTaskByProcessIdAndStateAndType" resultType="java.lang.Integer">
|
||||
select id from t_ds_task_instance
|
||||
where process_instance_id = #{processInstanceId}
|
||||
and task_type = #{taskType}
|
||||
@ -162,7 +162,7 @@
|
||||
</foreach>
|
||||
and flag = 1
|
||||
</select>
|
||||
<select id="queryTaskBySubProcessTaskIdAndStatusAndType" resultType="java.lang.Integer">
|
||||
<select id="queryTaskBySubProcessTaskIdAndStateAndType" resultType="java.lang.Integer">
|
||||
select id from t_ds_task_instance
|
||||
where process_instance_id =
|
||||
(select process_instance_id from t_ds_relation_process_instance where parent_task_instance_id = #{subProcessTaskId})
|
||||
|
@ -320,7 +320,7 @@ public class TaskInstanceMapperTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryTaskByPIdAndStatusAndType() {
|
||||
public void testQueryTaskByProcessIdAndStateAndType() {
|
||||
// insert three task instances with the same process instance id
|
||||
List<TaskInstance> taskList = new ArrayList<>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
@ -329,7 +329,7 @@ public class TaskInstanceMapperTest {
|
||||
}
|
||||
|
||||
// test query result
|
||||
List<Integer> resultArray = taskInstanceMapper.queryTaskByPIdAndStatusAndType(66,
|
||||
List<Integer> resultArray = taskInstanceMapper.queryTaskByProcessIdAndStateAndType(66,
|
||||
new int[] {ExecutionStatus.FAILURE.ordinal(), ExecutionStatus.KILL.ordinal(), ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal()},
|
||||
TaskType.SUB_PROCESS.toString());
|
||||
Assert.assertEquals(3, resultArray.size());
|
||||
@ -341,7 +341,7 @@ public class TaskInstanceMapperTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryTaskBySubProcessTaskIdAndStatusAndType() {
|
||||
public void testQueryTaskBySubProcessTaskIdAndStateAndType() {
|
||||
TaskInstance parentTask = insertOne("parent-task", 66, ExecutionStatus.FAILURE, TaskType.SUB_PROCESS.toString());
|
||||
|
||||
ProcessInstanceMap processInstanceMap = new ProcessInstanceMap();
|
||||
@ -354,7 +354,7 @@ public class TaskInstanceMapperTest {
|
||||
TaskInstance subTask2 = insertOne("sub2", 67, ExecutionStatus.FORCED_SUCCESS, TaskType.SHELL.toString());
|
||||
|
||||
// test query result
|
||||
List<Integer> resultList = taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(parentTask.getId(),
|
||||
List<Integer> resultList = taskInstanceMapper.queryTaskBySubProcessTaskIdAndStateAndType(parentTask.getId(),
|
||||
new int[] {ExecutionStatus.FORCED_SUCCESS.ordinal()},
|
||||
null);
|
||||
|
||||
|
@ -1115,9 +1115,9 @@ public class MasterExecThread implements Runnable {
|
||||
TaskInstance task = readyToSubmitTaskQueue.peek();
|
||||
// stop tasks which is retrying if forced success happens
|
||||
if (task.taskCanRetry()) {
|
||||
TaskInstance tmpTask = processService.findTaskInstanceById(task.getId());
|
||||
if (tmpTask != null && tmpTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
|
||||
task.setState(tmpTask.getState());
|
||||
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
|
||||
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
|
||||
task.setState(retryTask.getState());
|
||||
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
|
||||
removeTaskFromStandbyList(task);
|
||||
completeTaskList.put(task.getName(), task);
|
||||
|
@ -1426,7 +1426,7 @@ public class ProcessService {
|
||||
for (int i = 0; i < states.length; i++) {
|
||||
statesArray[i] = states[i].ordinal();
|
||||
}
|
||||
return taskInstanceMapper.queryTaskByPIdAndStatusAndType(processInstanceId, statesArray, taskType.toString());
|
||||
return taskInstanceMapper.queryTaskByProcessIdAndStateAndType(processInstanceId, statesArray, taskType.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1443,9 +1443,9 @@ public class ProcessService {
|
||||
statesArray[i] = states[i].ordinal();
|
||||
}
|
||||
if (taskType == null) {
|
||||
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, null);
|
||||
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStateAndType(taskId, statesArray, null);
|
||||
}
|
||||
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStatusAndType(taskId, statesArray, taskType.toString());
|
||||
return taskInstanceMapper.queryTaskBySubProcessTaskIdAndStateAndType(taskId, statesArray, taskType.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -133,6 +133,10 @@ const runningType = [
|
||||
{
|
||||
desc: `${i18n.$t('Recovery waiting thread')}`,
|
||||
code: 'RECOVER_WAITTING_THREAD'
|
||||
},
|
||||
{
|
||||
desc: `${i18n.$t('Resume from forced success')}`,
|
||||
code: 'RESUME_FROM_FORCED_SUCCESS'
|
||||
}
|
||||
]
|
||||
|
||||
|
@ -79,7 +79,7 @@
|
||||
</el-tooltip>
|
||||
<el-tooltip :content="$t('Resume From Forced Success')" placement="top" :enterable="false">
|
||||
<span>
|
||||
<el-button type="success" size="mini" icon="el-icon-stopwatch" :disabled="scope.row.state !== 'FORCED_SUCCESS'" @click="_resumeFromForcedSuccess(scope.row,scope.$index)" circle></el-button>
|
||||
<el-button type="success" size="mini" icon="el-icon-stopwatch" :disabled="!(scope.row.state === 'FAILURE' || scope.row.state === 'NEED_FAULT_TOLERANCE' || scope.row.state === 'KILL')" @click="_resumeFromForcedSuccess(scope.row,scope.$index)" circle></el-button>
|
||||
</span>
|
||||
</el-tooltip>
|
||||
<el-tooltip :content="scope.row.state === 'STOP' ? $t('Recovery Suspend') : $t('Stop')" placement="top" :enterable="false">
|
||||
|
@ -116,15 +116,14 @@
|
||||
this.logDialog = true
|
||||
},
|
||||
ok () {},
|
||||
|
||||
close () {
|
||||
this.logDialog = false
|
||||
},
|
||||
|
||||
_forceSuccess (item) {
|
||||
this.forceTaskSuccess({ taskInstanceId: item.id }).then(res => {
|
||||
if (res.code === 0) {
|
||||
this.$message.success(res.msg)
|
||||
setTimeout(this._onUpdate, 1000)
|
||||
} else {
|
||||
this.$message.error(res.msg)
|
||||
}
|
||||
@ -132,6 +131,9 @@
|
||||
this.$message.error(e.msg)
|
||||
})
|
||||
},
|
||||
_onUpdate () {
|
||||
this.$emit('on-update')
|
||||
},
|
||||
_go (item) {
|
||||
this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` })
|
||||
}
|
||||
|
@ -23,7 +23,7 @@
|
||||
|
||||
<template slot="content">
|
||||
<template v-if="taskInstanceList.length">
|
||||
<m-list :task-instance-list="taskInstanceList" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
|
||||
<m-list :task-instance-list="taskInstanceList" @on-update="_onUpdate" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
|
||||
</m-list>
|
||||
<div class="page-box">
|
||||
<el-pagination
|
||||
@ -126,6 +126,12 @@
|
||||
this.isLoading = false
|
||||
})
|
||||
},
|
||||
/**
|
||||
* update
|
||||
*/
|
||||
_onUpdate () {
|
||||
this._debounceGET()
|
||||
},
|
||||
/**
|
||||
* Anti shake request interface
|
||||
* @desc Prevent functions from being called multiple times
|
||||
|
Loading…
Reference in New Issue
Block a user