[Plugin] Support apache cassandra

This commit is contained in:
qianmoQ 2023-06-06 22:58:42 +08:00
parent 8ed765177e
commit 26518270a7
25 changed files with 600 additions and 94 deletions

View File

@ -181,6 +181,9 @@ Here are some of the major database solutions that are supported:
</a>&nbsp;
<a href="https://docs.pinot.apache.org/" target="_blank" class="connector-logo-index">
<img src="docs/docs/assets/plugin/pinot.png" alt="Apache Pinot" height=60" />
</a>&nbsp;
<a href="https://cassandra.apache.org/" target="_blank" class="connector-logo-index">
<img src="docs/docs/assets/plugin/cassandra.png" alt="Apache Cassandra" height=60" />
</a>
</p>

View File

@ -410,6 +410,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-plugin-cassandra</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- Executor -->
<dependency>
<groupId>io.edurt.datacap</groupId>

View File

@ -0,0 +1,26 @@
name: Apache Cassandra
supportTime: '2023-06-07'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 9042
message: port is a required field, please be sure to enter
- field: database
type: String
required: true
value: datacenter
message: database is a required field, please be sure to enter
group: advanced

View File

@ -12,11 +12,14 @@ import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.PrePersist;
import javax.persistence.PreUpdate;
import javax.persistence.Table;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.sql.Timestamp;
import java.time.LocalDateTime;
@Entity
@Data
@ -72,4 +75,16 @@ public class TemplateSqlEntity
@Column(name = "update_time", columnDefinition = "timestamp not null default current_timestamp")
private Timestamp updateTime;
@PrePersist
void prePersist()
{
createTime = Timestamp.valueOf(LocalDateTime.now());
}
@PreUpdate
void preUpdate()
{
updateTime = Timestamp.valueOf(LocalDateTime.now());
}
}

View File

@ -283,3 +283,25 @@ VALUES ('datacap', '$2a$10$bZ4XBRlYUjKfkBovWT9TuuXlEF7lpRxVrXS8iqyCjCHUqy4RPTL8.
-- --------------------------------
alter table `audit_plugin`
add column `count` bigint default 0;
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllDatabase', 'SELECT keyspace_name AS name
FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllTablesFromDatabase', 'SELECT
table_name AS name
FROM
system_schema.tables
WHERE
keyspace_name = ''${database:String}''
GROUP BY
table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT
column_name
FROM
system_schema.columns
WHERE
keyspace_name = ''${database:String}''
and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra',
'[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1);

View File

@ -35,3 +35,25 @@ where url = '/admin/menu';
update `menus`
set url = '/system/user'
where url = '/admin/user';
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllDatabase', 'SELECT keyspace_name AS name
FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllTablesFromDatabase', 'SELECT
table_name AS name
FROM
system_schema.tables
WHERE
keyspace_name = ''${database:String}''
GROUP BY
table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT
column_name
FROM
system_schema.columns
WHERE
keyspace_name = ''${database:String}''
and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra',
'[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1);

View File

@ -770,3 +770,25 @@ values ('2', '7'),
-- --------------------------------
alter table `audit_plugin`
add column `count` bigint(20) default 0;
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllDatabase', 'SELECT keyspace_name AS name
FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllTablesFromDatabase', 'SELECT
table_name AS name
FROM
system_schema.tables
WHERE
keyspace_name = ''${database:String}''
GROUP BY
table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1);
INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`)
VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT
column_name
FROM
system_schema.columns
WHERE
keyspace_name = ''${database:String}''
and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra',
'[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1);

View File

@ -1,8 +1,17 @@
package io.edurt.datacap.spi.adapter;
import io.edurt.datacap.spi.FormatType;
import io.edurt.datacap.spi.formatter.FormatterFactory;
import io.edurt.datacap.spi.model.Response;
import java.util.List;
public interface Adapter
{
Response handlerExecute(String content);
default Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
{
return FormatterFactory.createFormatter(format, headers, columns).formatter();
}
}

View File

@ -22,7 +22,7 @@ public class HttpAdapter
this.httpConnection = httpConnection;
}
protected Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
public Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
{
return FormatterFactory.createFormatter(format, headers, columns).formatter();
}

View File

@ -33,7 +33,7 @@ public class JdbcAdapter
this.jdbcConnection = jdbcConnection;
}
protected Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
public Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
{
return FormatterFactory.createFormatter(format, headers, columns).formatter();
}

View File

@ -33,7 +33,7 @@ public class NativeAdapter
this.parser = parser;
}
protected Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
public Object handlerFormatter(FormatType format, List<String> headers, List<Object> columns)
{
return FormatterFactory.createFormatter(format, headers, columns).formatter();
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

View File

@ -212,6 +212,9 @@ Datacap is fast, lightweight, intuitive system.
</a>&nbsp;
<a href="https://docs.pinot.apache.org/" target="_blank" class="connector-logo-index">
<img src="/assets/plugin/pinot.png" alt="Apache Pinot" height=60" />
</a>&nbsp;
<a href="https://cassandra.apache.org/" target="_blank" class="connector-logo-index">
<img src="/assets/plugin/cassandra.png" alt="Apache Cassandra" height=60" />
</a>
</p>

View File

@ -212,6 +212,9 @@ Datacap 是快速、轻量级、直观的系统。
</a>&nbsp;
<a href="https://docs.pinot.apache.org/" target="_blank" class="connector-logo-index">
<img src="/assets/plugin/pinot.png" alt="Apache Pinot" height=60" />
</a>&nbsp;
<a href="https://cassandra.apache.org/" target="_blank" class="connector-logo-index">
<img src="/assets/plugin/cassandra.png" alt="Apache Cassandra" height=60" />
</a>
</p>

View File

@ -0,0 +1,53 @@
---
title: Apache Cassandra
status: 1.11.0
---
<img src="/assets/plugin/cassandra.png" class="connector-logo" style="width: 100px;" />
#### What is Cassandra ?
Apache Cassandra® powers mission-critical deployments with improved performance and unparalleled levels of scale in the cloud.
#### Environment
---
!!! note
If you need to use this data source, you need to upgrade the DataCap service to >= `1.11.x`
Support Time: `2023-06-07`
#### Configure
---
!!! note
If your plugin service version requires other special configurations, please refer to modifying the configuration file and restarting the DataCap service.
=== "Configure"
| Field | Required | Default Value |
|:------:|:---------------------------------:|:-------------:|
| `Name` | :material-check-circle: { .red } | - |
| `Host` | :material-check-circle: { .red } | `127.0.0.1` |
| `Port` | :material-check-circle: { .red } | `9042` |
=== "Advanced"
| Field | Required | Default Value |
|:----------:|:--------------------------------:|:-------------:|
| `Database` | :material-check-circle: { .red } | `datacenter` |
#### Version (Validation)
---
!!! warning
The online service has not been tested yet, if you have detailed test results, please submit [issues](https://github.com/EdurtIO/datacap/issues/new/choose) to us
- [x] `0.4.x`

View File

@ -64,6 +64,7 @@ extra:
status:
new: Recently added
1.10.0: 1.10.0
1.11.0: 1.11.0
alternate:
- name: English
link: /en/
@ -159,6 +160,7 @@ nav:
- Sql: reference/admin/template/sql/home.md
- Connecting to connectors:
- reference/connectors/index.md
- Apache Cassandra: reference/connectors/cassandra.md
- JDBC:
- Hologres: reference/connectors/jdbc/hologres.md
- StarRocks: reference/connectors/jdbc/starrocks.md

View File

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap</artifactId>
<version>1.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>datacap-plugin-cassandra</artifactId>
<description>DataCap - Apache Cassandra</description>
<properties>
<plugin.name>plugin-cassandra</plugin.name>
<cassandra.driver.version>4.16.0</cassandra.driver.version>
</properties>
<dependencies>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${cassandra.driver.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${assembly-plugin.version}</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>${plugin.name}</finalName>
<descriptors>
<descriptor>../../configure/assembly/plugin.xml</descriptor>
</descriptors>
<outputDirectory>../../dist/plugins</outputDirectory>
</configuration>
</plugin>
<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,66 @@
package io.edurt.datacap.plugin.cassandra
import com.datastax.oss.driver.api.core.cql.ResultSet
import io.edurt.datacap.spi.adapter.Adapter
import io.edurt.datacap.spi.model.Configure
import io.edurt.datacap.spi.model.Response
import io.edurt.datacap.spi.model.Time
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import java.lang.Boolean
import java.util.*
import kotlin.Any
import kotlin.Exception
import kotlin.String
class CassandraAdapter : Adapter {
private val log: Logger = getLogger(CassandraAdapter::class.java)
private var connection: CassandraConnection? = null
constructor(connection: CassandraConnection?) : super() {
this.connection = connection
}
override fun handlerExecute(content: String?): Response {
val processorTime = Time()
processorTime.start = Date().time
val response: Response = this.connection!!.response
val configure: Configure = this.connection!!.configure
if (response.isConnected) {
val headers: MutableList<String> = ArrayList()
val types: MutableList<String> = ArrayList()
val columns: MutableList<Any> = ArrayList()
try {
val resultSet: ResultSet = connection?.getSession()!!.execute(content!!)
var isPresent = true
resultSet.forEach { row ->
if (isPresent) {
row.columnDefinitions.forEach {
types.add(it.type.asCql(true, true))
headers.add(it.name.asCql(true))
}
isPresent = false
}
val _columns: MutableList<Any> = ArrayList()
headers.forEach {
_columns.add(row.getObject(it).toString())
}
columns.add(handlerFormatter(configure.format, headers, _columns))
}
response.isSuccessful = Boolean.TRUE
} catch (ex: Exception) {
log.error("Execute content failed content {} exception ", content, ex)
response.isSuccessful = Boolean.FALSE
response.message = ex.message
} finally {
response.headers = headers
response.types = types
response.columns = columns
}
}
processorTime.end = Date().time
response.processor = processorTime
return response
}
}

View File

@ -0,0 +1,44 @@
package io.edurt.datacap.plugin.cassandra
import com.datastax.oss.driver.api.core.CqlSession
import io.edurt.datacap.spi.connection.Connection
import io.edurt.datacap.spi.model.Configure
import io.edurt.datacap.spi.model.Response
import java.lang.Boolean
import java.net.InetSocketAddress
import kotlin.Exception
import kotlin.String
import kotlin.TODO
class CassandraConnection : Connection {
private var session: CqlSession? = null
constructor(configure: Configure, response: Response) : super(configure, response)
override fun formatJdbcUrl(): String {
return TODO("Provide the return value")
}
override fun openConnection(): java.sql.Connection? {
try {
this.session = CqlSession.builder()
.addContactPoint(InetSocketAddress(configure?.host, configure!!.port))
.withLocalDatacenter(configure.database.orElse("datacenter"))
.build()
response?.isConnected = Boolean.TRUE
} catch (ex: Exception) {
response?.isConnected = Boolean.FALSE
response?.message = ex.message
}
return null
}
fun getSession(): CqlSession? {
return session
}
override fun destroy() {
this.session?.close()
}
}

View File

@ -0,0 +1,22 @@
package io.edurt.datacap.plugin.cassandra
import com.google.inject.multibindings.Multibinder
import io.edurt.datacap.spi.AbstractPluginModule
import io.edurt.datacap.spi.Plugin
import io.edurt.datacap.spi.PluginModule
import io.edurt.datacap.spi.PluginType
class CassandraModule : AbstractPluginModule(), PluginModule {
override fun getType(): PluginType {
return PluginType.JDBC
}
override fun get(): AbstractPluginModule {
return this
}
override fun configure() {
Multibinder.newSetBinder(binder(), Plugin::class.java)
.addBinding().to(CassandraPlugin::class.java)
}
}

View File

@ -0,0 +1,52 @@
package io.edurt.datacap.plugin.cassandra
import io.edurt.datacap.spi.Plugin
import io.edurt.datacap.spi.model.Configure
import io.edurt.datacap.spi.model.Response
import org.apache.commons.beanutils.BeanUtils.copyProperties
import org.apache.commons.lang3.ObjectUtils.isNotEmpty
import org.slf4j.LoggerFactory.getLogger
class CassandraPlugin : Plugin {
private val log = getLogger(CassandraPlugin::class.java)
private var configure: Configure? = null
private var connection: CassandraConnection? = null
private var response: Response? = null
override fun validator(): String {
return "SELECT release_version AS version FROM system.local"
}
override fun connect(configure: Configure?) {
try {
log.info("Connecting to apache cassandra")
response = Response()
this.configure = Configure()
copyProperties(this.configure, configure)
connection = CassandraConnection(this.configure!!, response!!)
} catch (ex: Exception) {
response!!.isConnected = false
response!!.message = ex.message
}
}
override fun execute(content: String?): Response {
if (isNotEmpty(connection)) {
log.info("Apache cassandra connection established")
response = connection?.response
val processor = CassandraAdapter(connection)
response = processor.handlerExecute(content)
log.info("Apache cassandra execution completed")
}
destroy()
return response!!
}
override fun destroy() {
if (isNotEmpty(connection)) {
log.info("Apache cassandra driver destroyed")
connection?.destroy()
configure = null
}
}
}

View File

@ -0,0 +1 @@
io.edurt.datacap.plugin.cassandra.CassandraModule

View File

@ -0,0 +1,47 @@
package io.edurt.datacap.plugin.cassandra
import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.spi.Plugin
import io.edurt.datacap.spi.model.Configure
import io.edurt.datacap.spi.model.Response
import org.apache.commons.lang3.ObjectUtils
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import org.slf4j.LoggerFactory
class CassandraPluginTest {
private val log = LoggerFactory.getLogger(this::class.java)
private var injector: Injector? = null
private var configure: Configure? = null
@Before
fun before() {
injector = Guice.createInjector(CassandraModule())
configure = Configure()
configure?.host = "localhost"
configure?.port = 9042
}
@Test
fun test() {
val plugin: Plugin? = injector?.getInstance(Key.get(object : TypeLiteral<Set<Plugin?>?>() {}))
?.first { v -> v?.name().equals("Cassandra") }
if (ObjectUtils.isNotEmpty(plugin)) {
plugin?.connect(configure)
val sql = "SELECT keyspace_name FROM system_schema.keyspaces"
val response: Response = plugin!!.execute(sql)
log.info("================ plugin executed information =================")
if (!response.isSuccessful) {
log.error("Message: {}", response.message)
} else {
response.columns.forEach { column -> log.info(column.toString()) }
}
Assert.assertTrue(response.isSuccessful)
}
}
}

View File

@ -64,6 +64,7 @@
<module>plugin/datacap-jdbc-hologres</module>
<module>plugin/datacap-plugin-pinot</module>
<module>plugin/datacap-plugin-mongo-community</module>
<module>plugin/datacap-plugin-cassandra</module>
<module>executor/datacap-executor-example</module>
<module>executor/datacap-executor-seatunnel</module>
<module>shaded/datacap-shaded-ydb</module>