diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index c120fb0fc1..98e69ab1f4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -139,7 +139,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType, @RequestParam(value = "execType", required = false) CommandType execType, @RequestParam(value = "warningType") WarningType warningType, - @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId, + @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @@ -226,7 +226,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType, @RequestParam(value = "execType", required = false) CommandType execType, @RequestParam(value = "warningType") WarningType warningType, - @RequestParam(value = "warningGroupId", required = false) int warningGroupId, + @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 93b3c91248..89fcc5fcf3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -333,7 +333,7 @@ public class PythonGateway { String cronTime, String workerGroup, String warningType, - int warningGroupId, + Integer warningGroupId, Integer timeout ) { User user = usersService.queryUser(userName); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index df44dc1445..9aaabb36b6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -61,7 +61,7 @@ public interface ExecutorService { Map execProcessInstance(User loginUser, long projectCode, long processDefinitionCode, String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, - TaskDependType taskDependType, WarningType warningType, int warningGroupId, + TaskDependType taskDependType, WarningType warningType, Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index a5bfe2780a..41ac529318 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -173,7 +173,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ String cronTime, CommandType commandType, FailureStrategy failureStrategy, String startNodeList, TaskDependType taskDependType, WarningType warningType, - int warningGroupId, RunMode runMode, + Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, @@ -714,7 +714,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, FailureStrategy failureStrategy, String startNodeList, String schedule, - WarningType warningType, int executorId, int warningGroupId, RunMode runMode, + WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map startParams, Integer expectedParallelismNumber, int dryRun, int testFlag, ComplementDependentMode complementDependentMode) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java index f55aa9f8be..fcfa068ae4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java @@ -209,9 +209,9 @@ public class ExecutorControllerTest extends AbstractControllerTest { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), - eq(0), eq(null), eq(null), eq("default"), eq(-1L), - eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), - eq(complementDependentMode))).thenReturn(executeServiceResult); + eq(null), eq(null), eq(null), eq("default"), eq(-1L), + eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), + eq(complementDependentMode))).thenReturn(executeServiceResult); //When final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index e50fb31ce9..8a8fd243a1 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -220,7 +220,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_SERIAL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -242,7 +242,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.START_PROCESS, null, "n1,n2", - null, null, 0, + null, null, null, RunMode.RUN_MODE_SERIAL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -308,7 +308,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}", CommandType.COMPLEMENT_DATA, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_SERIAL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -329,7 +329,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_SERIAL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -350,7 +350,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_PARALLEL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -372,7 +372,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_PARALLEL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); @@ -390,7 +390,7 @@ public class ExecutorServiceTest { "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", CommandType.COMPLEMENT_DATA, null, null, - null, null, 0, + null, null, null, RunMode.RUN_MODE_PARALLEL, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 169aa7aab9..dfdf4521a6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.dolphinscheduler.common.enums.AlertEvent; import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertType; @@ -48,6 +49,8 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -60,6 +63,11 @@ import com.google.common.collect.Lists; @Component public class AlertDao { + /** + * logger of AlertDao + */ + private static final Logger logger = LoggerFactory.getLogger(AlertDao.class); + private static final int QUERY_ALERT_THRESHOLD = 100; @Value("${alert.alarm-suppression.crash:60}") @@ -84,9 +92,16 @@ public class AlertDao { * @return add alert result */ public int addAlert(Alert alert) { + if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) { + logger.warn("the value of alertGroupId is null or 0 "); + return 0; + } + String sign = generateSign(alert); alert.setSign(sign); - return alertMapper.insert(alert); + int count = alertMapper.insert(alert); + logger.info("add alert to db , alert: {}", alert); + return count; } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index 6e7414c286..27df7bd573 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -169,7 +169,7 @@ public class ProcessDefinition { * warningGroupId */ @TableField(exist = false) - private int warningGroupId; + private Integer warningGroupId; /** * execution type @@ -226,5 +226,4 @@ public class ProcessDefinition { return globalParamMap; } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 3d448f2ebf..7716314444 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -716,9 +716,7 @@ public class WorkflowExecuteRunnable implements Callable { checkSerialProcess(processDefinition); } ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - if (processAlertManager.isNeedToSendWarning(processInstance)) { - processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); - } + processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); if (processInstance.getState().isSuccess()) { processAlertManager.closeAlert(processInstance); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java index ab8b162c08..2604354fbc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java @@ -200,7 +200,6 @@ public class ProcessAlertManager { processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId()); alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING); alertDao.addAlert(alert); - logger.info("add alert to db , alert : {}", alert); } catch (Exception e) { logger.error("send alert failed:{} ", e.getMessage()); @@ -217,13 +216,10 @@ public class ProcessAlertManager { public void sendAlertProcessInstance(ProcessInstance processInstance, List taskInstances, ProjectUser projectUser) { - if (!isNeedToSendWarning(processInstance)) { return; } - Alert alert = new Alert(); - String cmdName = getCommandCnName(processInstance.getCommandType()); String success = processInstance.getState().isSuccess() ? "success" : "failed"; alert.setTitle(cmdName + " " + success); @@ -238,7 +234,6 @@ public class ProcessAlertManager { alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE); alertDao.addAlert(alert); - logger.info("add alert to db , alert: {}", alert); } /** @@ -325,7 +320,6 @@ public class ProcessAlertManager { alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS : AlertType.PROCESS_INSTANCE_FAILURE); alertDao.addAlert(alert); - logger.info("add alert to db , alert: {}", alert); } /** @@ -342,7 +336,6 @@ public class ProcessAlertManager { alert.setProcessInstanceId(processInstance.getId()); alert.setAlertType(AlertType.TASK_FAILURE); alertDao.addAlert(alert); - logger.info("add alert to db , alert: {}", alert); } /** @@ -440,6 +433,5 @@ public class ProcessAlertManager { alert.setProcessInstanceId(processInstance.getId()); alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED); alertDao.addAlert(alert); - logger.info("add alert to db, alert: {}", alert); } }