mirror of
https://gitee.com/devlive-community/datacap.git
synced 2024-12-01 19:48:14 +08:00
[Core] [DataSet] Support custom life cycle
This commit is contained in:
parent
2d2359ef60
commit
215b02d67a
@ -4,5 +4,6 @@ public enum CreatedMode
|
||||
{
|
||||
CREATE_TABLE,
|
||||
CREATE_COLUMN,
|
||||
MODIFY_COLUMN
|
||||
MODIFY_COLUMN,
|
||||
MODIFY_LIFECYCLE
|
||||
}
|
||||
|
@ -437,6 +437,7 @@ public class DataSetServiceImpl
|
||||
TableBuilder.Companion.PARTITION_BY(columnEntities.stream().filter(DataSetColumnEntity::isPartitionKey).map(DataSetColumnEntity::getName).collect(Collectors.toList()));
|
||||
TableBuilder.Companion.PRIMARY_KEY(columnEntities.stream().filter(DataSetColumnEntity::isPrimaryKey).map(DataSetColumnEntity::getName).collect(Collectors.toList()));
|
||||
TableBuilder.Companion.SAMPLING_KEY(columnEntities.stream().filter(DataSetColumnEntity::isSamplingKey).map(DataSetColumnEntity::getName).collect(Collectors.toList()));
|
||||
TableBuilder.Companion.ADD_LIFECYCLE(String.format("`%s` + INTERVAL %s %s", entity.getLifeCycleColumn(), entity.getLifeCycle(), entity.getLifeCycleType()));
|
||||
String sql = TableBuilder.Companion.SQL();
|
||||
log.info("Create table sql \n {} \n on dataset [ {} ]", sql, entity.getName());
|
||||
|
||||
@ -512,6 +513,23 @@ public class DataSetServiceImpl
|
||||
Response response = plugin.execute(sql);
|
||||
Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage());
|
||||
}
|
||||
|
||||
createdModels.stream()
|
||||
.filter(item -> item.getMode().equals(CreatedMode.MODIFY_LIFECYCLE))
|
||||
.findFirst()
|
||||
.ifPresent(item -> {
|
||||
TableBuilder.Companion.BEGIN();
|
||||
TableBuilder.Companion.MODIFY_LIFECYCLE(tableName);
|
||||
TableBuilder.Companion.LIFECYCLE(String.format("`%s` + INTERVAL %s %s", item.getColumn().getName(), item.getColumn().getLength(), item.getColumn().getDefaultValue()));
|
||||
String sql = TableBuilder.Companion.SQL();
|
||||
log.info("Modify lifecycle sql \n {} \n on dataset [ {} ] id [ {} ]", sql, entity.getName(), entity.getId());
|
||||
Plugin plugin = getOutputPlugin();
|
||||
SourceEntity source = getOutputSource();
|
||||
plugin.connect(source.toConfigure());
|
||||
Response response = plugin.execute(sql);
|
||||
Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage());
|
||||
});
|
||||
|
||||
completeState(entity, DataSetState.TABLE_SUCCESS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
@ -759,6 +777,15 @@ public class DataSetServiceImpl
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (entity.getLifeCycleColumn() != null) {
|
||||
DataSetColumnEntity column = DataSetColumnEntity.builder()
|
||||
.name(entity.getLifeCycleColumn())
|
||||
.length(Integer.parseInt(entity.getLifeCycle()))
|
||||
.defaultValue(entity.getLifeCycleType())
|
||||
.build();
|
||||
models.add(new CreatedModel(column, CreatedMode.MODIFY_LIFECYCLE));
|
||||
}
|
||||
return models;
|
||||
}
|
||||
|
||||
|
@ -120,6 +120,22 @@ abstract class AbstractSql<T> {
|
||||
return getSelf()
|
||||
}
|
||||
|
||||
fun MODIFY_LIFECYCLE(table: String?): T {
|
||||
sql().statementType = StatementType.MODIFY_LIFECYCLE
|
||||
sql().tables.add(table)
|
||||
return getSelf()
|
||||
}
|
||||
|
||||
fun LIFECYCLE(lifecycle: String?): T {
|
||||
sql().lifecycle = lifecycle
|
||||
return getSelf()
|
||||
}
|
||||
|
||||
fun ADD_LIFECYCLE(lifecycle: String?): T {
|
||||
sql().lifecycle = lifecycle
|
||||
return getSelf()
|
||||
}
|
||||
|
||||
fun CREATE_COLUMN(table: String?): T {
|
||||
sql().statementType = StatementType.CREATE_COLUMN
|
||||
sql().tables.add(table)
|
||||
@ -545,6 +561,7 @@ abstract class AbstractSql<T> {
|
||||
val primaryKey: MutableList<String?> = ArrayList()
|
||||
val samplingKey: MutableList<String?> = ArrayList()
|
||||
var formatEngine: EngineType? = EngineType.MYSQL
|
||||
var lifecycle: String? = null
|
||||
|
||||
init {
|
||||
// Prevent Synthetic Access
|
||||
@ -706,6 +723,9 @@ abstract class AbstractSql<T> {
|
||||
if (samplingKey.isNotEmpty()) {
|
||||
sqlClause(builder, "SAMPLE BY", samplingKey, "(", ")", ", ")
|
||||
}
|
||||
if (lifecycle != null) {
|
||||
sqlClause(builder, "TTL", listOf(lifecycle), "", "", ", ")
|
||||
}
|
||||
if (end) {
|
||||
builder.append(";")
|
||||
}
|
||||
@ -744,6 +764,15 @@ abstract class AbstractSql<T> {
|
||||
return builder.toString()
|
||||
}
|
||||
|
||||
private fun modifyLifecycleSQL(builder: SafeAppendable): String {
|
||||
sqlClause(builder, "ALTER TABLE", tables, "", "", "")
|
||||
sqlClause(builder, "MODIFY TTL", listOf(lifecycle), "", "", ",\n")
|
||||
if (end) {
|
||||
builder.append(";")
|
||||
}
|
||||
return builder.toString()
|
||||
}
|
||||
|
||||
fun sql(a: Appendable): String? {
|
||||
val builder = SafeAppendable(a)
|
||||
|
||||
@ -760,6 +789,7 @@ abstract class AbstractSql<T> {
|
||||
StatementType.CREATE_COLUMN -> createColumnSQL(builder)
|
||||
StatementType.DROP_COLUMN -> dropColumnSQL(builder)
|
||||
StatementType.MODIFY_COLUMN -> modifyColumnSQL(builder)
|
||||
StatementType.MODIFY_LIFECYCLE -> modifyLifecycleSQL(builder)
|
||||
else -> throw SqlException("Unsupported statement type: [ $statementType ]")
|
||||
}
|
||||
return answer
|
||||
|
@ -1,5 +1,5 @@
|
||||
package io.edurt.datacap.sql
|
||||
|
||||
enum class StatementType {
|
||||
DELETE, INSERT, SELECT, UPDATE, ALTER, SHOW, TRUNCATE, DROP, CREATE_TABLE, CREATE_COLUMN, DROP_COLUMN, MODIFY_COLUMN
|
||||
DELETE, INSERT, SELECT, UPDATE, ALTER, SHOW, TRUNCATE, DROP, CREATE_TABLE, CREATE_COLUMN, DROP_COLUMN, MODIFY_COLUMN, MODIFY_LIFECYCLE
|
||||
}
|
||||
|
@ -26,6 +26,18 @@ class TableBuilder {
|
||||
sql().CREATE_TABLE(table)
|
||||
}
|
||||
|
||||
fun MODIFY_LIFECYCLE(table: String?) {
|
||||
sql().MODIFY_LIFECYCLE(table)
|
||||
}
|
||||
|
||||
fun LIFECYCLE(lifecycle: String?) {
|
||||
sql().LIFECYCLE(lifecycle)
|
||||
}
|
||||
|
||||
fun ADD_LIFECYCLE(lifecycle: String?) {
|
||||
sql().ADD_LIFECYCLE(lifecycle)
|
||||
}
|
||||
|
||||
fun COLUMNS(values: List<String>) {
|
||||
sql().COLUMNS(values)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user