[fix][python] schedule unexpect offline each time update from api (#13301)

* Alway set workflow online before set schedule online
* Avoid return map in interface setScheduleState
This commit is contained in:
Jay Chung 2022-12-30 09:59:16 +08:00 committed by GitHub
parent 8503ee0eed
commit d42f576268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 71 deletions

View File

@ -193,8 +193,8 @@ public class SchedulerController extends BaseController {
public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
Map<String, Object> result = schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
return returnDataList(result);
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
return Result.success();
}
/**
@ -215,10 +215,8 @@ public class SchedulerController extends BaseController {
public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
Map<String, Object> result =
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
return returnDataList(result);
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
return Result.success();
}
/**

View File

@ -348,6 +348,8 @@ public class PythonGateway {
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
// Always set workflow online to set schedule online
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}

View File

@ -128,12 +128,11 @@ public interface SchedulerService {
* @param projectCode project code
* @param id scheduler id
* @param scheduleStatus schedule status
* @return publish result code
*/
Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus);
void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus);
/**
* query schedule

View File

@ -426,57 +426,48 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
@Override
@Transactional
public Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<>();
public void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Project project = projectMapper.queryByCode(projectCode);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
projectService.checkProjectAndAuthThrowException(loginUser, project, null);
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
logger.error("Schedule does not exist, scheduleId:{}.", id);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id);
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
logger.error("Process definition does not exist, processDefinitionCode:{}.",
scheduleObj.getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(scheduleObj.getProcessDefinitionCode()));
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
@ -496,9 +487,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
logger.warn(
"Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
return result;
}
}
}
@ -510,8 +500,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (masterServers.isEmpty()) {
logger.error("Master does not exist.");
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
// set status
@ -532,20 +521,15 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
deleteSchedule(project.getId(), id);
break;
default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
}
} catch (Exception e) {
logger.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(),
projectCode, scheduleObj.getId());
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
result.put(Constants.STATUS, status);
throw new ServiceException(status, e);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**

View File

@ -120,8 +120,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "37");
Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
isA(ReleaseState.class))).thenReturn(success());
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37)
.header(SESSION_ID, sessionId)
@ -140,8 +141,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "28");
Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
isA(ReleaseState.class))).thenReturn(success());
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28)
.header(SESSION_ID, sessionId)

View File

@ -26,10 +26,11 @@ import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
@ -41,7 +42,8 @@ import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -118,7 +120,6 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
@Test
public void testSetScheduleState() {
Map<String, Object> result;
Project project = getProject();
ProcessDefinition processDefinition = new ProcessDefinition();
@ -130,40 +131,49 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
// hash no auth
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
Mockito.when(projectService.hasProjectAndPerm(user, project, result, null)).thenReturn(true);
// schedule not exists
result = schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
// SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
// SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(),
((ServiceException) exception).getCode());
// PROCESS_DEFINE_NOT_EXIST
schedule.setProcessDefinitionCode(2);
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
schedule.setProcessDefinitionCode(1);
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
processDefinition.setReleaseState(ReleaseState.ONLINE);
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
// online also success
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList);
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode());
// SUCCESS
result = schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
Assertions.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
Server server = new Server();
List<Server> serverList = new ArrayList<>();
serverList.add(server);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Assertions.assertDoesNotThrow(() -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
}
@Test
@ -428,6 +438,7 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
private Project getProject() {
Project project = new Project();
project.setId(1);
project.setName(projectName);
project.setCode(projectCode);
project.setUserId(userId);