From 2df6ecb2fcbf68ca7072867431d64f226ac910e6 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Thu, 22 Oct 2020 19:57:30 +0800 Subject: [PATCH] the alert module support service. --- dolphinscheduler-alert/pom.xml | 4 + .../dolphinscheduler/alert/AlertServer.java | 52 +++++++- .../processor/AlertRequestProcessor.java | 67 +++++++++++ .../alert/runner/AlertSender.java | 113 ++++++++++++++---- dolphinscheduler-remote/pom.xml | 4 + .../remote/command/CommandType.java | 2 +- .../alert/AlertSendRequestCommand.java | 80 +++++++++++++ .../alert/AlertSendResponseCommand.java | 76 ++++++++++++ .../service/alert/AlertClientService.java | 96 +++++++++++++++ .../service/alert/AlertClientServiceTest.java | 58 +++++++++ .../spi/alert/AlertResult.java | 2 + 11 files changed, 526 insertions(+), 28 deletions(-) create mode 100644 dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml index e79c9a2b20..abae84515a 100644 --- a/dolphinscheduler-alert/pom.xml +++ b/dolphinscheduler-alert/pom.xml @@ -35,6 +35,10 @@ org.apache.dolphinscheduler dolphinscheduler-spi + + org.apache.dolphinscheduler + dolphinscheduler-remote + junit junit diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index ccd359da18..45184f03a4 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader; import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig; +import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor; import org.apache.dolphinscheduler.alert.runner.AlertSender; import org.apache.dolphinscheduler.alert.utils.Constants; import org.apache.dolphinscheduler.alert.utils.PropertyUtils; @@ -28,6 +29,9 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.Alert; +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.List; @@ -63,6 +67,11 @@ public class AlertServer { public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository"; + /** + * netty server + */ + private NettyRemotingServer server; + private static class AlertServerHolder { private static final AlertServer INSTANCE = new AlertServer(); } @@ -96,11 +105,21 @@ public class AlertServer { } } - public void start() { + /** + * init netty remoting server + */ + private void initRemoteServer() { + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(50501); + this.server = new NettyRemotingServer(serverConfig); + this.server.registerProcessor(CommandType.ALERT_SEND_REQUEST, new AlertRequestProcessor(alertDao, alertPluginManager, pluginDao)); + this.server.start(); + } - initPlugin(); - - logger.info("alert server ready start "); + /** + * Cyclic alert info sending alert + */ + private void runSender() { while (Stopper.isRunning()) { try { Thread.sleep(Constants.ALERT_SCAN_INTERVAL); @@ -118,10 +137,35 @@ public class AlertServer { } } + /** + * start + */ + public void start() { + initPlugin(); + initRemoteServer(); + logger.info("alert server ready start "); + runSender(); + } + + /** + * stop + */ + public void stop() { + this.server.close(); + logger.info("alert server shut down"); + } + public static void main(String[] args) { System.out.println(System.getProperty("user.dir")); AlertServer alertServer = AlertServer.getInstance(); alertServer.start(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + alertServer.stop(); + } + }); + } } diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java new file mode 100644 index 0000000000..5e8a8f89d6 --- /dev/null +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.processor; + +import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.alert.runner.AlertSender; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.PluginDao; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + +/** + * alert request processor + */ +public class AlertRequestProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class); + private AlertDao alertDao; + private PluginDao pluginDao; + private AlertPluginManager alertPluginManager; + + public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) { + this.alertDao = alertDao; + this.pluginDao = pluginDao; + this.alertPluginManager = alertPluginManager; + } + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.ALERT_SEND_REQUEST == command.getType(), + String.format("invalid command type : %s", command.getType())); + + AlertSendRequestCommand alertSendRequestCommand = JsonSerializer.deserialize( + command.getBody(), AlertSendRequestCommand.class); + logger.info("received command : {}", alertSendRequestCommand); + + AlertSender alertSender = new AlertSender(alertDao, alertPluginManager, pluginDao); + AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent()); + channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque())); + + } +} diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java index 4714a76ff1..341cd5f88a 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java @@ -23,11 +23,13 @@ import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.apache.dolphinscheduler.spi.alert.AlertChannel; import org.apache.dolphinscheduler.spi.alert.AlertData; import org.apache.dolphinscheduler.spi.alert.AlertInfo; import org.apache.dolphinscheduler.spi.alert.AlertResult; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -49,6 +51,13 @@ public class AlertSender { this.alertPluginManager = alertPluginManager; } + public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) { + super(); + this.alertDao = alertDao; + this.pluginDao = pluginDao; + this.alertPluginManager = alertPluginManager; + } + public AlertSender(List alertList, AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) { super(); this.alertList = alertList; @@ -71,33 +80,91 @@ public class AlertSender { for (AlertPluginInstance instance : alertInstanceList) { - String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName(); - String pluginInstanceName = instance.getInstanceName(); - AlertInfo alertInfo = new AlertInfo(); - alertInfo.setAlertData(alertData); - alertInfo.setAlertParams(instance.getPluginInstanceParams()); - AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName); - if (alertChannel == null) { - alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "Alert send error, not found plugin " + pluginName, alert.getId()); - logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName); - continue; - } + AlertResult alertResult = getAlertResult(instance, alertData); + AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; + alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId()); - AlertResult alertResult = alertChannel.process(alertInfo); - - if (alertResult == null) { - alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId()); - logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName); - } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) { - alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(alertResult.getMessage()), alert.getId()); - logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage()); - } else { - alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, alertResult.getMessage(), alert.getId()); - logger.info("Alert Plugin {} send success", pluginInstanceName); - } } } } + /** + * sync send alert handler + * @param alertGroupId alertGroupId + * @param title title + * @param content content + * @return AlertSendResponseCommand + */ + public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content) { + + List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); + AlertData alertData = new AlertData(); + alertData.setContent(title) + .setTitle(content); + + boolean sendResponseStatus = true; + List sendResponseResults = new ArrayList<>(); + + if (alertInstanceList == null || alertInstanceList.size() == 0) { + sendResponseStatus = false; + AlertResult alertResult = new AlertResult(); + String message = String.format("Alert GroupId %s send error : not found alert instance",alertGroupId); + alertResult.setStatus("false"); + alertResult.setMessage(message); + sendResponseResults.add(alertResult); + logger.error("Alert GroupId {} send error : not found alert instance", alertGroupId); + } + + for (AlertPluginInstance instance : alertInstanceList) { + + AlertResult alertResult = getAlertResult(instance, alertData); + sendResponseStatus = sendResponseStatus && Boolean.parseBoolean(String.valueOf(alertResult.getStatus())); + sendResponseResults.add(alertResult); + } + + return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults); + } + + /** + * alert result expansion + * @param instance instance + * @param alertData alertData + * @return AlertResult + */ + private AlertResult getAlertResult(AlertPluginInstance instance, AlertData alertData) { + String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName(); + AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName); + AlertResult alertResultExtend = new AlertResult(); + String pluginInstanceName = instance.getInstanceName(); + if (alertChannel == null) { + String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName); + alertResultExtend.setStatus("false"); + alertResultExtend.setMessage(message); + logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName); + } + + AlertInfo alertInfo = new AlertInfo(); + alertInfo.setAlertData(alertData); + alertInfo.setAlertParams(instance.getPluginInstanceParams()); + AlertResult alertResult = alertChannel.process(alertInfo); + + if (alertResult == null) { + String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName); + alertResultExtend.setStatus("false"); + alertResultExtend.setMessage(message); + logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName); + } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) { + alertResultExtend.setStatus("false"); + alertResultExtend.setMessage(alertResult.getMessage()); + logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage()); + } else { + String message = String.format("Alert Plugin %s send success",pluginInstanceName); + alertResultExtend.setStatus("true"); + alertResultExtend.setMessage(message); + logger.info("Alert Plugin {} send success", pluginInstanceName); + } + return alertResultExtend; + } + } diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index 4d398f3069..bc71241076 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -35,6 +35,10 @@ + + org.apache.dolphinscheduler + dolphinscheduler-spi + io.netty netty-all diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index d1ffc65f57..753216995e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG, /** * alert send request */ ALERT_SEND_REQUEST, /** * alert send response */ ALERT_SEND_RESPONSE; } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java new file mode 100644 index 0000000000..da56b0dc6b --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.alert; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + +import java.io.Serializable; + +public class AlertSendRequestCommand implements Serializable { + + private int groupId; + + private String title; + + private String content; + + public int getGroupId() { + return groupId; + } + + public void setGroupId(int groupId) { + this.groupId = groupId; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } + + public AlertSendRequestCommand(){ + + } + + public AlertSendRequestCommand(int groupId, String title, String content) { + this.groupId = groupId; + this.title = title; + this.content = content; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.ALERT_SEND_REQUEST); + byte[] body = JsonSerializer.serialize(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java new file mode 100644 index 0000000000..fbde4cfe51 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.alert; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.spi.alert.AlertResult; + +import java.io.Serializable; +import java.util.List; + +public class AlertSendResponseCommand implements Serializable { + + /** + * true:All alert are successful, + * false:As long as one alert fails + */ + private boolean alertStatus; + + private List alertResults; + + public boolean getAlertStatus() { + return alertStatus; + } + + public void setAlertStatus(boolean alertStatus) { + this.alertStatus = alertStatus; + } + + public List getAlertResults() { + return alertResults; + } + + public void setAlertResults(List alertResults) { + this.alertResults = alertResults; + } + + public AlertSendResponseCommand() { + + } + + public AlertSendResponseCommand(boolean alertStatus, List alertResults) { + this.alertStatus = alertStatus; + this.alertResults = alertResults; + } + + /** + * package response command + * + * @param opaque request unique identification + * @return command + */ + public Command convert2Command(long opaque) { + Command command = new Command(opaque); + command.setType(CommandType.ALERT_SEND_RESPONSE); + byte[] body = JsonSerializer.serialize(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java new file mode 100644 index 0000000000..9581413c39 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.alert; + +import org.apache.dolphinscheduler.remote.NettyRemotingClient; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AlertClientService { + + private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class); + + private final NettyClientConfig clientConfig; + + private final NettyRemotingClient client; + + private volatile boolean isRunning; + + /** + * request time out + */ + private static final long ALERT_REQUEST_TIMEOUT = 10 * 1000L; + + /** + * alert client + */ + public AlertClientService() { + this.clientConfig = new NettyClientConfig(); + this.client = new NettyRemotingClient(clientConfig); + this.isRunning = true; + } + + /** + * close + */ + public void close() { + this.client.close(); + this.isRunning = false; + logger.info("alter client closed"); + } + + /** + * alert sync send data + * @param host host + * @param port port + * @param groupId groupId + * @param title title + * @param content content + * @return AlertSendResponseCommand + */ + public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) { + logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title); + AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content); + final Host address = new Host(host, port); + try { + Command command = request.convert2Command(); + Command response = this.client.sendSync(address, command, ALERT_REQUEST_TIMEOUT); + if (response != null) { + AlertSendResponseCommand sendResponse = JsonSerializer.deserialize( + response.getBody(), AlertSendResponseCommand.class); + return sendResponse; + } + } catch (Exception e) { + logger.error("sync alert send error", e); + } finally { + this.client.closeChannel(address); + } + return null; + } + + public boolean isRunning() { + return isRunning; + } +} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java new file mode 100644 index 0000000000..98f2b14d54 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.alert; + +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +/** + * alert client service test + */ +public class AlertClientServiceTest { + + private static final Logger logger = LoggerFactory.getLogger(AlertClientServiceTest.class); + + + @Test + public void testSendAlert(){ + String host; + int port = 50501; + int groupId = 1; + String title = "test-title"; + String content = "test-content"; + AlertClientService alertClient = new AlertClientService(); + + // alter server does not exist + host = "128.0.10.1"; + AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); + Assert.assertNull(alertSendResponseCommand); + + host = "127.0.0.1"; + AlertSendResponseCommand alertSendResponseCommand_1 = alertClient.sendAlert(host, port, groupId, title, content); + + if (Objects.nonNull(alertClient) && alertClient.isRunning()) { + alertClient.close(); + } + + } +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java index a327d09403..f91db97651 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java @@ -38,4 +38,6 @@ public class AlertResult { public void setMessage(String message) { this.message = message; } + + }