From 7727bb5c9b0b4999db4ee5d77b91f1dcf5abd129 Mon Sep 17 00:00:00 2001 From: qianmoQ Date: Wed, 10 Apr 2024 23:56:32 +0800 Subject: [PATCH 01/10] [Core] [Dataset] Support building ad hoc queries on datasets --- .../user/v2/SourceV2Controller.java | 6 + .../src/main/schema/2024.03.1/schema.sql | 7 + .../datacap/service/entity/SourceEntity.java | 3 + .../datacap/service/enums/QueryMode.java | 3 +- .../service/repository/SourceRepository.java | 3 + .../service/service/SourceService.java | 2 + .../service/impl/SourceServiceImpl.java | 267 ++----- core/datacap-ui/src/i18n/langs/en/dataset.ts | 6 + .../datacap-ui/src/i18n/langs/zhCn/dataset.ts | 6 + core/datacap-ui/src/router/default.ts | 8 + core/datacap-ui/src/services/source.ts | 5 + core/datacap-ui/src/utils/array.ts | 15 + .../views/components/grid/GridConfigure.ts | 1 + .../src/views/components/grid/GridTable.vue | 6 +- .../views/components/source/SourceSelect.vue | 2 +- .../views/pages/admin/dataset/DatasetInfo.vue | 692 ++++++++++-------- .../src/views/pages/admin/query/QueryHome.vue | 7 +- 17 files changed, 535 insertions(+), 504 deletions(-) create mode 100644 core/datacap-ui/src/utils/array.ts diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/v2/SourceV2Controller.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/v2/SourceV2Controller.java index 8ba367a2..38a5ae0d 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/v2/SourceV2Controller.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/v2/SourceV2Controller.java @@ -53,6 +53,12 @@ public class SourceV2Controller return this.sourceService.getByIdV2(id); } + @GetMapping(value = "code/{code}") + public CommonResponse getByCode(@PathVariable(value = "code") String code) + { + return this.sourceService.getByCode(code); + } + @PostMapping(value = "getHistory/{id}") public CommonResponse> getHistory(@PathVariable(value = "id") Long id, @RequestBody FilterBody filter) diff --git a/core/datacap-server/src/main/schema/2024.03.1/schema.sql b/core/datacap-server/src/main/schema/2024.03.1/schema.sql index 7de75022..ac89def9 100644 --- a/core/datacap-server/src/main/schema/2024.03.1/schema.sql +++ b/core/datacap-server/src/main/schema/2024.03.1/schema.sql @@ -16,3 +16,10 @@ WHERE `id` = 16; UPDATE `menus` SET `url` = '/admin/query' WHERE `id` = 2; + +ALTER TABLE `datacap_source` + ADD COLUMN `code` VARCHAR(100); + +UPDATE `datacap_source` +SET `code` = REPLACE(UUID(), '-', '') +WHERE `code` IS NULL; diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/SourceEntity.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/SourceEntity.java index 4eedfafe..28a857e0 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/SourceEntity.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/SourceEntity.java @@ -122,6 +122,9 @@ public class SourceEntity @Column(name = "message") private String message; + @Column(name = "code") + private String code; + @Column(name = "create_time") @CreatedDate private Timestamp createTime; diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/enums/QueryMode.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/enums/QueryMode.java index ee5c6d3c..fc8d502d 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/enums/QueryMode.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/enums/QueryMode.java @@ -6,5 +6,6 @@ public enum QueryMode HISTORY, REPORT, SNIPPET, - SYNC + SYNC, + DATASET } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/SourceRepository.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/SourceRepository.java index 2c80ffab..12c3db4d 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/SourceRepository.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/SourceRepository.java @@ -7,6 +7,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.repository.PagingAndSortingRepository; import java.util.List; +import java.util.Optional; public interface SourceRepository extends PagingAndSortingRepository @@ -15,6 +16,8 @@ public interface SourceRepository SourceEntity findByName(String name); + Optional findByCode(String code); + Page findAllByUserOrPublishIsTrue(UserEntity user, Pageable pageable); Long countByUserOrPublishIsTrue(UserEntity user); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/SourceService.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/SourceService.java index 178e1eb3..ca073767 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/SourceService.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/SourceService.java @@ -26,6 +26,8 @@ public interface SourceService CommonResponse getById(Long id); + CommonResponse getByCode(String code); + CommonResponse>> getPlugins(); CommonResponse count(); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java index 803a2b3b..ff34202d 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -107,6 +108,7 @@ public class SourceServiceImpl { configure.setConfigure(JsonUtils.toJSON(configure.getConfigures())); configure.setUser(UserDetailsService.getUser()); + configure.setCode(UUID.randomUUID().toString().replace("-", "")); return CommonResponse.success(this.sourceRepository.save(configure)); } @@ -117,14 +119,12 @@ public class SourceServiceImpl UserEntity user = UserDetailsService.getUser(); Page page = this.sourceRepository.findAllByUserOrPublishIsTrue(user, pageable); // Populate pipeline configuration information - page.getContent() - .stream() - .forEach(item -> { - IConfigure fromConfigure = PluginUtils.loadYamlConfigure(item.getProtocol(), item.getType(), item.getType(), environment); - if (fromConfigure != null) { - item.setPipelines(fromConfigure.getPipelines()); - } - }); + page.getContent().stream().forEach(item -> { + IConfigure fromConfigure = PluginUtils.loadYamlConfigure(item.getProtocol(), item.getType(), item.getType(), environment); + if (fromConfigure != null) { + item.setPipelines(fromConfigure.getPipelines()); + } + }); return CommonResponse.success(PageEntity.build(page)); } @@ -183,6 +183,14 @@ public class SourceServiceImpl return CommonResponse.success(this.sourceRepository.findById(id)); } + @Override + public CommonResponse getByCode(String code) + { + return this.sourceRepository.findByCode(code) + .map(item -> CommonResponse.success(item)) + .orElseGet(() -> CommonResponse.failure(String.format("Source [ %s ] not found", code))); + } + @Override public CommonResponse>> getPlugins() { @@ -361,22 +369,17 @@ public class SourceServiceImpl public CommonResponse> getHistory(Long id, FilterBody filter) { Pageable pageable = PageRequestAdapter.of(filter); - SourceEntity entity = SourceEntity.builder() - .id(id) - .build(); + SourceEntity entity = SourceEntity.builder().id(id).build(); return CommonResponse.success(PageEntity.build(this.scheduledHistoryRepository.findAllBySource(entity, pageable))); } @Override public CommonResponse syncMetadata(Long id) { - return this.sourceRepository.findById(id) - .map(entity -> { - Executors.newSingleThreadExecutor() - .submit(() -> startSyncMetadata(entity, null)); - return CommonResponse.success(entity); - }) - .orElseGet(() -> CommonResponse.failure(String.format("Source [ %s ] not found", id))); + return this.sourceRepository.findById(id).map(entity -> { + Executors.newSingleThreadExecutor().submit(() -> startSyncMetadata(entity, null)); + return CommonResponse.success(entity); + }).orElseGet(() -> CommonResponse.failure(String.format("Source [ %s ] not found", id))); } private void startSyncMetadata(SourceEntity entity, ScheduledEntity scheduled) @@ -394,12 +397,7 @@ public class SourceServiceImpl Map> databaseTableCache = Maps.newHashMap(); Map tableCache = Maps.newHashMap(); Map> tableColumnCache = Maps.newHashMap(); - ScheduledHistoryEntity scheduledHistory = ScheduledHistoryEntity.builder() - .name(String.format("Sync source [ %s ]", entity.getName())) - .scheduled(scheduled) - .source(entity) - .state(RunState.RUNNING) - .build(); + ScheduledHistoryEntity scheduledHistory = ScheduledHistoryEntity.builder().name(String.format("Sync source [ %s ]", entity.getName())).scheduled(scheduled).source(entity).state(RunState.RUNNING).build(); scheduledHistoryHandler.save(scheduledHistory); log.info("==================== Sync metadata [ {} ] started =================", entity.getName()); Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol()); @@ -415,21 +413,7 @@ public class SourceServiceImpl log.error("The source [ {} ] not available", entity.getName()); } else { - this.startSyncDatabase(entity, - plugin, - databaseCache, - databaseTableCache, - tableCache, - tableColumnCache, - databaseAddedCount, - databaseUpdatedCount, - databaseRemovedCount, - tableAddedCount, - tableUpdatedCount, - tableRemovedCount, - columnAddedCount, - columnUpdatedCount, - columnRemovedCount); + this.startSyncDatabase(entity, plugin, databaseCache, databaseTableCache, tableCache, tableColumnCache, databaseAddedCount, databaseUpdatedCount, databaseRemovedCount, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount); } scheduledHistory.setState(RunState.SUCCESS); } @@ -471,22 +455,18 @@ public class SourceServiceImpl if (ObjectUtils.isNotEmpty(entity.getConfigure())) { final String[] content = {entity.getContent()}; List configures = JsonUtils.objectmapper.readValue(entity.getConfigure(), List.class); - map.entrySet() - .forEach(value -> { - Optional sqlConfigure = configures.stream() - .filter(v -> String.valueOf(v.get("column")).equalsIgnoreCase(value.getKey())) - .map(v -> { - SqlConfigure configure = new SqlConfigure(); - configure.setColumn(String.valueOf(v.get("column"))); - configure.setType(Type.valueOf(String.valueOf(v.get("type")))); - configure.setExpression(String.valueOf(v.get("expression"))); - return configure; - }) - .findFirst(); - if (sqlConfigure.isPresent()) { - content[0] = content[0].replace(sqlConfigure.get().getExpression(), String.valueOf(value.getValue())); - } - }); + map.entrySet().forEach(value -> { + Optional sqlConfigure = configures.stream().filter(v -> String.valueOf(v.get("column")).equalsIgnoreCase(value.getKey())).map(v -> { + SqlConfigure configure = new SqlConfigure(); + configure.setColumn(String.valueOf(v.get("column"))); + configure.setType(Type.valueOf(String.valueOf(v.get("type")))); + configure.setExpression(String.valueOf(v.get("expression"))); + return configure; + }).findFirst(); + if (sqlConfigure.isPresent()) { + content[0] = content[0].replace(sqlConfigure.get().getExpression(), String.valueOf(value.getValue())); + } + }); return content[0]; } } @@ -547,21 +527,7 @@ public class SourceServiceImpl * @param columnUpdatedCount the AtomicInteger object representing the count of updated columns * @param columnRemovedCount the AtomicInteger object representing the count of removed columns */ - private void startSyncDatabase(SourceEntity entity, - Plugin plugin, - Map databaseCache, - Map> databaseTableCache, - Map tableCache, - Map> tableColumnCache, - AtomicInteger databaseAddedCount, - AtomicInteger databaseUpdatedCount, - AtomicInteger databaseRemovedCount, - AtomicInteger tableAddedCount, - AtomicInteger tableUpdatedCount, - AtomicInteger tableRemovedCount, - AtomicInteger columnAddedCount, - AtomicInteger columnUpdatedCount, - AtomicInteger columnRemovedCount) + private void startSyncDatabase(SourceEntity entity, Plugin plugin, Map databaseCache, Map> databaseTableCache, Map tableCache, Map> tableColumnCache, AtomicInteger databaseAddedCount, AtomicInteger databaseUpdatedCount, AtomicInteger databaseRemovedCount, AtomicInteger tableAddedCount, AtomicInteger tableUpdatedCount, AtomicInteger tableRemovedCount, AtomicInteger columnAddedCount, AtomicInteger columnUpdatedCount, AtomicInteger columnRemovedCount) { String templateName = "SYSTEM_FOR_GET_ALL_DATABASES"; TemplateSqlEntity template = getTemplate(templateName, entity); @@ -576,29 +542,19 @@ public class SourceServiceImpl } else { List origin = databaseHandler.findAllBySource(entity); - List entities = response.getColumns() - .stream() - .map(item -> { - DatabaseEntity database = DatabaseEntity.builder() - .name(getNodeText(item, NodeType.SCHEMA)) - .catalog(getNodeText(item, NodeType.CATALOG)) - .description(String.format("[ %s ] of [ %s ]", getNodeText(item, NodeType.SCHEMA), getNodeText(item, NodeType.CATALOG))) - .source(entity) - .build(); - Optional optionalDatabase = origin.stream() - .filter(node -> node.getName().equals(database.getName())) - .findAny(); - if (optionalDatabase.isPresent()) { - database.setId(optionalDatabase.get().getId()); - database.setCreateTime(optionalDatabase.get().getCreateTime()); - databaseUpdatedCount.addAndGet(1); - } - else { - databaseAddedCount.addAndGet(1); - } - return database; - }) - .collect(Collectors.toList()); + List entities = response.getColumns().stream().map(item -> { + DatabaseEntity database = DatabaseEntity.builder().name(getNodeText(item, NodeType.SCHEMA)).catalog(getNodeText(item, NodeType.CATALOG)).description(String.format("[ %s ] of [ %s ]", getNodeText(item, NodeType.SCHEMA), getNodeText(item, NodeType.CATALOG))).source(entity).build(); + Optional optionalDatabase = origin.stream().filter(node -> node.getName().equals(database.getName())).findAny(); + if (optionalDatabase.isPresent()) { + database.setId(optionalDatabase.get().getId()); + database.setCreateTime(optionalDatabase.get().getCreateTime()); + databaseUpdatedCount.addAndGet(1); + } + else { + databaseAddedCount.addAndGet(1); + } + return database; + }).collect(Collectors.toList()); // Write the new data retrieved to the database log.info("Added database size [ {} ] to source [ {} ]", entities.size(), entity.getName()); databaseHandler.saveAll(entities); @@ -608,25 +564,12 @@ public class SourceServiceImpl databaseTableCache.put(key, this.tableHandler.findSimpleAllByDatabase(item)); }); // Delete invalid data that no longer exists - List deleteEntities = origin.stream() - .filter(node -> entities.stream().noneMatch(item -> node.getName().equals(item.getName()))) - .collect(Collectors.toList()); + List deleteEntities = origin.stream().filter(node -> entities.stream().noneMatch(item -> node.getName().equals(item.getName()))).collect(Collectors.toList()); log.info("Removed database size [ {} ] from source [ {} ]", deleteEntities.size(), entity.getName()); databaseHandler.deleteAll(deleteEntities); databaseRemovedCount.addAndGet(deleteEntities.size()); } - this.startSyncTable(entity, - plugin, - databaseCache, - databaseTableCache, - tableCache, - tableColumnCache, - tableAddedCount, - tableUpdatedCount, - tableRemovedCount, - columnAddedCount, - columnUpdatedCount, - columnRemovedCount); + this.startSyncTable(entity, plugin, databaseCache, databaseTableCache, tableCache, tableColumnCache, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount); } } @@ -646,18 +589,7 @@ public class SourceServiceImpl * @param columnUpdatedCount the column updated count * @param columnRemovedCount the column removed count */ - private void startSyncTable(SourceEntity entity, - Plugin plugin, - Map databaseCache, - Map> databaseTableCache, - Map tableCache, - Map> tableColumnCache, - AtomicInteger tableAddedCount, - AtomicInteger tableUpdatedCount, - AtomicInteger tableRemovedCount, - AtomicInteger columnAddedCount, - AtomicInteger columnUpdatedCount, - AtomicInteger columnRemovedCount) + private void startSyncTable(SourceEntity entity, Plugin plugin, Map databaseCache, Map> databaseTableCache, Map tableCache, Map> tableColumnCache, AtomicInteger tableAddedCount, AtomicInteger tableUpdatedCount, AtomicInteger tableRemovedCount, AtomicInteger columnAddedCount, AtomicInteger columnUpdatedCount, AtomicInteger columnRemovedCount) { String templateName = "SYSTEM_FOR_GET_ALL_TABLES"; TemplateSqlEntity template = getTemplate(templateName, entity); @@ -671,43 +603,20 @@ public class SourceServiceImpl log.error("The source [ {} ] protocol [ {} ] sync metadata tables [ {} ] failed", entity.getName(), entity.getProtocol(), response.getMessage()); } else { - List entities = response.getColumns() - .stream() - .map(item -> { - String key = String.format("%s_%s", getNodeText(item, NodeType.CATALOG), getNodeText(item, NodeType.SCHEMA)); - DatabaseEntity database = databaseCache.get(key); - String name = getNodeText(item, NodeType.TABLE); - return TableEntity.builder() - .name(name) - .description(String.format("Table [ %s ] of database [ %s ] ", name, getNodeText(item, NodeType.SCHEMA))) - .type(getNodeText(item, NodeType.TYPE)) - .engine(getNodeText(item, NodeType.ENGINE)) - .format(getNodeText(item, NodeType.FORMAT)) - .inCreateTime(getNodeText(item, NodeType.CREATE_TIME)) - .inUpdateTime(getNodeText(item, NodeType.UPDATE_TIME)) - .collation(getNodeText(item, NodeType.COLLATION)) - .rows(getNodeText(item, NodeType.ROWS)) - .comment(getNodeText(item, NodeType.COMMENT)) - .avgRowLength(getNodeText(item, NodeType.AVG_ROW)) - .dataLength(getNodeText(item, NodeType.DATA)) - .indexLength(getNodeText(item, NodeType.INDEX)) - .autoIncrement(getNodeText(item, NodeType.AUTO_INCREMENT)) - .database(database) - .build(); - }) - .collect(Collectors.toList()); + List entities = response.getColumns().stream().map(item -> { + String key = String.format("%s_%s", getNodeText(item, NodeType.CATALOG), getNodeText(item, NodeType.SCHEMA)); + DatabaseEntity database = databaseCache.get(key); + String name = getNodeText(item, NodeType.TABLE); + return TableEntity.builder().name(name).description(String.format("Table [ %s ] of database [ %s ] ", name, getNodeText(item, NodeType.SCHEMA))).type(getNodeText(item, NodeType.TYPE)).engine(getNodeText(item, NodeType.ENGINE)).format(getNodeText(item, NodeType.FORMAT)).inCreateTime(getNodeText(item, NodeType.CREATE_TIME)).inUpdateTime(getNodeText(item, NodeType.UPDATE_TIME)).collation(getNodeText(item, NodeType.COLLATION)).rows(getNodeText(item, NodeType.ROWS)).comment(getNodeText(item, NodeType.COMMENT)).avgRowLength(getNodeText(item, NodeType.AVG_ROW)).dataLength(getNodeText(item, NodeType.DATA)).indexLength(getNodeText(item, NodeType.INDEX)).autoIncrement(getNodeText(item, NodeType.AUTO_INCREMENT)).database(database).build(); + }).collect(Collectors.toList()); - Map> groupEntities = entities - .stream() - .collect(Collectors.groupingBy(item -> String.format("%s_%s", item.getDatabase().getCatalog(), item.getDatabase().getName()))); + Map> groupEntities = entities.stream().collect(Collectors.groupingBy(item -> String.format("%s_%s", item.getDatabase().getCatalog(), item.getDatabase().getName()))); groupEntities.forEach((key, groupItem) -> { // Detect data that needs to be updated List origin = databaseTableCache.get(key); groupItem.forEach(item -> { - Optional optionalTable = origin.stream() - .filter(node -> node.getName().equals(item.getName())) - .findAny(); + Optional optionalTable = origin.stream().filter(node -> node.getName().equals(item.getName())).findAny(); if (optionalTable.isPresent()) { TableEntity node = optionalTable.get(); item.setId(node.getId()); @@ -727,9 +636,7 @@ public class SourceServiceImpl tableColumnCache.put(tableCacheKey, this.columnHandler.findSimpleAllByTable(item)); }); - List deleteEntities = origin.stream() - .filter(node -> groupItem.stream().noneMatch(item -> node.getName().equals(item.getName()))) - .collect(Collectors.toList()); + List deleteEntities = origin.stream().filter(node -> groupItem.stream().noneMatch(item -> node.getName().equals(item.getName()))).collect(Collectors.toList()); log.info("Removed table size [ {} ] from database [ {} ]", deleteEntities.size(), key); tableHandler.deleteAll(deleteEntities); tableRemovedCount.addAndGet(deleteEntities.size()); @@ -751,13 +658,7 @@ public class SourceServiceImpl * @param columnUpdatedCount an atomic counter for tracking the number of columns updated * @param columnRemovedCount an atomic counter for tracking the number of columns removed */ - private void startSyncColumn(SourceEntity entity, - Plugin plugin, - Map tableCache, - Map> tableColumnCache, - AtomicInteger columnAddedCount, - AtomicInteger columnUpdatedCount, - AtomicInteger columnRemovedCount) + private void startSyncColumn(SourceEntity entity, Plugin plugin, Map tableCache, Map> tableColumnCache, AtomicInteger columnAddedCount, AtomicInteger columnUpdatedCount, AtomicInteger columnRemovedCount) { String templateName = "SYSTEM_FOR_GET_ALL_COLUMNS"; TemplateSqlEntity template = getTemplate(templateName, entity); @@ -771,43 +672,21 @@ public class SourceServiceImpl log.error("The source [ {} ] protocol [ {} ] sync metadata columns [ {} ] failed", entity.getName(), entity.getProtocol(), response.getMessage()); } else { - List entities = response.getColumns() - .stream() - .map(item -> { - String key = String.format("%s_%s", getNodeText(item, NodeType.CATALOG), getNodeText(item, NodeType.SCHEMA)); - TableEntity table = tableCache.get(key); - String name = getNodeText(item, NodeType.COLUMN); - ColumnEntity column = ColumnEntity.builder() - .name(name) - .description(String.format("Table [ %s ] of column [ %s ] ", table.getName(), name)) - .type(getNodeText(item, NodeType.COLUMN_TYPE)) - .comment(getNodeText(item, NodeType.COMMENT)) - .defaultValue(getNodeText(item, NodeType.DEFAULT)) - .position(getNodeText(item, NodeType.POSITION)) - .maximumLength(getNodeText(item, NodeType.MAXIMUM_LENGTH)) - .collation(getNodeText(item, NodeType.COLLATION)) - .isKey(getNodeText(item, NodeType.KEY)) - .privileges(getNodeText(item, NodeType.FORMAT)) - .dataType(getNodeText(item, NodeType.DATA_TYPE)) - .extra(getNodeText(item, NodeType.EXTRA)) - .isNullable(getNodeText(item, NodeType.NULLABLE)) - .table(table) - .build(); - return column; - }) - .collect(Collectors.toList()); + List entities = response.getColumns().stream().map(item -> { + String key = String.format("%s_%s", getNodeText(item, NodeType.CATALOG), getNodeText(item, NodeType.SCHEMA)); + TableEntity table = tableCache.get(key); + String name = getNodeText(item, NodeType.COLUMN); + ColumnEntity column = ColumnEntity.builder().name(name).description(String.format("Table [ %s ] of column [ %s ] ", table.getName(), name)).type(getNodeText(item, NodeType.COLUMN_TYPE)).comment(getNodeText(item, NodeType.COMMENT)).defaultValue(getNodeText(item, NodeType.DEFAULT)).position(getNodeText(item, NodeType.POSITION)).maximumLength(getNodeText(item, NodeType.MAXIMUM_LENGTH)).collation(getNodeText(item, NodeType.COLLATION)).isKey(getNodeText(item, NodeType.KEY)).privileges(getNodeText(item, NodeType.FORMAT)).dataType(getNodeText(item, NodeType.DATA_TYPE)).extra(getNodeText(item, NodeType.EXTRA)).isNullable(getNodeText(item, NodeType.NULLABLE)).table(table).build(); + return column; + }).collect(Collectors.toList()); - Map> groupEntities = entities - .stream() - .collect(Collectors.groupingBy(item -> String.format("%s_%s", item.getTable().getDatabase().getName(), item.getTable().getName()))); + Map> groupEntities = entities.stream().collect(Collectors.groupingBy(item -> String.format("%s_%s", item.getTable().getDatabase().getName(), item.getTable().getName()))); groupEntities.forEach((key, groupItem) -> { // Detect data that needs to be updated List origin = tableColumnCache.get(key); groupItem.forEach(item -> { - Optional optionalColumn = origin.stream() - .filter(node -> node.getName().equals(item.getName())) - .findAny(); + Optional optionalColumn = origin.stream().filter(node -> node.getName().equals(item.getName())).findAny(); if (optionalColumn.isPresent()) { ColumnEntity node = optionalColumn.get(); item.setId(node.getId()); @@ -822,9 +701,7 @@ public class SourceServiceImpl log.info("Added column size [ {} ] to table [ {} ]", groupItem.size(), key); columnHandler.saveAll(groupItem); - List deleteEntities = origin.stream() - .filter(node -> groupItem.stream().noneMatch(item -> node.getName().equals(item.getName()))) - .collect(Collectors.toList()); + List deleteEntities = origin.stream().filter(node -> groupItem.stream().noneMatch(item -> node.getName().equals(item.getName()))).collect(Collectors.toList()); log.info("Removed column size [ {} ] from table [ {} ]", deleteEntities.size(), key); columnHandler.deleteAll(deleteEntities); columnRemovedCount.addAndGet(deleteEntities.size()); diff --git a/core/datacap-ui/src/i18n/langs/en/dataset.ts b/core/datacap-ui/src/i18n/langs/en/dataset.ts index d1e276f8..6f927766 100644 --- a/core/datacap-ui/src/i18n/langs/en/dataset.ts +++ b/core/datacap-ui/src/i18n/langs/en/dataset.ts @@ -107,6 +107,11 @@ export default { lifeCycleColumn: 'Lifecycle columns', lifeCycleNumber: 'Lifecycle number', }, + validator: { + duplicateColumn: 'Column name [ $VALUE ] already exists', + specifiedColumn: 'Sort key or primary key must be specified', + specifiedName: 'Name must be specified', + }, tip: { selectExpression: 'Please select the expression', syncData: 'The data synchronization schedule will run in the background, see the logs for the specific synchronization results', @@ -117,5 +122,6 @@ export default { rebuildProgress: 'Rebuilding will only progress unfinished', lifeCycleMustDateColumn: 'The lifecycle must contain a date column', modifyNotSupportDataPreview: 'Data preview is not supported to modify', + publishSuccess: 'Dataset [ $VALUE ] published successfully', } } \ No newline at end of file diff --git a/core/datacap-ui/src/i18n/langs/zhCn/dataset.ts b/core/datacap-ui/src/i18n/langs/zhCn/dataset.ts index 14699c9f..48d57947 100644 --- a/core/datacap-ui/src/i18n/langs/zhCn/dataset.ts +++ b/core/datacap-ui/src/i18n/langs/zhCn/dataset.ts @@ -107,6 +107,11 @@ export default { lifeCycleColumn: '生命周期列', lifeCycleNumber: '生命周期数', }, + validator: { + duplicateColumn: '列名 [ $VALUE ] 已存在', + specifiedColumn: '排序键或主键必须指定', + specifiedName: '数据集名必须指定', + }, tip: { selectExpression: '请选择表达式', syncData: '数据同步计划将在后台运行,具体同步结果请参考日志', @@ -117,5 +122,6 @@ export default { rebuildProgress: '重建只会进行未完成进度', lifeCycleMustDateColumn: '生命周期必须包含一个日期列', modifyNotSupportDataPreview: '修改暂不支持数据预览', + publishSuccess: '数据集 [ $VALUE ] 发布成功', } } \ No newline at end of file diff --git a/core/datacap-ui/src/router/default.ts b/core/datacap-ui/src/router/default.ts index 4c15e84f..e8afb070 100644 --- a/core/datacap-ui/src/router/default.ts +++ b/core/datacap-ui/src/router/default.ts @@ -171,6 +171,14 @@ const createAdminRouter = (router: any) => { }, component: () => import('@/views/pages/admin/dataset/DatasetInfo.vue') }, + { + path: 'dataset/info/source/:sourceCode?', + meta: { + title: 'common.dataset', + isRoot: false + }, + component: () => import('@/views/pages/admin/dataset/DatasetInfo.vue') + }, { path: 'dataset/adhoc/:code', layout: LayoutContainer, diff --git a/core/datacap-ui/src/services/source.ts b/core/datacap-ui/src/services/source.ts index 690da447..a74bb184 100644 --- a/core/datacap-ui/src/services/source.ts +++ b/core/datacap-ui/src/services/source.ts @@ -21,6 +21,11 @@ class SourceService return new HttpUtils().get(DEFAULT_PATH_V1, { page: page, size: size }) } + getByCode(code: string): Promise + { + return new HttpUtils().get(`${ DEFAULT_PATH_V2 }/code/${ code }`) + } + getPlugins(): Promise { return new HttpUtils().get(`${ DEFAULT_PATH_V1 }/plugins`) diff --git a/core/datacap-ui/src/utils/array.ts b/core/datacap-ui/src/utils/array.ts new file mode 100644 index 00000000..88108a80 --- /dev/null +++ b/core/datacap-ui/src/utils/array.ts @@ -0,0 +1,15 @@ +export class ArrayUtils +{ + static findDuplicates(array: any[]): any[] + { + const counts: { [key: string]: number } = {} + const duplicates: string[] = [] + array.forEach(column => { + counts[column.name] = (counts[column.name] || 0) + 1 + if (counts[column.name] === 2) { + duplicates.push(column.name) + } + }) + return duplicates + } +} \ No newline at end of file diff --git a/core/datacap-ui/src/views/components/grid/GridConfigure.ts b/core/datacap-ui/src/views/components/grid/GridConfigure.ts index 55e73e5c..f895e15e 100644 --- a/core/datacap-ui/src/views/components/grid/GridConfigure.ts +++ b/core/datacap-ui/src/views/components/grid/GridConfigure.ts @@ -10,4 +10,5 @@ export interface GridConfigure context?: string sourceId?: number query?: string + code?: string } diff --git a/core/datacap-ui/src/views/components/grid/GridTable.vue b/core/datacap-ui/src/views/components/grid/GridTable.vue index 66baf086..1b4ac569 100644 --- a/core/datacap-ui/src/views/components/grid/GridTable.vue +++ b/core/datacap-ui/src/views/components/grid/GridTable.vue @@ -4,7 +4,7 @@