[fix-#11753] send alert error alert data id (#11774)

* [fix-#11753] send alert error alert data id

Co-authored-by: fuchanghai <changhai.fu@marketingforce.com>
This commit is contained in:
fuchanghai 2022-09-29 15:34:40 +08:00 committed by GitHub
parent 57fafe4256
commit e27c79974d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 34 additions and 30 deletions

View File

@ -139,7 +139,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType, @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType, @RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType, @RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId, @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@ -226,7 +226,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType, @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType, @RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType, @RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false) int warningGroupId, @RequestParam(value = "warningGroupId", required = false) Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,

View File

@ -333,7 +333,7 @@ public class PythonGateway {
String cronTime, String cronTime,
String workerGroup, String workerGroup,
String warningType, String warningType,
int warningGroupId, Integer warningGroupId,
Integer timeout Integer timeout
) { ) {
User user = usersService.queryUser(userName); User user = usersService.queryUser(userName);

View File

@ -61,7 +61,7 @@ public interface ExecutorService {
Map<String, Object> execProcessInstance(User loginUser, long projectCode, Map<String, Object> execProcessInstance(User loginUser, long projectCode,
long processDefinitionCode, String cronTime, CommandType commandType, long processDefinitionCode, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId, TaskDependType taskDependType, WarningType warningType, Integer warningGroupId,
RunMode runMode, RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Integer timeout, Integer timeout,

View File

@ -173,7 +173,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
String cronTime, CommandType commandType, String cronTime, CommandType commandType,
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, TaskDependType taskDependType, WarningType warningType,
int warningGroupId, RunMode runMode, Integer warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup, Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber, Map<String, String> startParams, Integer expectedParallelismNumber,
@ -714,7 +714,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/ */
private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep, private int createCommand(CommandType commandType, long processDefineCode, TaskDependType nodeDep,
FailureStrategy failureStrategy, String startNodeList, String schedule, FailureStrategy failureStrategy, String startNodeList, String schedule,
WarningType warningType, int executorId, int warningGroupId, RunMode runMode, WarningType warningType, int executorId, Integer warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun, Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
int testFlag, ComplementDependentMode complementDependentMode) { int testFlag, ComplementDependentMode complementDependentMode) {

View File

@ -209,9 +209,9 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
eq(0), eq(null), eq(null), eq("default"), eq(-1L), eq(null), eq(null), eq(null), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
eq(complementDependentMode))).thenReturn(executeServiceResult); eq(complementDependentMode))).thenReturn(executeServiceResult);
//When //When
final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode) final MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/executors/start-process-instance", projectCode)

View File

@ -220,7 +220,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.START_PROCESS, CommandType.START_PROCESS,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -242,7 +242,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.START_PROCESS, CommandType.START_PROCESS,
null, "n1,n2", null, "n1,n2",
null, null, 0, null, null, null,
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -308,7 +308,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}", "{\"complementStartDate\":\"2022-01-07 12:12:12\",\"complementEndDate\":\"2022-01-06 12:12:12\"}",
CommandType.COMPLEMENT_DATA, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -329,7 +329,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_SERIAL, RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -350,7 +350,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_PARALLEL, RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -372,7 +372,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_PARALLEL, RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);
@ -390,7 +390,7 @@ public class ExecutorServiceTest {
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, null,
RunMode.RUN_MODE_PARALLEL, RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE); ComplementDependentMode.OFF_MODE);

View File

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao; package org.apache.dolphinscheduler.dao;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.enums.AlertEvent; import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.AlertType;
@ -48,6 +49,8 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -60,6 +63,11 @@ import com.google.common.collect.Lists;
@Component @Component
public class AlertDao { public class AlertDao {
/**
* logger of AlertDao
*/
private static final Logger logger = LoggerFactory.getLogger(AlertDao.class);
private static final int QUERY_ALERT_THRESHOLD = 100; private static final int QUERY_ALERT_THRESHOLD = 100;
@Value("${alert.alarm-suppression.crash:60}") @Value("${alert.alarm-suppression.crash:60}")
@ -84,9 +92,16 @@ public class AlertDao {
* @return add alert result * @return add alert result
*/ */
public int addAlert(Alert alert) { public int addAlert(Alert alert) {
if (null == alert.getAlertGroupId() || NumberUtils.INTEGER_ZERO.equals(alert.getAlertGroupId())) {
logger.warn("the value of alertGroupId is null or 0 ");
return 0;
}
String sign = generateSign(alert); String sign = generateSign(alert);
alert.setSign(sign); alert.setSign(sign);
return alertMapper.insert(alert); int count = alertMapper.insert(alert);
logger.info("add alert to db , alert: {}", alert);
return count;
} }
/** /**

View File

@ -169,7 +169,7 @@ public class ProcessDefinition {
* warningGroupId * warningGroupId
*/ */
@TableField(exist = false) @TableField(exist = false)
private int warningGroupId; private Integer warningGroupId;
/** /**
* execution type * execution type
@ -226,5 +226,4 @@ public class ProcessDefinition {
return globalParamMap; return globalParamMap;
} }
} }

View File

@ -716,9 +716,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
checkSerialProcess(processDefinition); checkSerialProcess(processDefinition);
} }
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
if (processAlertManager.isNeedToSendWarning(processInstance)) { processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
if (processInstance.getState().isSuccess()) { if (processInstance.getState().isSuccess()) {
processAlertManager.closeAlert(processInstance); processAlertManager.closeAlert(processInstance);
} }

View File

@ -200,7 +200,6 @@ public class ProcessAlertManager {
processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId()); processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING); alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
alertDao.addAlert(alert); alertDao.addAlert(alert);
logger.info("add alert to db , alert : {}", alert);
} catch (Exception e) { } catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage()); logger.error("send alert failed:{} ", e.getMessage());
@ -217,13 +216,10 @@ public class ProcessAlertManager {
public void sendAlertProcessInstance(ProcessInstance processInstance, public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances, List<TaskInstance> taskInstances,
ProjectUser projectUser) { ProjectUser projectUser) {
if (!isNeedToSendWarning(processInstance)) { if (!isNeedToSendWarning(processInstance)) {
return; return;
} }
Alert alert = new Alert(); Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType()); String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().isSuccess() ? "success" : "failed"; String success = processInstance.getState().isSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success); alert.setTitle(cmdName + " " + success);
@ -238,7 +234,6 @@ public class ProcessAlertManager {
alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS
: AlertType.PROCESS_INSTANCE_FAILURE); : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert); alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
} }
/** /**
@ -325,7 +320,6 @@ public class ProcessAlertManager {
alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS alert.setAlertType(processInstance.getState().isSuccess() ? AlertType.PROCESS_INSTANCE_SUCCESS
: AlertType.PROCESS_INSTANCE_FAILURE); : AlertType.PROCESS_INSTANCE_FAILURE);
alertDao.addAlert(alert); alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
} }
/** /**
@ -342,7 +336,6 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId()); alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.TASK_FAILURE); alert.setAlertType(AlertType.TASK_FAILURE);
alertDao.addAlert(alert); alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
} }
/** /**
@ -440,6 +433,5 @@ public class ProcessAlertManager {
alert.setProcessInstanceId(processInstance.getId()); alert.setProcessInstanceId(processInstance.getId());
alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED); alert.setAlertType(AlertType.PROCESS_INSTANCE_BLOCKED);
alertDao.addAlert(alert); alertDao.addAlert(alert);
logger.info("add alert to db, alert: {}", alert);
} }
} }