This commit is contained in:
break60 2021-01-11 15:36:39 +08:00
commit 45e3084c02
24 changed files with 423 additions and 103 deletions

View File

@ -17,32 +17,33 @@ Dolphin Scheduler Official Website
### Design features:
A distributed and easy-to-extend visual DAG workflow scheduling system. Dedicated to solving the complex dependencies in data processing, making the scheduling system `out of the box` for data processing.
Dolphin Scheduler is a distributed and easy-to-extend visual DAG workflow scheduling system. It dedicates to solving the complex dependencies in data processing to make the scheduling system `out of the box` for the data processing process.
Its main objectives are as follows:
- Associate the Tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of task in real time.
- Support for many task types: Shell, MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Sub_Process, Procedure, etc.
- Associate the tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of the task in real-time.
- Support many task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc.
- Support process scheduling, dependency scheduling, manual scheduling, manual pause/stop/recovery, support for failed retry/alarm, recovery from specified nodes, Kill task, etc.
- Support process priority, task priority and task failover and task timeout alarm/failure
- Support process global parameters and node custom parameter settings
- Support online upload/download of resource files, management, etc. Support online file creation and editing
- Support the priority of process & task, task failover, and task timeout alarm or failure.
- Support process global parameters and node custom parameter settings.
- Support online upload/download of resource files, management, etc. Support online file creation and editing.
- Support task log online viewing and scrolling, online download log, etc.
- Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper
- Support online viewing of `Master/Worker` cpu load, memory
- Support process running history tree/gantt chart display, support task status statistics, process status statistics
- Support backfilling data
- Support multi-tenant
- Support internationalization
- There are more waiting partners to explore
- Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper.
- Support the viewing of Master/Worker CPU load, memory, and CPU usage metrics.
- Support presenting tree or Gantt chart of workflow history as well as the statistics results of task & process status in each workflow.
- Support backfilling data.
- Support multi-tenant.
- Support internationalization.
- There are more waiting for partners to explore...
### What's in Dolphin Scheduler
Stability | Easy to use | Features | Scalability |
-- | -- | -- | --
Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables and so on at a glance.  |  Support pause, recover operation | support custom task types
HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the api mode operation is provided. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. " | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline.
Overload processing: Task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured, when too many tasks will be cached in the task queue, will not cause machine jam. | One-click deployment | Supports traditional shell tasks, and also support big data platform task scheduling: MR, Spark, SQL (mysql, postgresql, hive, sparksql), Python, Procedure, Sub_Process | |
Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables, and so on at a glance.  |  Support pause, recover operation | Support custom task types
HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the API mode operation is provided. | Users on Dolphin Scheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline.
Overload processing: Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | |
### System partial screenshot
@ -55,17 +56,14 @@ Overload processing: Task queue mechanism, the number of schedulable tasks on a
![monitor](https://user-images.githubusercontent.com/59273635/75625839-c698a480-5bfc-11ea-8bbe-895b561b337f.png)
![security](https://user-images.githubusercontent.com/15833811/75236441-bfd2f180-57f8-11ea-88bd-f24311e01b7e.png)
![treeview](https://user-images.githubusercontent.com/15833811/75217191-3fe56100-57d1-11ea-8856-f19180d9a879.png)
### Online Demo
- <a href="http://106.75.43.194:8888" target="_blank">Online Demo</a>
### Recent R&D plan
Work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), Under the `In Develop` card is what is currently being developed, TODO card is to be done (including feature ideas)
The work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), which `In Develop` card shows the features that are currently being developed and TODO card lists what needs to be done(including feature ideas).
### How to contribute
Welcome to participate in contributing, please refer to the process of submitting the code:
[[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)]
Welcome to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)]
### How to Build
@ -82,15 +80,15 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release
### Thanks
Dolphin Scheduler uses a lot of excellent open source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open source projects of apache, etc.
It is because of the shoulders of these open source projects that the birth of the Dolphin Scheduler is possible. We are very grateful for all the open source software used! We also hope that we will not only be the beneficiaries of open source, but also be open source contributors. We also hope that partners who have the same passion and conviction for open source will join in and contribute to open source!
Dolphin Scheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects which contribute to making the dream of Dolphin Scheduler comes true. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we expect the partners who have the same passion and conviction to open-source will join in and contribute to the open-source community!
### Get Help
1. Submit an issue
1. Subscribe the mail list : https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html. then send mail to dev@dolphinscheduler.apache.org
1. Slack channel: [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://join.slack.com/share/zt-do3gvfhj-UUhrAX2GxkVX_~JJt1jpKA)
1. Contact WeChat(dailidong66). This is just for Mandarin(CN) discussion.
1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org
### License
Please refer to [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file.
Please refer to the [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file.

View File

@ -50,10 +50,6 @@ Dolphin Scheduler Official Website
![security](https://user-images.githubusercontent.com/15833811/75209633-baa28200-57b9-11ea-9def-94bef2e212a7.jpg)
### 我要体验
- <a href="http://106.75.43.194:8888" target="_blank">我要体验</a>
### 近期研发计划
@ -86,7 +82,6 @@ Dolphin Scheduler使用了很多优秀的开源项目比如google的guava、g
### 获得帮助
1. 提交issue
1. 先订阅邮件开发列表:[订阅邮件列表](https://dolphinscheduler.apache.org/zh-cn/docs/development/subscribe.html), 订阅成功后发送邮件到dev@dolphinscheduler.apache.org.
1. 联系微信群助手(ID:dailidong66). 微信仅用于中国用户讨论.
### 版权
请参考 [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) 文件.

View File

@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.ArrayList;
@ -372,7 +373,7 @@ public class ProcessDefinitionController extends BaseController {
}
/**
* query datail of process definition
* query datail of process definition by id
*
* @param loginUser login user
* @param projectName project name
@ -396,6 +397,29 @@ public class ProcessDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* query datail of process definition by name
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionName process definition name
* @return process definition detail
*/
@ApiOperation(value = "queryProcessDefinitionByName", notes = "QUERY_PROCESS_DEFINITION_BY_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionName", value = "PROCESS_DEFINITION_ID", required = true, dataType = "String")
})
@GetMapping(value = "/select-by-name")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR)
public Result<ProcessDefinition> queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam("processDefinitionName") String processDefinitionName
) {
Map<String, Object> result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectName, processDefinitionName);
return returnDataList(result);
}
/**
* query Process definition list
*

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -127,4 +129,27 @@ public class TaskInstanceController extends BaseController {
return returnDataListPaging(result);
}
/**
* change one task instance's state from FAILURE to FORCED_SUCCESS
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12")
})
@PostMapping(value = "/force-success")
@ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR)
public Result<Object> forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "taskInstanceId") Integer taskInstanceId) {
logger.info("force task success, project: {}, task instance id: {}", projectName, taskInstanceId);
Map<String, Object> result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId);
return returnDataList(result);
}
}

View File

@ -57,6 +57,16 @@ public class TaskCountDto {
.sum();
}
// remove the specified state
public void removeStateFromCountList(ExecutionStatus status) {
for (TaskStateCount count : this.taskCountDtos) {
if (count.getTaskStateType().equals(status)) {
this.taskCountDtos.remove(count);
break;
}
}
}
public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos;
}

View File

@ -30,7 +30,7 @@ public enum ExecuteType {
* 4 stop
* 5 pause
*/
NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
public static ExecuteType getEnum(int value){

View File

@ -196,6 +196,9 @@ public enum Status {
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
@ -247,7 +250,7 @@ public enum Status {
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),

View File

@ -95,6 +95,19 @@ public interface ProcessDefinitionService {
String projectName,
Integer processId);
/**
* query datail of process definition
*
* @param loginUser login user
* @param projectName project name
* @param processDefinitionName process definition name
* @return process definition detail
*/
Map<String, Object> queryProcessDefinitionByName(User loginUser,
String projectName,
String processDefinitionName);
/**
* batch copy process definition
*

View File

@ -145,6 +145,50 @@ public class TaskInstanceService extends BaseService {
return result;
}
/**
* change one task instance's state from failure to forced success
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
}
/***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map

View File

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.service.DataAnalysisService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -114,12 +115,17 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* @return process instance state count data
*/
public Map<String, Object> countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) {
return this.countStateByProject(
Map<String, Object> result = this.countStateByProject(
loginUser,
projectId,
startDate,
endDate,
(start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds));
// process state count needs to remove state of forced success
if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) {
((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS);
}
return result;
}
private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate

View File

@ -349,7 +349,29 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS);
}
return result;
}
@Override
public Map<String, Object> queryProcessDefinitionByName(User loginUser, String projectName, String processDefinitionName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
return checkResult;
}
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(),processDefinitionName);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
} else {
result.put(Constants.DATA_LIST, processDefinition);
putMsg(result, Status.SUCCESS);

View File

@ -172,6 +172,7 @@ PROCESS_DEFINITION_ID=process definition id
PROCESS_DEFINITION_IDS=process definition ids
RELEASE_PROCESS_DEFINITION_NOTES=release process definition
QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id
QUERY_PROCESS_DEFINITION_BY_NAME_NOTES=query process definition by name
QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list
QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging
QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list

View File

@ -18,8 +18,13 @@
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@ -27,24 +32,28 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* task instance controller test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskInstanceControllerTest {
public class TaskInstanceControllerTest extends AbstractControllerTest {
@InjectMocks
private TaskInstanceController taskInstanceController;
@ -67,7 +76,27 @@ public class TaskInstanceControllerTest {
Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "",
"", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize);
Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
}
@Ignore
@Test
public void testForceTaskSuccess() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId", "104");
Map<String, Object> mockResult = new HashMap<>(5);
mockResult.put(Constants.STATUS, Status.SUCCESS);
mockResult.put(Constants.MSG, Status.SUCCESS.getMsg());
when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
}

View File

@ -25,12 +25,14 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -82,12 +84,16 @@ public class ExecutorService2Test {
private int processDefinitionId = 1;
private int processInstanceId = 1;
private int tenantId = 1;
private int userId = 1;
private ProcessDefinition processDefinition = new ProcessDefinition();
private ProcessInstance processInstance = new ProcessInstance();
private User loginUser = new User();
private String projectName = "projectName";
@ -107,6 +113,13 @@ public class ExecutorService2Test {
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);
// processInstance
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);
// project
project.setName(projectName);
@ -120,6 +133,8 @@ public class ExecutorService2Test {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition);
}
/**

View File

@ -341,7 +341,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionById(loginUser,
"project_test1", 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition());
@ -350,6 +350,41 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testQueryProcessDefinitionByName() {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
//project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test_def")).thenReturn(null);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getId(),"test")).thenReturn(getProcessDefinition());
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser,
"project_test1", "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@Test
public void testBatchCopyProcessDefinition() {

View File

@ -214,4 +214,48 @@ public class TaskInstanceServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
}
@Test
public void forceTaskSuccess() {
User user = getAdminUser();
String projectName = "test";
Project project = getProject(projectName);
int taskId = 1;
TaskInstance task = getTaskInstance();
Map<String, Object> mockSuccess = new HashMap<>(5);
putMsg(mockSuccess, Status.SUCCESS);
when(projectMapper.queryByName(projectName)).thenReturn(project);
// user auth failed
Map<String, Object> mockFailure = new HashMap<>(5);
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName);
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure);
Map<String, Object> authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS));
// test task not found
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
Map<String, Object> taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));
// test task instance state error
task.setState(ExecutionStatus.SUCCESS);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
Map<String, Object> taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS));
// test error
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(0);
Map<String, Object> errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS));
// test success
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(1);
Map<String, Object> successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
}

View File

@ -41,6 +41,7 @@ public enum ExecutionStatus {
* 10 waiting thread
* 11 waiting depend node complete
* 12 delay execution
* 13 forced success
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
@ -54,7 +55,8 @@ public enum ExecutionStatus {
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution");
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");
ExecutionStatus(int code, String descp) {
this.code = code;
@ -79,7 +81,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsSuccess() {
return this == SUCCESS;
return this == SUCCESS || this == FORCED_SUCCESS;
}
/**

View File

@ -63,7 +63,7 @@ public class VarPoolUtils {
* @throws ParseException ParseException
*/
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (varPool == null || propToValue == null) {
if (propToValue == null || StringUtils.isEmpty(varPool)) {
return;
}
String[] splits = varPool.split("\\$VarPool\\$");

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -27,6 +27,10 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -36,9 +40,6 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@ -55,20 +56,38 @@ public class TaskInstanceMapperTest {
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
ProcessInstanceMapMapper processInstanceMapMapper;
/**
* insert
*
* @return TaskInstance
*/
private TaskInstance insertOne(){
private TaskInstance insertOne() {
//insertOne
return insertOne("us task", 1, ExecutionStatus.RUNNING_EXECUTION, TaskType.SHELL.toString());
}
/**
* construct a task instance and then insert
*
* @param taskName
* @param processInstanceId
* @param state
* @param taskType
* @return
*/
private TaskInstance insertOne(String taskName, int processInstanceId, ExecutionStatus state, String taskType) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setFlag(Flag.YES);
taskInstance.setName("ut task");
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setName(taskName);
taskInstance.setState(state);
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setTaskJson("{}");
taskInstance.setTaskType(TaskType.SHELL.toString());
taskInstance.setProcessInstanceId(processInstanceId);
taskInstance.setTaskType(taskType);
taskInstanceMapper.insert(taskInstance);
return taskInstance;
}
@ -90,7 +109,7 @@ public class TaskInstanceMapperTest {
* test delete
*/
@Test
public void testDelete(){
public void testDelete() {
TaskInstance taskInstance = insertOne();
int delete = taskInstanceMapper.deleteById(taskInstance.getId());
Assert.assertEquals(1, delete);
@ -149,7 +168,7 @@ public class TaskInstanceMapperTest {
taskInstanceMapper.deleteById(task2.getId());
taskInstanceMapper.deleteById(task.getId());
Assert.assertNotEquals(taskInstances.size(), 0);
Assert.assertNotEquals(taskInstances1.size(), 0 );
Assert.assertNotEquals(taskInstances1.size(), 0);
}
/**
@ -298,4 +317,4 @@ public class TaskInstanceMapperTest {
Assert.assertNotEquals(taskInstanceIPage.getTotal(), 0);
}
}
}

View File

@ -987,6 +987,7 @@ public class MasterExecThread implements Runnable {
// updateProcessInstance completed task status
// failure priority is higher than pause
// if a task fails, other suspended tasks need to be reset kill
// check if there exists forced success nodes in errorTaskList
if (errorTaskList.size() > 0) {
for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
TaskInstance completeTask = entry.getValue();
@ -996,6 +997,22 @@ public class MasterExecThread implements Runnable {
processService.updateTaskInstance(completeTask);
}
}
for (Map.Entry<String, TaskInstance> entry : errorTaskList.entrySet()) {
TaskInstance errorTask = entry.getValue();
TaskInstance currentTask = processService.findTaskInstanceById(errorTask.getId());
if (currentTask == null) {
continue;
}
// for nodes that have been forced success
if (errorTask.getState().typeIsFailure() && currentTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
// update state in this thread and remove from errorTaskList
errorTask.setState(currentTask.getState());
logger.info("task: {} has been forced success, remove it from error task list", errorTask.getName());
errorTaskList.remove(errorTask.getName());
// submit post nodes
submitPostNode(errorTask.getName());
}
}
}
if (canSubmitTaskToQueue()) {
submitStandByTask();
@ -1096,6 +1113,18 @@ public class MasterExecThread implements Runnable {
int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;
}
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) {

View File

@ -17,7 +17,7 @@
<template>
<div class="list-model" style="position: relative;">
<div class="table-box">
<el-table :data="list" size="mini" style="width: 100%" @selection-change="_arrDelChange">
<el-table class="fixed" :data="list" size="mini" style="width: 100%" @selection-change="_arrDelChange">
<el-table-column type="selection" width="50"></el-table-column>
<el-table-column type="index" :label="$t('#')" width="50"></el-table-column>
<el-table-column :label="$t('Process Name')" min-width="200">
@ -102,35 +102,34 @@
<div v-show="!scope.row.disabled">
<!--Edit-->
<el-button
type="info"
size="mini"
icon="el-icon-edit-outline"
disabled="true"
circle>
type="info"
size="mini"
icon="el-icon-edit-outline"
disabled="true"
circle>
</el-button>
<!--Rerun-->
<el-tooltip :content="$t('Rerun')" placement="top" :enterable="false">
<span>
<el-button
v-show="buttonType === 'run'"
type="info"
size="mini"
disabled="true"
circle>
<span style="padding: 0 2px">{{scope.row.count}}</span>
</el-button>
</span>
</el-tooltip>
<span>
<el-button
v-show="buttonType === 'run'"
type="info"
size="mini"
disabled="true"
circle>
<span style="padding: 0 2px">{{scope.row.count}}</span>
</el-button>
</span>
<el-button
v-show="buttonType !== 'run'"
type="info"
size="mini"
icon="el-icon-refresh"
disabled="true"
circle>
v-show="buttonType !== 'run'"
type="info"
size="mini"
icon="el-icon-refresh"
disabled="true"
circle>
</el-button>
<!--Store-->
<span>
<el-button
v-show="buttonType === 'store'"
@ -153,13 +152,12 @@
<!--Recovery Suspend/Pause-->
<span>
<el-button
style="padding: 0 3px"
v-show="(scope.row.state === 'PAUSE' || scope.row.state === 'STOP') && buttonType === 'suspend'"
type="warning"
size="mini"
circle
disabled="true">
{{scope.row.count}}
<span style="padding: 0 3px">{{scope.row.count}}</span>
</el-button>
</span>
@ -185,7 +183,19 @@
</el-button>
</span>
<!--delete-->
<!--Stop-->
<span>
<el-button
v-show="scope.row.state !== 'STOP'"
type="warning"
size="mini"
circle
icon="el-icon-video-pause"
disabled="true">
</el-button>
</span>
<!--Delete-->
<el-button
type="danger"
circle
@ -286,7 +296,6 @@
* @param REPEAT_RUNNING
*/
_reRun (item, index) {
console.log(index)
this._countDownFn({
id: item.id,
executeType: 'REPEAT_RUNNING',

View File

@ -213,21 +213,10 @@
.fixed {
table-layout: auto;
tr {
th:last-child,td:last-child {
background: inherit;
width: 230px;
height: 40px;
line-height: 40px;
border-left:1px solid #ecf3ff;
position: absolute;
right: 0;
z-index: 2;
}
td:last-child {
border-bottom:1px solid #ecf3ff;
}
th:nth-last-child(2) {
padding-right: 260px;
.el-button+.el-button {
margin-left: 0;
}
}
}
}

View File

@ -117,15 +117,14 @@
this.logDialog = true
},
ok () {},
close () {
this.logDialog = false
},
_forceSuccess (item) {
this.forceTaskSuccess({ taskInstanceId: item.id }).then(res => {
if (res.code === 0) {
this.$message.success(res.msg)
setTimeout(this._onUpdate, 1000)
} else {
this.$message.error(res.msg)
}
@ -133,6 +132,9 @@
this.$message.error(e.msg)
})
},
_onUpdate () {
this.$emit('on-update')
},
_go (item) {
this.$router.push({ path: `/projects/instance/list/${item.processInstanceId}` })
}

View File

@ -23,7 +23,7 @@
<template slot="content">
<template v-if="taskInstanceList.length">
<m-list :task-instance-list="taskInstanceList" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
<m-list :task-instance-list="taskInstanceList" @on-update="_onUpdate" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
</m-list>
<div class="page-box">
<el-pagination
@ -126,6 +126,12 @@
this.isLoading = false
})
},
/**
* update
*/
_onUpdate () {
this._debounceGET()
},
/**
* Anti shake request interface
* @desc Prevent functions from being called multiple times