[Plugin] [Kafka] Perfect test case (#303)

This commit is contained in:
qianmoQ 2023-03-23 22:46:31 +08:00 committed by GitHub
commit 86618dcb49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 216 additions and 11 deletions

View File

@ -13,14 +13,14 @@ public class OptionalUtilsTest
@Test
public void isEmpty()
{
Assert.assertTrue(OptionalUtils.isEmpty(empty));
Assert.assertFalse(OptionalUtils.isEmpty(empty));
Assert.assertFalse(OptionalUtils.isEmpty(notEmpty));
}
@Test
public void isNotEmpty()
{
Assert.assertFalse(OptionalUtils.isNotEmpty(empty));
Assert.assertTrue(OptionalUtils.isNotEmpty(empty));
Assert.assertTrue(OptionalUtils.isNotEmpty(notEmpty));
}
}

View File

@ -47,7 +47,7 @@ hide:
<a href="https://github.com/EdurtIO/datacap" target="_blank" title="Join Us On GitHub" class="md-button md-button--primary">
Join Us On GitHub
</a>
<a href="http://139.198.158.103:9096/" target="_blank" title="View online examples" class="md-button md-button--primary">
<a href="http://try.datacap.edurt.io/" target="_blank" title="View online examples" class="md-button md-button--primary">
View online examples
</a>
<p/><p/><p/><p/>

View File

@ -6,6 +6,7 @@ import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -36,7 +37,7 @@ public class IgnitePluginTest
if (pluginOptional.isPresent()) {
Plugin plugin = pluginOptional.get();
plugin.connect(configure);
System.out.println(plugin.execute("SELECT TABLE_NAME FROM SYS.TABLES"));
Assert.assertNotNull(plugin.execute(plugin.validator()).getConnection());
plugin.destroy();
}
}

View File

@ -15,6 +15,7 @@
<properties>
<kafka.version>2.8.0</kafka.version>
<testcontainers.version>1.17.6</testcontainers.version>
<plugin.name>native-kafka</plugin.name>
</properties>
@ -41,6 +42,21 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,150 @@
package io.edurt.datacap.plugin.natived.kafka;
import com.github.dockerjava.api.command.InspectContainerResponse;
import lombok.SneakyThrows;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
public class KafkaContainer
extends GenericContainer<KafkaContainer>
{
private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("confluentinc/cp-kafka");
public static final int KAFKA_PORT = 9094;
public static final int ZOOKEEPER_PORT = 2181;
private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
protected String externalZookeeperConnect = null;
public KafkaContainer(final DockerImageName dockerImageName)
{
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
withExposedPorts(KAFKA_PORT);
// Use two listeners with different names, it will force Kafka to communicate with itself
// via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will try to use
// the advertised listener
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092");
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
withEnv("KAFKA_BROKER_ID", "1");
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
}
public KafkaContainer withEmbeddedZookeeper()
{
externalZookeeperConnect = null;
return self();
}
public KafkaContainer withExternalZookeeper(String connectString)
{
externalZookeeperConnect = connectString;
return self();
}
public String getBootstrapServers()
{
return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(), getMappedPort(KAFKA_PORT));
}
@Override
protected void configure()
{
withEnv(
"KAFKA_ADVERTISED_LISTENERS",
String.format(
"BROKER://%s:9092",
getNetwork() != null ? getNetworkAliases().get(1) : "localhost"));
String command = "#!/bin/bash\n";
if (externalZookeeperConnect != null) {
withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
}
else {
addExposedPort(ZOOKEEPER_PORT);
withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n";
command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n";
command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n";
command += "zookeeper-server-start zookeeper.properties &\n";
}
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
withCommand("sh", "-c", command);
}
@Override
@SneakyThrows
protected void containerIsStarted(InspectContainerResponse containerInfo)
{
String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo);
ExecResult result =
execInContainer(
"kafka-configs",
"--alter",
"--bootstrap-server",
brokerAdvertisedListener,
"--entity-type",
"brokers",
"--entity-name",
getEnvMap().get("KAFKA_BROKER_ID"),
"--add-config",
"advertised.listeners=["
+ String.join(",", getBootstrapServers(), brokerAdvertisedListener)
+ "]");
if (result.getExitCode() != 0) {
throw new IllegalStateException(result.toString());
}
}
protected String brokerAdvertisedListener(InspectContainerResponse containerInfo)
{
return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092");
}
public String getLinuxLocalIp()
{
String ip = "";
try {
Enumeration<NetworkInterface> networkInterfaces =
NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
ip = inetAddress.getHostAddress();
}
}
}
}
catch (SocketException ex) {
ex.printStackTrace();
}
return ip;
}
}

View File

@ -1,43 +1,76 @@
package io.edurt.datacap.plugin.natived.kafka;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import io.edurt.datacap.spi.model.Response;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
@Slf4j
public class KafkaPluginTest
{
private static final String KAFKA_HOST = "kafkaCluster";
private static final int KAFKA_PORT = 9093;
private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
private Network network;
private KafkaContainer kafkaContainer;
private Injector injector;
private Configure configure;
private Optional<Plugin> pluginOptional;
@Before
public void before()
{
network = Network.newNetwork();
kafkaContainer = new KafkaContainer(KAFKA_IMAGE_NAME)
.withNetwork(network)
.withNetworkAliases(KAFKA_HOST)
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME.asCanonicalNameString())));
;
kafkaContainer.setPortBindings(
Lists.newArrayList(String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
Startables.deepStart(Stream.of(kafkaContainer)).join();
log.info("Kafka container started");
injector = Guice.createInjector(new KafkaPluginModule());
configure = new Configure();
configure.setHost("localhost:9092");
configure.setHost(kafkaContainer.getBootstrapServers());
Set<Plugin> plugins = injector.getInstance(Key.get(new TypeLiteral<Set<Plugin>>() {}));
pluginOptional = plugins.stream()
.filter(v -> v.name().equalsIgnoreCase("Kafka"))
.findFirst();
}
@Test
public void test()
public void testValidator()
{
Set<Plugin> plugins = injector.getInstance(Key.get(new TypeLiteral<Set<Plugin>>() {}));
Optional<Plugin> pluginOptional = plugins.stream()
.filter(v -> v.name().equalsIgnoreCase("Kafka"))
.findFirst();
if (pluginOptional.isPresent()) {
Plugin plugin = pluginOptional.get();
plugin.connect(configure);
Assert.assertNotNull(plugin.execute(plugin.validator()).getConnection());
Response response = plugin.execute(plugin.validator());
plugin.destroy();
log.info("Kafka plugin validation response {}", response);
Assert.assertTrue(response.getIsSuccessful());
}
}
}

View File

@ -177,6 +177,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>findbugs</artifactId>