[fix][python] schedule unexpect offline each time update from api #13301

This commit is contained in:
Jay Chung 2022-12-30 09:59:16 +08:00 committed by zhuangchong
parent 9564bdeefb
commit 543626aa7a
6 changed files with 106 additions and 93 deletions

View File

@ -185,8 +185,8 @@ public class SchedulerController extends BaseController {
public Result publishScheduleOnline(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
Map<String, Object> result = schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
return returnDataList(result);
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
return Result.success();
}
/**
@ -207,10 +207,8 @@ public class SchedulerController extends BaseController {
public Result offlineSchedule(@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
Map<String, Object> result =
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
return returnDataList(result);
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
return Result.success();
}
/**

View File

@ -338,6 +338,8 @@ public class PythonGateway {
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
}
// Always set workflow online to set schedule online
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}

View File

@ -92,12 +92,11 @@ public interface SchedulerService {
* @param projectCode project code
* @param id scheduler id
* @param scheduleStatus schedule status
* @return publish result code
*/
Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus);
void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus);
/**
* query schedule

View File

@ -279,52 +279,48 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
@Override
@Transactional
public Map<String, Object> setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<>();
public void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Project project = projectMapper.queryByCode(projectCode);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result, null);
if (!hasProjectAndPerm) {
return result;
}
projectService.checkProjectAndAuthThrowException(loginUser, project, null);
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
logger.error("Schedule does not exist, scheduleId:{}.", id);
throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id);
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
logger.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
logger.error("Process definition does not exist, processDefinitionCode:{}.",
scheduleObj.getProcessDefinitionCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(scheduleObj.getProcessDefinitionCode()));
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}", processDefinition.getId(),
processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
logger.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
@ -338,11 +334,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
logger.warn(
"Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
return result;
}
}
}
@ -353,8 +349,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
logger.error("Master does not exist.");
throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
// set status
@ -375,18 +371,13 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
deleteSchedule(project.getId(), id);
break;
default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
}
} catch (Exception e) {
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
result.put(Constants.STATUS, status);
throw new ServiceException(status, e);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**

View File

@ -119,8 +119,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id","37");
Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
isA(ReleaseState.class))).thenReturn(success());
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online",123, 37)
.header(SESSION_ID, sessionId)
@ -139,8 +140,9 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id","28");
Mockito.when(schedulerService.setScheduleState(isA(User.class), isA(Long.class), isA(Integer.class),
isA(ReleaseState.class))).thenReturn(success());
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline",123, 28)
.header(SESSION_ID, sessionId)

View File

@ -18,12 +18,14 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
@ -43,6 +45,8 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -83,19 +87,31 @@ public class SchedulerServiceTest {
@Mock
private SchedulerApi schedulerApi;
@Before
public void setUp() {
protected static User user;
protected Exception exception;
private static final String userName = "userName";
private static final String projectName = "projectName";
private static final long projectCode = 1L;
private static final int userId = 1;
private static final String processDefinitionName = "processDefinitionName";
private static final long processDefinitionCode = 2L;
private static final int processDefinitionVersion = 3;
private static final int scheduleId = 3;
private static final long environmentCode = 4L;
private static final String startTime = "2020-01-01 12:13:14";
private static final String endTime = "2020-02-01 12:13:14";
private static final String crontab = "0 0 * * * ? *";
@BeforeEach
public void setUp() {
user = new User();
user.setUserName(userName);
user.setId(userId);
}
@Test
public void testSetScheduleState() {
String projectName = "test";
long projectCode = 1L;
User loginUser = new User();
loginUser.setId(1);
Map<String, Object> result = new HashMap<String, Object>();
Project project = getProject(projectName, projectCode);
Project project = getProject();
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectCode(projectCode);
@ -109,53 +125,58 @@ public class SchedulerServiceTest {
masterServers.add(new Server());
Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(project);
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
// schedule not exists
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
//hash no auth
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Mockito.when(projectService.hasProjectAndPerm(loginUser, project, result,null)).thenReturn(true);
//schedule not exists
result = schedulerService.setScheduleState(loginUser, project.getCode(), 2, ReleaseState.ONLINE);
Assert.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS, result.get(Constants.STATUS));
//SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, result.get(Constants.STATUS));
// SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(),
((ServiceException) exception).getCode());
//PROCESS_DEFINE_NOT_EXIST
schedule.setProcessDefinitionCode(2);
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, result.get(Constants.STATUS));
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
schedule.setProcessDefinitionCode(1);
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
// online also success
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList);
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode());
// SUCCESS
Server server = new Server();
List<Server> serverList = new ArrayList<>();
serverList.add(server);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition);
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
//set master
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(masterServers);
//SUCCESS
result = schedulerService.setScheduleState(loginUser, project.getCode(), 1, ReleaseState.ONLINE);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, result.get(Constants.STATUS));
Assertions.assertDoesNotThrow(() -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
}
private Project getProject(String name, long code) {
private Project getProject() {
Project project = new Project();
project.setName(name);
project.setCode(code);
project.setUserId(1);
project.setId(1);
project.setName(projectName);
project.setCode(projectCode);
project.setUserId(userId);
return project;
}