diff --git a/README.md b/README.md index b8c37b62..61e2d0d4 100644 --- a/README.md +++ b/README.md @@ -181,6 +181,9 @@ Here are some of the major database solutions that are supported:   Apache Pinot +   + + Apache Cassandra

diff --git a/core/datacap-server/pom.xml b/core/datacap-server/pom.xml index 6fa0c877..10fd7c31 100644 --- a/core/datacap-server/pom.xml +++ b/core/datacap-server/pom.xml @@ -410,6 +410,12 @@ ${project.version} provided + + io.edurt.datacap + datacap-plugin-cassandra + ${project.version} + provided + io.edurt.datacap diff --git a/core/datacap-server/src/main/etc/conf/plugins/jdbc/cassandra.yaml b/core/datacap-server/src/main/etc/conf/plugins/jdbc/cassandra.yaml new file mode 100644 index 00000000..7021a829 --- /dev/null +++ b/core/datacap-server/src/main/etc/conf/plugins/jdbc/cassandra.yaml @@ -0,0 +1,26 @@ +name: Apache Cassandra +supportTime: '2023-06-07' + +configures: + - field: name + type: String + required: true + message: name is a required field, please be sure to enter + - field: host + type: String + required: true + value: 127.0.0.1 + message: host is a required field, please be sure to enter + - field: port + type: Number + required: true + min: 1 + max: 65535 + value: 9042 + message: port is a required field, please be sure to enter + - field: database + type: String + required: true + value: datacenter + message: database is a required field, please be sure to enter + group: advanced diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/entity/TemplateSqlEntity.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/entity/TemplateSqlEntity.java index 1a3d0e7a..4573d4d8 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/entity/TemplateSqlEntity.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/entity/TemplateSqlEntity.java @@ -12,11 +12,14 @@ import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; +import javax.persistence.PrePersist; +import javax.persistence.PreUpdate; import javax.persistence.Table; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import java.sql.Timestamp; +import java.time.LocalDateTime; @Entity @Data @@ -72,4 +75,16 @@ public class TemplateSqlEntity @Column(name = "update_time", columnDefinition = "timestamp not null default current_timestamp") private Timestamp updateTime; + + @PrePersist + void prePersist() + { + createTime = Timestamp.valueOf(LocalDateTime.now()); + } + + @PreUpdate + void preUpdate() + { + updateTime = Timestamp.valueOf(LocalDateTime.now()); + } } diff --git a/core/datacap-server/src/main/resources/schema.sql b/core/datacap-server/src/main/resources/schema.sql index d76d82d9..e4598641 100644 --- a/core/datacap-server/src/main/resources/schema.sql +++ b/core/datacap-server/src/main/resources/schema.sql @@ -136,9 +136,9 @@ CREATE TABLE IF NOT EXISTS template_sql description text NULL, plugin varchar(50) NULL COMMENT 'Using plug-ins', configure text NULL COMMENT 'The template must use the configuration information in the key->value format', - create_time timestamp DEFAULT CURRENT_TIMESTAMP, - update_time timestamp DEFAULT CURRENT_TIMESTAMP, - `system` boolean NULL DEFAULT 0 + create_time timestamp DEFAULT CURRENT_TIMESTAMP, + update_time timestamp DEFAULT CURRENT_TIMESTAMP, + `system` boolean NULL DEFAULT 0 ); TRUNCATE TABLE template_sql; ALTER TABLE template_sql @@ -283,3 +283,25 @@ VALUES ('datacap', '$2a$10$bZ4XBRlYUjKfkBovWT9TuuXlEF7lpRxVrXS8iqyCjCHUqy4RPTL8. -- -------------------------------- alter table `audit_plugin` add column `count` bigint default 0; + +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllDatabase', 'SELECT keyspace_name AS name +FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllTablesFromDatabase', 'SELECT + table_name AS name +FROM + system_schema.tables +WHERE + keyspace_name = ''${database:String}'' +GROUP BY + table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT + column_name +FROM + system_schema.columns +WHERE + keyspace_name = ''${database:String}'' + and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra', + '[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1); diff --git a/core/datacap-server/src/main/schema/1.11.0/update.sql b/core/datacap-server/src/main/schema/1.11.0/update.sql index b8ca4e8f..d6070bc5 100644 --- a/core/datacap-server/src/main/schema/1.11.0/update.sql +++ b/core/datacap-server/src/main/schema/1.11.0/update.sql @@ -35,3 +35,25 @@ where url = '/admin/menu'; update `menus` set url = '/system/user' where url = '/admin/user'; + +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllDatabase', 'SELECT keyspace_name AS name +FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllTablesFromDatabase', 'SELECT + table_name AS name +FROM + system_schema.tables +WHERE + keyspace_name = ''${database:String}'' +GROUP BY + table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT + column_name +FROM + system_schema.columns +WHERE + keyspace_name = ''${database:String}'' + and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra', + '[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1); diff --git a/core/datacap-server/src/main/schema/datacap.sql b/core/datacap-server/src/main/schema/datacap.sql index e4f6c8c7..32ff8554 100644 --- a/core/datacap-server/src/main/schema/datacap.sql +++ b/core/datacap-server/src/main/schema/datacap.sql @@ -1,5 +1,5 @@ use -datacap; + datacap; -- -------------------------------- -- Table for audit_plugin @@ -7,18 +7,18 @@ datacap; create table audit_plugin ( id bigint auto_increment primary key, - state varchar(255) null, - create_time mediumtext null, - end_time mediumtext null, - plugin_id bigint not null, - content text null, - message text null, + state varchar(255) null, + create_time mediumtext null, + end_time mediumtext null, + plugin_id bigint not null, + content text null, + message text null, elapsed bigint default 0 null, - user_id bigint not null + user_id bigint not null ); create -fulltext index full_text_index_for_content + fulltext index full_text_index_for_content on audit_plugin (content); -- -------------------------------- @@ -27,14 +27,14 @@ fulltext index full_text_index_for_content create table functions ( id bigint auto_increment primary key, - name varchar(255) null comment 'Function name', - content varchar(255) null comment 'Expression of function', - description text null comment 'Function description', - plugin varchar(255) null comment 'Trial plug-in, multiple according to, split', - example text null comment 'Function Usage Example', + name varchar(255) null comment 'Function name', + content varchar(255) null comment 'Expression of function', + description text null comment 'Function description', + plugin varchar(255) null comment 'Trial plug-in, multiple according to, split', + example text null comment 'Function Usage Example', create_time datetime default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, update_time datetime default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, - type varchar(20) default 'KEYWORDS' null + type varchar(20) default 'KEYWORDS' null ) comment 'Plug-in correlation function'; -- -------------------------------- @@ -43,19 +43,19 @@ create table functions create table pipeline ( id int auto_increment primary key, - name varchar(255) not null, - content text not null, - state varchar(100) null, - message text null, - work text null, + name varchar(255) not null, + content text not null, + state varchar(100) null, + message text null, + work text null, start_time datetime default CURRENT_TIMESTAMP null, - end_time datetime null on update CURRENT_TIMESTAMP, - elapsed bigint null, - user_id int not null, - from_id int not null, - from_configures text null, - to_id int not null, - to_configures text null + end_time datetime null on update CURRENT_TIMESTAMP, + elapsed bigint null, + user_id int not null, + from_id int not null, + from_configures text null, + to_id int not null, + to_configures text null ); -- -------------------------------- @@ -64,9 +64,9 @@ create table pipeline create table role ( id bigint auto_increment primary key, - name varchar(255) null comment ' ', - description varchar(255) null comment ' ', - create_time datetime(5) default CURRENT_TIMESTAMP (5) null + name varchar(255) null comment ' ', + description varchar(255) null comment ' ', + create_time datetime(5) default CURRENT_TIMESTAMP(5) null ); INSERT INTO datacap.role (name, description) @@ -80,13 +80,13 @@ VALUES ('User', 'User role'); create table datacap.scheduled_task ( id int auto_increment primary key, - name varchar(255) not null, - description text not null, - expression varchar(100) null, - active tinyint(1) default 1 null, - is_system tinyint(1) default 1 null, - create_time datetime default CURRENT_TIMESTAMP null, - update_time datetime null on update CURRENT_TIMESTAMP + name varchar(255) not null, + description text not null, + expression varchar(100) null, + active tinyint(1) default 1 null, + is_system tinyint(1) default 1 null, + create_time datetime default CURRENT_TIMESTAMP null, + update_time datetime null on update CURRENT_TIMESTAMP ); INSERT INTO datacap.scheduled_task (name, description, expression, active, is_system) @@ -98,16 +98,16 @@ VALUES ('Synchronize table structure', 'Synchronize the table structure of the d create table datacap.snippet ( id bigint auto_increment primary key, - name varchar(255) null comment ' ', - description varchar(255) null comment ' ', - code text null comment ' ', + name varchar(255) null comment ' ', + description varchar(255) null comment ' ', + code text null comment ' ', create_time timestamp default CURRENT_TIMESTAMP not null, update_time timestamp default CURRENT_TIMESTAMP not null, user_id bigint not null ); create -fulltext index full_text_index_for_code + fulltext index full_text_index_for_code on datacap.snippet (code); -- -------------------------------- @@ -116,24 +116,24 @@ fulltext index full_text_index_for_code create table datacap.source ( id bigint auto_increment primary key, - _catalog varchar(255) null, - create_time datetime default CURRENT_TIMESTAMP null, - _database varchar(255) null, - description varchar(255) null, - host varchar(255) not null, - name varchar(255) not null, - password varchar(255) null, - port bigint not null, - protocol varchar(255) null, - username varchar(255) null, - _type varchar(100) not null, - `ssl` tinyint(1) default 0 null, - _ssl tinyint(1) default 0 null, - publish tinyint(1) default 0 null, - public tinyint(1) default 0 null, - user_id bigint null, - configure text null, - used_config boolean default false + _catalog varchar(255) null, + create_time datetime default CURRENT_TIMESTAMP null, + _database varchar(255) null, + description varchar(255) null, + host varchar(255) not null, + name varchar(255) not null, + password varchar(255) null, + port bigint not null, + protocol varchar(255) null, + username varchar(255) null, + _type varchar(100) not null, + `ssl` tinyint(1) default 0 null, + _ssl tinyint(1) default 0 null, + publish tinyint(1) default 0 null, + public tinyint(1) default 0 null, + user_id bigint null, + configure text null, + used_config boolean default false ) comment 'The storage is used to query the data connection source'; -- -------------------------------- @@ -142,14 +142,14 @@ create table datacap.source create table datacap.template_sql ( id bigint auto_increment primary key, - name varchar(255) null comment 'Name of template', - content text null, - description text null, - plugin varchar(50) null comment 'Using plug-ins', - configure text null comment 'The template must use the configuration information in the key->value format', - create_time timestamp default CURRENT_TIMESTAMP not null, - update_time timestamp default CURRENT_TIMESTAMP not null, - `system` tinyint(1) default 0 null + name varchar(255) null comment 'Name of template', + content text null, + description text null, + plugin varchar(50) null comment 'Using plug-ins', + configure text null comment 'The template must use the configuration information in the key->value format', + create_time timestamp default CURRENT_TIMESTAMP not null, + update_time timestamp default CURRENT_TIMESTAMP not null, + `system` tinyint(1) default 0 null ) comment 'The system preset SQL template table'; INSERT INTO datacap.template_sql (name, content, description, plugin, configure, create_time, update_time, `system`) @@ -223,15 +223,15 @@ VALUES ( 'getAllColumnsFromDatabaseAndTable', 'SHOW COLUMNS FROM ${table:String} create table user_chat ( id int auto_increment primary key, - name varchar(255) not null, - question text not null, - answer text null, - type varchar(100) null, - create_time datetime default CURRENT_TIMESTAMP null, - end_time datetime null on update CURRENT_TIMESTAMP, - elapsed bigint null, - user_id int not null, - is_new tinyint(1) default 1 null + name varchar(255) not null, + question text not null, + answer text null, + type varchar(100) null, + create_time datetime default CURRENT_TIMESTAMP null, + end_time datetime null on update CURRENT_TIMESTAMP, + elapsed bigint null, + user_id int not null, + is_new tinyint(1) default 1 null ); -- -------------------------------- @@ -241,14 +241,14 @@ create table user_log ( id bigint auto_increment primary key, - device varchar(255) null comment 'Login device', - client varchar(255) null comment 'Login client', - ip varchar(100) null comment 'Login ip', - message varchar(225) null comment 'Error message', - state varchar(20) null comment 'Login state', - ua varchar(255) null comment 'Trial plug-in, multiple according to, split', - user_id bigint not null, - create_time datetime(5) default CURRENT_TIMESTAMP (5) null + device varchar(255) null comment 'Login device', + client varchar(255) null comment 'Login client', + ip varchar(100) null comment 'Login ip', + message varchar(225) null comment 'Error message', + state varchar(20) null comment 'Login state', + ua varchar(255) null comment 'Trial plug-in, multiple according to, split', + user_id bigint not null, + create_time datetime(5) default CURRENT_TIMESTAMP(5) null ) comment 'User login log'; -- -------------------------------- @@ -273,10 +273,10 @@ create table datacap.users ( id bigint auto_increment primary key, - username varchar(255) null comment ' ', - password varchar(255) null comment ' ', - create_time datetime(5) default CURRENT_TIMESTAMP (5) null, - third_configure text null + username varchar(255) null comment ' ', + password varchar(255) null comment ' ', + create_time datetime(5) default CURRENT_TIMESTAMP(5) null, + third_configure text null ); INSERT INTO datacap.users (username, password) @@ -770,3 +770,25 @@ values ('2', '7'), -- -------------------------------- alter table `audit_plugin` add column `count` bigint(20) default 0; + +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllDatabase', 'SELECT keyspace_name AS name +FROM system_schema.keyspaces', 'Gets a list of all databases', 'Cassandra', '[]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllTablesFromDatabase', 'SELECT + table_name AS name +FROM + system_schema.tables +WHERE + keyspace_name = ''${database:String}'' +GROUP BY + table_name', 'Get the data table from the database', 'Cassandra', '[{"column":"database","type":"String","expression":"${database:String}"}]', 1); +INSERT INTO `template_sql` (name, content, description, plugin, configure, `system`) +VALUES ('getAllColumnsFromDatabaseAndTable', 'SELECT + column_name +FROM + system_schema.columns +WHERE + keyspace_name = ''${database:String}'' + and table_name = ''${table:String}''', 'Get the data column from the database and table', 'Cassandra', + '[{"column":"database","type":"String","expression":"${database:String}"},{"column":"table","type":"String","expression":"${table:String}"}]', 1); diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java index d6d7039d..44a6d956 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java @@ -1,8 +1,17 @@ package io.edurt.datacap.spi.adapter; +import io.edurt.datacap.spi.FormatType; +import io.edurt.datacap.spi.formatter.FormatterFactory; import io.edurt.datacap.spi.model.Response; +import java.util.List; + public interface Adapter { Response handlerExecute(String content); + + default Object handlerFormatter(FormatType format, List headers, List columns) + { + return FormatterFactory.createFormatter(format, headers, columns).formatter(); + } } diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/HttpAdapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/HttpAdapter.java index 44550f20..3cb1eb59 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/HttpAdapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/HttpAdapter.java @@ -22,7 +22,7 @@ public class HttpAdapter this.httpConnection = httpConnection; } - protected Object handlerFormatter(FormatType format, List headers, List columns) + public Object handlerFormatter(FormatType format, List headers, List columns) { return FormatterFactory.createFormatter(format, headers, columns).formatter(); } diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java index 6882ffeb..7cfb5671 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java @@ -33,7 +33,7 @@ public class JdbcAdapter this.jdbcConnection = jdbcConnection; } - protected Object handlerFormatter(FormatType format, List headers, List columns) + public Object handlerFormatter(FormatType format, List headers, List columns) { return FormatterFactory.createFormatter(format, headers, columns).formatter(); } diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/NativeAdapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/NativeAdapter.java index 2581fa48..1c8899d5 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/NativeAdapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/NativeAdapter.java @@ -33,7 +33,7 @@ public class NativeAdapter this.parser = parser; } - protected Object handlerFormatter(FormatType format, List headers, List columns) + public Object handlerFormatter(FormatType format, List headers, List columns) { return FormatterFactory.createFormatter(format, headers, columns).formatter(); } diff --git a/core/datacap-web/console-fe/public/static/images/plugin/cassandra.png b/core/datacap-web/console-fe/public/static/images/plugin/cassandra.png new file mode 100644 index 00000000..3b19258d Binary files /dev/null and b/core/datacap-web/console-fe/public/static/images/plugin/cassandra.png differ diff --git a/docs/docs/assets/plugin/cassandra.png b/docs/docs/assets/plugin/cassandra.png new file mode 100644 index 00000000..3b19258d Binary files /dev/null and b/docs/docs/assets/plugin/cassandra.png differ diff --git a/docs/docs/index.md b/docs/docs/index.md index 8c32c35d..0b10ca2f 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -212,6 +212,9 @@ Datacap is fast, lightweight, intuitive system.   Apache Pinot +   + + Apache Cassandra

diff --git a/docs/docs/index.zh.md b/docs/docs/index.zh.md index edba886d..fe0d4342 100644 --- a/docs/docs/index.zh.md +++ b/docs/docs/index.zh.md @@ -212,6 +212,9 @@ Datacap 是快速、轻量级、直观的系统。   Apache Pinot +   + + Apache Cassandra

diff --git a/docs/docs/reference/connectors/cassandra.md b/docs/docs/reference/connectors/cassandra.md new file mode 100644 index 00000000..687bc2c9 --- /dev/null +++ b/docs/docs/reference/connectors/cassandra.md @@ -0,0 +1,53 @@ +--- +title: Apache Cassandra +status: 1.11.0 +--- + + + +#### What is Cassandra ? + +Apache Cassandra® powers mission-critical deployments with improved performance and unparalleled levels of scale in the cloud. + +#### Environment + +--- + +!!! note + + If you need to use this data source, you need to upgrade the DataCap service to >= `1.11.x` + +Support Time: `2023-06-07` + +#### Configure + +--- + + +!!! note + + If your plugin service version requires other special configurations, please refer to modifying the configuration file and restarting the DataCap service. + +=== "Configure" + + | Field | Required | Default Value | + |:------:|:---------------------------------:|:-------------:| + | `Name` | :material-check-circle: { .red } | - | + | `Host` | :material-check-circle: { .red } | `127.0.0.1` | + | `Port` | :material-check-circle: { .red } | `9042` | + +=== "Advanced" + + | Field | Required | Default Value | + |:----------:|:--------------------------------:|:-------------:| + | `Database` | :material-check-circle: { .red } | `datacenter` | + +#### Version (Validation) + +--- + +!!! warning + + The online service has not been tested yet, if you have detailed test results, please submit [issues](https://github.com/EdurtIO/datacap/issues/new/choose) to us + +- [x] `0.4.x` diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 18bebd1f..cbafab62 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -64,6 +64,7 @@ extra: status: new: Recently added 1.10.0: 1.10.0 + 1.11.0: 1.11.0 alternate: - name: English link: /en/ @@ -159,6 +160,7 @@ nav: - Sql: reference/admin/template/sql/home.md - Connecting to connectors: - reference/connectors/index.md + - Apache Cassandra: reference/connectors/cassandra.md - JDBC: - Hologres: reference/connectors/jdbc/hologres.md - StarRocks: reference/connectors/jdbc/starrocks.md diff --git a/plugin/datacap-plugin-cassandra/pom.xml b/plugin/datacap-plugin-cassandra/pom.xml new file mode 100644 index 00000000..9da0c94f --- /dev/null +++ b/plugin/datacap-plugin-cassandra/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + io.edurt.datacap + datacap + 1.11.0-SNAPSHOT + ../../pom.xml + + + datacap-plugin-cassandra + DataCap - Apache Cassandra + + + plugin-cassandra + 4.16.0 + + + + + io.edurt.datacap + datacap-spi + provided + + + commons-beanutils + commons-beanutils + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-simple + test + + + org.jetbrains.kotlin + kotlin-reflect + provided + + + org.testcontainers + testcontainers + test + + + com.datastax.oss + java-driver-core + ${cassandra.driver.version} + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${assembly-plugin.version} + + + make-assembly + package + + single + + + + + ${plugin.name} + + ../../configure/assembly/plugin.xml + + ../../dist/plugins + + + + org.jetbrains.dokka + dokka-maven-plugin + + + + + diff --git a/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraAdapter.kt b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraAdapter.kt new file mode 100644 index 00000000..6e266dea --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraAdapter.kt @@ -0,0 +1,66 @@ +package io.edurt.datacap.plugin.cassandra + +import com.datastax.oss.driver.api.core.cql.ResultSet +import io.edurt.datacap.spi.adapter.Adapter +import io.edurt.datacap.spi.model.Configure +import io.edurt.datacap.spi.model.Response +import io.edurt.datacap.spi.model.Time +import org.slf4j.Logger +import org.slf4j.LoggerFactory.getLogger +import java.lang.Boolean +import java.util.* +import kotlin.Any +import kotlin.Exception +import kotlin.String + +class CassandraAdapter : Adapter { + private val log: Logger = getLogger(CassandraAdapter::class.java) + + private var connection: CassandraConnection? = null + + constructor(connection: CassandraConnection?) : super() { + this.connection = connection + } + + override fun handlerExecute(content: String?): Response { + val processorTime = Time() + processorTime.start = Date().time + val response: Response = this.connection!!.response + val configure: Configure = this.connection!!.configure + if (response.isConnected) { + val headers: MutableList = ArrayList() + val types: MutableList = ArrayList() + val columns: MutableList = ArrayList() + try { + val resultSet: ResultSet = connection?.getSession()!!.execute(content!!) + var isPresent = true + resultSet.forEach { row -> + if (isPresent) { + row.columnDefinitions.forEach { + types.add(it.type.asCql(true, true)) + headers.add(it.name.asCql(true)) + } + isPresent = false + } + val _columns: MutableList = ArrayList() + headers.forEach { + _columns.add(row.getObject(it).toString()) + } + columns.add(handlerFormatter(configure.format, headers, _columns)) + } + response.isSuccessful = Boolean.TRUE + } catch (ex: Exception) { + log.error("Execute content failed content {} exception ", content, ex) + response.isSuccessful = Boolean.FALSE + response.message = ex.message + } finally { + response.headers = headers + response.types = types + response.columns = columns + } + } + processorTime.end = Date().time + response.processor = processorTime + return response + } +} diff --git a/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraConnection.kt b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraConnection.kt new file mode 100644 index 00000000..db5aa402 --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraConnection.kt @@ -0,0 +1,44 @@ +package io.edurt.datacap.plugin.cassandra + +import com.datastax.oss.driver.api.core.CqlSession +import io.edurt.datacap.spi.connection.Connection +import io.edurt.datacap.spi.model.Configure +import io.edurt.datacap.spi.model.Response +import java.lang.Boolean +import java.net.InetSocketAddress +import kotlin.Exception +import kotlin.String +import kotlin.TODO + + +class CassandraConnection : Connection { + private var session: CqlSession? = null + + constructor(configure: Configure, response: Response) : super(configure, response) + + override fun formatJdbcUrl(): String { + return TODO("Provide the return value") + } + + override fun openConnection(): java.sql.Connection? { + try { + this.session = CqlSession.builder() + .addContactPoint(InetSocketAddress(configure?.host, configure!!.port)) + .withLocalDatacenter(configure.database.orElse("datacenter")) + .build() + response?.isConnected = Boolean.TRUE + } catch (ex: Exception) { + response?.isConnected = Boolean.FALSE + response?.message = ex.message + } + return null + } + + fun getSession(): CqlSession? { + return session + } + + override fun destroy() { + this.session?.close() + } +} diff --git a/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraModule.kt b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraModule.kt new file mode 100644 index 00000000..b5f83a6b --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraModule.kt @@ -0,0 +1,22 @@ +package io.edurt.datacap.plugin.cassandra + +import com.google.inject.multibindings.Multibinder +import io.edurt.datacap.spi.AbstractPluginModule +import io.edurt.datacap.spi.Plugin +import io.edurt.datacap.spi.PluginModule +import io.edurt.datacap.spi.PluginType + +class CassandraModule : AbstractPluginModule(), PluginModule { + override fun getType(): PluginType { + return PluginType.JDBC + } + + override fun get(): AbstractPluginModule { + return this + } + + override fun configure() { + Multibinder.newSetBinder(binder(), Plugin::class.java) + .addBinding().to(CassandraPlugin::class.java) + } +} diff --git a/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPlugin.kt b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPlugin.kt new file mode 100644 index 00000000..e1c84d9e --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/main/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPlugin.kt @@ -0,0 +1,52 @@ +package io.edurt.datacap.plugin.cassandra + +import io.edurt.datacap.spi.Plugin +import io.edurt.datacap.spi.model.Configure +import io.edurt.datacap.spi.model.Response +import org.apache.commons.beanutils.BeanUtils.copyProperties +import org.apache.commons.lang3.ObjectUtils.isNotEmpty +import org.slf4j.LoggerFactory.getLogger + +class CassandraPlugin : Plugin { + private val log = getLogger(CassandraPlugin::class.java) + + private var configure: Configure? = null + private var connection: CassandraConnection? = null + private var response: Response? = null + override fun validator(): String { + return "SELECT release_version AS version FROM system.local" + } + + override fun connect(configure: Configure?) { + try { + log.info("Connecting to apache cassandra") + response = Response() + this.configure = Configure() + copyProperties(this.configure, configure) + connection = CassandraConnection(this.configure!!, response!!) + } catch (ex: Exception) { + response!!.isConnected = false + response!!.message = ex.message + } + } + + override fun execute(content: String?): Response { + if (isNotEmpty(connection)) { + log.info("Apache cassandra connection established") + response = connection?.response + val processor = CassandraAdapter(connection) + response = processor.handlerExecute(content) + log.info("Apache cassandra execution completed") + } + destroy() + return response!! + } + + override fun destroy() { + if (isNotEmpty(connection)) { + log.info("Apache cassandra driver destroyed") + connection?.destroy() + configure = null + } + } +} diff --git a/plugin/datacap-plugin-cassandra/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule b/plugin/datacap-plugin-cassandra/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule new file mode 100644 index 00000000..9cb91a15 --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/main/resources/META-INF/services/io.edurt.datacap.spi.PluginModule @@ -0,0 +1 @@ +io.edurt.datacap.plugin.cassandra.CassandraModule diff --git a/plugin/datacap-plugin-cassandra/src/test/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPluginTest.kt b/plugin/datacap-plugin-cassandra/src/test/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPluginTest.kt new file mode 100644 index 00000000..ebb1171d --- /dev/null +++ b/plugin/datacap-plugin-cassandra/src/test/kotlin/io/edurt/datacap/plugin/cassandra/CassandraPluginTest.kt @@ -0,0 +1,47 @@ +package io.edurt.datacap.plugin.cassandra + +import com.google.inject.Guice +import com.google.inject.Injector +import com.google.inject.Key +import com.google.inject.TypeLiteral +import io.edurt.datacap.spi.Plugin +import io.edurt.datacap.spi.model.Configure +import io.edurt.datacap.spi.model.Response +import org.apache.commons.lang3.ObjectUtils +import org.junit.Assert +import org.junit.Before +import org.junit.Test +import org.slf4j.LoggerFactory + +class CassandraPluginTest { + private val log = LoggerFactory.getLogger(this::class.java) + + private var injector: Injector? = null + private var configure: Configure? = null + + @Before + fun before() { + injector = Guice.createInjector(CassandraModule()) + configure = Configure() + configure?.host = "localhost" + configure?.port = 9042 + } + + @Test + fun test() { + val plugin: Plugin? = injector?.getInstance(Key.get(object : TypeLiteral?>() {})) + ?.first { v -> v?.name().equals("Cassandra") } + if (ObjectUtils.isNotEmpty(plugin)) { + plugin?.connect(configure) + val sql = "SELECT keyspace_name FROM system_schema.keyspaces" + val response: Response = plugin!!.execute(sql) + log.info("================ plugin executed information =================") + if (!response.isSuccessful) { + log.error("Message: {}", response.message) + } else { + response.columns.forEach { column -> log.info(column.toString()) } + } + Assert.assertTrue(response.isSuccessful) + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 08a8d9df..a1149a22 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ plugin/datacap-jdbc-hologres plugin/datacap-plugin-pinot plugin/datacap-plugin-mongo-community + plugin/datacap-plugin-cassandra executor/datacap-executor-example executor/datacap-executor-seatunnel shaded/datacap-shaded-ydb