mirror of
https://gitee.com/devlive-community/datacap.git
synced 2024-12-02 03:57:35 +08:00
[Plugin] Support kafka for native
This commit is contained in:
parent
9c2e3bb95d
commit
6bcffa9b90
@ -143,6 +143,9 @@ Here are some of the major database solutions that are supported:
|
||||
</a>
|
||||
<a href="https://www.alibabacloud.com/zh/product/object-storage-service" target="_blank">
|
||||
<img src="assets/plugin/alioss.png" alt="Aliyun OSS" height="50" />
|
||||
</a>
|
||||
<a href="https://kafka.apache.org" target="_blank">
|
||||
<img src="assets/plugin/kafka.png" alt="Apache Kafka" height="50" />
|
||||
</a>
|
||||
</p>
|
||||
|
||||
|
BIN
assets/plugin/kafka.png
Normal file
BIN
assets/plugin/kafka.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 6.5 KiB |
@ -274,6 +274,11 @@
|
||||
<artifactId>datacap-native-alioss</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.edurt.datacap</groupId>
|
||||
<artifactId>datacap-native-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -0,0 +1,19 @@
|
||||
{
|
||||
"name": "Kafka",
|
||||
"supportTime": "2023-03-06",
|
||||
"configures": [
|
||||
{
|
||||
"field": "name",
|
||||
"type": "String",
|
||||
"required": true,
|
||||
"message": "name is a required field, please be sure to enter"
|
||||
},
|
||||
{
|
||||
"field": "host",
|
||||
"type": "String",
|
||||
"required": true,
|
||||
"value": "127.0.0.1:9092",
|
||||
"message": "host is a required field, please be sure to enter"
|
||||
}
|
||||
]
|
||||
}
|
Binary file not shown.
After Width: | Height: | Size: 2.7 KiB |
71
plugin/datacap-native-kafka/pom.xml
Normal file
71
plugin/datacap-native-kafka/pom.xml
Normal file
@ -0,0 +1,71 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>io.edurt.datacap</groupId>
|
||||
<artifactId>datacap</artifactId>
|
||||
<version>1.7.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>datacap-native-kafka</artifactId>
|
||||
<description>DataCap - Kafka</description>
|
||||
|
||||
<properties>
|
||||
<kafka.version>2.8.0</kafka.version>
|
||||
<plugin.name>native-kafka</plugin.name>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.edurt.datacap</groupId>
|
||||
<artifactId>datacap-spi</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.edurt.datacap</groupId>
|
||||
<artifactId>datacap-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.edurt.datacap</groupId>
|
||||
<artifactId>datacap-parser</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-assembly-plugin</artifactId>
|
||||
<version>${assembly-plugin.version}</version>
|
||||
<configuration>
|
||||
<finalName>${plugin.name}</finalName>
|
||||
<descriptors>
|
||||
<descriptor>../../configure/assembly/assembly-plugin.xml</descriptor>
|
||||
</descriptors>
|
||||
<outputDirectory>../../dist/plugins/${plugin.name}</outputDirectory>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>make-assembly</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>single</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
@ -0,0 +1,147 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||
import io.edurt.datacap.spi.adapter.NativeAdapter;
|
||||
import io.edurt.datacap.spi.model.Configure;
|
||||
import io.edurt.datacap.spi.model.Response;
|
||||
import io.edurt.datacap.spi.model.Time;
|
||||
import io.edurt.datacap.sql.SqlBase;
|
||||
import io.edurt.datacap.sql.SqlBaseToken;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
||||
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", "REC_CATCH_EXCEPTION"},
|
||||
justification = "I prefer to suppress these FindBugs warnings")
|
||||
public class KafkaAdapter
|
||||
extends NativeAdapter
|
||||
{
|
||||
protected KafkaConnection kafkaConnection;
|
||||
private final KafkaParser parser;
|
||||
|
||||
public KafkaAdapter(KafkaConnection kafkaConnection, KafkaParser parser)
|
||||
{
|
||||
super(kafkaConnection, parser);
|
||||
this.kafkaConnection = kafkaConnection;
|
||||
this.parser = parser;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response handlerExecute(String content)
|
||||
{
|
||||
Time processorTime = new Time();
|
||||
processorTime.setStart(new Date().getTime());
|
||||
Response response = this.kafkaConnection.getResponse();
|
||||
Configure configure = this.kafkaConnection.getConfigure();
|
||||
if (response.getIsConnected()) {
|
||||
List<String> headers = new ArrayList<>();
|
||||
List<String> types = new ArrayList<>();
|
||||
List<Object> columns = new ArrayList<>();
|
||||
try {
|
||||
SqlBase sqlBase = this.parser.getSqlBase();
|
||||
if (sqlBase.isSuccessful()) {
|
||||
AdminClient client = this.kafkaConnection.getClient();
|
||||
if (ObjectUtils.isNotEmpty(this.parser.getSqlBase().getColumns())) {
|
||||
headers.addAll(this.parser.getSqlBase().getColumns());
|
||||
}
|
||||
else {
|
||||
headers.add("*");
|
||||
}
|
||||
types.add("String");
|
||||
this.adapter(client, sqlBase)
|
||||
.forEach(column -> columns.add(handlerFormatter(configure.getFormat(), headers, Collections.singletonList(column))));
|
||||
response.setIsSuccessful(Boolean.TRUE);
|
||||
}
|
||||
else {
|
||||
Preconditions.checkArgument(!sqlBase.isSuccessful(), sqlBase.getMessage());
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error("Execute content failed content {} exception ", content, ex);
|
||||
response.setIsSuccessful(Boolean.FALSE);
|
||||
response.setMessage(ex.getMessage());
|
||||
}
|
||||
finally {
|
||||
response.setHeaders(headers);
|
||||
response.setTypes(types);
|
||||
response.setColumns(columns);
|
||||
}
|
||||
}
|
||||
processorTime.setEnd(new Date().getTime());
|
||||
response.setProcessor(processorTime);
|
||||
return response;
|
||||
}
|
||||
|
||||
private List<String> adapter(AdminClient client, SqlBase info)
|
||||
{
|
||||
List<String> array = new ArrayList<>();
|
||||
if (info.getToken() == SqlBaseToken.SHOW) {
|
||||
if (info.getChildToken() == SqlBaseToken.TOPICS) {
|
||||
this.adapterShowTopics(client, array);
|
||||
}
|
||||
else if (info.getChildToken() == SqlBaseToken.CONSUMERS) {
|
||||
this.adapterShowConsumers(client, info, array);
|
||||
}
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
private void adapterShowTopics(AdminClient client, List<String> array)
|
||||
{
|
||||
try {
|
||||
client.listTopics()
|
||||
.listings()
|
||||
.get()
|
||||
.forEach(v -> array.add(v.name()));
|
||||
}
|
||||
catch (Exception e) {
|
||||
Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
private void adapterShowConsumers(AdminClient client, SqlBase info, List<String> array)
|
||||
{
|
||||
try {
|
||||
if (StringUtils.isNotEmpty(info.getTable())) {
|
||||
client.listConsumerGroups()
|
||||
.all()
|
||||
.get()
|
||||
.parallelStream()
|
||||
.forEach(v -> {
|
||||
try {
|
||||
DescribeConsumerGroupsResult describeConsumerGroupsResult = client.describeConsumerGroups(Collections.singleton(v.groupId()));
|
||||
ConsumerGroupDescription consumerGroupDescription = describeConsumerGroupsResult.all().get().get(v.groupId());
|
||||
if (consumerGroupDescription.members().stream().anyMatch(member ->
|
||||
member.assignment().topicPartitions().stream().anyMatch(tp ->
|
||||
tp.topic().equals(info.getTable().replace("`", ""))))) {
|
||||
array.add(v.groupId());
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
client.listConsumerGroups()
|
||||
.all()
|
||||
.get()
|
||||
.forEach(v -> array.add(v.groupId()));
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import io.edurt.datacap.spi.connection.Connection;
|
||||
import io.edurt.datacap.spi.model.Configure;
|
||||
import io.edurt.datacap.spi.model.Response;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
@Slf4j
|
||||
public class KafkaConnection
|
||||
extends Connection
|
||||
{
|
||||
private Configure configure;
|
||||
private Response response;
|
||||
|
||||
@Getter
|
||||
private AdminClient client;
|
||||
|
||||
public KafkaConnection(Configure configure, Response response)
|
||||
{
|
||||
super(configure, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String formatJdbcUrl()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected java.sql.Connection openConnection()
|
||||
{
|
||||
try {
|
||||
this.configure = getConfigure();
|
||||
this.response = getResponse();
|
||||
log.info("Connection url {}", formatJdbcUrl());
|
||||
Properties properties = new Properties();
|
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configure.getHost());
|
||||
this.client = AdminClient.create(properties);
|
||||
response.setIsConnected(Boolean.TRUE);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
log.error("Connection failed ", ex);
|
||||
response.setIsConnected(Boolean.FALSE);
|
||||
response.setMessage(ex.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
this.client.close();
|
||||
log.info("Connection close successful");
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import io.edurt.datacap.spi.parser.SqlParser;
|
||||
import io.edurt.datacap.sql.SqlBase;
|
||||
import io.edurt.datacap.sql.SqlBaseToken;
|
||||
|
||||
public class KafkaParser
|
||||
extends SqlParser
|
||||
{
|
||||
public KafkaParser(String content)
|
||||
{
|
||||
super(content);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExecuteContext()
|
||||
{
|
||||
SqlBase sqlBase = this.getSqlBase();
|
||||
if (sqlBase.getToken() == SqlBaseToken.SHOW) {
|
||||
return sqlBase.getTable();
|
||||
}
|
||||
else if (sqlBase.getToken() == SqlBaseToken.SELECT) {
|
||||
return sqlBase.getTable();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import io.edurt.datacap.spi.Plugin;
|
||||
import io.edurt.datacap.spi.PluginType;
|
||||
import io.edurt.datacap.spi.adapter.Adapter;
|
||||
import io.edurt.datacap.spi.connection.JdbcConfigure;
|
||||
import io.edurt.datacap.spi.model.Configure;
|
||||
import io.edurt.datacap.spi.model.Response;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.beanutils.BeanUtils;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
|
||||
@Slf4j
|
||||
public class KafkaPlugin
|
||||
implements Plugin
|
||||
{
|
||||
private Configure configure;
|
||||
private KafkaConnection connection;
|
||||
private Response response;
|
||||
|
||||
@Override
|
||||
public String validator()
|
||||
{
|
||||
return "SHOW TOPICS";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name()
|
||||
{
|
||||
return "Kafka";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description()
|
||||
{
|
||||
return "Integrate Kafka data sources";
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginType type()
|
||||
{
|
||||
return PluginType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(Configure configure)
|
||||
{
|
||||
try {
|
||||
this.response = new Response();
|
||||
this.configure = new JdbcConfigure();
|
||||
BeanUtils.copyProperties(this.configure, configure);
|
||||
this.connection = new KafkaConnection(this.configure, this.response);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
this.response.setIsConnected(Boolean.FALSE);
|
||||
this.response.setMessage(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Response execute(String content)
|
||||
{
|
||||
if (ObjectUtils.isNotEmpty(this.connection)) {
|
||||
log.info("Execute kafka plugin logic started");
|
||||
this.response = this.connection.getResponse();
|
||||
Adapter processor = new KafkaAdapter(this.connection, new KafkaParser(content));
|
||||
this.response = processor.handlerExecute(content);
|
||||
log.info("Execute kafka plugin logic end");
|
||||
}
|
||||
return this.response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy()
|
||||
{
|
||||
if (ObjectUtils.isNotEmpty(this.connection)) {
|
||||
this.connection.destroy();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import com.google.inject.multibindings.Multibinder;
|
||||
import io.edurt.datacap.spi.AbstractPluginModule;
|
||||
import io.edurt.datacap.spi.Plugin;
|
||||
import io.edurt.datacap.spi.PluginModule;
|
||||
import io.edurt.datacap.spi.PluginType;
|
||||
|
||||
public class KafkaPluginModule
|
||||
extends AbstractPluginModule
|
||||
implements PluginModule
|
||||
{
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return "Kafka";
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginType getType()
|
||||
{
|
||||
return PluginType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractPluginModule get()
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void configure()
|
||||
{
|
||||
Multibinder<String> module = Multibinder.newSetBinder(this.binder(), String.class);
|
||||
module.addBinding().toInstance(this.getClass().getSimpleName());
|
||||
Multibinder<Plugin> plugin = Multibinder.newSetBinder(this.binder(), Plugin.class);
|
||||
plugin.addBinding().to(KafkaPlugin.class);
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
io.edurt.datacap.plugin.natived.kafka.KafkaPluginModule
|
@ -0,0 +1,30 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import io.edurt.datacap.spi.Plugin;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public class KafkaPluginModuleTest
|
||||
{
|
||||
private Injector injector;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
this.injector = Guice.createInjector(new KafkaPluginModule());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test()
|
||||
{
|
||||
Set<Plugin> plugins = injector.getInstance(Key.get(new TypeLiteral<Set<Plugin>>() {}));
|
||||
Assert.assertTrue(plugins.size() > 0);
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package io.edurt.datacap.plugin.natived.kafka;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import io.edurt.datacap.spi.Plugin;
|
||||
import io.edurt.datacap.spi.model.Configure;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class KafkaPluginTest
|
||||
{
|
||||
private Injector injector;
|
||||
private Configure configure;
|
||||
|
||||
@Before
|
||||
public void before()
|
||||
{
|
||||
injector = Guice.createInjector(new KafkaPluginModule());
|
||||
configure = new Configure();
|
||||
configure.setHost("localhost:9092");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test()
|
||||
{
|
||||
Set<Plugin> plugins = injector.getInstance(Key.get(new TypeLiteral<Set<Plugin>>() {}));
|
||||
Optional<Plugin> pluginOptional = plugins.stream()
|
||||
.filter(v -> v.name().equalsIgnoreCase("Kafka"))
|
||||
.findFirst();
|
||||
if (pluginOptional.isPresent()) {
|
||||
Plugin plugin = pluginOptional.get();
|
||||
plugin.connect(configure);
|
||||
Assert.assertNotNull(plugin.execute(plugin.validator()).getConnection());
|
||||
plugin.destroy();
|
||||
}
|
||||
}
|
||||
}
|
1
pom.xml
1
pom.xml
@ -18,6 +18,7 @@
|
||||
<module>plugin/datacap-native-alioss</module>
|
||||
<module>plugin/datacap-native-zookeeper</module>
|
||||
<module>plugin/datacap-native-redis</module>
|
||||
<module>plugin/datacap-native-kafka</module>
|
||||
<module>plugin/datacap-http-cratedb</module>
|
||||
<module>plugin/datacap-http-clickhouse</module>
|
||||
<module>plugin/datacap-jdbc-clickhouse</module>
|
||||
|
Loading…
Reference in New Issue
Block a user