mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-29 18:58:05 +08:00
[Improvement][UT] Improve Worker registry coverage (#15380)
Co-authored-by: fuchanghai <changhaifu@apache.org> Co-authored-by: Eric Gao <ericgao.apache@gmail.com> Co-authored-by: Rick Cheng <rickchengx@gmail.com>
This commit is contained in:
parent
43a06525a2
commit
69676b445c
@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.registry;
|
|||||||
|
|
||||||
import org.apache.dolphinscheduler.registry.api.RegistryClient;
|
import org.apache.dolphinscheduler.registry.api.RegistryClient;
|
||||||
import org.apache.dolphinscheduler.registry.api.StrategyType;
|
import org.apache.dolphinscheduler.registry.api.StrategyType;
|
||||||
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
|
||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
@ -34,8 +33,6 @@ public class WorkerStopStrategy implements WorkerConnectStrategy {
|
|||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public RegistryClient registryClient;
|
public RegistryClient registryClient;
|
||||||
@Autowired
|
|
||||||
private WorkerConfig workerConfig;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
|
@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThread
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
||||||
|
import lombok.NonNull;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@ -52,6 +53,16 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private WorkerTaskExecutorThreadPool workerManagerThread;
|
private WorkerTaskExecutorThreadPool workerManagerThread;
|
||||||
|
|
||||||
|
public WorkerWaitingStrategy(@NonNull WorkerConfig workerConfig,
|
||||||
|
@NonNull RegistryClient registryClient,
|
||||||
|
@NonNull MessageRetryRunner messageRetryRunner,
|
||||||
|
@NonNull WorkerTaskExecutorThreadPool workerManagerThread) {
|
||||||
|
this.workerConfig = workerConfig;
|
||||||
|
this.registryClient = registryClient;
|
||||||
|
this.messageRetryRunner = messageRetryRunner;
|
||||||
|
this.workerManagerThread = workerManagerThread;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
try {
|
try {
|
||||||
|
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.registry;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.registry.api.ConnectionState;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* worker registry test
|
||||||
|
*/
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class WorkerConnectionStateListenerTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(WorkerConnectionStateListenerTest.class);
|
||||||
|
@InjectMocks
|
||||||
|
private WorkerConnectionStateListener workerConnectionStateListener;
|
||||||
|
@Mock
|
||||||
|
private WorkerConfig workerConfig;
|
||||||
|
@Mock
|
||||||
|
private WorkerConnectStrategy workerConnectStrategy;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerConnectionStateListener() {
|
||||||
|
workerConnectionStateListener.onUpdate(ConnectionState.CONNECTED);
|
||||||
|
|
||||||
|
workerConnectionStateListener.onUpdate(ConnectionState.RECONNECTED);
|
||||||
|
Mockito.verify(workerConnectStrategy, times(1)).reconnect();
|
||||||
|
|
||||||
|
workerConnectionStateListener.onUpdate(ConnectionState.SUSPENDED);
|
||||||
|
|
||||||
|
workerConnectionStateListener.onUpdate(ConnectionState.DISCONNECTED);
|
||||||
|
Mockito.verify(workerConnectStrategy, times(1)).disconnect();
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.registry;
|
|||||||
|
|
||||||
import static org.mockito.BDDMockito.given;
|
import static org.mockito.BDDMockito.given;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.IStoppable;
|
||||||
|
import org.apache.dolphinscheduler.common.model.Server;
|
||||||
import org.apache.dolphinscheduler.common.utils.NetUtils;
|
import org.apache.dolphinscheduler.common.utils.NetUtils;
|
||||||
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
|
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
|
||||||
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
|
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
|
||||||
@ -29,6 +31,9 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerServerLoadProtecti
|
|||||||
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
|
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@ -37,6 +42,8 @@ import org.mockito.InjectMocks;
|
|||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* worker registry test
|
* worker registry test
|
||||||
@ -44,26 +51,24 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
public class WorkerRegistryClientTest {
|
public class WorkerRegistryClientTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
private WorkerRegistryClient workerRegistryClient;
|
private WorkerRegistryClient workerRegistryClient;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private RegistryClient registryClient;
|
private RegistryClient registryClient;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private WorkerConfig workerConfig;
|
private WorkerConfig workerConfig;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private MetricsProvider metricsProvider;
|
private MetricsProvider metricsProvider;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private WorkerTaskExecutorThreadPool workerManagerThread;
|
private WorkerTaskExecutorThreadPool workerManagerThread;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private WorkerConnectStrategy workerConnectStrategy;
|
private WorkerConnectStrategy workerConnectStrategy;
|
||||||
|
@Mock
|
||||||
|
private IStoppable stoppable;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStart() {
|
public void testWorkerRegistryClientbasic() {
|
||||||
|
|
||||||
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
|
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
|
||||||
given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
|
given(workerConfig.getMaxHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
|
||||||
@ -75,16 +80,23 @@ public class WorkerRegistryClientTest {
|
|||||||
workerRegistryClient.initWorkRegistry();
|
workerRegistryClient.initWorkRegistry();
|
||||||
workerRegistryClient.start();
|
workerRegistryClient.start();
|
||||||
|
|
||||||
Assertions.assertTrue(true);
|
workerRegistryClient.setRegistryStoppable(stoppable);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnRegistry() {
|
public void testWorkerRegistryClientgetAlertServerAddress() {
|
||||||
|
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
|
||||||
}
|
.willReturn(new ArrayList<Server>());
|
||||||
|
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress(), Optional.empty());
|
||||||
@Test
|
Mockito.reset(registryClient);
|
||||||
public void testGetWorkerZkPaths() {
|
String host = "test";
|
||||||
|
Integer port = 1;
|
||||||
|
Server server = new Server();
|
||||||
|
server.setHost(host);
|
||||||
|
server.setPort(port);
|
||||||
|
given(registryClient.getServerList(Mockito.any(RegistryNodeType.class)))
|
||||||
|
.willReturn(new ArrayList<Server>(Arrays.asList(server)));
|
||||||
|
Assertions.assertEquals(workerRegistryClient.getAlertServerAddress().get().getAddress(),
|
||||||
|
String.format("%s:%d", host, port));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,187 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.registry;
|
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.BDDMockito.given;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.IStoppable;
|
||||||
|
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleException;
|
||||||
|
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
|
||||||
|
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
|
||||||
|
import org.apache.dolphinscheduler.registry.api.RegistryClient;
|
||||||
|
import org.apache.dolphinscheduler.registry.api.RegistryException;
|
||||||
|
import org.apache.dolphinscheduler.registry.api.StrategyType;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockedStatic;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* worker registry test
|
||||||
|
*/
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
public class WorkerStrategyTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(WorkerStrategyTest.class);
|
||||||
|
@Mock
|
||||||
|
private RegistryClient registryClient;
|
||||||
|
@Mock
|
||||||
|
private IStoppable stoppable;
|
||||||
|
@Mock
|
||||||
|
private WorkerConfig workerConfig;
|
||||||
|
@Mock
|
||||||
|
private WorkerRpcServer workerRpcServer;
|
||||||
|
@Mock
|
||||||
|
private MessageRetryRunner messageRetryRunner;
|
||||||
|
@Mock
|
||||||
|
private WorkerTaskExecutorThreadPool workerManagerThread;
|
||||||
|
@Mock
|
||||||
|
private ConnectStrategyProperties connectStrategyProperties;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerStopStrategy() {
|
||||||
|
given(registryClient.getStoppable())
|
||||||
|
.willReturn(stoppable);
|
||||||
|
WorkerStopStrategy workerStopStrategy = new WorkerStopStrategy();
|
||||||
|
workerStopStrategy.registryClient = registryClient;
|
||||||
|
workerStopStrategy.reconnect();
|
||||||
|
workerStopStrategy.disconnect();
|
||||||
|
Assertions.assertEquals(workerStopStrategy.getStrategyType(), StrategyType.STOP);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerWaitingStrategyreconnect() {
|
||||||
|
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
|
||||||
|
workerConfig,
|
||||||
|
registryClient,
|
||||||
|
messageRetryRunner,
|
||||||
|
workerManagerThread);
|
||||||
|
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.isRunning())
|
||||||
|
.thenReturn(true);
|
||||||
|
workerWaitingStrategy.reconnect();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
doNothing().when(stoppable).stop(anyString());
|
||||||
|
given(registryClient.getStoppable())
|
||||||
|
.willReturn(stoppable);
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
|
||||||
|
.thenThrow(new ServerLifeCycleException(""));
|
||||||
|
workerWaitingStrategy.reconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.recoverFromWaiting())
|
||||||
|
.thenAnswer(invocation -> null);
|
||||||
|
workerWaitingStrategy.reconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWorkerWaitingStrategydisconnect() {
|
||||||
|
WorkerWaitingStrategy workerWaitingStrategy = new WorkerWaitingStrategy(
|
||||||
|
workerConfig,
|
||||||
|
registryClient,
|
||||||
|
messageRetryRunner,
|
||||||
|
workerManagerThread);
|
||||||
|
Assertions.assertEquals(workerWaitingStrategy.getStrategyType(), StrategyType.WAITING);
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
doNothing().when(stoppable).stop(anyString());
|
||||||
|
given(registryClient.getStoppable())
|
||||||
|
.willReturn(stoppable);
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.toWaiting())
|
||||||
|
.thenThrow(new ServerLifeCycleException(""));
|
||||||
|
workerWaitingStrategy.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
|
||||||
|
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
|
||||||
|
Mockito.reset(registryClient);
|
||||||
|
doNothing().when(registryClient).connectUntilTimeout(any());
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.toWaiting())
|
||||||
|
.thenAnswer(invocation -> null);
|
||||||
|
workerWaitingStrategy.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
given(connectStrategyProperties.getMaxWaitingTime()).willReturn(Duration.ofSeconds(1));
|
||||||
|
given(workerConfig.getRegistryDisconnectStrategy()).willReturn(connectStrategyProperties);
|
||||||
|
Mockito.reset(registryClient);
|
||||||
|
doNothing().when(stoppable).stop(anyString());
|
||||||
|
given(registryClient.getStoppable())
|
||||||
|
.willReturn(stoppable);
|
||||||
|
Mockito.doThrow(new RegistryException("TEST")).when(registryClient).connectUntilTimeout(any());
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.toWaiting())
|
||||||
|
.thenAnswer(invocation -> null);
|
||||||
|
workerWaitingStrategy.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
try (
|
||||||
|
MockedStatic<ServerLifeCycleManager> serverLifeCycleManagerMockedStatic =
|
||||||
|
Mockito.mockStatic(ServerLifeCycleManager.class)) {
|
||||||
|
Mockito.reset(workerConfig);
|
||||||
|
given(workerConfig.getRegistryDisconnectStrategy()).willThrow(new NullPointerException(""));
|
||||||
|
doNothing().when(stoppable).stop(anyString());
|
||||||
|
given(registryClient.getStoppable())
|
||||||
|
.willReturn(stoppable);
|
||||||
|
serverLifeCycleManagerMockedStatic
|
||||||
|
.when(() -> ServerLifeCycleManager.toWaiting())
|
||||||
|
.thenAnswer(invocation -> null);
|
||||||
|
workerWaitingStrategy.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user