diff --git a/README.md b/README.md
index 857691e2..3c9bc2e5 100644
--- a/README.md
+++ b/README.md
@@ -143,6 +143,9 @@ Here are some of the major database solutions that are supported:
+
+
+
diff --git a/assets/plugin/kafka.png b/assets/plugin/kafka.png
new file mode 100644
index 00000000..7668eed9
Binary files /dev/null and b/assets/plugin/kafka.png differ
diff --git a/core/datacap-server/pom.xml b/core/datacap-server/pom.xml
index 7bad66bd..fd7a8418 100644
--- a/core/datacap-server/pom.xml
+++ b/core/datacap-server/pom.xml
@@ -274,6 +274,11 @@
datacap-native-alioss
${project.version}
+
+ io.edurt.datacap
+ datacap-native-kafka
+ ${project.version}
+
diff --git a/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json b/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json
new file mode 100644
index 00000000..5f9c4db7
--- /dev/null
+++ b/core/datacap-server/src/main/etc/conf/plugins/native/kafka.json
@@ -0,0 +1,19 @@
+{
+ "name": "Kafka",
+ "supportTime": "2023-03-06",
+ "configures": [
+ {
+ "field": "name",
+ "type": "String",
+ "required": true,
+ "message": "name is a required field, please be sure to enter"
+ },
+ {
+ "field": "host",
+ "type": "String",
+ "required": true,
+ "value": "127.0.0.1:9092",
+ "message": "host is a required field, please be sure to enter"
+ }
+ ]
+}
diff --git a/core/datacap-web/console-fe/public/static/images/plugin/Kafka.png b/core/datacap-web/console-fe/public/static/images/plugin/Kafka.png
new file mode 100644
index 00000000..72c6a292
Binary files /dev/null and b/core/datacap-web/console-fe/public/static/images/plugin/Kafka.png differ
diff --git a/plugin/datacap-native-kafka/pom.xml b/plugin/datacap-native-kafka/pom.xml
new file mode 100644
index 00000000..790f25ae
--- /dev/null
+++ b/plugin/datacap-native-kafka/pom.xml
@@ -0,0 +1,71 @@
+
+
+ 4.0.0
+
+ io.edurt.datacap
+ datacap
+ 1.7.0-SNAPSHOT
+ ../../pom.xml
+
+
+ datacap-native-kafka
+ DataCap - Kafka
+
+
+ 2.8.0
+ native-kafka
+
+
+
+
+ io.edurt.datacap
+ datacap-spi
+ provided
+
+
+ io.edurt.datacap
+ datacap-common
+
+
+ commons-beanutils
+ commons-beanutils
+
+
+ io.edurt.datacap
+ datacap-parser
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+
+
+
+ maven-assembly-plugin
+ ${assembly-plugin.version}
+
+ ${plugin.name}
+
+ ../../configure/assembly/assembly-plugin.xml
+
+ ../../dist/plugins/${plugin.name}
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java
new file mode 100644
index 00000000..10bae61c
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaAdapter.java
@@ -0,0 +1,147 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.edurt.datacap.spi.adapter.NativeAdapter;
+import io.edurt.datacap.spi.model.Configure;
+import io.edurt.datacap.spi.model.Response;
+import io.edurt.datacap.spi.model.Time;
+import io.edurt.datacap.sql.SqlBase;
+import io.edurt.datacap.sql.SqlBaseToken;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+@Slf4j
+@SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE", "REC_CATCH_EXCEPTION"},
+ justification = "I prefer to suppress these FindBugs warnings")
+public class KafkaAdapter
+ extends NativeAdapter
+{
+ protected KafkaConnection kafkaConnection;
+ private final KafkaParser parser;
+
+ public KafkaAdapter(KafkaConnection kafkaConnection, KafkaParser parser)
+ {
+ super(kafkaConnection, parser);
+ this.kafkaConnection = kafkaConnection;
+ this.parser = parser;
+ }
+
+ @Override
+ public Response handlerExecute(String content)
+ {
+ Time processorTime = new Time();
+ processorTime.setStart(new Date().getTime());
+ Response response = this.kafkaConnection.getResponse();
+ Configure configure = this.kafkaConnection.getConfigure();
+ if (response.getIsConnected()) {
+ List headers = new ArrayList<>();
+ List types = new ArrayList<>();
+ List columns = new ArrayList<>();
+ try {
+ SqlBase sqlBase = this.parser.getSqlBase();
+ if (sqlBase.isSuccessful()) {
+ AdminClient client = this.kafkaConnection.getClient();
+ if (ObjectUtils.isNotEmpty(this.parser.getSqlBase().getColumns())) {
+ headers.addAll(this.parser.getSqlBase().getColumns());
+ }
+ else {
+ headers.add("*");
+ }
+ types.add("String");
+ this.adapter(client, sqlBase)
+ .forEach(column -> columns.add(handlerFormatter(configure.getFormat(), headers, Collections.singletonList(column))));
+ response.setIsSuccessful(Boolean.TRUE);
+ }
+ else {
+ Preconditions.checkArgument(!sqlBase.isSuccessful(), sqlBase.getMessage());
+ }
+ }
+ catch (Exception ex) {
+ log.error("Execute content failed content {} exception ", content, ex);
+ response.setIsSuccessful(Boolean.FALSE);
+ response.setMessage(ex.getMessage());
+ }
+ finally {
+ response.setHeaders(headers);
+ response.setTypes(types);
+ response.setColumns(columns);
+ }
+ }
+ processorTime.setEnd(new Date().getTime());
+ response.setProcessor(processorTime);
+ return response;
+ }
+
+ private List adapter(AdminClient client, SqlBase info)
+ {
+ List array = new ArrayList<>();
+ if (info.getToken() == SqlBaseToken.SHOW) {
+ if (info.getChildToken() == SqlBaseToken.TOPICS) {
+ this.adapterShowTopics(client, array);
+ }
+ else if (info.getChildToken() == SqlBaseToken.CONSUMERS) {
+ this.adapterShowConsumers(client, info, array);
+ }
+ }
+ return array;
+ }
+
+ private void adapterShowTopics(AdminClient client, List array)
+ {
+ try {
+ client.listTopics()
+ .listings()
+ .get()
+ .forEach(v -> array.add(v.name()));
+ }
+ catch (Exception e) {
+ Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
+ }
+ }
+
+ private void adapterShowConsumers(AdminClient client, SqlBase info, List array)
+ {
+ try {
+ if (StringUtils.isNotEmpty(info.getTable())) {
+ client.listConsumerGroups()
+ .all()
+ .get()
+ .parallelStream()
+ .forEach(v -> {
+ try {
+ DescribeConsumerGroupsResult describeConsumerGroupsResult = client.describeConsumerGroups(Collections.singleton(v.groupId()));
+ ConsumerGroupDescription consumerGroupDescription = describeConsumerGroupsResult.all().get().get(v.groupId());
+ if (consumerGroupDescription.members().stream().anyMatch(member ->
+ member.assignment().topicPartitions().stream().anyMatch(tp ->
+ tp.topic().equals(info.getTable().replace("`", ""))))) {
+ array.add(v.groupId());
+ }
+ }
+ catch (Exception e) {
+ Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
+ }
+ });
+ }
+ else {
+ client.listConsumerGroups()
+ .all()
+ .get()
+ .forEach(v -> array.add(v.groupId()));
+ }
+ }
+ catch (Exception e) {
+ Preconditions.checkArgument(false, ExceptionUtils.getMessage(e));
+ }
+ }
+}
diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java
new file mode 100644
index 00000000..ae312c9f
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaConnection.java
@@ -0,0 +1,60 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import io.edurt.datacap.spi.connection.Connection;
+import io.edurt.datacap.spi.model.Configure;
+import io.edurt.datacap.spi.model.Response;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+
+import java.util.Properties;
+
+@Slf4j
+public class KafkaConnection
+ extends Connection
+{
+ private Configure configure;
+ private Response response;
+
+ @Getter
+ private AdminClient client;
+
+ public KafkaConnection(Configure configure, Response response)
+ {
+ super(configure, response);
+ }
+
+ @Override
+ protected String formatJdbcUrl()
+ {
+ return null;
+ }
+
+ @Override
+ protected java.sql.Connection openConnection()
+ {
+ try {
+ this.configure = getConfigure();
+ this.response = getResponse();
+ log.info("Connection url {}", formatJdbcUrl());
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configure.getHost());
+ this.client = AdminClient.create(properties);
+ response.setIsConnected(Boolean.TRUE);
+ }
+ catch (Exception ex) {
+ log.error("Connection failed ", ex);
+ response.setIsConnected(Boolean.FALSE);
+ response.setMessage(ex.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public void destroy()
+ {
+ this.client.close();
+ log.info("Connection close successful");
+ }
+}
diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java
new file mode 100644
index 00000000..2bf46863
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaParser.java
@@ -0,0 +1,27 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import io.edurt.datacap.spi.parser.SqlParser;
+import io.edurt.datacap.sql.SqlBase;
+import io.edurt.datacap.sql.SqlBaseToken;
+
+public class KafkaParser
+ extends SqlParser
+{
+ public KafkaParser(String content)
+ {
+ super(content);
+ }
+
+ @Override
+ public String getExecuteContext()
+ {
+ SqlBase sqlBase = this.getSqlBase();
+ if (sqlBase.getToken() == SqlBaseToken.SHOW) {
+ return sqlBase.getTable();
+ }
+ else if (sqlBase.getToken() == SqlBaseToken.SELECT) {
+ return sqlBase.getTable();
+ }
+ return null;
+ }
+}
diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java
new file mode 100644
index 00000000..fc4e7873
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPlugin.java
@@ -0,0 +1,80 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import io.edurt.datacap.spi.Plugin;
+import io.edurt.datacap.spi.PluginType;
+import io.edurt.datacap.spi.adapter.Adapter;
+import io.edurt.datacap.spi.connection.JdbcConfigure;
+import io.edurt.datacap.spi.model.Configure;
+import io.edurt.datacap.spi.model.Response;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.lang3.ObjectUtils;
+
+@Slf4j
+public class KafkaPlugin
+ implements Plugin
+{
+ private Configure configure;
+ private KafkaConnection connection;
+ private Response response;
+
+ @Override
+ public String validator()
+ {
+ return "SHOW TOPICS";
+ }
+
+ @Override
+ public String name()
+ {
+ return "Kafka";
+ }
+
+ @Override
+ public String description()
+ {
+ return "Integrate Kafka data sources";
+ }
+
+ @Override
+ public PluginType type()
+ {
+ return PluginType.NATIVE;
+ }
+
+ @Override
+ public void connect(Configure configure)
+ {
+ try {
+ this.response = new Response();
+ this.configure = new JdbcConfigure();
+ BeanUtils.copyProperties(this.configure, configure);
+ this.connection = new KafkaConnection(this.configure, this.response);
+ }
+ catch (Exception ex) {
+ this.response.setIsConnected(Boolean.FALSE);
+ this.response.setMessage(ex.getMessage());
+ }
+ }
+
+ @Override
+ public Response execute(String content)
+ {
+ if (ObjectUtils.isNotEmpty(this.connection)) {
+ log.info("Execute kafka plugin logic started");
+ this.response = this.connection.getResponse();
+ Adapter processor = new KafkaAdapter(this.connection, new KafkaParser(content));
+ this.response = processor.handlerExecute(content);
+ log.info("Execute kafka plugin logic end");
+ }
+ return this.response;
+ }
+
+ @Override
+ public void destroy()
+ {
+ if (ObjectUtils.isNotEmpty(this.connection)) {
+ this.connection.destroy();
+ }
+ }
+}
diff --git a/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java
new file mode 100644
index 00000000..73aba34b
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModule.java
@@ -0,0 +1,38 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import com.google.inject.multibindings.Multibinder;
+import io.edurt.datacap.spi.AbstractPluginModule;
+import io.edurt.datacap.spi.Plugin;
+import io.edurt.datacap.spi.PluginModule;
+import io.edurt.datacap.spi.PluginType;
+
+public class KafkaPluginModule
+ extends AbstractPluginModule
+ implements PluginModule
+{
+ @Override
+ public String getName()
+ {
+ return "Kafka";
+ }
+
+ @Override
+ public PluginType getType()
+ {
+ return PluginType.NATIVE;
+ }
+
+ @Override
+ public AbstractPluginModule get()
+ {
+ return this;
+ }
+
+ protected void configure()
+ {
+ Multibinder module = Multibinder.newSetBinder(this.binder(), String.class);
+ module.addBinding().toInstance(this.getClass().getSimpleName());
+ Multibinder plugin = Multibinder.newSetBinder(this.binder(), Plugin.class);
+ plugin.addBinding().to(KafkaPlugin.class);
+ }
+}
diff --git a/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule b/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule
new file mode 100644
index 00000000..00dadf17
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule
@@ -0,0 +1 @@
+io.edurt.datacap.plugin.natived.kafka.KafkaPluginModule
diff --git a/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java
new file mode 100644
index 00000000..0ca4d8ef
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginModuleTest.java
@@ -0,0 +1,30 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import io.edurt.datacap.spi.Plugin;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class KafkaPluginModuleTest
+{
+ private Injector injector;
+
+ @Before
+ public void before()
+ {
+ this.injector = Guice.createInjector(new KafkaPluginModule());
+ }
+
+ @Test
+ public void test()
+ {
+ Set plugins = injector.getInstance(Key.get(new TypeLiteral>() {}));
+ Assert.assertTrue(plugins.size() > 0);
+ }
+}
\ No newline at end of file
diff --git a/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java
new file mode 100644
index 00000000..c089963b
--- /dev/null
+++ b/plugin/datacap-native-kafka/src/test/java/io/edurt/datacap/plugin/natived/kafka/KafkaPluginTest.java
@@ -0,0 +1,43 @@
+package io.edurt.datacap.plugin.natived.kafka;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import io.edurt.datacap.spi.Plugin;
+import io.edurt.datacap.spi.model.Configure;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.Set;
+
+public class KafkaPluginTest
+{
+ private Injector injector;
+ private Configure configure;
+
+ @Before
+ public void before()
+ {
+ injector = Guice.createInjector(new KafkaPluginModule());
+ configure = new Configure();
+ configure.setHost("localhost:9092");
+ }
+
+ @Test
+ public void test()
+ {
+ Set plugins = injector.getInstance(Key.get(new TypeLiteral>() {}));
+ Optional pluginOptional = plugins.stream()
+ .filter(v -> v.name().equalsIgnoreCase("Kafka"))
+ .findFirst();
+ if (pluginOptional.isPresent()) {
+ Plugin plugin = pluginOptional.get();
+ plugin.connect(configure);
+ Assert.assertNotNull(plugin.execute(plugin.validator()).getConnection());
+ plugin.destroy();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index ae9069b8..82f57069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,6 +18,7 @@
plugin/datacap-native-alioss
plugin/datacap-native-zookeeper
plugin/datacap-native-redis
+ plugin/datacap-native-kafka
plugin/datacap-http-cratedb
plugin/datacap-http-clickhouse
plugin/datacap-jdbc-clickhouse