[FIX-10784][Bug] [ALERT-SERVER] FEISHU Plugin might block the whole alert process loop (#10888)

* closed 10784 [Bug] [ALERT-SERVER] FEISHU Plugin might block the whole alert process loop
This commit is contained in:
pinkhello 2022-07-13 14:27:44 +08:00 committed by GitHub
parent 427c58546c
commit 7a15877fa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 2 deletions

View File

@ -25,6 +25,8 @@ import org.springframework.stereotype.Component;
public final class AlertConfig {
private int port;
private int waitTimeout;
public int getPort() {
return port;
}
@ -32,4 +34,13 @@ public final class AlertConfig {
public void setPort(final int port) {
this.port = port;
}
public int getWaitTimeout() {
return waitTimeout;
}
public void setWaitTimeout(final int waitTimeout) {
this.waitTimeout = waitTimeout;
}
}

View File

@ -40,6 +40,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,10 +53,12 @@ public final class AlertSenderService extends Thread {
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
private final AlertConfig alertConfig;
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
this.alertConfig = alertConfig;
}
@Override
@ -221,9 +225,20 @@ public final class AlertSenderService extends Thread {
AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData);
alertInfo.setAlertParams(paramsMap);
int waitTimeout = alertConfig.getWaitTimeout();
AlertResult alertResult;
try {
alertResult = alertChannel.get().process(alertInfo);
if (waitTimeout <= 0) {
alertResult = alertChannel.get().process(alertInfo);
} else {
CompletableFuture<AlertResult> future =
CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo));
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
alertResult = new AlertResult("false", e.getMessage());
logger.error("send alert error alert data id :{},", alertData.getId(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
alertResult = new AlertResult("false", e.getMessage());
logger.error("send alert error alert data id :{},", alertData.getId(), e);

View File

@ -63,6 +63,9 @@ management:
alert:
port: 50052
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
metrics:
enabled: true

View File

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert.runner;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.alert.AlertConfig;
import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
@ -55,6 +56,8 @@ public class AlertSenderServiceTest {
private PluginDao pluginDao;
@Mock
private AlertPluginManager alertPluginManager;
@Mock
private AlertConfig alertConfig;
@InjectMocks
private AlertSenderService alertSenderService;
@ -73,6 +76,7 @@ public class AlertSenderServiceTest {
//1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
when(alertConfig.getWaitTimeout()).thenReturn(0);
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
@ -102,6 +106,7 @@ public class AlertSenderServiceTest {
AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(0);
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
@ -126,6 +131,7 @@ public class AlertSenderServiceTest {
alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName));
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(5000);
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus());

View File

@ -173,6 +173,9 @@ worker:
alert:
port: 50052
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
python-gateway:
# Weather enable python gateway server or not. The default value is true.