[DSIP-35][Alert] Refactor the alert thread model (#15932)

This commit is contained in:
Wenjun Ruan 2024-05-09 12:23:01 +08:00 committed by GitHub
parent bbca37d03e
commit 8d336def61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
99 changed files with 2344 additions and 890 deletions

View File

@ -40,7 +40,7 @@ public final class VoiceAlertChannel implements AlertChannel {
Map<String, String> paramsMap = info.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "aliyun-voice params is null");
return new AlertResult(false, "aliyun-voice params is null");
}
VoiceParam voiceParam = buildVoiceParam(paramsMap);
return new VoiceSender(voiceParam).send();

View File

@ -46,7 +46,7 @@ public final class VoiceSender {
public AlertResult send() {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
try {
Client client = createClient(voiceParam.getConnection());
SingleCallByTtsRequest singleCallByTtsRequest = new SingleCallByTtsRequest()
@ -61,7 +61,7 @@ public final class VoiceSender {
}
SingleCallByTtsResponseBody body = response.getBody();
if (body.code.equalsIgnoreCase("ok")) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage(body.getCallId());
} else {
alertResult.setMessage(body.getMessage());

View File

@ -46,7 +46,7 @@ class VoiceSenderTest {
VoiceSender weChatSender = new VoiceSender(voiceParam);
AlertResult alertResult = weChatSender.send();
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
}

View File

@ -35,6 +35,6 @@ public interface AlertChannel {
AlertResult process(AlertInfo info);
default @NonNull AlertResult closeAlert(AlertInfo info) {
return new AlertResult("true", "no need to close alert");
return new AlertResult(true, "no need to close alert");
}
}

View File

@ -53,14 +53,6 @@ public class AlertData {
*/
private String log;
/**
* 0 do not send warning;
* 1 send if process success;
* 2 send if process failed;
* 3 send if process ends, whatever the result;
*/
private int warnType;
/**
* AlertType#code
*/

View File

@ -33,15 +33,19 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class AlertResult {
/**
* todo: use enum
* false or true
*/
private String status;
private boolean success;
/**
* alert result message, each plugin can have its own message
*/
private String message;
public static AlertResult success() {
return new AlertResult(true, null);
}
public static AlertResult fail(String message) {
return new AlertResult(false, message);
}
}

View File

@ -31,7 +31,7 @@ public final class DingTalkAlertChannel implements AlertChannel {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> paramsMap = alertInfo.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "ding talk params is null");
return new AlertResult(false, "ding talk params is null");
}
return new DingTalkSender(paramsMap).sendDingTalkMsg(alertData.getTitle(), alertData.getContent());
}

View File

@ -126,7 +126,7 @@ public final class DingTalkSender {
private AlertResult checkSendDingTalkSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
if (null == result) {
alertResult.setMessage("send ding talk msg error");
@ -140,7 +140,7 @@ public final class DingTalkSender {
return alertResult;
}
if (sendMsgResponse.errcode == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send ding talk msg success");
return alertResult;
}
@ -164,7 +164,7 @@ public final class DingTalkSender {
} catch (Exception e) {
log.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;

View File

@ -52,7 +52,7 @@ public class DingTalkSenderTest {
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "true");
dingTalkSender = new DingTalkSender(dingTalkConfig);
AlertResult alertResult = dingTalkSender.sendDingTalkMsg("title", "content test");
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertEquals(false, alertResult.isSuccess());
}
}

View File

@ -35,24 +35,20 @@ public final class EmailAlertChannel implements AlertChannel {
AlertData alert = info.getAlertData();
Map<String, String> paramsMap = info.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "mail params is null");
return new AlertResult(false, "mail params is null");
}
MailSender mailSender = new MailSender(paramsMap);
AlertResult alertResult = mailSender.sendMails(alert.getTitle(), alert.getContent());
boolean flag;
if (alertResult == null) {
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("alert send error.");
log.info("alert send error : {}", alertResult.getMessage());
return alertResult;
}
flag = Boolean.parseBoolean(String.valueOf(alertResult.getStatus()));
if (flag) {
if (alertResult.isSuccess()) {
log.info("alert send success");
alertResult.setMessage("email send success.");
} else {

View File

@ -154,7 +154,7 @@ public final class MailSender {
*/
public AlertResult sendMails(List<String> receivers, List<String> receiverCcs, String title, String content) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
// if there is no receivers && no receiversCc, no need to process
if (CollectionUtils.isEmpty(receivers) && CollectionUtils.isEmpty(receiverCcs)) {
@ -201,7 +201,7 @@ public final class MailSender {
attachment(title, content, partContent);
alertResult.setStatus("true");
alertResult.setSuccess(true);
return alertResult;
} catch (Exception e) {
handleException(alertResult, e);
@ -380,7 +380,7 @@ public final class MailSender {
email.setDebug(true);
email.send();
alertResult.setStatus("true");
alertResult.setSuccess(true);
return alertResult;
}

View File

@ -66,7 +66,7 @@ public class EmailAlertChannelTest {
alertInfo.setAlertParams(paramsMap);
AlertResult alertResult = emailAlertChannel.process(alertInfo);
Assertions.assertNotNull(alertResult);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
public String getEmailAlertParams() {

View File

@ -77,7 +77,7 @@ public class MailUtilsTest {
AlertResult alertResult = mailSender.sendMails(
"Mysql Exception",
content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -107,7 +107,7 @@ public class MailUtilsTest {
emailConfig.put(MailParamsConstants.NAME_MAIL_PASSWD, "passwd");
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.sendMails(title, content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
public String list2String() {
@ -142,24 +142,24 @@ public class MailUtilsTest {
emailConfig.put(AlertConstants.NAME_SHOW_TYPE, ShowType.TABLE.getDescp());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.sendMails(title, content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
public void testAttachmentFile() throws Exception {
public void testAttachmentFile() {
String content = list2String();
emailConfig.put(AlertConstants.NAME_SHOW_TYPE, ShowType.ATTACHMENT.getDescp());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.sendMails("gaojing", content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
public void testTableAttachmentFile() throws Exception {
public void testTableAttachmentFile() {
String content = list2String();
emailConfig.put(AlertConstants.NAME_SHOW_TYPE, ShowType.TABLE_ATTACHMENT.getDescp());
mailSender = new MailSender(emailConfig);
AlertResult alertResult = mailSender.sendMails("gaojing", content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
}

View File

@ -31,7 +31,7 @@ public final class FeiShuAlertChannel implements AlertChannel {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> paramsMap = alertInfo.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "fei shu params is null");
return new AlertResult(false, "fei shu params is null");
}
return new FeiShuSender(paramsMap).sendFeiShuMsg(alertData);
}

View File

@ -80,7 +80,7 @@ public final class FeiShuSender {
public static AlertResult checkSendFeiShuSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
if (org.apache.commons.lang3.StringUtils.isBlank(result)) {
alertResult.setMessage("send fei shu msg error");
@ -95,7 +95,7 @@ public final class FeiShuSender {
return alertResult;
}
if (sendMsgResponse.statusCode == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send fei shu msg success");
return alertResult;
}
@ -136,7 +136,7 @@ public final class FeiShuSender {
} catch (Exception e) {
log.info("send fei shu alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("send fei shu alert fail.");
}
return alertResult;

View File

@ -43,7 +43,7 @@ public class FeiShuSenderTest {
alertData.setContent("feishu test content");
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.sendFeiShuMsg(alertData);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -87,12 +87,12 @@ public class FeiShuSenderTest {
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.checkSendFeiShuSendMsgResult("");
Assertions.assertFalse(Boolean.valueOf(alertResult.getStatus()));
Assertions.assertFalse(alertResult.isSuccess());
AlertResult alertResult2 = feiShuSender.checkSendFeiShuSendMsgResult("123");
Assertions.assertEquals("send fei shu msg fail", alertResult2.getMessage());
String response = "{\"StatusCode\":\"0\",\"extra\":\"extra\",\"StatusMessage\":\"StatusMessage\"}";
AlertResult alertResult3 = feiShuSender.checkSendFeiShuSendMsgResult(response);
Assertions.assertTrue(Boolean.valueOf(alertResult3.getStatus()));
Assertions.assertTrue(alertResult3.isSuccess());
}
}

View File

@ -31,7 +31,7 @@ public final class HttpAlertChannel implements AlertChannel {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> paramsMap = alertInfo.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "http params is null");
return new AlertResult(false, "http params is null");
}
return new HttpSender(paramsMap).send(alertData.getContent());

View File

@ -92,18 +92,18 @@ public final class HttpSender {
}
if (httpRequest == null) {
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("Request types are not supported");
return alertResult;
}
try {
String resp = this.getResponseString(httpRequest);
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage(resp);
} catch (Exception e) {
log.error("send http alert msg exception : {}", e.getMessage());
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage(
String.format("Send http request alert failed: %s", e.getMessage()));
}

View File

@ -62,9 +62,9 @@ public class HttpAlertChannelTest {
// HttpSender(paramsMap).send(alertData.getContent()); already test in HttpSenderTest.sendTest. so we can mock
// it
doReturn(new AlertResult("true", "success")).when(alertChannel).process(any());
doReturn(new AlertResult(true, "success")).when(alertChannel).process(any());
AlertResult alertResult = alertChannel.process(alertInfo);
Assertions.assertEquals("true", alertResult.getStatus());
Assertions.assertTrue(alertResult.isSuccess());
}
/**

View File

@ -46,7 +46,7 @@ public class HttpSenderTest {
HttpSender httpSender = spy(new HttpSender(paramsMap));
doReturn("success").when(httpSender).getResponseString(any());
AlertResult alertResult = httpSender.send("Fault tolerance warning");
Assertions.assertEquals("true", alertResult.getStatus());
Assertions.assertTrue(alertResult.isSuccess());
Assertions.assertTrue(httpSender.getRequestUrl().contains(url));
Assertions.assertTrue(httpSender.getRequestUrl().contains(contentField));
}

View File

@ -30,8 +30,8 @@ public final class PagerDutyAlertChannel implements AlertChannel {
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> alertParams = alertInfo.getAlertParams();
if (alertParams == null || alertParams.size() == 0) {
return new AlertResult("false", "PagerDuty alert params is empty");
if (alertParams == null || alertParams.isEmpty()) {
return new AlertResult(false, "PagerDuty alert params is empty");
}
return new PagerDutySender(alertParams).sendPagerDutyAlter(alertData.getTitle(), alertData.getContent());

View File

@ -53,7 +53,7 @@ public final class PagerDutySender {
public AlertResult sendPagerDutyAlter(String title, String content) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("send pager duty alert fail.");
try {
@ -83,7 +83,7 @@ public final class PagerDutySender {
String responseContent = EntityUtils.toString(entity, StandardCharsets.UTF_8);
try {
if (statusCode == HttpStatus.SC_OK || statusCode == HttpStatus.SC_ACCEPTED) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send pager duty alert success");
} else {
alertResult.setMessage(

View File

@ -39,6 +39,6 @@ public class PagerDutySenderTest {
public void testSend() {
PagerDutySender pagerDutySender = new PagerDutySender(pagerDutyConfig);
AlertResult alertResult = pagerDutySender.sendPagerDutyAlter("pagerduty test title", "pagerduty test content");
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
}

View File

@ -31,7 +31,7 @@ public final class PrometheusAlertChannel implements AlertChannel {
AlertData alertData = info.getAlertData();
Map<String, String> paramsMap = info.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "prometheus alert manager params is null");
return new AlertResult(false, "prometheus alert manager params is null");
}
return new PrometheusAlertSender(paramsMap).sendMessage(alertData);

View File

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.alert.api.HttpServiceRetryStrategy;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -64,11 +65,10 @@ public class PrometheusAlertSender {
String resp = sendMsg(alertData);
return checkSendAlertManageMsgResult(resp);
} catch (Exception e) {
String errorMsg = String.format("send prometheus alert manager alert error, exception: %s", e.getMessage());
log.error(errorMsg);
log.error("Send prometheus alert manager alert error", e);
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage(errorMsg);
alertResult.setSuccess(false);
alertResult.setMessage(ExceptionUtils.getMessage(e));
}
return alertResult;
}
@ -106,10 +106,10 @@ public class PrometheusAlertSender {
public AlertResult checkSendAlertManageMsgResult(String resp) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
if (Objects.equals(resp, PrometheusAlertConstants.ALERT_SUCCESS)) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("prometheus alert manager send success");
return alertResult;
}

View File

@ -55,17 +55,17 @@ public class PrometheusAlertSenderTest {
" }]");
PrometheusAlertSender sender = new PrometheusAlertSender(config);
AlertResult result = sender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}
@Test
public void testCheckSendAlertManageMsgResult() {
PrometheusAlertSender prometheusAlertSender = new PrometheusAlertSender(config);
AlertResult alertResult1 = prometheusAlertSender.checkSendAlertManageMsgResult("");
Assertions.assertFalse(Boolean.parseBoolean(alertResult1.getStatus()));
Assertions.assertFalse(alertResult1.isSuccess());
Assertions.assertEquals("prometheus alert manager send fail, resp is ", alertResult1.getMessage());
AlertResult alertResult2 = prometheusAlertSender.checkSendAlertManageMsgResult("alert success");
Assertions.assertTrue(Boolean.parseBoolean(alertResult2.getStatus()));
Assertions.assertTrue(alertResult2.isSuccess());
Assertions.assertEquals("prometheus alert manager send success", alertResult2.getMessage());
}
}

View File

@ -33,7 +33,7 @@ public final class ScriptAlertChannel implements AlertChannel {
AlertData alertData = alertinfo.getAlertData();
Map<String, String> paramsMap = alertinfo.getAlertParams();
if (MapUtils.isEmpty(paramsMap)) {
return new AlertResult("false", "script params is empty");
return new AlertResult(false, "script params is empty");
}
return new ScriptSender(paramsMap).sendScriptAlert(alertData.getTitle(), alertData.getContent());
}

View File

@ -56,7 +56,7 @@ public final class ScriptSender {
}
// If it is another type of alarm script can be added here, such as python
alertResult.setStatus("false");
alertResult.setSuccess(false);
log.error("script type error: {}", scriptType);
alertResult.setMessage("script type error : " + scriptType);
return alertResult;
@ -64,7 +64,7 @@ public final class ScriptSender {
private AlertResult executeShellScript(String title, String content) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
if (Boolean.TRUE.equals(OSUtils.isWindows())) {
alertResult.setMessage("shell script not support windows os");
return alertResult;
@ -111,7 +111,7 @@ public final class ScriptSender {
int exitCode = ProcessUtils.executeScript(cmd);
if (exitCode == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send script alert msg success");
return alertResult;
}

View File

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.alert.script;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import java.util.HashMap;
@ -48,9 +50,9 @@ public class ScriptSenderTest {
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult;
alertResult = scriptSender.sendScriptAlert("test title Kris", "test content");
Assertions.assertEquals("true", alertResult.getStatus());
Assertions.assertTrue(alertResult.isSuccess());
alertResult = scriptSender.sendScriptAlert("error msg title", "test content");
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -58,7 +60,7 @@ public class ScriptSenderTest {
scriptConfig.put(ScriptParamsConstants.NAME_SCRIPT_USER_PARAMS, "' ; calc.exe ; '");
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult = scriptSender.sendScriptAlert("test title Kris", "test content");
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -67,7 +69,7 @@ public class ScriptSenderTest {
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult;
alertResult = scriptSender.sendScriptAlert("test user params NPE", "test content");
Assertions.assertEquals("true", alertResult.getStatus());
Assertions.assertTrue(alertResult.isSuccess());
}
@Test
@ -76,7 +78,7 @@ public class ScriptSenderTest {
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult;
alertResult = scriptSender.sendScriptAlert("test path NPE", "test content");
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -85,7 +87,7 @@ public class ScriptSenderTest {
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult;
alertResult = scriptSender.sendScriptAlert("test path NPE", "test content");
Assertions.assertEquals("false", alertResult.getStatus());
assertFalse(alertResult.isSuccess());
Assertions.assertTrue(alertResult.getMessage().contains("shell script is invalid, only support .sh file"));
}
@ -95,7 +97,7 @@ public class ScriptSenderTest {
ScriptSender scriptSender = new ScriptSender(scriptConfig);
AlertResult alertResult;
alertResult = scriptSender.sendScriptAlert("test type is error", "test content");
Assertions.assertEquals("false", alertResult.getStatus());
assertFalse(alertResult.isSuccess());
}
}

View File

@ -30,11 +30,11 @@ public final class SlackAlertChannel implements AlertChannel {
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> alertParams = alertInfo.getAlertParams();
if (alertParams == null || alertParams.size() == 0) {
return new AlertResult("false", "Slack alert params is empty");
if (alertParams == null || alertParams.isEmpty()) {
return new AlertResult(false, "Slack alert params is empty");
}
SlackSender slackSender = new SlackSender(alertParams);
String response = slackSender.sendMessage(alertData.getTitle(), alertData.getContent());
return new AlertResult("ok".equals(response) ? "true" : "false", response);
return new AlertResult("ok".equals(response), response);
}
}

View File

@ -30,7 +30,7 @@ public final class TelegramAlertChannel implements AlertChannel {
public AlertResult process(AlertInfo info) {
Map<String, String> alertParams = info.getAlertParams();
if (alertParams == null || alertParams.isEmpty()) {
return new AlertResult("false", "Telegram alert params is empty");
return AlertResult.fail("Telegram alert params is empty");
}
AlertData data = info.getAlertData();
return new TelegramSender(alertParams).sendMessage(data);

View File

@ -105,7 +105,7 @@ public final class TelegramSender {
} catch (Exception e) {
log.warn("send telegram alert msg exception : {}", e.getMessage());
result = new AlertResult();
result.setStatus("false");
result.setSuccess(false);
result.setMessage(String.format("send telegram alert fail. %s", e.getMessage()));
}
return result;
@ -113,7 +113,7 @@ public final class TelegramSender {
private AlertResult parseRespToResult(String resp) {
AlertResult result = new AlertResult();
result.setStatus("false");
result.setSuccess(false);
if (null == resp || resp.isEmpty()) {
result.setMessage("send telegram msg error. telegram server resp is empty");
return result;
@ -127,7 +127,7 @@ public final class TelegramSender {
result.setMessage(String.format("send telegram alert fail. telegram server error_code: %d, description: %s",
response.errorCode, response.description));
} else {
result.setStatus("true");
result.setSuccess(true);
result.setMessage("send telegram msg success.");
}
return result;

View File

@ -52,7 +52,7 @@ public class TelegramSenderTest {
TelegramParamsConstants.NAME_TELEGRAM_BOT_TOKEN, "XXXXXXX");
TelegramSender telegramSender = new TelegramSender(telegramConfig);
AlertResult result = telegramSender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}
@ -65,7 +65,7 @@ public class TelegramSenderTest {
TelegramParamsConstants.NAME_TELEGRAM_CHAT_ID, "-XXXXXXX");
TelegramSender telegramSender = new TelegramSender(telegramConfig);
AlertResult result = telegramSender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}
@Test
@ -75,7 +75,7 @@ public class TelegramSenderTest {
alertData.setContent("telegram test content");
TelegramSender telegramSender = new TelegramSender(telegramConfig);
AlertResult result = telegramSender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}
@ -89,7 +89,7 @@ public class TelegramSenderTest {
TelegramParamsConstants.NAME_TELEGRAM_PARSE_MODE, TelegramAlertConstants.PARSE_MODE_MARKDOWN);
TelegramSender telegramSender = new TelegramSender(telegramConfig);
AlertResult result = telegramSender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}
@ -102,7 +102,7 @@ public class TelegramSenderTest {
TelegramParamsConstants.NAME_TELEGRAM_PARSE_MODE, TelegramAlertConstants.PARSE_MODE_HTML);
TelegramSender telegramSender = new TelegramSender(telegramConfig);
AlertResult result = telegramSender.sendMessage(alertData);
Assertions.assertEquals("false", result.getStatus());
Assertions.assertFalse(result.isSuccess());
}

View File

@ -33,7 +33,7 @@ public final class WebexTeamsAlertChannel implements AlertChannel {
AlertData alertData = alertInfo.getAlertData();
Map<String, String> alertParams = alertInfo.getAlertParams();
if (MapUtils.isEmpty(alertParams)) {
return new AlertResult("false", "WebexTeams alert params is empty");
return new AlertResult(false, "WebexTeams alert params is empty");
}
return new WebexTeamsSender(alertParams).sendWebexTeamsAlter(alertData);

View File

@ -67,7 +67,7 @@ public final class WebexTeamsSender {
public AlertResult sendWebexTeamsAlter(AlertData alertData) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setSuccess(false);
alertResult.setMessage("send webex teams alert fail.");
try {
@ -93,7 +93,7 @@ public final class WebexTeamsSender {
String responseContent = EntityUtils.toString(entity, StandardCharsets.UTF_8);
try {
if (statusCode == HttpStatus.SC_OK) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("send webex teams alert success");
} else {
alertResult.setMessage(String.format(

View File

@ -85,6 +85,6 @@ public class WebexTeamsSenderTest {
public void testSend() {
WebexTeamsSender webexTeamsSender = new WebexTeamsSender(webexTeamsConfig);
AlertResult alertResult = webexTeamsSender.sendWebexTeamsAlter(alertData);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
}

View File

@ -31,7 +31,7 @@ public final class WeChatAlertChannel implements AlertChannel {
AlertData alertData = info.getAlertData();
Map<String, String> paramsMap = info.getAlertParams();
if (null == paramsMap) {
return new AlertResult("false", "we chat params is null");
return new AlertResult(false, "we chat params is null");
}
return new WeChatSender(paramsMap).sendEnterpriseWeChat(alertData.getTitle(), alertData.getContent());

View File

@ -54,7 +54,6 @@ import lombok.extern.slf4j.Slf4j;
public final class WeChatSender {
private static final String MUST_NOT_NULL = " must not null";
private static final String ALERT_STATUS = "false";
private static final String AGENT_ID_REG_EXP = "{agentId}";
private static final String MSG_REG_EXP = "{msg}";
private static final String USER_REG_EXP = "{toUser}";
@ -178,7 +177,7 @@ public final class WeChatSender {
private static AlertResult checkWeChatSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
if (null == result) {
alertResult.setMessage("we chat send fail");
@ -192,11 +191,11 @@ public final class WeChatSender {
return alertResult;
}
if (sendMsgResponse.errcode == 0) {
alertResult.setStatus("true");
alertResult.setSuccess(true);
alertResult.setMessage("we chat alert send success");
return alertResult;
}
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
alertResult.setMessage(sendMsgResponse.getErrmsg());
return alertResult;
}
@ -212,7 +211,7 @@ public final class WeChatSender {
if (null == weChatToken) {
alertResult = new AlertResult();
alertResult.setMessage("send we chat alert fail,get weChat token error");
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
return alertResult;
}
String enterpriseWeChatPushUrlReplace = "";
@ -239,7 +238,7 @@ public final class WeChatSender {
log.info("send we chat alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setMessage("send we chat alert fail");
alertResult.setStatus(ALERT_STATUS);
alertResult.setSuccess(false);
}
return alertResult;
}

View File

@ -71,7 +71,7 @@ public class WeChatSenderTest {
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.sendEnterpriseWeChat("test", content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
@Test
@ -79,7 +79,7 @@ public class WeChatSenderTest {
weChatConfig.put(AlertConstants.NAME_SHOW_TYPE, ShowType.TEXT.getDescp());
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.sendEnterpriseWeChat("test", content);
Assertions.assertEquals("false", alertResult.getStatus());
Assertions.assertFalse(alertResult.isSuccess());
}
}

View File

@ -66,6 +66,12 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-kubernetes-client-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -18,11 +18,7 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
@ -50,14 +46,6 @@ public class AlertServer {
@Autowired
private AlertBootstrapService alertBootstrapService;
@Autowired
private ListenerEventPostService listenerEventPostService;
@Autowired
private AlertRpcServer alertRpcServer;
@Autowired
private AlertPluginManager alertPluginManager;
@Autowired
private AlertRegistryClient alertRegistryClient;
public static void main(String[] args) {
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
@ -68,27 +56,14 @@ public class AlertServer {
@PostConstruct
public void run() {
log.info("Alert server is staring ...");
alertPluginManager.start();
alertRegistryClient.start();
log.info("AlertServer is staring ...");
alertBootstrapService.start();
listenerEventPostService.start();
alertRpcServer.start();
log.info("Alert server is started ...");
log.info("AlertServer is started ...");
}
@PreDestroy
public void close() {
destroy("alert server destroy");
}
/**
* gracefully stop
*
* @param cause stop cause
*/
public void destroy(String cause) {
String cause = "AlertServer destroy";
try {
// set stop signal is true
// execute only once
@ -96,19 +71,14 @@ public class AlertServer {
log.warn("AlterServer is already stopped");
return;
}
log.info("Alert server is stopping, cause: {}", cause);
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertBootstrapService closedAlertBootstrapService = alertBootstrapService;
ListenerEventPostService closedListenerEventPostService = listenerEventPostService;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
}
log.info("AlertServer is stopping, cause: {}", cause);
alertBootstrapService.close();
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
log.info("Alter server stopped, cause: {}", cause);
log.info("AlertServer stopped, cause: {}", cause);
} catch (Exception e) {
log.error("Alert server stop failed, cause: {}", cause, e);
log.error("AlertServer stop failed, cause: {}", cause, e);
}
}
}

View File

@ -43,6 +43,8 @@ public final class AlertConfig implements Validator {
private Duration maxHeartbeatInterval = Duration.ofSeconds(60);
private int senderParallelism = 100;
private String alertServerAddress;
@Override
@ -58,6 +60,10 @@ public final class AlertConfig implements Validator {
errors.rejectValue("max-heartbeat-interval", null, "should be a valid duration");
}
if (senderParallelism <= 0) {
errors.rejectValue("sender-parallelism", null, "should be a positive number");
}
if (StringUtils.isEmpty(alertServerAddress)) {
alertConfig.setAlertServerAddress(NetUtils.getAddr(alertConfig.getPort()));
}

View File

@ -45,6 +45,12 @@ public class AlertServerMetrics {
.register(Metrics.globalRegistry);
}
public void registerSendingAlertGauge(final Supplier<Number> supplier) {
Gauge.builder("ds.alert.sending", supplier)
.description("Number of sending alert")
.register(Metrics.globalRegistry);
}
public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.alert.uncached.exception", supplier)
.description("number of uncached exception")

View File

@ -36,8 +36,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@Component
public final class AlertPluginManager {
private final PluginDao pluginDao;

View File

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
@ -42,12 +43,15 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
private final RegistryClient registryClient;
private final MetricsProvider metricsProvider;
private final AlertHAServer alertHAServer;
private final String heartBeatPath;
private final long startupTime;
public AlertHeartbeatTask(AlertConfig alertConfig,
MetricsProvider metricsProvider,
RegistryClient registryClient) {
RegistryClient registryClient,
AlertHAServer alertHAServer) {
super("AlertHeartbeatTask", alertConfig.getMaxHeartbeatInterval().toMillis());
this.startupTime = System.currentTimeMillis();
this.alertConfig = alertConfig;
@ -55,6 +59,7 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
this.registryClient = registryClient;
this.heartBeatPath =
RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
this.alertHAServer = alertHAServer;
this.processId = OSUtils.getProcessID();
}
@ -70,6 +75,7 @@ public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat>
.memoryUsage(systemMetrics.getSystemMemoryUsedPercentage())
.jvmMemoryUsage(systemMetrics.getJvmMemoryUsedPercentage())
.serverStatus(ServerStatus.NORMAL)
.isActive(alertHAServer.isActive())
.host(NetUtils.getHost())
.port(alertConfig.getPort())
.build();

View File

@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.extern.slf4j.Slf4j;
@ -42,10 +42,12 @@ public class AlertRegistryClient implements AutoCloseable {
private AlertHeartbeatTask alertHeartbeatTask;
@Autowired
private AlertHAServer alertHAServer;
public void start() {
log.info("AlertRegistryClient starting...");
registryClient.getLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, metricsProvider, registryClient);
alertHeartbeatTask = new AlertHeartbeatTask(alertConfig, metricsProvider, registryClient, alertHAServer);
alertHeartbeatTask.start();
// start heartbeat task
log.info("AlertRegistryClient started...");
@ -55,7 +57,6 @@ public class AlertRegistryClient implements AutoCloseable {
public void close() {
log.info("AlertRegistryClient closing...");
alertHeartbeatTask.shutdown();
registryClient.releaseLock(RegistryNodeType.ALERT_LOCK.getRegistryPath());
log.info("AlertRegistryClient closed...");
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.dolphinscheduler.alert.rpc;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.AlertSender;
import org.apache.dolphinscheduler.extract.alert.IAlertOperator;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendRequest;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
@ -32,16 +32,15 @@ import org.springframework.stereotype.Service;
public class AlertOperatorImpl implements IAlertOperator {
@Autowired
private AlertBootstrapService alertBootstrapService;
private AlertSender alertSender;
@Override
public AlertSendResponse sendAlert(AlertSendRequest alertSendRequest) {
log.info("Received AlertSendRequest : {}", alertSendRequest);
AlertSendResponse alertSendResponse = alertBootstrapService.syncHandler(
AlertSendResponse alertSendResponse = alertSender.syncHandler(
alertSendRequest.getGroupId(),
alertSendRequest.getTitle(),
alertSendRequest.getContent(),
alertSendRequest.getWarnType());
alertSendRequest.getContent());
log.info("Handle AlertSendRequest finish: {}", alertSendResponse);
return alertSendResponse;
}
@ -49,7 +48,7 @@ public class AlertOperatorImpl implements IAlertOperator {
@Override
public AlertSendResponse sendTestAlert(AlertTestSendRequest alertSendRequest) {
log.info("Received AlertTestSendRequest : {}", alertSendRequest);
AlertSendResponse alertSendResponse = alertBootstrapService.syncTestSend(
AlertSendResponse alertSendResponse = alertSender.syncTestSend(
alertSendRequest.getPluginDefineId(),
alertSendRequest.getPluginInstanceParams());
log.info("Handle AlertTestSendRequest finish: {}", alertSendResponse);

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.commons.collections4.CollectionUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractEventFetcher<T> extends BaseDaemonThread implements EventFetcher<T> {
protected static final int FETCH_SIZE = 100;
protected static final long FETCH_INTERVAL = 5_000;
protected final AlertHAServer alertHAServer;
private final EventPendingQueue<T> eventPendingQueue;
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
private Integer eventOffset;
protected AbstractEventFetcher(String fetcherName,
AlertHAServer alertHAServer,
EventPendingQueue<T> eventPendingQueue) {
super(fetcherName);
this.alertHAServer = alertHAServer;
this.eventPendingQueue = eventPendingQueue;
this.eventOffset = -1;
}
@Override
public synchronized void start() {
if (!runningFlag.compareAndSet(false, true)) {
throw new IllegalArgumentException("AlertEventFetcher is already started");
}
log.info("AlertEventFetcher starting...");
super.start();
log.info("AlertEventFetcher started...");
}
@Override
public void run() {
while (runningFlag.get()) {
try {
if (!alertHAServer.isActive()) {
log.debug("The current node is not active, will not loop Alert");
Thread.sleep(FETCH_INTERVAL);
continue;
}
List<T> pendingEvents = fetchPendingEvent(eventOffset);
if (CollectionUtils.isEmpty(pendingEvents)) {
log.debug("No pending events found");
Thread.sleep(FETCH_INTERVAL);
continue;
}
for (T alert : pendingEvents) {
eventPendingQueue.put(alert);
}
eventOffset = Math.max(eventOffset,
pendingEvents.stream().map(this::getEventOffset).max(Integer::compareTo).get());
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
log.error("AlertEventFetcher error", ex);
}
}
}
protected abstract int getEventOffset(T event);
@Override
public void shutdown() {
if (!runningFlag.compareAndSet(true, false)) {
log.warn("The AlertEventFetcher is not started");
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractEventLoop<T> extends BaseDaemonThread implements EventLoop<T> {
private final EventPendingQueue<T> eventPendingQueue;
private final AtomicInteger handlingEventCount;
private final int eventHandleWorkerNum;
private final ThreadPoolExecutor threadPoolExecutor;
private final AtomicBoolean runningFlag = new AtomicBoolean(false);
protected AbstractEventLoop(String name,
ThreadPoolExecutor threadPoolExecutor,
EventPendingQueue<T> eventPendingQueue) {
super(name);
this.handlingEventCount = new AtomicInteger(0);
this.eventHandleWorkerNum = threadPoolExecutor.getMaximumPoolSize();
this.threadPoolExecutor = threadPoolExecutor;
this.eventPendingQueue = eventPendingQueue;
}
@Override
public synchronized void start() {
if (!runningFlag.compareAndSet(false, true)) {
throw new IllegalArgumentException(getClass().getName() + " is already started");
}
log.info("{} starting...", getClass().getName());
super.start();
log.info("{} started...", getClass().getName());
}
@Override
public void run() {
while (runningFlag.get()) {
try {
if (handlingEventCount.get() >= eventHandleWorkerNum) {
log.debug("There is no idle event worker, waiting for a while...");
Thread.sleep(1000);
continue;
}
T pendingEvent = eventPendingQueue.take();
handlingEventCount.incrementAndGet();
CompletableFuture.runAsync(() -> handleEvent(pendingEvent), threadPoolExecutor)
.whenComplete((aVoid, throwable) -> {
if (throwable != null) {
log.error("Handle event: {} error", pendingEvent, throwable);
}
handlingEventCount.decrementAndGet();
});
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
log.error("Loop event thread has been interrupted...");
break;
} catch (Exception ex) {
log.error("Loop event error", ex);
}
}
}
@Override
public int getHandlingEventCount() {
return handlingEventCount.get();
}
@Override
public void shutdown() {
if (!runningFlag.compareAndSet(true, false)) {
log.warn(getClass().getName() + " is not started");
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import java.util.concurrent.LinkedBlockingQueue;
public abstract class AbstractEventPendingQueue<T> implements EventPendingQueue<T> {
private final LinkedBlockingQueue<T> pendingAlertQueue;
private final int capacity;
protected AbstractEventPendingQueue(int capacity) {
this.capacity = capacity;
this.pendingAlertQueue = new LinkedBlockingQueue<>(capacity);
}
@Override
public void put(T alert) throws InterruptedException {
pendingAlertQueue.put(alert);
}
@Override
public T take() throws InterruptedException {
return pendingAlertQueue.take();
}
@Override
public int size() {
return pendingAlertQueue.size();
}
@Override
public int capacity() {
return capacity;
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@Slf4j
public abstract class AbstractEventSender<T> implements EventSender<T> {
protected final AlertPluginManager alertPluginManager;
private final long sendEventTimeout;
protected AbstractEventSender(AlertPluginManager alertPluginManager, long sendEventTimeout) {
this.alertPluginManager = alertPluginManager;
this.sendEventTimeout = sendEventTimeout;
}
@Override
public void sendEvent(T event) {
List<AlertPluginInstance> alertPluginInstanceList = getAlertPluginInstanceList(event);
if (CollectionUtils.isEmpty(alertPluginInstanceList)) {
onError(event, "No bind plugin instance found");
return;
}
AlertData alertData = getAlertData(event);
List<AlertSendStatus> alertSendStatuses = new ArrayList<>();
for (AlertPluginInstance instance : alertPluginInstanceList) {
AlertResult alertResult = doSendEvent(instance, alertData);
AlertStatus alertStatus =
alertResult.isSuccess() ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(getEventId(event))
.alertPluginInstanceId(instance.getId())
.sendStatus(alertStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
alertSendStatuses.add(alertSendStatus);
}
long failureCount = alertSendStatuses.stream()
.map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_FAILURE)
.count();
long successCount = alertSendStatuses.stream()
.map(alertSendStatus -> alertSendStatus.getSendStatus() == AlertStatus.EXECUTION_SUCCESS)
.count();
if (successCount == 0) {
onError(event, JSONUtils.toJsonString(alertSendStatuses));
} else {
if (failureCount > 0) {
onPartialSuccess(event, JSONUtils.toJsonString(alertSendStatuses));
} else {
onSuccess(event, JSONUtils.toJsonString(alertSendStatuses));
}
}
}
public abstract List<AlertPluginInstance> getAlertPluginInstanceList(T event);
public abstract AlertData getAlertData(T event);
public abstract Integer getEventId(T event);
public abstract void onError(T event, String log);
public abstract void onPartialSuccess(T event, String log);
public abstract void onSuccess(T event, String log);
@Override
public AlertResult doSendEvent(AlertPluginInstance instance, AlertData alertData) {
int pluginDefineId = instance.getPluginDefineId();
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(pluginDefineId);
if (!alertChannelOptional.isPresent()) {
return AlertResult.fail("Cannot find the alertPlugin: " + pluginDefineId);
}
AlertChannel alertChannel = alertChannelOptional.get();
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(PluginParamsTransfer.getPluginParamsMap(instance.getPluginInstanceParams()))
.alertPluginInstanceId(instance.getId())
.build();
try {
AlertResult alertResult;
if (sendEventTimeout <= 0) {
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
alertResult = alertChannel.closeAlert(alertInfo);
} else {
alertResult = alertChannel.process(alertInfo);
}
} else {
CompletableFuture<AlertResult> future;
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
} else {
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
}
alertResult = future.get(sendEventTimeout, TimeUnit.MILLISECONDS);
}
checkNotNull(alertResult, "AlertResult cannot be null");
return alertResult;
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
return AlertResult.fail(ExceptionUtils.getMessage(interruptedException));
} catch (Exception e) {
log.error("Send alert data {} failed", alertData, e);
return AlertResult.fail(ExceptionUtils.getMessage(e));
}
}
@Override
public AlertSendResponse syncTestSend(int pluginDefineId, String pluginInstanceParams) {
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(pluginDefineId);
if (!alertChannelOptional.isPresent()) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
AlertSendResponse.AlertSendResponseResult.fail("Cannot find the alertPlugin: " + pluginDefineId);
return AlertSendResponse.fail(Lists.newArrayList(alertSendResponseResult));
}
AlertData alertData = AlertData.builder()
.title(AlertConstants.TEST_TITLE)
.content(AlertConstants.TEST_CONTENT)
.build();
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(PluginParamsTransfer.getPluginParamsMap(pluginInstanceParams))
.build();
try {
AlertResult alertResult = alertChannelOptional.get().process(alertInfo);
Preconditions.checkNotNull(alertResult, "AlertResult cannot be null");
if (alertResult.isSuccess()) {
return AlertSendResponse
.success(Lists.newArrayList(AlertSendResponse.AlertSendResponseResult.success()));
}
return AlertSendResponse.fail(
Lists.newArrayList(AlertSendResponse.AlertSendResponseResult.fail(alertResult.getMessage())));
} catch (Exception e) {
log.error("Test send alert error", e);
return new AlertSendResponse(false,
Lists.newArrayList(AlertSendResponse.AlertSendResponseResult.fail(ExceptionUtils.getMessage(e))));
}
}
}

View File

@ -17,350 +17,84 @@
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.collect.Lists;
@Service
/**
* The bootstrap service for alert server. it will start all the necessary component for alert server.
*/
@Slf4j
public final class AlertBootstrapService extends BaseDaemonThread implements AutoCloseable {
@Service
public final class AlertBootstrapService implements AutoCloseable {
@Autowired
private AlertDao alertDao;
@Autowired
private AlertPluginManager alertPluginManager;
@Autowired
private AlertConfig alertConfig;
private final AlertRpcServer alertRpcServer;
public AlertBootstrapService() {
super("AlertBootstrapService");
private final AlertRegistryClient alertRegistryClient;
private final AlertPluginManager alertPluginManager;
private final AlertHAServer alertHAServer;
private final AlertEventFetcher alertEventFetcher;
private final AlertEventLoop alertEventLoop;
private final ListenerEventLoop listenerEventLoop;
private final ListenerEventFetcher listenerEventFetcher;
public AlertBootstrapService(AlertRpcServer alertRpcServer,
AlertRegistryClient alertRegistryClient,
AlertPluginManager alertPluginManager,
AlertHAServer alertHAServer,
AlertEventFetcher alertEventFetcher,
AlertEventLoop alertEventLoop,
ListenerEventLoop listenerEventLoop,
ListenerEventFetcher listenerEventFetcher) {
this.alertRpcServer = alertRpcServer;
this.alertRegistryClient = alertRegistryClient;
this.alertPluginManager = alertPluginManager;
this.alertHAServer = alertHAServer;
this.alertEventFetcher = alertEventFetcher;
this.alertEventLoop = alertEventLoop;
this.listenerEventLoop = listenerEventLoop;
this.listenerEventFetcher = listenerEventFetcher;
}
@Override
public void run() {
log.info("Alert sender thread started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
if (CollectionUtils.isEmpty(alerts)) {
log.debug("There is not waiting alerts");
continue;
}
AlertServerMetrics.registerPendingAlertGauge(alerts::size);
this.send(alerts);
} catch (Exception e) {
log.error("Alert sender thread meet an exception", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
}
}
log.info("Alert sender thread stopped");
}
public void start() {
log.info("AlertBootstrapService starting...");
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();
alertHAServer.start();
public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
// get alert group from alert
int alertId = alert.getId();
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) {
log.error("send alert msg fail,no bind plugin instance.");
List<AlertResult> alertResults = Lists.newArrayList(new AlertResult("false",
"no bind plugin instance"));
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, JSONUtils.toJsonString(alertResults), alertId);
continue;
}
AlertData alertData = AlertData.builder()
.id(alertId)
.content(alert.getContent())
.log(alert.getLog())
.title(alert.getTitle())
.warnType(alert.getWarningType().getCode())
.alertType(alert.getAlertType().getCode())
.build();
listenerEventFetcher.start();
alertEventFetcher.start();
int sendSuccessCount = 0;
List<AlertSendStatus> alertSendStatuses = new ArrayList<>();
List<AlertResult> alertResults = new ArrayList<>();
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus())
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(alertId)
.alertPluginInstanceId(instance.getId())
.sendStatus(sendStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
alertSendStatuses.add(alertSendStatus);
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) {
sendSuccessCount++;
AlertServerMetrics.incAlertSuccessCount();
} else {
AlertServerMetrics.incAlertFailCount();
}
alertResults.add(alertResult);
}
}
AlertStatus alertStatus = AlertStatus.EXECUTION_SUCCESS;
if (sendSuccessCount == 0) {
alertStatus = AlertStatus.EXECUTION_FAILURE;
} else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
}
// we update the alert first to avoid duplicate key in alertSendStatus
// this may loss the alertSendStatus if the server restart
// todo: use transaction to update these two table
alertDao.updateAlert(alertStatus, JSONUtils.toJsonString(alertResults), alertId);
alertDao.insertAlertSendStatus(alertSendStatuses);
}
}
/**
* sync send alert handler
*
* @param alertGroupId alertGroupId
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponse syncHandler(int alertGroupId, String title, String content, int warnType) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = AlertData.builder()
.content(content)
.title(title)
.warnType(warnType)
.build();
boolean sendResponseStatus = true;
List<AlertSendResponse.AlertSendResponseResult> sendResponseResults = new ArrayList<>();
if (CollectionUtils.isEmpty(alertInstanceList)) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult();
String message = String.format("Alert GroupId %s send error : not found alert instance", alertGroupId);
alertSendResponseResult.setSuccess(false);
alertSendResponseResult.setMessage(message);
sendResponseResults.add(alertSendResponseResult);
log.error("Alert GroupId {} send error : not found alert instance", alertGroupId);
return new AlertSendResponse(false, sendResponseResults);
}
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult(
Boolean.parseBoolean(alertResult.getStatus()),
alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.isSuccess();
sendResponseResults.add(alertSendResponseResult);
}
}
return new AlertSendResponse(sendResponseStatus, sendResponseResults);
}
/**
* alert result handler
*
* @param instance instance
* @param alertData alertData
* @return AlertResult
*/
private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
String pluginInstanceName = instance.getInstanceName();
int pluginDefineId = instance.getPluginDefineId();
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
if (!alertChannelOptional.isPresent()) {
String message = String.format("Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
pluginInstanceName,
pluginDefineId);
log.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
return new AlertResult("false", message);
}
AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();
if (MapUtils.isNotEmpty(paramsMap)) {
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}
WarningType warningType = WarningType.of(instanceWarnType);
if (warningType == null) {
String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
log.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
return new AlertResult("false", message);
}
boolean sendWarning = false;
switch (warningType) {
case ALL:
sendWarning = true;
break;
case SUCCESS:
if (alertData.getWarnType() == WarningType.SUCCESS.getCode()) {
sendWarning = true;
}
break;
case FAILURE:
if (alertData.getWarnType() == WarningType.FAILURE.getCode()) {
sendWarning = true;
}
break;
default:
}
if (!sendWarning) {
String message = String.format(
"Alert Plugin %s send ignore warning type not match: plugin warning type is %s, alert data warning type is %s",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
log.info(
"Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
return new AlertResult("false", message);
}
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(paramsMap)
.alertPluginInstanceId(instance.getId())
.build();
int waitTimeout = alertConfig.getWaitTimeout();
try {
AlertResult alertResult;
if (waitTimeout <= 0) {
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
alertResult = alertChannel.closeAlert(alertInfo);
} else {
alertResult = alertChannel.process(alertInfo);
}
} else {
CompletableFuture<AlertResult> future;
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
} else {
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
}
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
if (alertResult == null) {
throw new RuntimeException("Alert result cannot be null");
}
return alertResult;
} catch (InterruptedException e) {
log.error("send alert error alert data id :{},", alertData.getId(), e);
Thread.currentThread().interrupt();
return new AlertResult("false", e.getMessage());
} catch (Exception e) {
log.error("send alert error alert data id :{},", alertData.getId(), e);
return new AlertResult("false", e.getMessage());
}
}
public AlertSendResponse syncTestSend(int pluginDefineId, String pluginInstanceParams) {
boolean sendResponseStatus = true;
List<AlertSendResponse.AlertSendResponseResult> sendResponseResults = new ArrayList<>();
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(pluginDefineId);
if (!alertChannelOptional.isPresent()) {
String message = String.format("Test send alert error: the channel doesn't exist, pluginDefineId: %s",
pluginDefineId);
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult();
alertSendResponseResult.setSuccess(false);
alertSendResponseResult.setMessage(message);
sendResponseResults.add(alertSendResponseResult);
log.error("Test send alert error : not found plugin {}", pluginDefineId);
return new AlertSendResponse(false, sendResponseResults);
}
AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(pluginInstanceParams);
AlertData alertData = AlertData.builder()
.title(AlertConstants.TEST_TITLE)
.content(AlertConstants.TEST_CONTENT)
.warnType(WarningType.ALL.getCode())
.build();
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(paramsMap)
.build();
try {
AlertResult alertResult = alertChannel.process(alertInfo);
if (alertResult != null) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult(
Boolean.parseBoolean(alertResult.getStatus()),
alertResult.getMessage());
sendResponseStatus = alertSendResponseResult.isSuccess();
sendResponseResults.add(alertSendResponseResult);
}
} catch (Exception e) {
log.error("Test send alert error", e);
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult();
alertSendResponseResult.setSuccess(false);
alertSendResponseResult.setMessage(e.getMessage());
sendResponseResults.add(alertSendResponseResult);
return new AlertSendResponse(false, sendResponseResults);
}
return new AlertSendResponse(sendResponseStatus, sendResponseResults);
listenerEventLoop.start();
alertEventLoop.start();
log.info("AlertBootstrapService started...");
}
@Override
public void close() {
log.info("Closed AlertBootstrapService...");
}
log.info("AlertBootstrapService stopping...");
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
listenerEventFetcher.shutdown();
alertEventFetcher.shutdown();
listenerEventLoop.shutdown();
alertEventLoop.shutdown();
alertHAServer.shutdown();
}
log.info("AlertBootstrapService stopped...");
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertEventFetcher extends AbstractEventFetcher<Alert> {
private final AlertDao alertDao;
public AlertEventFetcher(AlertHAServer alertHAServer,
AlertDao alertDao,
AlertEventPendingQueue alertEventPendingQueue) {
super("AlertEventFetcher", alertHAServer, alertEventPendingQueue);
this.alertDao = alertDao;
}
@Override
public List<Alert> fetchPendingEvent(int eventOffset) {
return alertDao.listPendingAlerts(eventOffset);
}
@Override
protected int getEventOffset(Alert event) {
return event.getId();
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.dao.entity.Alert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertEventLoop extends AbstractEventLoop<Alert> {
private final AlertSender alertSender;
public AlertEventLoop(AlertEventPendingQueue alertEventPendingQueue,
AlertSenderThreadPoolFactory alertSenderThreadPoolFactory,
AlertSender alertSender) {
super("AlertEventLoop", alertSenderThreadPoolFactory.getThreadPool(), alertEventPendingQueue);
this.alertSender = alertSender;
AlertServerMetrics.registerPendingAlertGauge(this::getHandlingEventCount);
}
@Override
public void handleEvent(Alert event) {
alertSender.sendEvent(event);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.springframework.stereotype.Component;
@Component
public class AlertEventPendingQueue extends AbstractEventPendingQueue<Alert> {
public AlertEventPendingQueue(AlertConfig alertConfig) {
super(alertConfig.getSenderParallelism() * 3 + 1);
AlertServerMetrics.registerPendingAlertGauge(this::size);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertHAServer extends AbstractHAServer {
public AlertHAServer(Registry registry) {
super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertSender extends AbstractEventSender<Alert> {
private final AlertDao alertDao;
public AlertSender(AlertDao alertDao,
AlertPluginManager alertPluginManager,
AlertConfig alertConfig) {
super(alertPluginManager, alertConfig.getWaitTimeout());
this.alertDao = alertDao;
}
/**
* sync send alert handler
*
* @param alertGroupId alertGroupId
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponse syncHandler(int alertGroupId, String title, String content) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = AlertData.builder()
.content(content)
.title(title)
.build();
boolean sendResponseStatus = true;
List<AlertSendResponse.AlertSendResponseResult> sendResponseResults = new ArrayList<>();
if (CollectionUtils.isEmpty(alertInstanceList)) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult();
String message = String.format("Alert GroupId %s send error : not found alert instance", alertGroupId);
alertSendResponseResult.setSuccess(false);
alertSendResponseResult.setMessage(message);
sendResponseResults.add(alertSendResponseResult);
log.error("Alert GroupId {} send error : not found alert instance", alertGroupId);
return new AlertSendResponse(false, sendResponseResults);
}
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = doSendEvent(instance, alertData);
if (alertResult != null) {
AlertSendResponse.AlertSendResponseResult alertSendResponseResult =
new AlertSendResponse.AlertSendResponseResult(
alertResult.isSuccess(),
alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.isSuccess();
sendResponseResults.add(alertSendResponseResult);
}
}
return new AlertSendResponse(sendResponseStatus, sendResponseResults);
}
@Override
public List<AlertPluginInstance> getAlertPluginInstanceList(Alert event) {
return alertDao.listInstanceByAlertGroupId(event.getAlertGroupId());
}
@Override
public AlertData getAlertData(Alert event) {
return AlertData.builder()
.id(event.getId())
.content(event.getContent())
.log(event.getLog())
.title(event.getTitle())
.alertType(event.getAlertType().getCode())
.build();
}
@Override
public Integer getEventId(Alert event) {
return event.getId();
}
@Override
public void onError(Alert event, String log) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, log, event.getId());
}
@Override
public void onPartialSuccess(Alert event, String log) {
alertDao.updateAlert(AlertStatus.EXECUTION_PARTIAL_SUCCESS, log, event.getId());
}
@Override
public void onSuccess(Alert event, String log) {
alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, log, event.getId());
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.stereotype.Component;
@Component
public class AlertSenderThreadPoolFactory {
private final ThreadPoolExecutor threadPool;
public AlertSenderThreadPoolFactory(AlertConfig alertConfig) {
this.threadPool = ThreadUtils.newDaemonFixedThreadExecutor("AlertSenderThread",
alertConfig.getSenderParallelism());
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import java.util.List;
/**
* The interface responsible for fetching events.
*
* @param <T> the type of event
*/
public interface EventFetcher<T> {
void start();
List<T> fetchPendingEvent(int eventOffset);
void shutdown();
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
/**
* The interface responsible for consuming event from upstream, e.g {@link EventPendingQueue}.
*
* @param <T> the type of event
*/
public interface EventLoop<T> {
/**
* Start the event loop, once the event loop is started, it will keep consuming event from upstream.
*/
void start();
/**
* Handle the given event.
*/
void handleEvent(T event);
/**
* Get the count of handling event.
*/
int getHandlingEventCount();
/**
* Shutdown the event loop, once the event loop is shutdown, it will stop consuming event from upstream.
*/
void shutdown();
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
/**
* The interface responsible for managing pending events.
*
* @param <T> the type of event
*/
public interface EventPendingQueue<T> {
void put(T alert) throws InterruptedException;
T take() throws InterruptedException;
int size();
int capacity();
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
public interface EventSender<T> {
void sendEvent(T event);
AlertResult doSendEvent(AlertPluginInstance instance, AlertData alertData);
AlertSendResponse syncTestSend(int pluginDefineId, String pluginInstanceParams);
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ListenerEventFetcher extends AbstractEventFetcher<ListenerEvent> {
private final ListenerEventDao listenerEventDao;
protected ListenerEventFetcher(AlertHAServer alertHAServer,
ListenerEventDao listenerEventDao,
ListenerEventPendingQueue listenerEventPendingQueue) {
super("ListenerEventFetcher", alertHAServer, listenerEventPendingQueue);
this.listenerEventDao = listenerEventDao;
}
@Override
protected int getEventOffset(ListenerEvent event) {
return event.getId();
}
@Override
public List<ListenerEvent> fetchPendingEvent(int eventOffset) {
return listenerEventDao.listingPendingEvents(eventOffset, FETCH_SIZE);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.springframework.stereotype.Component;
@Component
public class ListenerEventLoop extends AbstractEventLoop<ListenerEvent> {
private final ListenerEventSender listenerEventSender;
protected ListenerEventLoop(AlertSenderThreadPoolFactory alertSenderThreadPoolFactory,
ListenerEventSender listenerEventSender,
ListenerEventPendingQueue listenerEventPendingQueue) {
super("ListenerEventLoop", alertSenderThreadPoolFactory.getThreadPool(), listenerEventPendingQueue);
this.listenerEventSender = listenerEventSender;
}
@Override
public void handleEvent(ListenerEvent event) {
listenerEventSender.sendEvent(event);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.springframework.stereotype.Component;
@Component
public class ListenerEventPendingQueue extends AbstractEventPendingQueue<ListenerEvent> {
public ListenerEventPendingQueue(AlertConfig alertConfig) {
super(alertConfig.getSenderParallelism() * 3 + 1);
}
}

View File

@ -1,262 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.AlertSendStatus;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public final class ListenerEventPostService extends BaseDaemonThread implements AutoCloseable {
@Value("${alert.query_alert_threshold:100}")
private Integer QUERY_ALERT_THRESHOLD;
@Autowired
private ListenerEventMapper listenerEventMapper;
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;
@Autowired
private AlertPluginManager alertPluginManager;
@Autowired
private AlertConfig alertConfig;
public ListenerEventPostService() {
super("ListenerEventPostService");
}
@Override
public void run() {
log.info("listener event post thread started");
while (!ServerLifeCycleManager.isStopped()) {
try {
List<ListenerEvent> listenerEvents = listenerEventMapper
.listingListenerEventByStatus(AlertStatus.WAIT_EXECUTION, QUERY_ALERT_THRESHOLD);
if (CollectionUtils.isEmpty(listenerEvents)) {
log.debug("There is no waiting listener events");
continue;
}
this.send(listenerEvents);
} catch (Exception e) {
log.error("listener event post thread meet an exception", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
}
}
log.info("listener event post thread stopped");
}
public void send(List<ListenerEvent> listenerEvents) {
for (ListenerEvent listenerEvent : listenerEvents) {
int eventId = listenerEvent.getId();
List<AlertPluginInstance> globalAlertInstanceList =
alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList();
if (CollectionUtils.isEmpty(globalAlertInstanceList)) {
log.error("post listener event fail,no bind global plugin instance.");
listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE,
"no bind plugin instance", new Date());
continue;
}
AbstractListenerEvent event = generateEventFromContent(listenerEvent);
if (event == null) {
log.error("parse listener event to abstract listener event fail.ed {}", listenerEvent.getContent());
listenerEventMapper.updateListenerEvent(eventId, AlertStatus.EXECUTION_FAILURE,
"parse listener event to abstract listener event failed", new Date());
continue;
}
List<AbstractListenerEvent> events = Lists.newArrayList(event);
AlertData alertData = AlertData.builder()
.id(eventId)
.content(JSONUtils.toJsonString(events))
.log(listenerEvent.getLog())
.title(event.getTitle())
.warnType(WarningType.GLOBAL.getCode())
.alertType(event.getEventType().getCode())
.build();
int sendSuccessCount = 0;
List<AlertSendStatus> failedPostResults = new ArrayList<>();
for (AlertPluginInstance instance : globalAlertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(alertResult.getStatus())
? AlertStatus.EXECUTION_SUCCESS
: AlertStatus.EXECUTION_FAILURE;
if (AlertStatus.EXECUTION_SUCCESS.equals(sendStatus)) {
sendSuccessCount++;
} else {
AlertSendStatus alertSendStatus = AlertSendStatus.builder()
.alertId(eventId)
.alertPluginInstanceId(instance.getId())
.sendStatus(sendStatus)
.log(JSONUtils.toJsonString(alertResult))
.createTime(new Date())
.build();
failedPostResults.add(alertSendStatus);
}
}
}
if (sendSuccessCount == globalAlertInstanceList.size()) {
listenerEventMapper.deleteById(eventId);
} else {
AlertStatus alertStatus =
sendSuccessCount == 0 ? AlertStatus.EXECUTION_FAILURE : AlertStatus.EXECUTION_PARTIAL_SUCCESS;
listenerEventMapper.updateListenerEvent(eventId, alertStatus, JSONUtils.toJsonString(failedPostResults),
new Date());
}
}
}
/**
* alert result handler
*
* @param instance instance
* @param alertData alertData
* @return AlertResult
*/
private @Nullable AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
String pluginInstanceName = instance.getInstanceName();
int pluginDefineId = instance.getPluginDefineId();
Optional<AlertChannel> alertChannelOptional = alertPluginManager.getAlertChannel(instance.getPluginDefineId());
if (!alertChannelOptional.isPresent()) {
String message =
String.format("Global Alert Plugin %s send error: the channel doesn't exist, pluginDefineId: %s",
pluginInstanceName,
pluginDefineId);
log.error("Global Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginDefineId);
return new AlertResult("false", message);
}
AlertChannel alertChannel = alertChannelOptional.get();
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
AlertInfo alertInfo = AlertInfo.builder()
.alertData(alertData)
.alertParams(paramsMap)
.alertPluginInstanceId(instance.getId())
.build();
int waitTimeout = alertConfig.getWaitTimeout();
try {
AlertResult alertResult;
if (waitTimeout <= 0) {
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
alertResult = alertChannel.closeAlert(alertInfo);
} else {
alertResult = alertChannel.process(alertInfo);
}
} else {
CompletableFuture<AlertResult> future;
if (alertData.getAlertType() == AlertType.CLOSE_ALERT.getCode()) {
future = CompletableFuture.supplyAsync(() -> alertChannel.closeAlert(alertInfo));
} else {
future = CompletableFuture.supplyAsync(() -> alertChannel.process(alertInfo));
}
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
if (alertResult == null) {
throw new RuntimeException("Alert result cannot be null");
}
return alertResult;
} catch (InterruptedException e) {
log.error("post listener event error alert data id :{},", alertData.getId(), e);
Thread.currentThread().interrupt();
return new AlertResult("false", e.getMessage());
} catch (Exception e) {
log.error("post listener event error alert data id :{},", alertData.getId(), e);
return new AlertResult("false", e.getMessage());
}
}
private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) {
String content = listenerEvent.getContent();
switch (listenerEvent.getEventType()) {
case SERVER_DOWN:
return JSONUtils.parseObject(content, ServerDownListenerEvent.class);
case PROCESS_DEFINITION_CREATED:
return JSONUtils.parseObject(content, ProcessDefinitionCreatedListenerEvent.class);
case PROCESS_DEFINITION_UPDATED:
return JSONUtils.parseObject(content, ProcessDefinitionUpdatedListenerEvent.class);
case PROCESS_DEFINITION_DELETED:
return JSONUtils.parseObject(content, ProcessDefinitionDeletedListenerEvent.class);
case PROCESS_START:
return JSONUtils.parseObject(content, ProcessStartListenerEvent.class);
case PROCESS_END:
return JSONUtils.parseObject(content, ProcessEndListenerEvent.class);
case PROCESS_FAIL:
return JSONUtils.parseObject(content, ProcessFailListenerEvent.class);
case TASK_START:
return JSONUtils.parseObject(content, TaskStartListenerEvent.class);
case TASK_END:
return JSONUtils.parseObject(content, TaskEndListenerEvent.class);
case TASK_FAIL:
return JSONUtils.parseObject(content, TaskFailListenerEvent.class);
default:
return null;
}
}
@Override
public void close() {
log.info("Closed ListenerEventPostService...");
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.AbstractListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionCreatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionDeletedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessDefinitionUpdatedListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ProcessStartListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskEndListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import java.util.Date;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ListenerEventSender extends AbstractEventSender<ListenerEvent> {
private final ListenerEventDao listenerEventDao;
private final AlertPluginInstanceMapper alertPluginInstanceMapper;
public ListenerEventSender(ListenerEventDao listenerEventDao,
AlertPluginInstanceMapper alertPluginInstanceMapper,
AlertPluginManager alertPluginManager,
AlertConfig alertConfig) {
super(alertPluginManager, alertConfig.getWaitTimeout());
this.listenerEventDao = listenerEventDao;
this.alertPluginInstanceMapper = alertPluginInstanceMapper;
}
private AbstractListenerEvent generateEventFromContent(ListenerEvent listenerEvent) {
String content = listenerEvent.getContent();
AbstractListenerEvent event = null;
switch (listenerEvent.getEventType()) {
case SERVER_DOWN:
event = JSONUtils.parseObject(content, ServerDownListenerEvent.class);
break;
case PROCESS_DEFINITION_CREATED:
event = JSONUtils.parseObject(content, ProcessDefinitionCreatedListenerEvent.class);
break;
case PROCESS_DEFINITION_UPDATED:
event = JSONUtils.parseObject(content, ProcessDefinitionUpdatedListenerEvent.class);
break;
case PROCESS_DEFINITION_DELETED:
event = JSONUtils.parseObject(content, ProcessDefinitionDeletedListenerEvent.class);
break;
case PROCESS_START:
event = JSONUtils.parseObject(content, ProcessStartListenerEvent.class);
break;
case PROCESS_END:
event = JSONUtils.parseObject(content, ProcessEndListenerEvent.class);
break;
case PROCESS_FAIL:
event = JSONUtils.parseObject(content, ProcessFailListenerEvent.class);
break;
case TASK_START:
event = JSONUtils.parseObject(content, TaskStartListenerEvent.class);
break;
case TASK_END:
event = JSONUtils.parseObject(content, TaskEndListenerEvent.class);
break;
case TASK_FAIL:
event = JSONUtils.parseObject(content, TaskFailListenerEvent.class);
break;
default:
throw new IllegalArgumentException("Unsupported event type: " + listenerEvent.getEventType());
}
if (event == null) {
throw new IllegalArgumentException("Failed to parse event from content: " + content);
}
return event;
}
@Override
public List<AlertPluginInstance> getAlertPluginInstanceList(ListenerEvent event) {
return alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList();
}
@Override
public AlertData getAlertData(ListenerEvent listenerEvent) {
AbstractListenerEvent event = generateEventFromContent(listenerEvent);
return AlertData.builder()
.id(listenerEvent.getId())
.content(JSONUtils.toJsonString(Lists.newArrayList(event)))
.log(listenerEvent.getLog())
.title(event.getTitle())
.alertType(event.getEventType().getCode())
.build();
}
@Override
public Integer getEventId(ListenerEvent event) {
return event.getId();
}
@Override
public void onError(ListenerEvent event, String log) {
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, log, new Date());
}
@Override
public void onPartialSuccess(ListenerEvent event, String log) {
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_PARTIAL_SUCCESS, log, new Date());
}
@Override
public void onSuccess(ListenerEvent event, String log) {
listenerEventDao.updateListenerEvent(event.getId(), AlertStatus.EXECUTION_FAILURE, log, new Date());
}
}

View File

@ -73,7 +73,8 @@ alert:
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
max-heartbeat-interval: 60s
query_alert_threshold: 100
# The maximum number of alerts that can be processed in parallel
sender-parallelism: 100
registry:
type: zookeeper

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.config;
import static com.google.common.truth.Truth.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
@AutoConfigureMockMvc
@SpringBootTest(classes = AlertConfig.class)
class AlertConfigTest {
@Autowired
private AlertConfig alertConfig;
@Test
void testValidate() {
assertThat(alertConfig.getWaitTimeout()).isEqualTo(10);
assertThat(alertConfig.getMaxHeartbeatInterval()).isEqualTo(Duration.ofSeconds(59));
assertThat(alertConfig.getSenderParallelism()).isEqualTo(101);
}
}

View File

@ -24,15 +24,13 @@ import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.AlertSender;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.extract.alert.request.AlertSendResponse;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@ -42,19 +40,20 @@ import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertBootstrapServiceTest {
@ExtendWith(MockitoExtension.class)
class AlertSenderTest {
private static final Logger logger = LoggerFactory.getLogger(AlertBootstrapServiceTest.class);
private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
@Mock
private AlertDao alertDao;
@ -66,7 +65,7 @@ public class AlertBootstrapServiceTest {
private AlertConfig alertConfig;
@InjectMocks
private AlertBootstrapService alertBootstrapService;
private AlertSender alertSender;
private static final String PLUGIN_INSTANCE_PARAMS =
"{\"User\":\"xx\",\"receivers\":\"xx\",\"sender\":\"xx\",\"smtpSslTrust\":\"*\",\"enableSmtpAuth\":\"true\",\"receiverCcs\":null,\"showType\":\"table\",\"starttlsEnable\":\"false\",\"serverPort\":\"25\",\"serverHost\":\"xx\",\"Password\":\"xx\",\"sslEnable\":\"false\"}";
@ -74,25 +73,17 @@ public class AlertBootstrapServiceTest {
private static final String PLUGIN_INSTANCE_NAME = "alert-instance-mail";
private static final String TITLE = "alert mail test TITLE";
private static final String CONTENT = "alert mail test CONTENT";
private static final List<ListenerEvent> EVENTS = new ArrayList<>();
private static final int PLUGIN_DEFINE_ID = 1;
private static final int ALERT_GROUP_ID = 1;
@BeforeEach
public void before() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testSyncHandler() {
void testSyncHandler() {
// 1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(ALERT_GROUP_ID)).thenReturn(null);
when(alertConfig.getWaitTimeout()).thenReturn(0);
AlertSendResponse alertSendResponse =
alertBootstrapService.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT, WarningType.ALL.getCode());
AlertSendResponse alertSendResponse = alertSender.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT);
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -108,12 +99,7 @@ public class AlertBootstrapServiceTest {
alertInstanceList.add(alertPluginInstance);
when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(alertInstanceList);
String pluginName = "alert-plugin-mail";
PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
alertSendResponse =
alertBootstrapService.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT, WarningType.ALL.getCode());
alertSendResponse = alertSender.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT);
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -122,37 +108,32 @@ public class AlertBootstrapServiceTest {
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(0);
alertSendResponse =
alertBootstrapService.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT, WarningType.ALL.getCode());
alertSendResponse = alertSender.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT);
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
// 4.abnormal information inside the alert plug-in code
AlertResult alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(false));
alertResult.setSuccess(false);
alertResult.setMessage("Abnormal information inside the alert plug-in code");
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponse =
alertBootstrapService.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT, WarningType.ALL.getCode());
alertSendResponse = alertSender.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT);
Assertions.assertFalse(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
// 5.alert plugin send success
alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(true));
alertResult.setSuccess(true);
alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName));
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(5000);
alertSendResponse =
alertBootstrapService.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT, WarningType.ALL.getCode());
alertSendResponse = alertSender.syncHandler(ALERT_GROUP_ID, TITLE, CONTENT);
Assertions.assertTrue(alertSendResponse.isSuccess());
alertSendResponse.getResResults().forEach(result -> logger
.info("alert send response result, status:{}, message:{}", result.isSuccess(), result.getMessage()));
@ -160,17 +141,13 @@ public class AlertBootstrapServiceTest {
}
@Test
public void testRun() {
List<Alert> alertList = new ArrayList<>();
void testRun() {
Alert alert = new Alert();
alert.setId(1);
alert.setAlertGroupId(ALERT_GROUP_ID);
alert.setTitle(TITLE);
alert.setContent(CONTENT);
alert.setWarningType(WarningType.FAILURE);
alertList.add(alert);
// alertSenderService = new AlertSenderService();
int pluginDefineId = 1;
String pluginInstanceParams = "alert-instance-mail-params";
@ -181,25 +158,18 @@ public class AlertBootstrapServiceTest {
alertInstanceList.add(alertPluginInstance);
when(alertDao.listInstanceByAlertGroupId(ALERT_GROUP_ID)).thenReturn(alertInstanceList);
String pluginName = "alert-plugin-mail";
PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
AlertResult alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(true));
alertResult.setSuccess(true);
alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName));
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assertions.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
Assertions.assertTrue(alertResult.isSuccess());
when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>());
alertBootstrapService.send(alertList);
alertSender.sendEvent(alert);
}
@Test
public void testSendAlert() {
void testSendAlert() {
AlertResult sendResult = new AlertResult();
sendResult.setStatus(String.valueOf(true));
sendResult.setSuccess(true);
sendResult.setMessage(String.format("Alert Plugin %s send success", PLUGIN_INSTANCE_NAME));
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult);
@ -209,6 +179,6 @@ public class AlertBootstrapServiceTest {
Mockito.mockStatic(PluginParamsTransfer.class);
pluginParamsTransferMockedStatic.when(() -> PluginParamsTransfer.getPluginParamsMap(PLUGIN_INSTANCE_PARAMS))
.thenReturn(paramsMap);
alertBootstrapService.syncTestSend(PLUGIN_DEFINE_ID, PLUGIN_INSTANCE_PARAMS);
alertSender.syncTestSend(PLUGIN_DEFINE_ID, PLUGIN_INSTANCE_PARAMS);
}
}

View File

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.alert.service.ListenerEventSender;
import org.apache.dolphinscheduler.common.enums.AlertPluginInstanceType;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.ListenerEventType;
@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.ServerDownListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao;
import org.apache.commons.codec.digest.DigestUtils;
@ -43,39 +43,32 @@ import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.mockito.junit.jupiter.MockitoExtension;
public class ListenerEventPostServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ListenerEventPostServiceTest.class);
@ExtendWith(MockitoExtension.class)
class ListenerEventSenderTest {
@Mock
private ListenerEventMapper listenerEventMapper;
private ListenerEventDao listenerEventDao;
@Mock
private AlertPluginInstanceMapper alertPluginInstanceMapper;
@Mock
private AlertPluginManager alertPluginManager;
@Mock
private AlertConfig alertConfig;
@InjectMocks
private ListenerEventPostService listenerEventPostService;
@BeforeEach
public void before() {
MockitoAnnotations.initMocks(this);
}
private ListenerEventSender listenerEventSender;
@Test
public void testSendServerDownEventSuccess() {
List<ListenerEvent> events = new ArrayList<>();
void testSendServerDownEventSuccess() {
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent();
serverDownListenerEvent.setEventTime(new Date());
serverDownListenerEvent.setType("WORKER");
@ -88,7 +81,6 @@ public class ListenerEventPostServiceTest {
successEvent.setEventType(ListenerEventType.SERVER_DOWN);
successEvent.setCreateTime(new Date());
successEvent.setUpdateTime(new Date());
events.add(successEvent);
int pluginDefineId = 1;
String pluginInstanceParams =
@ -103,19 +95,17 @@ public class ListenerEventPostServiceTest {
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList);
AlertResult sendResult = new AlertResult();
sendResult.setStatus(String.valueOf(true));
sendResult.setSuccess(true);
sendResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName));
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assertions.assertTrue(Boolean.parseBoolean(sendResult.getStatus()));
when(listenerEventMapper.deleteById(1)).thenReturn(1);
listenerEventPostService.send(events);
Assertions.assertTrue(sendResult.isSuccess());
listenerEventSender.sendEvent(successEvent);
}
@Test
public void testSendServerDownEventFailed() {
List<ListenerEvent> events = new ArrayList<>();
void testSendServerDownEventFailed() {
ServerDownListenerEvent serverDownListenerEvent = new ServerDownListenerEvent();
serverDownListenerEvent.setEventTime(new Date());
serverDownListenerEvent.setType("WORKER");
@ -128,7 +118,6 @@ public class ListenerEventPostServiceTest {
successEvent.setEventType(ListenerEventType.SERVER_DOWN);
successEvent.setCreateTime(new Date());
successEvent.setUpdateTime(new Date());
events.add(successEvent);
int pluginDefineId = 1;
String pluginInstanceParams =
@ -143,12 +132,12 @@ public class ListenerEventPostServiceTest {
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()).thenReturn(alertInstanceList);
AlertResult sendResult = new AlertResult();
sendResult.setStatus(String.valueOf(false));
sendResult.setSuccess(false);
sendResult.setMessage(String.format("Alert Plugin %s send failed", pluginInstanceName));
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(sendResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assertions.assertFalse(Boolean.parseBoolean(sendResult.getStatus()));
listenerEventPostService.send(events);
Assertions.assertFalse(sendResult.isSuccess());
listenerEventSender.sendEvent(successEvent);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.dao.entity.Alert;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import lombok.SneakyThrows;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class AlertEventPendingQueueTest {
private AlertEventPendingQueue alertEventPendingQueue;
private static final int QUEUE_SIZE = 10;
@BeforeEach
public void before() {
AlertConfig alertConfig = new AlertConfig();
alertConfig.setSenderParallelism(QUEUE_SIZE);
this.alertEventPendingQueue = new AlertEventPendingQueue(alertConfig);
}
@SneakyThrows
@Test
void put() {
for (int i = 0; i < alertEventPendingQueue.capacity(); i++) {
alertEventPendingQueue.put(new Alert());
}
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
alertEventPendingQueue.put(new Alert());
System.out.println(alertEventPendingQueue.size());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertThrowsExactly(ConditionTimeoutException.class,
() -> await()
.timeout(Duration.ofSeconds(2))
.until(completableFuture::isDone));
}
@Test
void take() {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
alertEventPendingQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
assertThrowsExactly(ConditionTimeoutException.class,
() -> await()
.timeout(Duration.ofSeconds(2))
.until(completableFuture::isDone));
}
@SneakyThrows
@Test
void size() {
for (int i = 0; i < alertEventPendingQueue.capacity(); i++) {
alertEventPendingQueue.put(new Alert());
assertThat(alertEventPendingQueue.size()).isEqualTo(i + 1);
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.service;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import java.util.concurrent.ThreadPoolExecutor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class AlertSenderThreadPoolFactoryTest {
private final AlertConfig alertConfig = new AlertConfig();
private final AlertSenderThreadPoolFactory alertSenderThreadPoolFactory =
new AlertSenderThreadPoolFactory(alertConfig);
@Test
void getThreadPool() {
ThreadPoolExecutor threadPool = alertSenderThreadPoolFactory.getThreadPool();
assertThat(threadPool.getCorePoolSize()).isEqualTo(alertConfig.getSenderParallelism());
assertThat(threadPool.getMaximumPoolSize()).isEqualTo(alertConfig.getSenderParallelism());
}
}

View File

@ -0,0 +1,107 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
spring:
profiles:
active: postgresql
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
banner:
charset: UTF-8
datasource:
driver-class-name: org.postgresql.Driver
url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
username: root
password: root
hikari:
connection-test-query: select 1
pool-name: DolphinScheduler
# Mybatis-plus configuration, you don't need to change it
mybatis-plus:
mapper-locations: classpath:org/apache/dolphinscheduler/dao/mapper/*Mapper.xml
type-aliases-package: org.apache.dolphinscheduler.dao.entity
configuration:
cache-enabled: false
call-setters-on-nulls: true
map-underscore-to-camel-case: true
jdbc-type-for-null: NULL
global-config:
db-config:
id-type: auto
banner: false
server:
port: 50053
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
endpoint:
health:
enabled: true
show-details: always
health:
db:
enabled: true
defaults:
enabled: false
metrics:
tags:
application: ${spring.application.name}
alert:
port: 50052
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 10
max-heartbeat-interval: 59s
# The maximum number of alerts that can be processed in parallel
sender-parallelism: 101
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
connect-string: localhost:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
metrics:
enabled: true
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler
username: root
password: root

View File

@ -20,6 +20,6 @@ package org.apache.dolphinscheduler.common.enums;
public enum ServerStatus {
NORMAL,
BUSY
BUSY,
}

View File

@ -24,4 +24,9 @@ import lombok.experimental.SuperBuilder;
@NoArgsConstructor
public class AlertServerHeartBeat extends BaseHeartBeat implements HeartBeat {
/**
* If the alert server is active or standby
*/
private boolean isActive;
}

View File

@ -63,8 +63,7 @@ import com.google.common.collect.Lists;
@Slf4j
public class AlertDao {
@Value("${alert.query_alert_threshold:100}")
private Integer QUERY_ALERT_THRESHOLD;
private static final Integer QUERY_ALERT_THRESHOLD = 100;
@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;
@ -104,8 +103,8 @@ public class AlertDao {
* update alert sending(execution) status
*
* @param alertStatus alertStatus
* @param log alert results json
* @param id id
* @param log alert results json
* @param id id
* @return update alert result
*/
public int updateAlert(AlertStatus alertStatus, String log, int id) {
@ -134,9 +133,9 @@ public class AlertDao {
/**
* add AlertSendStatus
*
* @param sendStatus alert send status
* @param log log
* @param alertId alert id
* @param sendStatus alert send status
* @param log log
* @param alertId alert id
* @param alertPluginInstanceId alert plugin instance id
* @return insert count
*/
@ -192,7 +191,7 @@ public class AlertDao {
* process time out alert
*
* @param processInstance processInstance
* @param projectUser projectUser
* @param projectUser projectUser
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
int alertGroupId = processInstance.getWarningGroupId();
@ -238,8 +237,8 @@ public class AlertDao {
* task timeout warn
*
* @param processInstance processInstanceId
* @param taskInstance taskInstance
* @param projectUser projectUser
* @param taskInstance taskInstance
* @param projectUser projectUser
*/
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance,
ProjectUser projectUser) {
@ -271,10 +270,11 @@ public class AlertDao {
}
/**
* List alerts that are pending for execution
* List pending alerts which id > minAlertId and status = {@link AlertStatus#WAIT_EXECUTION} order by id asc.
*/
public List<Alert> listPendingAlerts() {
return alertMapper.listingAlertByStatus(AlertStatus.WAIT_EXECUTION.getCode(), QUERY_ALERT_THRESHOLD);
public List<Alert> listPendingAlerts(int minAlertId) {
return alertMapper.listingAlertByStatus(minAlertId, AlertStatus.WAIT_EXECUTION.getCode(),
QUERY_ALERT_THRESHOLD);
}
public List<Alert> listAlerts(int processInstanceId) {
@ -283,15 +283,6 @@ public class AlertDao {
return alertMapper.selectList(wrapper);
}
/**
* for test
*
* @return AlertMapper
*/
public AlertMapper getAlertMapper() {
return alertMapper;
}
/**
* list all alert plugin instance by alert group id
*

View File

@ -34,9 +34,10 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface AlertMapper extends BaseMapper<Alert> {
/**
* Query the alert by alertStatus and return limit with default sort.
* Query the alert which id > minAlertId and status = alertStatus order by id asc.
*/
List<Alert> listingAlertByStatus(@Param("alertStatus") int alertStatus, @Param("limit") int limit);
List<Alert> listingAlertByStatus(@Param("minAlertId") int minAlertId, @Param("alertStatus") int alertStatus,
@Param("limit") int limit);
/**
* Insert server crash alert

View File

@ -34,7 +34,8 @@ public interface ListenerEventMapper extends BaseMapper<ListenerEvent> {
void insertServerDownEvent(@Param("event") ListenerEvent event,
@Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
List<ListenerEvent> listingListenerEventByStatus(@Param("postStatus") AlertStatus postStatus,
List<ListenerEvent> listingListenerEventByStatus(@Param("minId") int minId,
@Param("postStatus") int postStatus,
@Param("limit") int limit);
void updateListenerEvent(@Param("eventId") int eventId, @Param("postStatus") AlertStatus postStatus,

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import java.util.Date;
import java.util.List;
public interface ListenerEventDao extends IDao<ListenerEvent> {
List<ListenerEvent> listingPendingEvents(int minId, int limit);
void updateListenerEvent(int eventId, AlertStatus alertStatus, String message, Date date);
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao;
import java.util.Date;
import java.util.List;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
@Slf4j
@Repository
public class ListenerEventDaoImpl extends BaseDao<ListenerEvent, ListenerEventMapper> implements ListenerEventDao {
public ListenerEventDaoImpl(@NonNull ListenerEventMapper listenerEventMapper) {
super(listenerEventMapper);
}
@Override
public List<ListenerEvent> listingPendingEvents(int minId, int limit) {
return mybatisMapper.listingListenerEventByStatus(minId, AlertStatus.WAIT_EXECUTION.getCode(), limit);
}
@Override
public void updateListenerEvent(int eventId, AlertStatus alertStatus, String message, Date date) {
mybatisMapper.updateListenerEvent(eventId, alertStatus, message, date);
}
}

View File

@ -55,7 +55,9 @@
select
<include refid="baseSql"/>
from t_ds_alert
where alert_status = #{alertStatus}
where id > #{minAlertId}
and alert_status = #{alertStatus}
order by id asc
limit #{limit}
</select>

View File

@ -60,7 +60,8 @@
select
<include refid="baseSql"/>
from t_ds_listener_event
where post_status = #{postStatus.code}
where id > #{minId} and post_status = #{postStatus}
order by id asc
limit #{limit}
</select>
</mapper>

View File

@ -81,7 +81,7 @@ public class ListenerEventMapperTest extends BaseDaoTest {
ListenerEvent event2 = generateServerDownListenerEvent("192.168.x.2");
listenerEventMapper.batchInsert(Lists.newArrayList(event1, event2));
List<ListenerEvent> listenerEvents =
listenerEventMapper.listingListenerEventByStatus(AlertStatus.WAIT_EXECUTION, 50);
listenerEventMapper.listingListenerEventByStatus(-1, AlertStatus.WAIT_EXECUTION.getCode(), 50);
Assertions.assertEquals(listenerEvents.size(), 2);
}
@ -111,8 +111,10 @@ public class ListenerEventMapperTest extends BaseDaoTest {
ListenerEvent actualAlert = listenerEventMapper.selectById(event.getId());
Assertions.assertNull(actualAlert);
}
/**
* create server down event
*
* @param host worker host
* @return listener event
*/

View File

@ -18,37 +18,23 @@
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.ProfileType;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Alert;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.transaction.annotation.Transactional;
@ActiveProfiles(ProfileType.H2)
@ExtendWith(MockitoExtension.class)
@SpringBootApplication(scanBasePackageClasses = DaoConfiguration.class)
@SpringBootTest(classes = DaoConfiguration.class)
@Transactional
@Rollback
public class AlertDaoTest {
class AlertDaoTest extends BaseDaoTest {
@Autowired
private AlertDao alertDao;
@Test
public void testAlertDao() {
void testAlertDao() {
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
alert.setContent("[\"alarm time2018-02-05\", \"service nameMYSQL_ALTER\", \"alarm nameMYSQL_ALTER_DUMP\", "
@ -57,25 +43,25 @@ public class AlertDaoTest {
alert.setAlertStatus(AlertStatus.WAIT_EXECUTION);
alertDao.addAlert(alert);
List<Alert> alerts = alertDao.listPendingAlerts();
List<Alert> alerts = alertDao.listPendingAlerts(-1);
Assertions.assertNotNull(alerts);
Assertions.assertNotEquals(0, alerts.size());
}
@Test
public void testAddAlertSendStatus() {
void testAddAlertSendStatus() {
int insertCount = alertDao.addAlertSendStatus(AlertStatus.EXECUTION_SUCCESS, "success", 1, 1);
Assertions.assertEquals(1, insertCount);
}
@Test
public void testSendServerStoppedAlert() {
void testSendServerStoppedAlert() {
int alertGroupId = 1;
String host = "127.0.0.998165432";
String serverType = "Master";
alertDao.sendServerStoppedAlert(alertGroupId, host, serverType);
alertDao.sendServerStoppedAlert(alertGroupId, host, serverType);
long count = alertDao.listPendingAlerts()
long count = alertDao.listPendingAlerts(-1)
.stream()
.filter(alert -> alert.getContent().contains(host))
.count();

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.ListenerEventType;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.ListenerEvent;
import org.apache.dolphinscheduler.dao.repository.ListenerEventDao;
import java.util.Date;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
class ListenerEventDaoImplTest extends BaseDaoTest {
@Autowired
private ListenerEventDao listenerEventDao;
@Test
void listingPendingEvents() {
int minId = -1;
int limit = 10;
assertThat(listenerEventDao.listingPendingEvents(minId, limit)).isEmpty();
ListenerEvent listenerEvent = ListenerEvent.builder()
.eventType(ListenerEventType.SERVER_DOWN)
.sign("test")
.createTime(new Date())
.updateTime(new Date())
.postStatus(AlertStatus.WAIT_EXECUTION)
.build();
listenerEventDao.insert(listenerEvent);
listenerEvent = ListenerEvent.builder()
.eventType(ListenerEventType.SERVER_DOWN)
.sign("test")
.createTime(new Date())
.updateTime(new Date())
.postStatus(AlertStatus.EXECUTION_SUCCESS)
.build();
listenerEventDao.insert(listenerEvent);
assertThat(listenerEventDao.listingPendingEvents(minId, limit)).hasSize(1);
}
@Test
void updateListenerEvent() {
ListenerEvent listenerEvent = ListenerEvent.builder()
.eventType(ListenerEventType.SERVER_DOWN)
.sign("test")
.createTime(new Date())
.updateTime(new Date())
.postStatus(AlertStatus.WAIT_EXECUTION)
.build();
listenerEventDao.insert(listenerEvent);
listenerEventDao.updateListenerEvent(listenerEvent.getId(), AlertStatus.EXECUTION_SUCCESS, "test", new Date());
assertThat(listenerEventDao.queryById(listenerEvent.getId()).getPostStatus())
.isEqualTo(AlertStatus.EXECUTION_SUCCESS);
}
}

View File

@ -37,6 +37,14 @@ public class AlertSendResponse {
private List<AlertSendResponseResult> resResults;
public static AlertSendResponse success(List<AlertSendResponseResult> resResults) {
return new AlertSendResponse(true, resResults);
}
public static AlertSendResponse fail(List<AlertSendResponseResult> resResults) {
return new AlertSendResponse(false, resResults);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ -46,6 +54,14 @@ public class AlertSendResponse {
private String message;
public static AlertSendResponseResult success() {
return new AlertSendResponseResult(true, null);
}
public static AlertSendResponseResult fail(String message) {
return new AlertSendResponseResult(false, message);
}
}
}

View File

@ -58,7 +58,6 @@ public interface Registry extends Closeable {
String get(String key);
/**
*
* @param key
* @param value
* @param deleteOnDisconnect if true, when the connection state is disconnected, the key will be deleted
@ -67,6 +66,7 @@ public interface Registry extends Closeable {
/**
* This function will delete the keys whose prefix is {@param key}
*
* @param key the prefix of deleted key
* @throws if the key not exists, there is a registryException
*/
@ -90,6 +90,11 @@ public interface Registry extends Closeable {
*/
boolean acquireLock(String key);
/**
* Acquire the lock of the prefix {@param key}, if acquire in the given timeout return true, else return false.
*/
boolean acquireLock(String key, long timeout);
/**
* Release the lock of the prefix {@param key}
*/

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api.ha;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import com.google.common.collect.Lists;
@Slf4j
public abstract class AbstractHAServer implements HAServer {
private final Registry registry;
private final String serverPath;
private ServerStatus serverStatus;
private final List<ServerStatusChangeListener> serverStatusChangeListeners;
public AbstractHAServer(Registry registry, String serverPath) {
this.registry = registry;
this.serverPath = serverPath;
this.serverStatus = ServerStatus.STAND_BY;
this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener());
}
@Override
public void start() {
registry.subscribe(serverPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
if (isActive() && !participateElection()) {
statusChange(ServerStatus.STAND_BY);
}
}
});
ScheduledExecutorService electionSelectionThread =
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
electionSelectionThread.schedule(() -> {
if (isActive()) {
return;
}
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}, 10, TimeUnit.SECONDS);
}
@Override
public boolean isActive() {
return ServerStatus.ACTIVE.equals(getServerStatus());
}
@Override
public boolean participateElection() {
return registry.acquireLock(serverPath, 3_000);
}
@Override
public void addServerStatusChangeListener(ServerStatusChangeListener listener) {
serverStatusChangeListeners.add(listener);
}
@Override
public ServerStatus getServerStatus() {
return serverStatus;
}
@Override
public void shutdown() {
if (isActive()) {
registry.releaseLock(serverPath);
}
}
private void statusChange(ServerStatus targetStatus) {
synchronized (this) {
ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api.ha;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractServerStatusChangeListener implements ServerStatusChangeListener {
@Override
public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) {
log.info("The status change from {} to {}.", originStatus, currentStatus);
if (originStatus == HAServer.ServerStatus.ACTIVE) {
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
changeToStandBy();
}
} else if (originStatus == HAServer.ServerStatus.STAND_BY) {
if (currentStatus == HAServer.ServerStatus.ACTIVE) {
changeToActive();
}
}
}
public abstract void changeToActive();
public abstract void changeToStandBy();
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api.ha;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DefaultServerStatusChangeListener extends AbstractServerStatusChangeListener {
@Override
public void changeToActive() {
log.info("The status is active now.");
}
@Override
public void changeToStandBy() {
log.info("The status is standby now.");
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api.ha;
/**
* Interface for HA server, used to select a active server from multiple servers.
* In HA mode, there are multiple servers, only one server is active, others are standby.
*/
public interface HAServer {
/**
* Start the server.
*/
void start();
/**
* Judge whether the server is active.
*
* @return true if the current server is active.
*/
boolean isActive();
/**
* Participate in the election of active server, this method will block until the server is active.
*/
boolean participateElection();
/**
* Add a listener to listen to the status change of the server.
*
* @param listener listener to add.
*/
void addServerStatusChangeListener(ServerStatusChangeListener listener);
/**
* Get the status of the server.
*
* @return the status of the server.
*/
ServerStatus getServerStatus();
/**
* Shutdown the server, release resources.
*/
void shutdown();
enum ServerStatus {
ACTIVE,
STAND_BY,
;
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.registry.api.ha;
public interface ServerStatusChangeListener {
void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus);
}

View File

@ -34,6 +34,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
@ -311,6 +313,35 @@ public class EtcdRegistry implements Registry {
}
}
@Override
public boolean acquireLock(String key, long timeout) {
Lock lockClient = client.getLockClient();
Lease leaseClient = client.getLeaseClient();
// get the lock with a lease
try {
long leaseId = leaseClient.grant(TIME_TO_LIVE_SECONDS).get().getID();
// keep the lease
lockClient.lock(byteSequence(key), leaseId).get(timeout, TimeUnit.MICROSECONDS);
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
// save the leaseId for release Lock
if (null == threadLocalLockMap.get()) {
threadLocalLockMap.set(new HashMap<>());
}
threadLocalLockMap.get().put(key, leaseId);
return true;
} catch (TimeoutException timeoutException) {
log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("etcd get lock error", e);
} catch (ExecutionException e) {
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
}
}
/**
* release the lock by revoking the leaseId
*/

View File

@ -36,6 +36,7 @@ class EtcdKeepAliveLeaseManagerTest {
static Client client;
static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
@BeforeAll
public static void before() throws Exception {
server = EtcdClusterExtension.builder()
@ -65,8 +66,9 @@ class EtcdKeepAliveLeaseManagerTest {
@AfterAll
public static void after() throws IOException {
try (EtcdCluster closeServer = server.cluster()) {
client.close();
try (
EtcdCluster closeServer = server.cluster();
Client closedClient = client) {
}
}
}

View File

@ -179,6 +179,17 @@ public class JdbcRegistry implements Registry {
}
}
@Override
public boolean acquireLock(String key, long timeout) {
try {
return registryLockManager.acquireLock(key, timeout);
} catch (RegistryException e) {
throw e;
} catch (Exception e) {
throw new RegistryException(String.format("Acquire lock: %s error", key), e);
}
}
@Override
public boolean releaseLock(String key) {
registryLockManager.releaseLock(key);

View File

@ -83,6 +83,30 @@ public class RegistryLockManager implements AutoCloseable {
});
}
/**
* Acquire the lock, if cannot get the lock will await.
*/
public boolean acquireLock(String lockKey, long timeout) throws RegistryException {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < timeout) {
try {
if (lockHoldMap.containsKey(lockKey)) {
return true;
}
JdbcRegistryLock jdbcRegistryLock = jdbcOperator.tryToAcquireLock(lockKey);
if (jdbcRegistryLock != null) {
lockHoldMap.put(lockKey, jdbcRegistryLock);
return true;
}
} catch (SQLException e) {
throw new RegistryException("Acquire the lock: " + lockKey + " error", e);
}
log.debug("Acquire the lock {} failed try again", lockKey);
ThreadUtils.sleep(JdbcRegistryConstant.LOCK_ACQUIRE_INTERVAL);
}
return false;
}
public void releaseLock(String lockKey) {
JdbcRegistryLock jdbcRegistryLock = lockHoldMap.get(lockKey);
if (jdbcRegistryLock != null) {

View File

@ -217,11 +217,41 @@ public final class ZookeeperRegistry implements Registry {
public boolean acquireLock(String key) {
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
try {
interProcessMutex.acquire();
if (null == threadLocalLockMap.get()) {
threadLocalLockMap.set(new HashMap<>(3));
if (interProcessMutex.isAcquiredInThisProcess()) {
return true;
}
threadLocalLockMap.get().put(key, interProcessMutex);
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
}
interProcessMutex.acquire();
processMutexMap.put(key, interProcessMutex);
return true;
} catch (Exception e) {
try {
interProcessMutex.release();
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
} catch (Exception exception) {
throw new RegistryException(String.format("zookeeper get lock: %s error", key), e);
}
}
}
@Override
public boolean acquireLock(String key, long timeout) {
InterProcessMutex interProcessMutex = new InterProcessMutex(client, key);
try {
if (interProcessMutex.isAcquiredInThisProcess()) {
return true;
}
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (null == processMutexMap) {
processMutexMap = new HashMap<>();
threadLocalLockMap.set(processMutexMap);
}
interProcessMutex.acquire(timeout, MILLISECONDS);
processMutexMap.put(key, interProcessMutex);
return true;
} catch (Exception e) {
try {
@ -235,13 +265,17 @@ public final class ZookeeperRegistry implements Registry {
@Override
public boolean releaseLock(String key) {
if (null == threadLocalLockMap.get().get(key)) {
Map<String, InterProcessMutex> processMutexMap = threadLocalLockMap.get();
if (processMutexMap == null) {
return true;
}
if (null == processMutexMap.get(key)) {
return false;
}
try {
threadLocalLockMap.get().get(key).release();
threadLocalLockMap.get().remove(key);
if (threadLocalLockMap.get().isEmpty()) {
processMutexMap.get(key).release();
processMutexMap.remove(key);
if (processMutexMap.isEmpty()) {
threadLocalLockMap.remove();
}
} catch (Exception e) {

View File

@ -232,7 +232,8 @@ alert:
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
max-heartbeat-interval: 60s
query_alert_threshold: 100
# The maximum number of alerts that can be processed in parallel
sender-parallelism: 5
api:
audit-enable: false