From 9dc64f0c9b0338e40b972a42152c1fedf7d891f5 Mon Sep 17 00:00:00 2001 From: qianmoQ Date: Thu, 21 Nov 2024 17:29:14 +0800 Subject: [PATCH] feat(plugin): adapter core --- configure/etc/conf/plugins.properties | 3 +- convert/datacap-convert-spi/pom.xml | 5 + .../io/edurt/datacap/convert/ConvertFilter.kt | 15 +- .../edurt/datacap/convert/ConvertManager.kt | 23 - .../io/edurt/datacap/convert/ConvertModule.kt | 11 - .../convert/{Convert.kt => ConvertService.kt} | 3 +- ...terTest.kt => ConvertServiceFilterTest.kt} | 2 +- ...uleTest.kt => ConvertServiceModuleTest.kt} | 4 +- .../datacap/convert/TestConvertModule.kt | 4 +- .../{TestConvert.kt => TestConvertService.kt} | 2 +- .../io/edurt/datacap/convert/TxtPlugin.kt | 5 + .../{txt/TxtConvert.kt => TxtService.kt} | 6 +- .../io/edurt/datacap/convert/txt/TxtModule.kt | 15 - .../io.edurt.datacap.convert.ConvertModule | 1 - .../services/io.edurt.datacap.plugin.Plugin | 1 + .../services/io.edurt.datacap.plugin.Service | 1 + .../convert/{txt => }/TxtConvertTest.kt | 2 +- .../convert/{txt => }/TxtModuleTest.kt | 2 +- core/datacap-common/pom.xml | 10 - .../edurt/datacap/common/utils/SpiUtils.java | 48 -- .../io/edurt/datacap/plugin/PluginType.java | 4 +- .../datacap/plugin/utils/PluginPathUtils.java | 14 + .../server/controller/DataSetController.java | 3 +- .../server/controller/PluginController.java | 12 +- core/datacap-service/pom.xml | 5 + .../service/audit/AuditPluginHandler.java | 20 +- .../DatasetSchedulerInitializer.java | 48 +- .../service/service/DataSetService.java | 3 +- .../service/impl/DataSetServiceImpl.java | 455 +++++++++--------- .../service/impl/ExecuteServiceImpl.java | 11 +- .../service/impl/PipelineServiceImpl.java | 11 +- .../service/impl/PluginAuditServiceImpl.java | 46 +- .../service/impl/SourceServiceImpl.java | 345 +++++++------ .../service/impl/TableServiceImpl.java | 375 ++++++++------- .../service/impl/UploadServiceImpl.java | 17 +- .../service/service/impl/UserServiceImpl.java | 19 +- .../io/edurt/datacap/spi/adapter/Adapter.java | 6 +- .../datacap/spi/adapter/JdbcAdapter.java | 2 +- .../spi/connection/JdbcConnection.java | 18 - .../io/edurt/datacap/spi/model/Configure.java | 4 +- docs/docs/reference/filesystem/aliyun/home.md | 2 +- docs/docs/reference/filesystem/cos/home.md | 2 +- docs/docs/reference/filesystem/minio/home.md | 2 +- docs/docs/reference/filesystem/qiniu/home.md | 2 +- docs/docs/reference/filesystem/s3/home.md | 2 +- executor/datacap-executor-spi/pom.xml | 5 + .../edurt/datacap/executor/ExecutorManager.kt | 24 - .../edurt/datacap/executor/ExecutorModule.kt | 12 - .../{Executor.kt => ExecutorService.kt} | 11 +- .../edurt/datacap/executor/ExecutorUtils.kt | 13 - .../executor/configure/ExecutorRequest.kt | 4 +- fs/datacap-fs-spi/pom.xml | 5 + .../java/io/edurt/datacap/fs/FsManager.java | 29 -- .../java/io/edurt/datacap/fs/FsModule.java | 15 - .../datacap/fs/{Fs.java => FsService.java} | 5 +- .../io/edurt/datacap/fs/FsManagerTest.java | 28 -- .../test/java/io/edurt/datacap/fs/TestFs.java | 19 - .../io/edurt/datacap/fs/TestFsModule.java | 13 - .../io/edurt/datacap/plugin/MySQLPlugin.java | 9 - .../io/edurt/datacap/plugin/MySQLService.java | 11 + pom.xml | 4 +- scheduler/datacap-scheduler-spi/pom.xml | 5 + .../datacap/scheduler/ScheduleManager.kt | 25 - .../datacap/scheduler/SchedulerModule.kt | 12 - .../{Scheduler.kt => SchedulerService.kt} | 12 +- 65 files changed, 803 insertions(+), 1049 deletions(-) delete mode 100644 convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertManager.kt delete mode 100644 convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertModule.kt rename convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/{Convert.kt => ConvertService.kt} (89%) rename convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/{ConvertFilterTest.kt => ConvertServiceFilterTest.kt} (91%) rename convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/{ConvertModuleTest.kt => ConvertServiceModuleTest.kt} (91%) rename convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/{TestConvert.kt => TestConvertService.kt} (93%) create mode 100644 convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtPlugin.kt rename convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/{txt/TxtConvert.kt => TxtService.kt} (98%) delete mode 100644 convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtModule.kt delete mode 100644 convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.convert.ConvertModule create mode 100644 convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Plugin create mode 100644 convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Service rename convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/{txt => }/TxtConvertTest.kt (99%) rename convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/{txt => }/TxtModuleTest.kt (94%) delete mode 100644 core/datacap-common/src/main/java/io/edurt/datacap/common/utils/SpiUtils.java delete mode 100644 executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorManager.kt delete mode 100644 executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorModule.kt rename executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/{Executor.kt => ExecutorService.kt} (64%) delete mode 100644 executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorUtils.kt delete mode 100644 fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsManager.java delete mode 100644 fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsModule.java rename fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/{Fs.java => FsService.java} (87%) delete mode 100644 fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/FsManagerTest.java delete mode 100644 fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFs.java delete mode 100644 fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFsModule.java delete mode 100644 scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/ScheduleManager.kt delete mode 100644 scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerModule.kt rename scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/{Scheduler.kt => SchedulerService.kt} (53%) diff --git a/configure/etc/conf/plugins.properties b/configure/etc/conf/plugins.properties index c9a4a912..a6db4469 100644 --- a/configure/etc/conf/plugins.properties +++ b/configure/etc/conf/plugins.properties @@ -1 +1,2 @@ -datacap-plugin-mysql=plugin/datacap-plugin-mysql/pom.xml \ No newline at end of file +datacap-plugin-mysql=plugin/datacap-plugin-mysql/pom.xml +datacap-convert-txt=convert/datacap-convert-txt/pom.xml \ No newline at end of file diff --git a/convert/datacap-convert-spi/pom.xml b/convert/datacap-convert-spi/pom.xml index 1798d7c3..89d1374e 100644 --- a/convert/datacap-convert-spi/pom.xml +++ b/convert/datacap-convert-spi/pom.xml @@ -30,6 +30,11 @@ io.edurt.datacap datacap-common + + io.edurt.datacap + datacap-plugin + ${project.version} + diff --git a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertFilter.kt b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertFilter.kt index 96a53e67..709cb437 100644 --- a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertFilter.kt +++ b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertFilter.kt @@ -1,8 +1,6 @@ package io.edurt.datacap.convert -import com.google.inject.Injector -import com.google.inject.Key -import com.google.inject.TypeLiteral +import io.edurt.datacap.plugin.PluginManager import java.util.* object ConvertFilter @@ -10,17 +8,14 @@ object ConvertFilter /** * Finds a file in the injector by name. * - * @param injector the injector to search in + * @param pluginManager PluginManager * @param name the name of the file to find * @return an Optional containing the found File, or an empty Optional if not found */ @JvmStatic - fun filter(injector: Injector, name: String): Optional + fun filter(pluginManager: PluginManager, name: String): Optional { - return injector.getInstance(Key.get(object : TypeLiteral>() - {})) - .stream() - .filter { it.name() == name } - .findFirst() + return pluginManager.getPlugin(name) + .map { it.getService(ConvertService::class.java) } } } diff --git a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertManager.kt b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertManager.kt deleted file mode 100644 index baaa7f3b..00000000 --- a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertManager.kt +++ /dev/null @@ -1,23 +0,0 @@ -package io.edurt.datacap.convert - -import com.google.inject.AbstractModule -import io.edurt.datacap.common.utils.DateUtils -import org.slf4j.Logger -import org.slf4j.LoggerFactory.getLogger -import java.util.* - -class ConvertManager : AbstractModule() -{ - private val log: Logger = getLogger(ConvertManager::class.java) - private var externalModules: Iterable = ServiceLoader.load(ConvertModule::class.java) - - override fun configure() - { - log.info("========== Loading convert start ==========") - externalModules.forEach { module -> - log.info("Installing convert [ {} ] join time [ {} ]", module.name(), DateUtils.formatYMDHMSWithInterval()) - install(module) - } - log.info("========== Loading convert end ==========") - } -} diff --git a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertModule.kt b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertModule.kt deleted file mode 100644 index b2192c09..00000000 --- a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertModule.kt +++ /dev/null @@ -1,11 +0,0 @@ -package io.edurt.datacap.convert - -import com.google.inject.AbstractModule - -abstract class ConvertModule : AbstractModule() -{ - fun name(): String = this.javaClass - .simpleName - .removeSuffix("Convert") - .removeSuffix("Module") -} diff --git a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/Convert.kt b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertService.kt similarity index 89% rename from convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/Convert.kt rename to convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertService.kt index 875acc19..bffae8e7 100644 --- a/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/Convert.kt +++ b/convert/datacap-convert-spi/src/main/kotlin/io/edurt/datacap/convert/ConvertService.kt @@ -2,8 +2,9 @@ package io.edurt.datacap.convert import io.edurt.datacap.convert.model.ConvertRequest import io.edurt.datacap.convert.model.ConvertResponse +import io.edurt.datacap.plugin.Service -interface Convert +interface ConvertService : Service { fun name(): String { diff --git a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertFilterTest.kt b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceFilterTest.kt similarity index 91% rename from convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertFilterTest.kt rename to convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceFilterTest.kt index 9628a87e..bcf6d4d7 100644 --- a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertFilterTest.kt +++ b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceFilterTest.kt @@ -4,7 +4,7 @@ import com.google.inject.Guice import org.junit.Assert import org.junit.Test -class ConvertFilterTest +class ConvertServiceFilterTest { private val injector = Guice.createInjector(ConvertManager()) diff --git a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertModuleTest.kt b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceModuleTest.kt similarity index 91% rename from convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertModuleTest.kt rename to convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceModuleTest.kt index a377c672..f3f6fad5 100644 --- a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertModuleTest.kt +++ b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/ConvertServiceModuleTest.kt @@ -7,14 +7,14 @@ import com.google.inject.TypeLiteral import org.junit.Assert import org.junit.Test -class ConvertModuleTest +class ConvertServiceModuleTest { private val injector: Injector = Guice.createInjector(ConvertManager()) @Test fun test() { - injector.getInstance(Key.get(object : TypeLiteral>() + injector.getInstance(Key.get(object : TypeLiteral>() {})) .stream() .findFirst() diff --git a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertModule.kt b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertModule.kt index 934e0530..b74f8259 100644 --- a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertModule.kt +++ b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertModule.kt @@ -6,8 +6,8 @@ class TestConvertModule : ConvertModule() { override fun configure() { - Multibinder.newSetBinder(this.binder(), Convert::class.java) + Multibinder.newSetBinder(this.binder(), ConvertService::class.java) .addBinding() - .to(TestConvert::class.java) + .to(TestConvertService::class.java) } } diff --git a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvert.kt b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertService.kt similarity index 93% rename from convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvert.kt rename to convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertService.kt index 07e8cefe..9a4992cd 100644 --- a/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvert.kt +++ b/convert/datacap-convert-spi/src/test/kotlin/io/edurt/datacap/convert/TestConvertService.kt @@ -3,7 +3,7 @@ package io.edurt.datacap.convert import io.edurt.datacap.convert.model.ConvertRequest import io.edurt.datacap.convert.model.ConvertResponse -class TestConvert : Convert +class TestConvertService : ConvertService { override fun format(request: ConvertRequest): ConvertResponse { diff --git a/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtPlugin.kt b/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtPlugin.kt new file mode 100644 index 00000000..63151320 --- /dev/null +++ b/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtPlugin.kt @@ -0,0 +1,5 @@ +package io.edurt.datacap.convert + +import io.edurt.datacap.plugin.Plugin + +class TxtPlugin : Plugin() diff --git a/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtConvert.kt b/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtService.kt similarity index 98% rename from convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtConvert.kt rename to convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtService.kt index f673d311..5e45fdf2 100644 --- a/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtConvert.kt +++ b/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/TxtService.kt @@ -1,10 +1,8 @@ -package io.edurt.datacap.convert.txt +package io.edurt.datacap.convert import com.google.common.base.Preconditions.checkArgument import com.google.common.base.Preconditions.checkState import io.edurt.datacap.common.utils.DateUtils -import io.edurt.datacap.convert.Convert -import io.edurt.datacap.convert.FileConvert import io.edurt.datacap.convert.model.ConvertRequest import io.edurt.datacap.convert.model.ConvertResponse import org.apache.commons.io.FileUtils @@ -15,7 +13,7 @@ import java.io.IOException import java.io.InputStreamReader import java.util.Objects.requireNonNull -class TxtConvert : Convert +class TxtService : ConvertService { private val log = getLogger(this::class.java) diff --git a/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtModule.kt b/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtModule.kt deleted file mode 100644 index 252a35f9..00000000 --- a/convert/datacap-convert-txt/src/main/kotlin/io/edurt/datacap/convert/txt/TxtModule.kt +++ /dev/null @@ -1,15 +0,0 @@ -package io.edurt.datacap.convert.txt - -import com.google.inject.multibindings.Multibinder -import io.edurt.datacap.convert.Convert -import io.edurt.datacap.convert.ConvertModule - -class TxtModule : ConvertModule() -{ - override fun configure() - { - Multibinder.newSetBinder(this.binder(), Convert::class.java) - .addBinding() - .to(TxtConvert::class.java) - } -} diff --git a/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.convert.ConvertModule b/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.convert.ConvertModule deleted file mode 100644 index de0ac4df..00000000 --- a/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.convert.ConvertModule +++ /dev/null @@ -1 +0,0 @@ -io.edurt.datacap.convert.txt.TxtModule diff --git a/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Plugin b/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Plugin new file mode 100644 index 00000000..b78a5790 --- /dev/null +++ b/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Plugin @@ -0,0 +1 @@ +io.edurt.datacap.convert.TxtPlugin \ No newline at end of file diff --git a/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Service b/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Service new file mode 100644 index 00000000..da99f920 --- /dev/null +++ b/convert/datacap-convert-txt/src/main/resources/META-INF/services/io.edurt.datacap.plugin.Service @@ -0,0 +1 @@ +io.edurt.datacap.convert.TxtService \ No newline at end of file diff --git a/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtConvertTest.kt b/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtConvertTest.kt similarity index 99% rename from convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtConvertTest.kt rename to convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtConvertTest.kt index edbf59ad..0a492459 100644 --- a/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtConvertTest.kt +++ b/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtConvertTest.kt @@ -1,4 +1,4 @@ -package io.edurt.datacap.convert.txt +package io.edurt.datacap.convert import com.google.inject.Guice import com.google.inject.Injector diff --git a/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtModuleTest.kt b/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtModuleTest.kt similarity index 94% rename from convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtModuleTest.kt rename to convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtModuleTest.kt index b5ae8091..e5c24b97 100644 --- a/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/txt/TxtModuleTest.kt +++ b/convert/datacap-convert-txt/src/test/kotlin/io/edurt/datacap/convert/TxtModuleTest.kt @@ -1,4 +1,4 @@ -package io.edurt.datacap.convert.txt +package io.edurt.datacap.convert import com.google.inject.Guice import com.google.inject.Injector diff --git a/core/datacap-common/pom.xml b/core/datacap-common/pom.xml index 82da5f2f..a27d5ef6 100644 --- a/core/datacap-common/pom.xml +++ b/core/datacap-common/pom.xml @@ -50,16 +50,6 @@ datacap-captcha ${project.version} - - io.edurt.datacap - datacap-fs-spi - ${project.version} - - - io.edurt.datacap - datacap-scheduler-spi - ${project.version} - diff --git a/core/datacap-common/src/main/java/io/edurt/datacap/common/utils/SpiUtils.java b/core/datacap-common/src/main/java/io/edurt/datacap/common/utils/SpiUtils.java deleted file mode 100644 index 956f7ffa..00000000 --- a/core/datacap-common/src/main/java/io/edurt/datacap/common/utils/SpiUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.edurt.datacap.common.utils; - -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.TypeLiteral; -import io.edurt.datacap.fs.Fs; -import io.edurt.datacap.scheduler.Scheduler; - -import java.util.Optional; -import java.util.Set; - -public class SpiUtils -{ - private SpiUtils() - { - } - - /** - * Finds a specific Fs object by name. - * - * @param injector the injector used for dependency injection - * @param name the name of the Fs object to find - * @return an Optional containing the found Fs object, or an empty Optional if not found - */ - public static Optional findFs(Injector injector, String name) - { - Optional optionalFs = injector.getInstance(Key.get(new TypeLiteral>() {})) - .stream() - .filter(item -> item.name().equalsIgnoreCase(name)) - .findFirst(); - return optionalFs; - } - - /** - * Finds a schedule in the injector by name. - * - * @param injector the injector to search in - * @param name the name of the schedule to find - * @return an Optional containing the found Scheduler, or an empty Optional if not found - */ - public static Optional findSchedule(Injector injector, String name) - { - return injector.getInstance(Key.get(new TypeLiteral>() {})) - .stream() - .filter(item -> item.name().equalsIgnoreCase(name)) - .findFirst(); - } -} diff --git a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/PluginType.java b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/PluginType.java index 5258c0cf..d134e704 100644 --- a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/PluginType.java +++ b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/PluginType.java @@ -2,7 +2,9 @@ package io.edurt.datacap.plugin; public enum PluginType { - CONNECTOR("Connector"); + CONNECTOR("Connector"), + EXECUTOR("Executor"), + SCHEDULER("Scheduler"); private String name; diff --git a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/utils/PluginPathUtils.java b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/utils/PluginPathUtils.java index 0503c2a5..cef1e5fb 100644 --- a/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/utils/PluginPathUtils.java +++ b/core/datacap-plugin/src/main/java/io/edurt/datacap/plugin/utils/PluginPathUtils.java @@ -250,4 +250,18 @@ public class PluginPathUtils return Optional.empty(); } } + + /** + * 安全地将子路径添加到项目根目录 + * Safely append a sub-path to the project root directory + * + * @param subPath 要添加的子路径 + * @return 组合后的路径 + */ + public static Path appendPath(String subPath) + { + Path root = findProjectRoot(); + String safePath = subPath.startsWith("/") ? subPath.substring(1) : subPath; + return root.resolve(safePath); + } } diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java index 8973a795..7cbaf161 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/DataSetController.java @@ -1,6 +1,7 @@ package io.edurt.datacap.server.controller; import io.edurt.datacap.common.response.CommonResponse; +import io.edurt.datacap.plugin.PluginMetadata; import io.edurt.datacap.service.body.FilterBody; import io.edurt.datacap.service.body.adhoc.Adhoc; import io.edurt.datacap.service.entity.DataSetColumnEntity; @@ -78,7 +79,7 @@ public class DataSetController } @GetMapping(value = "getActuators") - public CommonResponse> getActuators() + public CommonResponse> getActuators() { return service.getActuators(); } diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/PluginController.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/PluginController.java index d6b19414..ff419428 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/PluginController.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/PluginController.java @@ -5,11 +5,11 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.TypeLiteral; import io.edurt.datacap.common.response.CommonResponse; -import io.edurt.datacap.executor.Executor; +import io.edurt.datacap.executor.ExecutorService; import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.plugin.PluginMetadata; import io.edurt.datacap.plugin.PluginType; -import io.edurt.datacap.scheduler.Scheduler; +import io.edurt.datacap.scheduler.SchedulerService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -37,15 +37,15 @@ public class PluginController public CommonResponse>> getPlugins() { Map> plugins = Maps.newHashMap(); - Set executors = injector.getInstance(Key.get(new TypeLiteral>() {})) + Set executors = injector.getInstance(Key.get(new TypeLiteral>() {})) .stream() - .map(Executor::name) + .map(ExecutorService::name) .collect(Collectors.toSet()); plugins.put("executor", executors); - Set schedulers = injector.getInstance(Key.get(new TypeLiteral>() {})) + Set schedulers = injector.getInstance(Key.get(new TypeLiteral>() {})) .stream() - .map(Scheduler::name) + .map(SchedulerService::name) .collect(Collectors.toSet()); plugins.put("scheduler", schedulers); return CommonResponse.success(plugins); diff --git a/core/datacap-service/pom.xml b/core/datacap-service/pom.xml index 863f542f..c4935dd5 100644 --- a/core/datacap-service/pom.xml +++ b/core/datacap-service/pom.xml @@ -112,6 +112,11 @@ io.edurt.datacap datacap-convert-spi + + io.edurt.datacap + datacap-fs-spi + ${project.version} + io.edurt.datacap diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/audit/AuditPluginHandler.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/audit/AuditPluginHandler.java index 0c24e015..ab346e65 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/audit/AuditPluginHandler.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/audit/AuditPluginHandler.java @@ -1,15 +1,15 @@ package io.edurt.datacap.service.audit; -import com.google.inject.Injector; import io.edurt.datacap.common.enums.State; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.CodeUtils; import io.edurt.datacap.common.utils.DateUtils; -import io.edurt.datacap.common.utils.SpiUtils; import io.edurt.datacap.convert.ConvertFilter; import io.edurt.datacap.convert.model.ConvertRequest; import io.edurt.datacap.convert.model.ConvertResponse; import io.edurt.datacap.fs.FsRequest; +import io.edurt.datacap.fs.FsService; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.common.FolderUtils; import io.edurt.datacap.service.entity.ExecuteEntity; import io.edurt.datacap.service.entity.PluginAuditEntity; @@ -49,14 +49,14 @@ public class AuditPluginHandler private final PluginAuditRepository pluginAuditRepository; private final SourceRepository sourceRepository; private final InitializerConfigure initializer; - private final Injector injector; + private final PluginManager pluginManager; - public AuditPluginHandler(PluginAuditRepository pluginAuditRepository, SourceRepository sourceRepository, InitializerConfigure initializer, Injector injector) + public AuditPluginHandler(PluginAuditRepository pluginAuditRepository, SourceRepository sourceRepository, InitializerConfigure initializer, PluginManager pluginManager) { this.pluginAuditRepository = pluginAuditRepository; this.sourceRepository = sourceRepository; this.initializer = initializer; - this.injector = injector; + this.pluginManager = pluginManager; } @Pointcut("@annotation(auditPlugin)") @@ -111,7 +111,7 @@ public class AuditPluginHandler String workHome = FolderUtils.getWorkHome(initializer.getDataHome(), user.getUsername(), String.join(File.separator, "adhoc", uniqueId)); log.info("Writer file to folder [ {} ] on [ {} ]", workHome, pluginAudit.getId()); try { - ConvertFilter.filter(injector, "Json") + ConvertFilter.filter(pluginManager, "Json") .ifPresent(it -> { try { FileUtils.forceMkdir(new File(workHome)); @@ -139,8 +139,12 @@ public class AuditPluginHandler fsRequest.setEndpoint(initializer.getFsConfigure().getEndpoint()); fsRequest.setFileName(String.join(File.separator, user.getUsername(), DateUtils.formatYMD(), String.join(File.separator, "adhoc", uniqueId), "result.csv")); } - SpiUtils.findFs(injector, initializer.getFsConfigure().getType()) - .map(v -> v.writer(fsRequest)); + + pluginManager.getPlugin(initializer.getFsConfigure().getType()) + .map(v -> { + FsService fsService = v.getService(FsService.class); + return fsService.writer(fsRequest); + }); log.info("Delete temp file [ {} ] on [ {} ] state [ {} ]", tempFile, pluginAudit.getId(), Files.deleteIfExists(tempFile.toPath())); log.info("Writer file to folder [ {} ] on [ {} ] completed", workHome, pluginAudit.getId()); pluginAudit.setHome(workHome); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DatasetSchedulerInitializer.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DatasetSchedulerInitializer.java index d6db9de1..5af17ec0 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DatasetSchedulerInitializer.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/initializer/DatasetSchedulerInitializer.java @@ -1,9 +1,9 @@ package io.edurt.datacap.service.initializer; -import com.google.inject.Injector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.edurt.datacap.common.utils.SpiUtils; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.scheduler.SchedulerRequest; +import io.edurt.datacap.scheduler.SchedulerService; import io.edurt.datacap.service.enums.SyncMode; import io.edurt.datacap.service.initializer.job.DatasetJob; import io.edurt.datacap.service.repository.DataSetRepository; @@ -19,14 +19,14 @@ import org.springframework.stereotype.Service; public class DatasetSchedulerInitializer implements CommandLineRunner { - private final Injector injector; + private final PluginManager pluginManager; private final DataSetRepository repository; private final DataSetService service; private final Scheduler scheduler; - public DatasetSchedulerInitializer(Injector injector, DataSetRepository repository, DataSetService service, Scheduler scheduler) + public DatasetSchedulerInitializer(PluginManager pluginManager, DataSetRepository repository, DataSetService service, Scheduler scheduler) { - this.injector = injector; + this.pluginManager = pluginManager; this.repository = repository; this.service = service; this.scheduler = scheduler; @@ -37,24 +37,26 @@ public class DatasetSchedulerInitializer throws Exception { log.info("Start dataset scheduler initializer"); -// this.repository.findAllBySyncMode(SyncMode.TIMING) -// .forEach(item -> { -// log.info("Dataset [ {} ] will be scheduled", item.getName()); -// SpiUtils.findSchedule(this.injector, item.getScheduler()) -// .ifPresent(scheduler -> { -// SchedulerRequest request = new SchedulerRequest(); -// request.setName(item.getId().toString()); -// request.setGroup("datacap"); -// request.setExpression(item.getExpression()); -// request.setJobId(String.valueOf(item.getId())); -// request.setCreateBeforeDelete(true); -// if (scheduler.name().equals("Default")) { -// request.setJob(new DatasetJob()); -// request.setScheduler(this.scheduler); -// } -// scheduler.initialize(request); -// }); -// }); + this.repository.findAllBySyncMode(SyncMode.TIMING) + .forEach(item -> { + log.info("Dataset [ {} ] will be scheduled", item.getName()); + pluginManager.getPlugin(item.getScheduler()) + .ifPresent(scheduler -> { + SchedulerRequest request = new SchedulerRequest(); + request.setName(item.getId().toString()); + request.setGroup("datacap"); + request.setExpression(item.getExpression()); + request.setJobId(String.valueOf(item.getId())); + request.setCreateBeforeDelete(true); + + SchedulerService schedulerService = scheduler.getService(SchedulerService.class); + if (schedulerService.name().equals("Default")) { + request.setJob(new DatasetJob()); + request.setScheduler(this.scheduler); + } + schedulerService.initialize(request); + }); + }); log.info("End dataset scheduler initializer"); } } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java index 5dafbc83..498ec36f 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/DataSetService.java @@ -1,6 +1,7 @@ package io.edurt.datacap.service.service; import io.edurt.datacap.common.response.CommonResponse; +import io.edurt.datacap.plugin.PluginMetadata; import io.edurt.datacap.service.body.FilterBody; import io.edurt.datacap.service.body.adhoc.Adhoc; import io.edurt.datacap.service.entity.DataSetColumnEntity; @@ -23,7 +24,7 @@ public interface DataSetService CommonResponse adhoc(String code, Adhoc configure); - CommonResponse> getActuators(); + CommonResponse> getActuators(); CommonResponse getInfo(String code); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java index de6f9dc6..86190c7c 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/DataSetServiceImpl.java @@ -3,9 +3,6 @@ package io.edurt.datacap.service.service.impl; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.TypeLiteral; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.edurt.datacap.common.enums.DataSetState; import io.edurt.datacap.common.response.CommonResponse; @@ -15,9 +12,7 @@ import io.edurt.datacap.common.sql.configure.SqlColumn; import io.edurt.datacap.common.sql.configure.SqlOperator; import io.edurt.datacap.common.sql.configure.SqlOrder; import io.edurt.datacap.common.sql.configure.SqlType; -import io.edurt.datacap.common.utils.SpiUtils; -import io.edurt.datacap.executor.Executor; -import io.edurt.datacap.executor.ExecutorUtils; +import io.edurt.datacap.executor.ExecutorService; import io.edurt.datacap.executor.common.RunEngine; import io.edurt.datacap.executor.common.RunMode; import io.edurt.datacap.executor.common.RunProtocol; @@ -27,15 +22,16 @@ import io.edurt.datacap.executor.configure.ExecutorConfigure; import io.edurt.datacap.executor.configure.ExecutorRequest; import io.edurt.datacap.executor.configure.ExecutorResponse; import io.edurt.datacap.executor.configure.OriginColumn; -import io.edurt.datacap.scheduler.Scheduler; +import io.edurt.datacap.plugin.PluginManager; +import io.edurt.datacap.plugin.PluginMetadata; import io.edurt.datacap.scheduler.SchedulerRequest; +import io.edurt.datacap.scheduler.SchedulerService; import io.edurt.datacap.service.adapter.PageRequestAdapter; import io.edurt.datacap.service.body.FilterBody; import io.edurt.datacap.service.body.PipelineFieldBody; import io.edurt.datacap.service.body.adhoc.Adhoc; import io.edurt.datacap.service.common.ConfigureUtils; import io.edurt.datacap.service.common.FolderUtils; -import io.edurt.datacap.service.common.PluginUtils; import io.edurt.datacap.service.configure.IConfigurePipelineType; import io.edurt.datacap.service.entity.DataSetColumnEntity; import io.edurt.datacap.service.entity.DataSetEntity; @@ -81,7 +77,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -97,17 +92,17 @@ public class DataSetServiceImpl public final DataSetColumnRepository columnRepository; private final DataSetRepository repository; private final DatasetHistoryRepository historyRepository; - private final Injector injector; + private final PluginManager pluginManager; private final InitializerConfigure initializerConfigure; private final org.quartz.Scheduler scheduler; private final Environment environment; - public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, Injector injector, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment) + public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, PluginManager pluginManager, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment) { this.repository = repository; this.columnRepository = columnRepository; this.historyRepository = historyRepository; - this.injector = injector; + this.pluginManager = pluginManager; this.initializerConfigure = initializerConfigure; this.scheduler = scheduler; this.environment = environment; @@ -117,7 +112,7 @@ public class DataSetServiceImpl public CommonResponse saveOrUpdate(DataSetEntity configure) { UserEntity user = UserDetailsService.getUser(); - ExecutorService service = Executors.newSingleThreadExecutor(); + java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(() -> { configure.setUser(user); completeState(configure, DataSetState.METADATA_START); @@ -133,7 +128,7 @@ public class DataSetServiceImpl if (!entity.isPresent()) { return CommonResponse.failure(String.format("DataSet [ %s ] not found", id)); } - ExecutorService service = Executors.newSingleThreadExecutor(); + java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(() -> startBuild(entity.get(), false)); return CommonResponse.success(entity); } @@ -156,7 +151,7 @@ public class DataSetServiceImpl if (!entityOptional.isPresent()) { return CommonResponse.failure(String.format("DataSet [ %s ] not found", id)); } - ExecutorService service = Executors.newSingleThreadExecutor(); + java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor(); DataSetEntity entity = entityOptional.get(); service.submit(() -> syncData(entity, service)); return CommonResponse.success(true); @@ -167,7 +162,7 @@ public class DataSetServiceImpl { return repository.findByCode(code) .map(item -> { - ExecutorService service = Executors.newSingleThreadExecutor(); + java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor(); service.submit(() -> clearData(item, service)); return CommonResponse.success(true); }) @@ -246,32 +241,33 @@ public class DataSetServiceImpl String sql = new SqlBuilder(body).getSql(); log.info("Execute SQL: {} for DataSet [ {} ]", sql, code); - Optional pluginOptional = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()); - if (!pluginOptional.isPresent()) { - throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType())); - } - PluginService plugin = pluginOptional.get(); - Configure targetConfigure = new Configure(); - targetConfigure.setHost(initializerConfigure.getDataSetConfigure().getHost()); - targetConfigure.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); - targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername())); - targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword())); - targetConfigure.setDatabase(Optional.ofNullable(database)); - targetConfigure.setInjector(injector); - plugin.connect(targetConfigure); - Response response = plugin.execute(sql); - response.setContent(sql); - return CommonResponse.success(response); + return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .map(plugin -> { + PluginService pluginService = plugin.getService(PluginService.class); + Configure targetConfigure = new Configure(); + targetConfigure.setHost(initializerConfigure.getDataSetConfigure().getHost()); + targetConfigure.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); + targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername())); + targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword())); + targetConfigure.setDatabase(Optional.ofNullable(database)); + targetConfigure.setPluginManager(pluginManager); + Response response = pluginService.execute(targetConfigure, sql); + response.setContent(sql); + return CommonResponse.success(response); + }) + .orElseGet(() -> CommonResponse.failure(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType()))); }) .orElseGet(() -> CommonResponse.failure(String.format("DataSet [ %s ] not found", code))); } @Override - public CommonResponse> getActuators() + public CommonResponse> getActuators() { - Set actuators = Sets.newHashSet(); - this.injector.getInstance(Key.get(new TypeLiteral>() {})) - .forEach(item -> actuators.add(item.name())); + Set actuators = Sets.newHashSet(); + this.pluginManager.getPluginInfos() + .stream() + .filter(item -> item.getType().equals(io.edurt.datacap.plugin.PluginType.EXECUTOR)) + .forEach(actuators::add); return CommonResponse.success(actuators); } @@ -338,7 +334,7 @@ public class DataSetServiceImpl targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername())); targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword())); targetConfigure.setDatabase(Optional.ofNullable(database)); - targetConfigure.setInjector(injector); + targetConfigure.setPluginManager(pluginManager); return targetConfigure; } @@ -402,63 +398,66 @@ public class DataSetServiceImpl private void createTable(DataSetEntity entity) { try { - Optional pluginOptional = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()); - if (!pluginOptional.isPresent()) { - throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType())); - } - PluginService plugin = pluginOptional.get(); - String database = initializerConfigure.getDataSetConfigure().getDatabase(); - String originTableName = entity.getTableName(); - String tableDefaultEngine = initializerConfigure.getDataSetConfigure().getTableDefaultEngine(); + pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .ifPresentOrElse(plugin -> { + PluginService pluginService = plugin.getService(PluginService.class); + String database = initializerConfigure.getDataSetConfigure().getDatabase(); + String originTableName = entity.getTableName(); + String tableDefaultEngine = initializerConfigure.getDataSetConfigure().getTableDefaultEngine(); - List columns = Lists.newArrayList(); - List columnEntities = columnRepository.findAllByDataset(entity); - columnEntities.stream() - .filter(item -> !item.isVirtualColumn()) - .forEach(item -> { - Column column = new Column(); - column.setName(item.getName()); - column.setType(getColumnType(item.getType())); - column.setComment(item.getComment()); - column.setLength(item.getLength()); - column.setNullable(item.isNullable()); - column.setDefaultValue(item.getDefaultValue()); - columns.add(column); - }); + List columns = Lists.newArrayList(); + List columnEntities = columnRepository.findAllByDataset(entity); + columnEntities.stream() + .filter(item -> !item.isVirtualColumn()) + .forEach(item -> { + Column column = new Column(); + column.setName(item.getName()); + column.setType(getColumnType(item.getType())); + column.setComment(item.getComment()); + column.setLength(item.getLength()); + column.setNullable(item.isNullable()); + column.setDefaultValue(item.getDefaultValue()); + columns.add(column); + }); - TableBuilder.Companion.BEGIN(); - TableBuilder.Companion.CREATE_TABLE(String.format("`%s`.`%s`", database, originTableName)); - TableBuilder.Companion.COLUMNS(columns.stream().map(Column::toColumnVar).collect(Collectors.toList())); - TableBuilder.Companion.ENGINE(tableDefaultEngine); - TableBuilder.Companion.ORDER_BY(columnEntities.stream().filter(DataSetColumnEntity::isOrderByKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); - TableBuilder.Companion.PARTITION_BY(columnEntities.stream().filter(DataSetColumnEntity::isPartitionKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); - TableBuilder.Companion.PRIMARY_KEY(columnEntities.stream().filter(DataSetColumnEntity::isPrimaryKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); - TableBuilder.Companion.SAMPLING_KEY(columnEntities.stream().filter(DataSetColumnEntity::isSamplingKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); - if (entity.getLifeCycleColumn() != null - && entity.getLifeCycle() != null - && entity.getLifeCycleType() != null) { - TableBuilder.Companion.ADD_LIFECYCLE(String.format("`%s` + INTERVAL %s %s", entity.getLifeCycleColumn(), entity.getLifeCycle(), entity.getLifeCycleType())); - } - String sql = TableBuilder.Companion.SQL(); - log.info("Create table sql \n {} \n on dataset [ {} ]", sql, entity.getName()); + TableBuilder.Companion.BEGIN(); + TableBuilder.Companion.CREATE_TABLE(String.format("`%s`.`%s`", database, originTableName)); + TableBuilder.Companion.COLUMNS(columns.stream().map(Column::toColumnVar).collect(Collectors.toList())); + TableBuilder.Companion.ENGINE(tableDefaultEngine); + TableBuilder.Companion.ORDER_BY(columnEntities.stream().filter(DataSetColumnEntity::isOrderByKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); + TableBuilder.Companion.PARTITION_BY(columnEntities.stream().filter(DataSetColumnEntity::isPartitionKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); + TableBuilder.Companion.PRIMARY_KEY(columnEntities.stream().filter(DataSetColumnEntity::isPrimaryKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); + TableBuilder.Companion.SAMPLING_KEY(columnEntities.stream().filter(DataSetColumnEntity::isSamplingKey).map(DataSetColumnEntity::getName).collect(Collectors.toList())); + if (entity.getLifeCycleColumn() != null + && entity.getLifeCycle() != null + && entity.getLifeCycleType() != null) { + TableBuilder.Companion.ADD_LIFECYCLE(String.format("`%s` + INTERVAL %s %s", entity.getLifeCycleColumn(), entity.getLifeCycle(), entity.getLifeCycleType())); + } + String sql = TableBuilder.Companion.SQL(); + log.info("Create table sql \n {} \n on dataset [ {} ]", sql, entity.getName()); - Configure targetConfigure = new Configure(); - targetConfigure.setHost(initializerConfigure.getDataSetConfigure().getHost()); - targetConfigure.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); - targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername())); - targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword())); - targetConfigure.setDatabase(Optional.ofNullable(database)); - plugin.connect(targetConfigure); - Response response = plugin.execute(sql); - plugin.destroy(); - if (response.getIsSuccessful()) { - entity.setTableName(originTableName); - entity.setMessage(null); - completeState(entity, DataSetState.TABLE_SUCCESS); - } - else { - throw new RuntimeException(response.getMessage()); - } + Configure targetConfigure = new Configure(); + targetConfigure.setHost(initializerConfigure.getDataSetConfigure().getHost()); + targetConfigure.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); + targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername())); + targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword())); + targetConfigure.setDatabase(Optional.ofNullable(database)); + Response response = pluginService.execute(targetConfigure, sql); + if (response.getIsSuccessful()) { + entity.setTableName(originTableName); + entity.setMessage(null); + completeState(entity, DataSetState.TABLE_SUCCESS); + } + else { + throw new RuntimeException(response.getMessage()); + } + }, + () -> { + String type = initializerConfigure.getDataSetConfigure().getType(); + String errorMessage = String.format("Plugin [ %s ] not found", type); + log.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + }); } catch (Exception e) { log.warn("Create dataset [ {} ] ", entity.getName(), e); @@ -524,10 +523,9 @@ public class DataSetServiceImpl TableBuilder.Companion.LIFECYCLE(String.format("`%s` + INTERVAL %s %s", item.getColumn().getName(), item.getColumn().getLength(), item.getColumn().getDefaultValue())); String sql = TableBuilder.Companion.SQL(); log.info("Modify lifecycle sql \n {} \n on dataset [ {} ] id [ {} ]", sql, entity.getName(), entity.getId()); - PluginService plugin = getOutputPlugin(); + PluginService pluginService = getOutputPlugin(); SourceEntity source = getOutputSource(); - plugin.connect(source.toConfigure()); - Response response = plugin.execute(sql); + Response response = pluginService.execute(source.toConfigure(), sql); Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage()); }); @@ -547,85 +545,102 @@ public class DataSetServiceImpl } } - private void syncData(DataSetEntity entity, ExecutorService service) + private PluginService getOutputPlugin() + { + return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .map(plugin -> plugin.getService(PluginService.class)) + .orElseThrow(() -> new RuntimeException("Not found service from %s" + initializerConfigure.getDataSetConfigure().getType())); + } + + private void syncData(DataSetEntity entity, java.util.concurrent.ExecutorService service) { DatasetHistoryEntity history = new DatasetHistoryEntity(); try { SourceEntity source = entity.getSource(); - Optional pluginOptional = PluginUtils.getPluginByNameAndType(injector, source.getType(), source.getProtocol()); - if (pluginOptional.isEmpty()) { - throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType())); - } - history.setState(RunState.CREATED); - history.setCreateTime(new Date()); - history.setQuery(entity.getQuery()); - history.setDataset(entity); - historyRepository.save(history); + pluginManager.getPlugin(source.getType()) + .ifPresentOrElse(plugin -> { + history.setState(RunState.CREATED); + history.setCreateTime(new Date()); + history.setQuery(entity.getQuery()); + history.setDataset(entity); + historyRepository.save(history); - Executor executor = ExecutorUtils.findOne(this.injector, entity.getExecutor()); - PluginService inputPlugin = pluginOptional.get(); - Set originColumns = columnRepository.findAllByDataset(entity) - .stream() - .filter(item -> !item.isVirtualColumn()) - .map(item -> new OriginColumn(item.getName(), item.getOriginal())) - .collect(Collectors.toSet()); - String database = initializerConfigure.getDataSetConfigure().getDatabase(); - Properties originInputProperties = new Properties(); - originInputProperties.put("driver", inputPlugin.driver()); - PipelineFieldBody inputFieldBody = ConfigureUtils.convertFieldBody(source, entity.getExecutor(), IConfigurePipelineType.INPUT, environment, originInputProperties); - Properties inputProperties = ConfigureUtils.convertProperties(source, environment, - IConfigurePipelineType.INPUT, entity.getExecutor(), entity.getQuery(), inputFieldBody); - Set inputOptions = ConfigureUtils.convertOptions(source, environment, entity.getExecutor(), IConfigurePipelineType.INPUT); - Configure inputConfigure = source.toConfigure(); - inputConfigure.setInjector(injector); - ExecutorConfigure input = new ExecutorConfigure(source.getType(), inputProperties, inputOptions, RunProtocol.valueOf(source.getProtocol()), - inputPlugin, entity.getQuery(), database, entity.getTableName(), inputConfigure, originColumns); + PluginService inputPlugin = plugin.getService(PluginService.class); + pluginManager.getPlugin(entity.getExecutor()) + .ifPresent(executor -> { + ExecutorService executorService = executor.getService(ExecutorService.class); - PluginService outputPlugin = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null); - SourceEntity outputSource = new SourceEntity(); - outputSource.setType(initializerConfigure.getDataSetConfigure().getType()); - outputSource.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase()); - outputSource.setHost(initializerConfigure.getDataSetConfigure().getHost()); - outputSource.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); - outputSource.setUsername(initializerConfigure.getDataSetConfigure().getUsername()); - outputSource.setPassword(initializerConfigure.getDataSetConfigure().getPassword()); - outputSource.setProtocol(PluginType.JDBC.name()); - Properties originOutputProperties = new Properties(); - List fields = Lists.newArrayList(); - columnRepository.findAllByDataset(entity) - .forEach(item -> fields.add(item.getName())); - originOutputProperties.put("fields", String.join("\n", fields)); - originOutputProperties.put("database", database); - originOutputProperties.put("table", entity.getTableName()); - PipelineFieldBody outputFieldBody = ConfigureUtils.convertFieldBody(outputSource, entity.getExecutor(), IConfigurePipelineType.OUTPUT, environment, originOutputProperties); - Properties outputProperties = ConfigureUtils.convertProperties(outputSource, environment, - IConfigurePipelineType.OUTPUT, entity.getExecutor(), entity.getQuery(), outputFieldBody); - Set outputOptions = ConfigureUtils.convertOptions(outputSource, environment, entity.getExecutor(), IConfigurePipelineType.OUTPUT); - ExecutorConfigure output = new ExecutorConfigure(outputSource.getType(), outputProperties, outputOptions, RunProtocol.NONE, - outputPlugin, null, null, null, getConfigure(database), Sets.newHashSet()); + Set originColumns = columnRepository.findAllByDataset(entity) + .stream() + .filter(item -> !item.isVirtualColumn()) + .map(item -> new OriginColumn(item.getName(), item.getOriginal())) + .collect(Collectors.toSet()); + String database = initializerConfigure.getDataSetConfigure().getDatabase(); + Properties originInputProperties = new Properties(); + originInputProperties.put("driver", inputPlugin.driver()); + PipelineFieldBody inputFieldBody = ConfigureUtils.convertFieldBody(source, entity.getExecutor(), IConfigurePipelineType.INPUT, environment, originInputProperties); + Properties inputProperties = ConfigureUtils.convertProperties(source, environment, + IConfigurePipelineType.INPUT, entity.getExecutor(), entity.getQuery(), inputFieldBody); + Set inputOptions = ConfigureUtils.convertOptions(source, environment, entity.getExecutor(), IConfigurePipelineType.INPUT); + Configure inputConfigure = source.toConfigure(); + inputConfigure.setPluginManager(pluginManager); + ExecutorConfigure input = new ExecutorConfigure(source.getType(), inputProperties, inputOptions, RunProtocol.valueOf(source.getProtocol()), + inputPlugin, entity.getQuery(), database, entity.getTableName(), inputConfigure, originColumns); - String taskName = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS"); - String workHome = FolderUtils.getWorkHome(initializerConfigure.getDataHome(), entity.getUser().getUsername(), String.join(File.separator, "dataset", entity.getExecutor().toLowerCase(), taskName)); - ExecutorRequest request = new ExecutorRequest(taskName, entity.getUser().getUsername(), input, output, - environment.getProperty(String.format("datacap.executor.%s.home", entity.getExecutor().toLowerCase())), - workHome, this.injector, 600, - RunWay.valueOf(environment.getProperty("datacap.executor.way")), - RunMode.valueOf(environment.getProperty("datacap.executor.mode")), - environment.getProperty("datacap.executor.startScript"), - RunEngine.valueOf(environment.getProperty("datacap.executor.engine"))); + pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .ifPresent(_plugin -> { + PluginService outputPlugin = _plugin.getService(PluginService.class); + SourceEntity outputSource = new SourceEntity(); + outputSource.setType(initializerConfigure.getDataSetConfigure().getType()); + outputSource.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase()); + outputSource.setHost(initializerConfigure.getDataSetConfigure().getHost()); + outputSource.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); + outputSource.setUsername(initializerConfigure.getDataSetConfigure().getUsername()); + outputSource.setPassword(initializerConfigure.getDataSetConfigure().getPassword()); + outputSource.setProtocol(PluginType.JDBC.name()); + Properties originOutputProperties = new Properties(); + List fields = Lists.newArrayList(); + columnRepository.findAllByDataset(entity) + .forEach(item -> fields.add(item.getName())); + originOutputProperties.put("fields", String.join("\n", fields)); + originOutputProperties.put("database", database); + originOutputProperties.put("table", entity.getTableName()); + PipelineFieldBody outputFieldBody = ConfigureUtils.convertFieldBody(outputSource, entity.getExecutor(), IConfigurePipelineType.OUTPUT, environment, originOutputProperties); + Properties outputProperties = ConfigureUtils.convertProperties(outputSource, environment, + IConfigurePipelineType.OUTPUT, entity.getExecutor(), entity.getQuery(), outputFieldBody); + Set outputOptions = ConfigureUtils.convertOptions(outputSource, environment, entity.getExecutor(), IConfigurePipelineType.OUTPUT); + ExecutorConfigure output = new ExecutorConfigure(outputSource.getType(), outputProperties, outputOptions, RunProtocol.NONE, + outputPlugin, null, null, null, getConfigure(database), Sets.newHashSet()); - history.setState(RunState.RUNNING); - historyRepository.save(history); - ExecutorResponse response = executor.start(request); - history.setUpdateTime(new Date()); - history.setElapsed((history.getUpdateTime().getTime() - history.getCreateTime().getTime()) / 1000); - history.setMode(QueryMode.SYNC); - history.setCount(response.getCount()); - history.setState(response.getState()); - Preconditions.checkArgument(response.getSuccessful(), response.getMessage()); + String taskName = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS"); + String workHome = FolderUtils.getWorkHome(initializerConfigure.getDataHome(), entity.getUser().getUsername(), String.join(File.separator, "dataset", entity.getExecutor().toLowerCase(), taskName)); + ExecutorRequest request = new ExecutorRequest(taskName, entity.getUser().getUsername(), input, output, + environment.getProperty(String.format("datacap.executor.%s.home", entity.getExecutor().toLowerCase())), + workHome, this.pluginManager, 600, + RunWay.valueOf(environment.getProperty("datacap.executor.way")), + RunMode.valueOf(environment.getProperty("datacap.executor.mode")), + environment.getProperty("datacap.executor.startScript"), + RunEngine.valueOf(environment.getProperty("datacap.executor.engine"))); - this.flushTableMetadata(entity, outputPlugin, database, requireNonNull(output.getOriginConfigure())); + history.setState(RunState.RUNNING); + historyRepository.save(history); + + ExecutorResponse response = executorService.start(request); + history.setUpdateTime(new Date()); + history.setElapsed((history.getUpdateTime().getTime() - history.getCreateTime().getTime()) / 1000); + history.setMode(QueryMode.SYNC); + history.setCount(response.getCount()); + history.setState(response.getState()); + Preconditions.checkArgument(response.getSuccessful(), response.getMessage()); + + this.flushTableMetadata(entity, outputPlugin, database, requireNonNull(output.getOriginConfigure())); + }); + }); + }, + () -> { + throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType())); + }); } catch (Exception e) { log.warn("Sync data for dataset [ {} ] failed", entity.getName(), e); @@ -647,7 +662,7 @@ public class DataSetServiceImpl { log.info("Start schedule for dataset [ {} ] id [ {} ]", entity.getName(), entity.getId()); if (entity.getSyncMode().equals(SyncMode.TIMING)) { - SpiUtils.findSchedule(this.injector, entity.getScheduler()) + pluginManager.getPlugin(entity.getScheduler()) .ifPresent(scheduler -> { SchedulerRequest request = new SchedulerRequest(); request.setName(entity.getId().toString()); @@ -655,51 +670,57 @@ public class DataSetServiceImpl request.setExpression(entity.getExpression()); request.setJobId(String.valueOf(entity.getId())); request.setCreateBeforeDelete(true); - if (scheduler.name().equals("Default")) { + + SchedulerService schedulerService = scheduler.getService(SchedulerService.class); + if (schedulerService.name().equals("Default")) { request.setJob(new DatasetJob()); request.setScheduler(this.scheduler); } - scheduler.initialize(request); + schedulerService.initialize(request); }); } else { - SpiUtils.findSchedule(this.injector, entity.getScheduler()) + pluginManager.getPlugin(entity.getScheduler()) .ifPresent(scheduler -> { SchedulerRequest request = new SchedulerRequest(); request.setName(entity.getId().toString()); request.setGroup("datacap"); - if (scheduler.name().equals("Default")) { + + SchedulerService schedulerService = scheduler.getService(SchedulerService.class); + if (schedulerService.name().equals("Default")) { request.setScheduler(this.scheduler); } - scheduler.stop(request); + schedulerService.stop(request); }); } } - private void clearData(DataSetEntity entity, ExecutorService service) + private void clearData(DataSetEntity entity, java.util.concurrent.ExecutorService service) { try { - PluginService plugin = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null); - SourceEntity source = new SourceEntity(); - source.setType(initializerConfigure.getDataSetConfigure().getType()); - source.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase()); - source.setHost(initializerConfigure.getDataSetConfigure().getHost()); - source.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); - source.setUsername(initializerConfigure.getDataSetConfigure().getUsername()); - source.setPassword(initializerConfigure.getDataSetConfigure().getPassword()); - source.setProtocol(PluginType.JDBC.name()); + pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .ifPresent(plugin -> { + SourceEntity source = new SourceEntity(); + source.setType(initializerConfigure.getDataSetConfigure().getType()); + source.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase()); + source.setHost(initializerConfigure.getDataSetConfigure().getHost()); + source.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort())); + source.setUsername(initializerConfigure.getDataSetConfigure().getUsername()); + source.setPassword(initializerConfigure.getDataSetConfigure().getPassword()); + source.setProtocol(PluginType.JDBC.name()); - plugin.connect(source.toConfigure()); - SqlBody body = SqlBody.builder() - .type(SqlType.TRUNCATE) - .database(initializerConfigure.getDataSetConfigure().getDatabase()) - .table(entity.getTableName()) - .build(); - String sql = new SqlBuilder(body).getSql(); - log.info("Clear data for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql); - Response response = plugin.execute(sql); - Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage()); - this.flushTableMetadata(entity, plugin, initializerConfigure.getDataSetConfigure().getDatabase(), source.toConfigure()); + PluginService pluginService = plugin.getService(PluginService.class); + SqlBody body = SqlBody.builder() + .type(SqlType.TRUNCATE) + .database(initializerConfigure.getDataSetConfigure().getDatabase()) + .table(entity.getTableName()) + .build(); + String sql = new SqlBuilder(body).getSql(); + log.info("Clear data for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql); + Response response = pluginService.execute(source.toConfigure(), sql); + Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage()); + this.flushTableMetadata(entity, pluginService, initializerConfigure.getDataSetConfigure().getDatabase(), source.toConfigure()); + }); } catch (Exception e) { log.warn("Clear data for dataset [ {} ] failed", entity.getName(), e); @@ -713,11 +734,11 @@ public class DataSetServiceImpl * Flushes the table metadata for a given dataset entity using the specified plugin and database configuration. * * @param entity the dataset entity to flush - * @param plugin the plugin used to connect + * @param pluginService the plugin used to connect * @param database the database name * @param configure the configuration settings */ - private void flushTableMetadata(DataSetEntity entity, PluginService plugin, String database, Configure configure) + private void flushTableMetadata(DataSetEntity entity, PluginService pluginService, String database, Configure configure) { // Get the total number of rows and the total size of the dataset log.info("Get the total number of rows and the total size of the dataset [ {} ]", entity.getName()); @@ -733,9 +754,8 @@ public class DataSetServiceImpl .condition(" AND ") .build()); configure.setFormat("None"); - configure.setInjector(injector); - plugin.connect(configure); - Response outputResponse = plugin.execute(builder.getSql()); + configure.setPluginManager(pluginManager); + Response outputResponse = pluginService.execute(configure, builder.getSql()); if (outputResponse.getIsSuccessful()) { Object columnData = outputResponse.getColumns().get(0); if (columnData instanceof List) { @@ -817,32 +837,25 @@ public class DataSetServiceImpl */ private boolean checkTableExists(DataSetEntity entity) { - try { - PluginService plugin = getOutputPlugin(); - SourceEntity source = getOutputSource(); - Configure configure = source.toConfigure(); - configure.setInjector(injector); - plugin.connect(configure); - String sql = String.format("SHOW CREATE TABLE `%s`.`%s`", initializerConfigure.getDataSetConfigure().getDatabase(), entity.getTableName()); - log.info("Check table exists for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql); - Response response = plugin.execute(sql); - Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage()); - return true; - } - catch (Exception e) { - log.warn("Check table exists for dataset [ {} ] failed", entity.getName(), e); - return false; - } - } - - /** - * Retrieves the output plugin using the injector and initializer configuration data set type. - * - * @return an instance of the output plugin, or null if not found - */ - private PluginService getOutputPlugin() - { - return PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null); + return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType()) + .map(plugin -> { + try { + PluginService pluginService = plugin.getService(PluginService.class); + SourceEntity source = getOutputSource(); + Configure configure = source.toConfigure(); + configure.setPluginManager(pluginManager); + String sql = String.format("SHOW CREATE TABLE `%s`.`%s`", initializerConfigure.getDataSetConfigure().getDatabase(), entity.getTableName()); + log.info("Check table exists for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql); + Response response = pluginService.execute(configure, sql); + Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage()); + return true; + } + catch (Exception e) { + log.warn("Check table exists for dataset [ {} ] failed", entity.getName(), e); + return false; + } + }) + .orElse(false); } /** diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/ExecuteServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/ExecuteServiceImpl.java index e1ec3489..d24ef85f 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/ExecuteServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/ExecuteServiceImpl.java @@ -1,6 +1,5 @@ package io.edurt.datacap.service.service.impl; -import com.google.inject.Injector; import io.edurt.datacap.common.enums.ServiceState; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.sql.SqlBuilder; @@ -10,7 +9,6 @@ import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.audit.AuditPlugin; import io.edurt.datacap.service.body.ExecuteDslBody; import io.edurt.datacap.service.entity.ExecuteEntity; -import io.edurt.datacap.service.initializer.InitializerConfigure; import io.edurt.datacap.service.repository.SourceRepository; import io.edurt.datacap.service.service.ExecuteService; import io.edurt.datacap.spi.PluginService; @@ -28,18 +26,14 @@ import java.util.Optional; public class ExecuteServiceImpl implements ExecuteService { - private final Injector injector; private final SourceRepository sourceRepository; private final Environment environment; - private final InitializerConfigure initializerConfigure; private final PluginManager pluginManager; - public ExecuteServiceImpl(Injector injector, SourceRepository sourceRepository, Environment environment, InitializerConfigure initializerConfigure, PluginManager pluginManager) + public ExecuteServiceImpl(SourceRepository sourceRepository, Environment environment, PluginManager pluginManager) { - this.injector = injector; this.sourceRepository = sourceRepository; this.environment = environment; - this.initializerConfigure = initializerConfigure; this.pluginManager = pluginManager; } @@ -62,9 +56,8 @@ public class ExecuteServiceImpl .env(Optional.ofNullable(entity.getConfigures())) .format(configure.getFormat()) .usedConfig(entity.isUsedConfig()) - .injector(plugin.getInjector()) + .pluginManager(pluginManager) .build(); - _configure.setInjector(injector); if (entity.isUsedConfig()) { _configure.setUsername(Optional.of(entity.getUser().getUsername())); String configHome = environment.getProperty("datacap.config.data"); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java index a0d38669..0f37578c 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java @@ -6,7 +6,7 @@ import com.google.inject.Key; import com.google.inject.TypeLiteral; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.BeanToPropertiesUtils; -import io.edurt.datacap.executor.Executor; +import io.edurt.datacap.executor.ExecutorService; import io.edurt.datacap.executor.common.RunEngine; import io.edurt.datacap.executor.common.RunMode; import io.edurt.datacap.executor.common.RunState; @@ -49,7 +49,6 @@ import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Service @@ -157,9 +156,9 @@ public class PipelineServiceImpl log.info("Pipeline containers is not full, submit to executor [ {} ]", pipelineName); pipelineEntity.setState(RunState.RUNNING); repository.save(pipelineEntity); - Optional executorOptional = injector.getInstance(Key.get(new TypeLiteral>() {})) + Optional executorOptional = injector.getInstance(Key.get(new TypeLiteral>() {})) .stream() - .filter(executor -> executor.name().equals(configure.getExecutor())) + .filter(executorService -> executorService.name().equals(configure.getExecutor())) .findFirst(); try { @@ -176,7 +175,7 @@ public class PipelineServiceImpl environment.getProperty("datacap.executor.startScript"), RunEngine.valueOf(environment.getProperty("datacap.executor.engine"))); - final ExecutorService executorService = Executors.newCachedThreadPool(); + final java.util.concurrent.ExecutorService executorService = Executors.newCachedThreadPool(); PipelineEntity finalPipelineEntity = pipelineEntity; executorService.submit(() -> { initializer.getTaskExecutors() @@ -224,7 +223,7 @@ public class PipelineServiceImpl return CommonResponse.failure(String.format("Pipeline [ %s ] is already stopped", entity.getName())); } - ExecutorService service = initializer.getTaskExecutors() + java.util.concurrent.ExecutorService service = initializer.getTaskExecutors() .get(entity.getName()); if (service != null) { service.shutdownNow(); diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PluginAuditServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PluginAuditServiceImpl.java index 03f05376..df50e575 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PluginAuditServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PluginAuditServiceImpl.java @@ -1,15 +1,15 @@ package io.edurt.datacap.service.service.impl; -import com.google.inject.Injector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.DateUtils; -import io.edurt.datacap.common.utils.SpiUtils; -import io.edurt.datacap.convert.ConvertFilter; +import io.edurt.datacap.convert.ConvertService; import io.edurt.datacap.convert.model.ConvertRequest; import io.edurt.datacap.convert.model.ConvertResponse; import io.edurt.datacap.fs.FsRequest; import io.edurt.datacap.fs.FsResponse; +import io.edurt.datacap.fs.FsService; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.activity.HeatmapActivity; import io.edurt.datacap.service.adapter.PageRequestAdapter; import io.edurt.datacap.service.body.FilterBody; @@ -43,13 +43,13 @@ public class PluginAuditServiceImpl { private final PluginAuditRepository pluginAuditRepository; private final InitializerConfigure initializer; - private final Injector injector; + private final PluginManager pluginManager; - public PluginAuditServiceImpl(PluginAuditRepository pluginAuditRepository, InitializerConfigure initializer, Injector injector) + public PluginAuditServiceImpl(PluginAuditRepository pluginAuditRepository, InitializerConfigure initializer, PluginManager pluginManager) { this.pluginAuditRepository = pluginAuditRepository; this.initializer = initializer; - this.injector = injector; + this.pluginManager = pluginManager; } @Override @@ -116,22 +116,26 @@ public class PluginAuditServiceImpl fsRequest.setEndpoint(initializer.getFsConfigure().getEndpoint()); fsRequest.setFileName(String.join(File.separator, value.getUser().getUsername(), DateUtils.formatYMD(), String.join(File.separator, "adhoc", code), "result.csv")); } - FsResponse fsResponse = SpiUtils.findFs(injector, initializer.getFsConfigure().getType()) - .map(v -> v.reader(fsRequest)) - .get(); - ConvertFilter.filter(injector, "Json") - .ifPresent(it -> { - ConvertRequest request = new ConvertRequest(); - request.setStream(fsResponse.getContext()); + pluginManager.getPlugin(initializer.getFsConfigure().getType()) + .ifPresent(plugin -> { + FsService fsService = plugin.getService(FsService.class); + FsResponse fsResponse = fsService.reader(fsRequest); - ConvertResponse _response = it.formatStream(request); - if (_response.getSuccessful()) { - response.setHeaders(_response.getHeaders() - .stream() - .map(String::valueOf) - .collect(Collectors.toList())); - response.setColumns(_response.getColumns()); - } + pluginManager.getPlugin("Json") + .ifPresent(it -> { + ConvertRequest request = new ConvertRequest(); + request.setStream(fsResponse.getContext()); + ConvertService convertService = it.getService(ConvertService.class); + + ConvertResponse _response = convertService.formatStream(request); + if (_response.getSuccessful()) { + response.setHeaders(_response.getHeaders() + .stream() + .map(String::valueOf) + .collect(Collectors.toList())); + response.setColumns(_response.getColumns()); + } + }); }); return CommonResponse.success(response); }) diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java index 6735542c..7e1439b8 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/SourceServiceImpl.java @@ -4,9 +4,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Maps; import com.google.common.io.Files; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.TypeLiteral; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.edurt.datacap.common.enums.NodeType; import io.edurt.datacap.common.enums.ServiceState; @@ -15,6 +12,7 @@ import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.CodeUtils; import io.edurt.datacap.common.utils.JsonUtils; import io.edurt.datacap.executor.common.RunState; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.adapter.PageRequestAdapter; import io.edurt.datacap.service.body.FilterBody; import io.edurt.datacap.service.body.SharedSourceBody; @@ -59,13 +57,11 @@ import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Service; import java.io.File; -import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -86,10 +82,10 @@ public class SourceServiceImpl private final ColumnRepository columnHandler; private final TemplateSqlRepository templateHandler; private final ScheduledHistoryRepository scheduledHistoryHandler; - private final Injector injector; + private final PluginManager pluginManager; private final Environment environment; - public SourceServiceImpl(SourceRepository sourceRepository, UserRepository userRepository, ScheduledHistoryRepository scheduledHistoryRepository, DatabaseRepository databaseHandler, TableRepository tableHandler, ColumnRepository columnHandler, TemplateSqlRepository templateHandler, ScheduledHistoryRepository scheduledHistoryHandler, Injector injector, Environment environment) + public SourceServiceImpl(SourceRepository sourceRepository, UserRepository userRepository, ScheduledHistoryRepository scheduledHistoryRepository, DatabaseRepository databaseHandler, TableRepository tableHandler, ColumnRepository columnHandler, TemplateSqlRepository templateHandler, ScheduledHistoryRepository scheduledHistoryHandler, PluginManager pluginManager, Environment environment) { this.sourceRepository = sourceRepository; this.userRepository = userRepository; @@ -99,7 +95,7 @@ public class SourceServiceImpl this.columnHandler = columnHandler; this.templateHandler = templateHandler; this.scheduledHistoryHandler = scheduledHistoryHandler; - this.injector = injector; + this.pluginManager = pluginManager; this.environment = environment; } @@ -152,28 +148,27 @@ public class SourceServiceImpl @Override public CommonResponse testConnection(SourceEntity configure) { - Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getType(), configure.getProtocol()); - if (!pluginOptional.isPresent()) { - return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND); - } - - Configure _configure = new Configure(); - PluginService plugin = pluginOptional.get(); - _configure.setHost(configure.getHost()); - _configure.setPort(configure.getPort()); - _configure.setUsername(Optional.ofNullable(configure.getUsername())); - _configure.setPassword(Optional.ofNullable(configure.getPassword())); - Optional _database = StringUtils.isNotEmpty(configure.getDatabase()) ? Optional.ofNullable(configure.getDatabase()) : Optional.empty(); - _configure.setDatabase(_database); - _configure.setEnv(Optional.ofNullable(configure.getConfigures())); - _configure.setSsl(Optional.ofNullable(configure.getSsl())); - plugin.connect(_configure); - io.edurt.datacap.spi.model.Response response = plugin.execute(plugin.validator()); - plugin.destroy(); - if (response.getIsSuccessful()) { - return CommonResponse.success(response); - } - return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage()); + return pluginManager.getPlugin(configure.getType()) + .map(plugin -> { + Configure _configure = new Configure(); + PluginService pluginService = plugin.getService(PluginService.class); + _configure.setHost(configure.getHost()); + _configure.setPort(configure.getPort()); + _configure.setUsername(Optional.ofNullable(configure.getUsername())); + _configure.setPassword(Optional.ofNullable(configure.getPassword())); + Optional _database = StringUtils.isNotEmpty(configure.getDatabase()) ? Optional.ofNullable(configure.getDatabase()) : Optional.empty(); + _configure.setDatabase(_database); + _configure.setEnv(Optional.ofNullable(configure.getConfigures())); + _configure.setSsl(Optional.ofNullable(configure.getSsl())); + pluginService.connect(_configure); + io.edurt.datacap.spi.model.Response response = pluginService.execute(pluginService.validator()); + pluginService.destroy(); + if (response.getIsSuccessful()) { + return CommonResponse.success(response); + } + return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage()); + }) + .orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND)); } @Override @@ -207,19 +202,19 @@ public class SourceServiceImpl public CommonResponse>> getPlugins() { Map> pluginMap = new ConcurrentHashMap<>(); - this.injector.getInstance(Key.get(new TypeLiteral>() {})).stream().forEach(plugin -> { - PluginEntity entity = new PluginEntity(); - entity.setName(plugin.name()); - entity.setDescription(plugin.description()); - entity.setType(plugin.type().name()); - entity.setConfigure(PluginUtils.loadYamlConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment)); - List plugins = pluginMap.get(plugin.type().name()); - if (ObjectUtils.isEmpty(plugins)) { - plugins = new ArrayList<>(); - } - plugins.add(entity); - pluginMap.put(plugin.type().name(), plugins); - }); +// this.injector.getInstance(Key.get(new TypeLiteral>() {})).stream().forEach(plugin -> { +// PluginEntity entity = new PluginEntity(); +// entity.setName(plugin.name()); +// entity.setDescription(plugin.description()); +// entity.setType(plugin.type().name()); +// entity.setConfigure(PluginUtils.loadYamlConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment)); +// List plugins = pluginMap.get(plugin.type().name()); +// if (ObjectUtils.isEmpty(plugins)) { +// plugins = new ArrayList<>(); +// } +// plugins.add(entity); +// pluginMap.put(plugin.type().name(), plugins); +// }); return CommonResponse.success(pluginMap); } @@ -251,122 +246,120 @@ public class SourceServiceImpl @Override public CommonResponse testConnectionV2(SourceBody configure) { - Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getName(), configure.getType()); - if (!pluginOptional.isPresent()) { - return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND); - } + return pluginManager.getPlugin(configure.getName()) + .map(plugin -> { + // Check configure + IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment); + if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) { + return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH); + } - // Check configure - IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment); - if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) { - return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH); - } + // Filter required + List requiredMismatchConfigures = configure.getConfigure().getConfigures().stream().filter(v -> v.isRequired()).filter(v -> ObjectUtils.isEmpty(v.getValue())).collect(Collectors.toList()); + if (requiredMismatchConfigures.size() > 0) { + return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_REQUIRED, ConfigureUtils.preparedMessage(requiredMismatchConfigures)); + } - // Filter required - List requiredMismatchConfigures = configure.getConfigure().getConfigures().stream().filter(v -> v.isRequired()).filter(v -> ObjectUtils.isEmpty(v.getValue())).collect(Collectors.toList()); - if (requiredMismatchConfigures.size() > 0) { - return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_REQUIRED, ConfigureUtils.preparedMessage(requiredMismatchConfigures)); - } - - PluginService plugin = pluginOptional.get(); - // The filter parameter value is null data - List applyConfigures = ConfigureUtils.filterNotEmpty(configure.getConfigure().getConfigures()); - Configure _configure = ConfigureUtils.preparedConfigure(applyConfigures); - // Adapter file configure - if (_configure.isUsedConfig()) { - String cacheHome = environment.getProperty("datacap.cache.data"); - if (StringUtils.isEmpty(cacheHome)) { - cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache"); - } - _configure.setHome(cacheHome); - _configure.setUsername(Optional.of(UserDetailsService.getUser().getUsername())); - } - _configure.setInjector(injector); - plugin.connect(_configure); - io.edurt.datacap.spi.model.Response response = plugin.execute(plugin.validator()); - if (response.getIsSuccessful()) { - plugin.destroy(); - return CommonResponse.success(response); - } - return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage()); + PluginService pluginService = plugin.getService(PluginService.class); + // The filter parameter value is null data + List applyConfigures = ConfigureUtils.filterNotEmpty(configure.getConfigure().getConfigures()); + Configure _configure = ConfigureUtils.preparedConfigure(applyConfigures); + // Adapter file configure + if (_configure.isUsedConfig()) { + String cacheHome = environment.getProperty("datacap.cache.data"); + if (StringUtils.isEmpty(cacheHome)) { + cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache"); + } + _configure.setHome(cacheHome); + _configure.setUsername(Optional.of(UserDetailsService.getUser().getUsername())); + } + _configure.setPluginManager(pluginManager); + pluginService.connect(_configure); + Response response = pluginService.execute(pluginService.validator()); + if (response.getIsSuccessful()) { + pluginService.destroy(); + return CommonResponse.success(response); + } + return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage()); + }) + .orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND)); } @Override public CommonResponse saveOrUpdateV2(SourceBody configure) { - Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getName(), configure.getType()); - if (!pluginOptional.isPresent()) { - return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND); - } + return pluginManager.getPlugin(configure.getName()) + .map(plugin -> { + // Check configure + IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment); + if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) { + return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH); + } - // Check configure - IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment); - if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) { - return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH); - } + // Filter required + List requiredMismatchConfigures = configure.getConfigure() + .getConfigures() + .stream() + .filter(IConfigureField::isRequired) + .filter(v -> ObjectUtils.isEmpty(v.getValue())) + .collect(Collectors.toList()); + if (!requiredMismatchConfigures.isEmpty()) { + return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_REQUIRED, ConfigureUtils.preparedMessage(requiredMismatchConfigures)); + } - // Filter required - List requiredMismatchConfigures = configure.getConfigure() - .getConfigures() - .stream() - .filter(IConfigureField::isRequired) - .filter(v -> ObjectUtils.isEmpty(v.getValue())) - .collect(Collectors.toList()); - if (!requiredMismatchConfigures.isEmpty()) { - return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_REQUIRED, ConfigureUtils.preparedMessage(requiredMismatchConfigures)); - } - - // The filter parameter value is null data - List applyConfigures = ConfigureUtils.filterNotEmpty(configure.getConfigure().getConfigures()); - SourceEntity source = ConfigureUtils.preparedSourceEntity(applyConfigures); - source.setProtocol(configure.getType()); - source.setType(configure.getName()); - source.setUser(UserDetailsService.getUser()); - if (configure.getId() == null) { - source.setCode(UUID.randomUUID().toString().replace("-", "")); - } - if (ObjectUtils.isNotEmpty(configure.getId()) && configure.getId() > 0) { - source.setId(configure.getId()); - sourceRepository.findById(configure.getId()) - .ifPresent(item -> source.setCode(item.getCode())); - } - if (StringUtils.isNotEmpty(configure.getVersion())) { - source.setVersion(configure.getVersion()); - source.setAvailable(true); - } - else { - source.setAvailable(false); - } - this.sourceRepository.save(source); - // Copy file to local data - if (source.isUsedConfig()) { - String cacheHome = environment.getProperty("datacap.cache.data"); - if (StringUtils.isEmpty(cacheHome)) { - cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache"); - } - String configHome = environment.getProperty("datacap.config.data"); - if (StringUtils.isEmpty(configHome)) { - configHome = String.join(File.separator, System.getProperty("user.dir"), "config"); - } - File sourceFile = new File(String.join(File.separator, cacheHome, source.getUser().getUsername(), source.getType())); - String destination = String.join(File.separator, configHome, source.getUser().getUsername(), source.getType(), String.valueOf(source.getId())); - File directory = new File(destination); - try { - if (!directory.exists()) { - directory.mkdirs(); - } - for (File file : sourceFile.listFiles()) { - Files.copy(file, new File(String.join(File.separator, destination, file.getName()))); - } - FileUtils.deleteDirectory(sourceFile); - } - catch (Exception e) { - return CommonResponse.failure("Copy failed: " + e.getMessage()); - } - } - // Start sync metadata - this.syncMetadata(source.getId()); - return CommonResponse.success(source); + // The filter parameter value is null data + List applyConfigures = ConfigureUtils.filterNotEmpty(configure.getConfigure().getConfigures()); + SourceEntity source = ConfigureUtils.preparedSourceEntity(applyConfigures); + source.setProtocol(configure.getType()); + source.setType(configure.getName()); + source.setUser(UserDetailsService.getUser()); + if (configure.getId() == null) { + source.setCode(UUID.randomUUID().toString().replace("-", "")); + } + if (ObjectUtils.isNotEmpty(configure.getId()) && configure.getId() > 0) { + source.setId(configure.getId()); + sourceRepository.findById(configure.getId()) + .ifPresent(item -> source.setCode(item.getCode())); + } + if (StringUtils.isNotEmpty(configure.getVersion())) { + source.setVersion(configure.getVersion()); + source.setAvailable(true); + } + else { + source.setAvailable(false); + } + this.sourceRepository.save(source); + // Copy file to local data + if (source.isUsedConfig()) { + String cacheHome = environment.getProperty("datacap.cache.data"); + if (StringUtils.isEmpty(cacheHome)) { + cacheHome = String.join(File.separator, System.getProperty("user.dir"), "cache"); + } + String configHome = environment.getProperty("datacap.config.data"); + if (StringUtils.isEmpty(configHome)) { + configHome = String.join(File.separator, System.getProperty("user.dir"), "config"); + } + File sourceFile = new File(String.join(File.separator, cacheHome, source.getUser().getUsername(), source.getType())); + String destination = String.join(File.separator, configHome, source.getUser().getUsername(), source.getType(), String.valueOf(source.getId())); + File directory = new File(destination); + try { + if (!directory.exists()) { + directory.mkdirs(); + } + for (File file : sourceFile.listFiles()) { + Files.copy(file, new File(String.join(File.separator, destination, file.getName()))); + } + FileUtils.deleteDirectory(sourceFile); + } + catch (Exception e) { + return CommonResponse.failure("Copy failed: " + e.getMessage()); + } + } + // Start sync metadata + this.syncMetadata(source.getId()); + return CommonResponse.success(source); + }) + .orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND)); } @Override @@ -407,31 +400,27 @@ public class SourceServiceImpl ScheduledHistoryEntity scheduledHistory = ScheduledHistoryEntity.builder().name(String.format("Sync source [ %s ]", entity.getName())).scheduled(scheduled).source(entity).state(RunState.RUNNING).build(); scheduledHistoryHandler.save(scheduledHistory); log.info("==================== Sync metadata [ {} ] started =================", entity.getName()); - Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol()); - if (pluginOptional.isEmpty()) { - log.warn("The source [ {} ] protocol [ {} ] is not available", entity.getName(), entity.getProtocol()); - } - else { - try { - PluginService plugin = pluginOptional.get(); - Configure pConfigure = entity.toConfigure(); - pConfigure.setInjector(injector); - plugin.connect(pConfigure); - Response response = plugin.execute(plugin.validator()); - if (!response.getIsSuccessful()) { - log.error("The source [ {} ] not available", entity.getName()); - } - else { - this.startSyncDatabase(entity, plugin, databaseCache, databaseTableCache, tableCache, tableColumnCache, databaseAddedCount, databaseUpdatedCount, databaseRemovedCount, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount); - } - scheduledHistory.setState(RunState.SUCCESS); - } - catch (Exception e) { - log.error("The scheduled task [ {} ] source [ {} ] not available ", scheduled.getName(), entity.getName(), e); - scheduledHistory.setState(RunState.FAILURE); - scheduledHistory.setMessage(ExceptionUtils.getStackTrace(e)); - } - } + pluginManager.getPlugin(entity.getType()) + .ifPresent(plugin -> { + try { + PluginService pluginService = plugin.getService(PluginService.class); + Configure pConfigure = entity.toConfigure(); + pConfigure.setPluginManager(pluginManager); + Response response = pluginService.execute(pConfigure, pluginService.validator()); + if (!response.getIsSuccessful()) { + log.error("The source [ {} ] not available", entity.getName()); + } + else { + this.startSyncDatabase(entity, pluginService, databaseCache, databaseTableCache, tableCache, tableColumnCache, databaseAddedCount, databaseUpdatedCount, databaseRemovedCount, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount); + } + scheduledHistory.setState(RunState.SUCCESS); + } + catch (Exception e) { + log.error("The scheduled task [ {} ] source [ {} ] not available ", scheduled.getName(), entity.getName(), e); + scheduledHistory.setState(RunState.FAILURE); + scheduledHistory.setMessage(ExceptionUtils.getStackTrace(e)); + } + }); log.info("==================== Sync metadata [ {} ] finished =================", entity.getName()); Properties info = new Properties(); info.put("databaseAddedCount", databaseAddedCount.get()); @@ -545,7 +534,7 @@ public class SourceServiceImpl } else { Configure pConfigure = entity.toConfigure(); - pConfigure.setInjector(injector); + pConfigure.setPluginManager(pluginManager); plugin.connect(pConfigure); Response response = plugin.execute(getSqlContext(template, null)); if (!response.getIsSuccessful()) { @@ -617,7 +606,7 @@ public class SourceServiceImpl } else { Configure pConfigure = entity.toConfigure(); - pConfigure.setInjector(injector); + pConfigure.setPluginManager(pluginManager); plugin.connect(pConfigure); Response response = plugin.execute(getSqlContext(template, null)); if (!response.getIsSuccessful()) { @@ -710,7 +699,7 @@ public class SourceServiceImpl } else { Configure pConfigure = entity.toConfigure(); - pConfigure.setInjector(injector); + pConfigure.setPluginManager(pluginManager); plugin.connect(pConfigure); Response response = plugin.execute(getSqlContext(template, null)); if (!response.getIsSuccessful()) { diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/TableServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/TableServiceImpl.java index 9b234cb7..ee022fe6 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/TableServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/TableServiceImpl.java @@ -1,7 +1,6 @@ package io.edurt.datacap.service.service.impl; import com.google.common.collect.Lists; -import com.google.inject.Injector; import io.edurt.datacap.common.enums.ServiceState; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.sql.SqlBuilder; @@ -11,14 +10,13 @@ import io.edurt.datacap.common.sql.configure.SqlOperator; import io.edurt.datacap.common.sql.configure.SqlOrder; import io.edurt.datacap.common.sql.configure.SqlType; import io.edurt.datacap.common.utils.CSVUtils; -import io.edurt.datacap.common.utils.SpiUtils; -import io.edurt.datacap.fs.Fs; import io.edurt.datacap.fs.FsRequest; import io.edurt.datacap.fs.FsResponse; +import io.edurt.datacap.fs.FsService; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.body.ExportBody; import io.edurt.datacap.service.body.TableBody; import io.edurt.datacap.service.body.TableFilter; -import io.edurt.datacap.service.common.PluginUtils; import io.edurt.datacap.service.entity.BaseEntity; import io.edurt.datacap.service.entity.ColumnEntity; import io.edurt.datacap.service.entity.DatabaseEntity; @@ -69,16 +67,16 @@ import java.util.stream.Collectors; public class TableServiceImpl implements TableService { - private final Injector injector; + private final PluginManager pluginManager; private final TableRepository repository; private final DatabaseRepository databaseRepository; private final ColumnRepository columnRepository; private final InitializerConfigure initializerConfigure; private final HttpServletRequest request; - public TableServiceImpl(Injector injector, TableRepository repository, DatabaseRepository databaseRepository, ColumnRepository columnRepository, InitializerConfigure initializerConfigure, HttpServletRequest request) + public TableServiceImpl(PluginManager pluginManager, TableRepository repository, DatabaseRepository databaseRepository, ColumnRepository columnRepository, InitializerConfigure initializerConfigure, HttpServletRequest request) { - this.injector = injector; + this.pluginManager = pluginManager; this.repository = repository; this.databaseRepository = databaseRepository; this.columnRepository = columnRepository; @@ -100,36 +98,37 @@ public class TableServiceImpl return repository.findByCode(code) .map(table -> { SourceEntity source = table.getDatabase().getSource(); - Optional pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()); - if (!pluginOptional.isPresent()) { - return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND); - } - PluginService plugin = pluginOptional.get(); - if (configure.getType().equals(SqlType.SELECT)) { - return this.fetchSelect(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.INSERT)) { - return this.fetchInsert(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.UPDATE)) { - return this.fetchUpdate(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.DELETE)) { - return this.fetchDelete(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.ALTER)) { - return this.fetchAlter(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.SHOW)) { - return this.fetchShowCreateTable(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.TRUNCATE)) { - return this.fetchTruncateTable(plugin, table, source, configure); - } - else if (configure.getType().equals(SqlType.DROP)) { - return this.fetchDropTable(plugin, table, source, configure); - } - return CommonResponse.failure(String.format("Not implemented yet [ %s ]", configure.getType())); + pluginManager.getPlugin(source.getType()) + .map(plugin -> { + PluginService pluginService = plugin.getService(PluginService.class); + if (configure.getType().equals(SqlType.SELECT)) { + return this.fetchSelect(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.INSERT)) { + return this.fetchInsert(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.UPDATE)) { + return this.fetchUpdate(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.DELETE)) { + return this.fetchDelete(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.ALTER)) { + return this.fetchAlter(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.SHOW)) { + return this.fetchShowCreateTable(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.TRUNCATE)) { + return this.fetchTruncateTable(pluginService, table, source, configure); + } + else if (configure.getType().equals(SqlType.DROP)) { + return this.fetchDropTable(pluginService, table, source, configure); + } + return CommonResponse.failure(String.format("Not implemented yet [ %s ]", configure.getType())); + }) + .orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND)); + return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND); }) .orElse(CommonResponse.failure(String.format("Table [ %s ] not found", code))); } @@ -144,93 +143,94 @@ public class TableServiceImpl } SourceEntity source = table.getDatabase().getSource(); - Optional optionalPlugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()); - if (!optionalPlugin.isPresent()) { - return CommonResponse.failure(String.format("Plugin [ %s ] not found", source.getType())); - } - - Optional optionalFs = SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType()); - if (!optionalFs.isPresent()) { - return CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType())); - } - - UserEntity user = UserDetailsService.getUser(); - String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), user.getUsername(), "export"); - log.info("Export data user [ {} ] home [ {} ]", user.getUsername(), endpoint); - String fileName = String.join(".", UUID.randomUUID().toString(), "csv"); - log.info("Export file name [ {} ]", fileName); - Integer count = Integer.MAX_VALUE; - if (configure.getCount() > 0) { - count = configure.getCount().intValue(); - } - Pagination pagination = Pagination.newInstance(count.intValue(), 1, count.intValue()); - TableFilter tableFilter = TableFilter.builder() - .type(SqlType.SELECT) - .pagination(pagination) - .build(); - CommonResponse tempResponse = this.fetchSelect(optionalPlugin.get(), table, source, tableFilter); - if (tempResponse.getStatus()) { - Response pluginResponse = (Response) tempResponse.getData(); - try { - File tempFile = CSVUtils.makeTempCSV(endpoint, fileName, pluginResponse.getHeaders(), pluginResponse.getColumns()); - log.info("Export temp file [ {} ]", tempFile.getAbsolutePath()); - FsRequest fsRequest = FsRequest.builder() - .access(initializerConfigure.getFsConfigure().getAccess()) - .secret(initializerConfigure.getFsConfigure().getSecret()) - .endpoint(endpoint) - .bucket(initializerConfigure.getFsConfigure().getBucket()) - .stream(Files.newInputStream(tempFile.toPath())) - .fileName(fileName) - .build(); - optionalFs.get().writer(fsRequest); - if (initializerConfigure.getFsConfigure().getType().equals("Local")) { - String address = request.getRequestURL() - .toString() - .replace(request.getServletPath().trim(), ""); - String remote = String.join("/", address, "api/v1/table/dataDownload", user.getUsername(), fileName); - return CommonResponse.success(remote); - } - } - catch (Exception ex) { - return CommonResponse.failure(ex.getMessage()); - } - } - return CommonResponse.failure(ServiceState.REQUEST_EXCEPTION); + return pluginManager.getPlugin(source.getType()) + .map(plugin -> { + PluginService pluginService = plugin.getService(PluginService.class); + return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType()) + .map(fsPlugin -> { + FsService fsService = fsPlugin.getService(FsService.class); + UserEntity user = UserDetailsService.getUser(); + String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), user.getUsername(), "export"); + log.info("Export data user [ {} ] home [ {} ]", user.getUsername(), endpoint); + String fileName = String.join(".", UUID.randomUUID().toString(), "csv"); + log.info("Export file name [ {} ]", fileName); + Integer count = Integer.MAX_VALUE; + if (configure.getCount() > 0) { + count = configure.getCount().intValue(); + } + Pagination pagination = Pagination.newInstance(count.intValue(), 1, count.intValue()); + TableFilter tableFilter = TableFilter.builder() + .type(SqlType.SELECT) + .pagination(pagination) + .build(); + CommonResponse tempResponse = this.fetchSelect(pluginService, table, source, tableFilter); + if (tempResponse.getStatus()) { + Response pluginResponse = (Response) tempResponse.getData(); + try { + File tempFile = CSVUtils.makeTempCSV(endpoint, fileName, pluginResponse.getHeaders(), pluginResponse.getColumns()); + log.info("Export temp file [ {} ]", tempFile.getAbsolutePath()); + FsRequest fsRequest = FsRequest.builder() + .access(initializerConfigure.getFsConfigure().getAccess()) + .secret(initializerConfigure.getFsConfigure().getSecret()) + .endpoint(endpoint) + .bucket(initializerConfigure.getFsConfigure().getBucket()) + .stream(Files.newInputStream(tempFile.toPath())) + .fileName(fileName) + .build(); + fsService.writer(fsRequest); + if (initializerConfigure.getFsConfigure().getType().equals("Local")) { + String address = request.getRequestURL() + .toString() + .replace(request.getServletPath().trim(), ""); + String remote = String.join("/", address, "api/v1/table/dataDownload", user.getUsername(), fileName); + return CommonResponse.success(remote); + } + } + catch (Exception ex) { + return CommonResponse.failure(ex.getMessage()); + } + } + return CommonResponse.failure(ServiceState.REQUEST_EXCEPTION); + }) + .orElseGet(() -> CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType()))); + }) + .orElseGet(() -> CommonResponse.failure(String.format("Plugin [ %s ] not found", source.getType()))); } @Override public Object dataDownload(String username, String filename) { - Optional optionalFs = SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType()); - if (!optionalFs.isPresent()) { - return CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType())); - } + return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType()) + .map(plugin -> { + String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), username, "export"); + log.info("Download data user [ {} ] home [ {} ]", username, endpoint); + String filePath = String.join(File.separator, endpoint, filename); + log.info("Download file path [ {} ]", filePath); + FsRequest fsRequest = FsRequest.builder() + .access(initializerConfigure.getFsConfigure().getAccess()) + .secret(initializerConfigure.getFsConfigure().getSecret()) + .endpoint(endpoint) + .bucket(initializerConfigure.getFsConfigure().getBucket()) + .fileName(filename) + .build(); - String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), username, "export"); - log.info("Download data user [ {} ] home [ {} ]", username, endpoint); - String filePath = String.join(File.separator, endpoint, filename); - log.info("Download file path [ {} ]", filePath); - FsRequest fsRequest = FsRequest.builder() - .access(initializerConfigure.getFsConfigure().getAccess()) - .secret(initializerConfigure.getFsConfigure().getSecret()) - .endpoint(endpoint) - .bucket(initializerConfigure.getFsConfigure().getBucket()) - .fileName(filename) - .build(); - FsResponse response = optionalFs.get().reader(fsRequest); - try { - Resource resource = new FileSystemResource(response.getRemote()); - HttpHeaders headers = new HttpHeaders(); - headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + resource.getFilename()); - return ResponseEntity.ok() - .headers(headers) - .contentLength(resource.contentLength()) - .contentType(MediaType.APPLICATION_OCTET_STREAM) - .body(resource); - } - catch (IOException e) { - throw new RuntimeException(e); - } + FsService fsService = plugin.getService(FsService.class); + FsResponse response = fsService.reader(fsRequest); + try { + Resource resource = new FileSystemResource(response.getRemote()); + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + resource.getFilename()); + return ResponseEntity.ok() + .headers(headers) + .contentLength(resource.contentLength()) + .contentType(MediaType.APPLICATION_OCTET_STREAM) + .body(resource); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }) + .orElseThrow(() -> new RuntimeException("Not found plugin")); } @Override @@ -243,19 +243,21 @@ public class TableServiceImpl DatabaseEntity database = optionalDatabase.get(); SourceEntity source = database.getSource(); - PluginService plugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()).get(); - TableBuilder.Companion.BEGIN(); - TableBuilder.Companion.CREATE_TABLE(String.format("`%s`.`%s`", database.getName(), configure.getName())); - TableBuilder.Companion.COLUMNS(configure.getColumns().stream().map(item -> item.toColumnVar()).collect(Collectors.toList())); - String sql = TableBuilder.Companion.SQL(); - log.info("Create table sql \n {} \n on database [ {} ]", sql, database.getName()); - Configure pConfigure = source.toConfigure(); - pConfigure.setInjector(injector); - plugin.connect(pConfigure); - Response response = plugin.execute(sql); - response.setContent(sql); - plugin.destroy(); - return CommonResponse.success(response); + return pluginManager.getPlugin(source.getType()) + .map(plugin -> { + PluginService pluginService = plugin.getService(PluginService.class); + TableBuilder.Companion.BEGIN(); + TableBuilder.Companion.CREATE_TABLE(String.format("`%s`.`%s`", database.getName(), configure.getName())); + TableBuilder.Companion.COLUMNS(configure.getColumns().stream().map(item -> item.toColumnVar()).collect(Collectors.toList())); + String sql = TableBuilder.Companion.SQL(); + log.info("Create table sql \n {} \n on database [ {} ]", sql, database.getName()); + Configure pConfigure = source.toConfigure(); + pConfigure.setPluginManager(pluginManager); + Response response = pluginService.execute(pConfigure, sql); + response.setContent(sql); + return CommonResponse.success(response); + }) + .orElseGet(() -> CommonResponse.failure("Not found plugin")); } @Override @@ -264,52 +266,55 @@ public class TableServiceImpl return repository.findByCode(code) .map(table -> { SourceEntity source = table.getDatabase().getSource(); - PluginService plugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()).get(); - AtomicReference atomicReference = new AtomicReference<>(null); - if (configure.getType().equals(SqlType.CREATE)) { - ColumnBuilder.Companion.BEGIN(); - ColumnBuilder.Companion.CREATE_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); - ColumnBuilder.Companion.COLUMNS(configure.getColumns().stream().map(Column::toColumnVar).collect(Collectors.toList())); - atomicReference.set(ColumnBuilder.Companion.SQL()); - log.info("Create column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); - } - else if (configure.getType().equals(SqlType.DROP)) { - columnRepository.findById(configure.getColumnId()) - .ifPresent(column -> { + pluginManager.getPlugin(source.getType()) + .map(plugin -> { + AtomicReference atomicReference = new AtomicReference<>(null); + if (configure.getType().equals(SqlType.CREATE)) { ColumnBuilder.Companion.BEGIN(); - ColumnBuilder.Companion.DROP_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); - ColumnBuilder.Companion.COLUMNS(Lists.newArrayList(column.getName())); + ColumnBuilder.Companion.CREATE_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); + ColumnBuilder.Companion.COLUMNS(configure.getColumns().stream().map(Column::toColumnVar).collect(Collectors.toList())); atomicReference.set(ColumnBuilder.Companion.SQL()); - }); - log.info("Drop column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); - } - else if (configure.getType().equals(SqlType.MODIFY)) { - ColumnBuilder.Companion.BEGIN(); - ColumnBuilder.Companion.MODIFY_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); - ColumnBuilder.Companion.COLUMNS(configure.getColumns().stream().map(Column::toColumnVar).collect(Collectors.toList())); - atomicReference.set(ColumnBuilder.Companion.SQL()); - log.info("Modify column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); - } - Response response; - if (configure.isPreview()) { - response = Response.builder() - .isSuccessful(true) - .isConnected(true) - .headers(Lists.newArrayList()) - .columns(Lists.newArrayList()) - .types(Lists.newArrayList()) - .content(atomicReference.get()) - .build(); - } - else { - Configure pConfigure = source.toConfigure(); - pConfigure.setInjector(injector); - plugin.connect(pConfigure); - response = plugin.execute(atomicReference.get()); - response.setContent(atomicReference.get()); - plugin.destroy(); - } - return CommonResponse.success(response); + log.info("Create column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); + } + else if (configure.getType().equals(SqlType.DROP)) { + columnRepository.findById(configure.getColumnId()) + .ifPresent(column -> { + ColumnBuilder.Companion.BEGIN(); + ColumnBuilder.Companion.DROP_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); + ColumnBuilder.Companion.COLUMNS(Lists.newArrayList(column.getName())); + atomicReference.set(ColumnBuilder.Companion.SQL()); + }); + log.info("Drop column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); + } + else if (configure.getType().equals(SqlType.MODIFY)) { + ColumnBuilder.Companion.BEGIN(); + ColumnBuilder.Companion.MODIFY_COLUMN(String.format("`%s`.`%s`", table.getDatabase().getName(), table.getName())); + ColumnBuilder.Companion.COLUMNS(configure.getColumns().stream().map(Column::toColumnVar).collect(Collectors.toList())); + atomicReference.set(ColumnBuilder.Companion.SQL()); + log.info("Modify column sql \n {} \n on table [ {} ]", atomicReference.get(), table.getName()); + } + Response response; + if (configure.isPreview()) { + response = Response.builder() + .isSuccessful(true) + .isConnected(true) + .headers(Lists.newArrayList()) + .columns(Lists.newArrayList()) + .types(Lists.newArrayList()) + .content(atomicReference.get()) + .build(); + } + else { + Configure pConfigure = source.toConfigure(); + pConfigure.setPluginManager(pluginManager); + PluginService pluginService = plugin.getService(PluginService.class); + response = pluginService.execute(pConfigure, atomicReference.get()); + response.setContent(atomicReference.get()); + } + return CommonResponse.success(response); + }) + .orElseGet(() -> CommonResponse.failure("")); + return CommonResponse.failure(""); }) .orElse(CommonResponse.failure(String.format("Table [ %s ] not found", code))); } @@ -330,7 +335,7 @@ public class TableServiceImpl int totalRows = Integer.parseInt(table.getRows()); Configure countConfigure = source.toConfigure(); countConfigure.setFormat("None"); - countConfigure.setInjector(injector); + countConfigure.setPluginManager(pluginManager); plugin.connect(countConfigure); SqlBody countBody = SqlBody.builder() .type(SqlType.SELECT) @@ -403,7 +408,7 @@ public class TableServiceImpl SqlBuilder builder = new SqlBuilder(body); String sql = builder.getSql(); Configure pConfigure = source.toConfigure(); - pConfigure.setInjector(injector); + pConfigure.setPluginManager(pluginManager); plugin.connect(pConfigure); Response response = plugin.execute(sql); response.setContent(sql); @@ -431,7 +436,7 @@ public class TableServiceImpl try { Configure updateConfigure = source.toConfigure(); updateConfigure.setFormat("None"); - updateConfigure.setInjector(injector); + updateConfigure.setPluginManager(pluginManager); plugin.connect(updateConfigure); List allSql = Lists.newArrayList(); // Gets the auto-increment column for the current row @@ -489,7 +494,7 @@ public class TableServiceImpl try { Configure updateConfigure = source.toConfigure(); updateConfigure.setFormat("None"); - updateConfigure.setInjector(injector); + updateConfigure.setPluginManager(pluginManager); plugin.connect(updateConfigure); List allSql = Lists.newArrayList(); configure.getColumns().forEach(v -> { @@ -527,7 +532,7 @@ public class TableServiceImpl try { Configure updateConfigure = source.toConfigure(); updateConfigure.setFormat("None"); - updateConfigure.setInjector(injector); + updateConfigure.setPluginManager(pluginManager); plugin.connect(updateConfigure); List allSql = Lists.newArrayList(); configure.getColumns().forEach(v -> { @@ -561,7 +566,7 @@ public class TableServiceImpl try { Configure alterConfigure = source.toConfigure(); alterConfigure.setFormat("None"); - alterConfigure.setInjector(injector); + alterConfigure.setPluginManager(pluginManager); plugin.connect(alterConfigure); SqlBody body = SqlBody.builder() .type(SqlType.ALTER) @@ -590,7 +595,7 @@ public class TableServiceImpl try { Configure alterConfigure = source.toConfigure(); alterConfigure.setFormat("None"); - alterConfigure.setInjector(injector); + alterConfigure.setPluginManager(pluginManager); plugin.connect(alterConfigure); SqlBody body = SqlBody.builder() .type(SqlType.SHOW) @@ -618,7 +623,7 @@ public class TableServiceImpl try { Configure alterConfigure = source.toConfigure(); alterConfigure.setFormat("None"); - alterConfigure.setInjector(injector); + alterConfigure.setPluginManager(pluginManager); plugin.connect(alterConfigure); SqlBody body = SqlBody.builder() .type(SqlType.TRUNCATE) @@ -646,7 +651,7 @@ public class TableServiceImpl try { Configure alterConfigure = source.toConfigure(); alterConfigure.setFormat("None"); - alterConfigure.setInjector(injector); + alterConfigure.setPluginManager(pluginManager); plugin.connect(alterConfigure); SqlBody body = SqlBody.builder() .type(SqlType.DROP) @@ -680,7 +685,7 @@ public class TableServiceImpl .stream() .filter(item -> item.getIsKey().equals("PRI")) .collect(Collectors.toList()); - if (originColumns.size() > 0) { + if (!originColumns.isEmpty()) { // If the table contains a primary key, update the data using the primary key as a condition originColumns.forEach(item -> wheres.add(SqlColumn.builder() .column(item.getName()) diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UploadServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UploadServiceImpl.java index bb32eb06..ec98599a 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UploadServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UploadServiceImpl.java @@ -1,13 +1,13 @@ package io.edurt.datacap.service.service.impl; -import com.google.inject.Injector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.CodeUtils; -import io.edurt.datacap.common.utils.SpiUtils; import io.edurt.datacap.common.utils.UrlUtils; import io.edurt.datacap.fs.FsRequest; import io.edurt.datacap.fs.FsResponse; +import io.edurt.datacap.fs.FsService; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.body.UploadBody; import io.edurt.datacap.service.entity.convert.AvatarEntity; import io.edurt.datacap.service.enums.UploadMode; @@ -28,13 +28,13 @@ import java.io.IOException; public class UploadServiceImpl implements UploadService { - private final Injector injector; + private final PluginManager pluginManager; private final HttpServletRequest request; private final InitializerConfigure initializer; - public UploadServiceImpl(Injector injector, HttpServletRequest request, InitializerConfigure initializer) + public UploadServiceImpl(PluginManager pluginManager, HttpServletRequest request, InitializerConfigure initializer) { - this.injector = injector; + this.pluginManager = pluginManager; this.request = request; this.initializer = initializer; } @@ -49,9 +49,10 @@ public class UploadServiceImpl .build(); try { FsRequest fsRequest = getFsRequest(configure.getFile(), configure); - SpiUtils.findFs(injector, initializer.getFsConfigure().getType()) - .ifPresent(fs -> { - FsResponse response = fs.writer(fsRequest); + pluginManager.getPlugin(initializer.getFsConfigure().getType()) + .ifPresent(plugin -> { + FsService fsService = plugin.getService(FsService.class); + FsResponse response = fsService.writer(fsRequest); entity.setPath(response.getRemote()); entity.setLocal(response.getRemote()); if (initializer.getFsConfigure().getType().equals("Local")) { diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UserServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UserServiceImpl.java index 80e2a96a..80465fac 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UserServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/UserServiceImpl.java @@ -2,7 +2,6 @@ package io.edurt.datacap.service.service.impl; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.inject.Injector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.edurt.datacap.common.enums.ServiceState; import io.edurt.datacap.common.response.CommonResponse; @@ -10,9 +9,10 @@ import io.edurt.datacap.common.response.JwtResponse; import io.edurt.datacap.common.utils.CodeUtils; import io.edurt.datacap.common.utils.JsonUtils; import io.edurt.datacap.common.utils.NullAwareBeanUtils; -import io.edurt.datacap.common.utils.SpiUtils; import io.edurt.datacap.fs.FsRequest; import io.edurt.datacap.fs.FsResponse; +import io.edurt.datacap.fs.FsService; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.service.adapter.PageRequestAdapter; import io.edurt.datacap.service.audit.AuditUserLog; import io.edurt.datacap.service.body.FilterBody; @@ -80,9 +80,9 @@ public class UserServiceImpl private final RedisTemplate redisTemplate; private final Environment environment; private final InitializerConfigure initializerConfigure; - private final Injector injector; + private final PluginManager pluginManager; - public UserServiceImpl(UserRepository userRepository, RoleRepository roleRepository, SourceRepository sourceRepository, MenuRepository menuRepository, PasswordEncoder encoder, AuthenticationManager authenticationManager, JwtService jwtService, RedisTemplate redisTemplate, Environment environment, InitializerConfigure initializerConfigure, Injector injector) + public UserServiceImpl(UserRepository userRepository, RoleRepository roleRepository, SourceRepository sourceRepository, MenuRepository menuRepository, PasswordEncoder encoder, AuthenticationManager authenticationManager, JwtService jwtService, RedisTemplate redisTemplate, Environment environment, InitializerConfigure initializerConfigure, PluginManager pluginManager) { this.userRepository = userRepository; this.roleRepository = roleRepository; @@ -94,7 +94,7 @@ public class UserServiceImpl this.redisTemplate = redisTemplate; this.environment = environment; this.initializerConfigure = initializerConfigure; - this.injector = injector; + this.pluginManager = pluginManager; } @Override @@ -304,8 +304,9 @@ public class UserServiceImpl @Override public CommonResponse uploadAvatar(MultipartFile file) { - return SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType()) - .map(fs -> { + + return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType()) + .map(plugin -> { UserEntity user = UserDetailsService.getUser(); try { String avatarPath = initializerConfigure.getAvatarPath(); @@ -319,7 +320,9 @@ public class UserServiceImpl .stream(file.getInputStream()) .fileName(String.format("%s.png", user.getId())) .build(); - FsResponse response = fs.writer(fsRequest); + + FsService fsService = plugin.getService(FsService.class); + FsResponse response = fsService.writer(fsRequest); UserEntity entity = userRepository.findById(user.getId()).get(); Map avatar = Maps.newConcurrentMap(); avatar.put("fsType", initializerConfigure.getFsConfigure().getType()); diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java index fe248c22..44a57ac7 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/Adapter.java @@ -1,10 +1,10 @@ package io.edurt.datacap.spi.adapter; import com.google.common.base.Preconditions; -import com.google.inject.Injector; import io.edurt.datacap.convert.ConvertFilter; import io.edurt.datacap.convert.model.ConvertRequest; import io.edurt.datacap.convert.model.ConvertResponse; +import io.edurt.datacap.plugin.PluginManager; import io.edurt.datacap.spi.model.Response; import java.util.List; @@ -13,9 +13,9 @@ public interface Adapter { Response handlerExecute(String content); - default List handlerFormatter(Injector injector, String format, List headers, List columns) + default List handlerFormatter(PluginManager pluginManager, String format, List headers, List columns) { - return ConvertFilter.filter(injector, format) + return ConvertFilter.filter(pluginManager, format) .map(file -> { ConvertRequest request = new ConvertRequest(); request.setHeaders(headers); diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java index e5ea9991..ec1e6444 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/adapter/JdbcAdapter.java @@ -98,7 +98,7 @@ public class JdbcAdapter finally { response.setHeaders(headers); response.setTypes(types); - response.setColumns(handlerFormatter(configure.getInjector(), configure.getFormat(), headers, columns)); + response.setColumns(handlerFormatter(configure.getPluginManager(), configure.getFormat(), headers, columns)); response.setIsSuccessful(Boolean.TRUE); } } diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/connection/JdbcConnection.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/connection/JdbcConnection.java index 9ad52ec0..e1b6a96f 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/connection/JdbcConnection.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/connection/JdbcConnection.java @@ -77,24 +77,6 @@ public class JdbcConnection try { this.jdbcConfigure = (JdbcConfigure) getConfigure(); this.response = getResponse(); - - // Remove org.apache.pinot.client.PinotDriver and net.suteren.jdbc.influxdb.InfluxDbDriver - Enumeration drivers = DriverManager.getDrivers(); - while (drivers.hasMoreElements()) { - Driver driver = drivers.nextElement(); - if (driver instanceof org.apache.pinot.client.PinotDriver - || driver instanceof net.suteren.jdbc.influxdb.InfluxDbDriver) { - DriverManager.deregisterDriver(driver); - log.info("Deregistered driver {}", driver); - } - } - - // Manually loading and registering the driver - Driver driver = (Driver) Class.forName(this.jdbcConfigure.getJdbcDriver()) - .getDeclaredConstructor() - .newInstance(); - DriverManager.registerDriver(driver); - String url = formatJdbcUrl(); log.info("Connection driver {}", this.jdbcConfigure.getJdbcDriver()); log.info("Connection url {}", url); diff --git a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/model/Configure.java b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/model/Configure.java index 3ed7e303..0fd0dd1d 100644 --- a/core/datacap-spi/src/main/java/io/edurt/datacap/spi/model/Configure.java +++ b/core/datacap-spi/src/main/java/io/edurt/datacap/spi/model/Configure.java @@ -1,7 +1,7 @@ package io.edurt.datacap.spi.model; -import com.google.inject.Injector; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.plugin.PluginManager; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -19,7 +19,7 @@ import java.util.Optional; @SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) public class Configure { - private Injector injector; + private PluginManager pluginManager; private String host; private Integer port; private Optional username = Optional.empty(); diff --git a/docs/docs/reference/filesystem/aliyun/home.md b/docs/docs/reference/filesystem/aliyun/home.md index 3670c440..186415b9 100644 --- a/docs/docs/reference/filesystem/aliyun/home.md +++ b/docs/docs/reference/filesystem/aliyun/home.md @@ -51,7 +51,7 @@ import com.google.inject.Guice import com.google.inject.Injector import com.google.inject.Key import com.google.inject.TypeLiteral -import io.edurt.datacap.fs.Fs +import io.edurt.datacap.fs.FsService import io.edurt.datacap.fs.FsManager import io.edurt.datacap.fs.FsRequest import org.junit.Assert.assertTrue diff --git a/docs/docs/reference/filesystem/cos/home.md b/docs/docs/reference/filesystem/cos/home.md index b3177d3a..bb8b6708 100644 --- a/docs/docs/reference/filesystem/cos/home.md +++ b/docs/docs/reference/filesystem/cos/home.md @@ -51,7 +51,7 @@ import com.google.inject.Guice import com.google.inject.Injector import com.google.inject.Key import com.google.inject.TypeLiteral -import io.edurt.datacap.fs.Fs +import io.edurt.datacap.fs.FsService import io.edurt.datacap.fs.FsManager import io.edurt.datacap.fs.FsRequest import org.junit.Assert.assertTrue diff --git a/docs/docs/reference/filesystem/minio/home.md b/docs/docs/reference/filesystem/minio/home.md index 7c52ad92..99acd523 100644 --- a/docs/docs/reference/filesystem/minio/home.md +++ b/docs/docs/reference/filesystem/minio/home.md @@ -51,7 +51,7 @@ import com.google.inject.Guice import com.google.inject.Injector import com.google.inject.Key import com.google.inject.TypeLiteral -import io.edurt.datacap.fs.Fs +import io.edurt.datacap.fs.FsService import io.edurt.datacap.fs.FsManager import io.edurt.datacap.fs.FsRequest import org.junit.Assert.assertTrue diff --git a/docs/docs/reference/filesystem/qiniu/home.md b/docs/docs/reference/filesystem/qiniu/home.md index d9a0f426..d2a21b84 100644 --- a/docs/docs/reference/filesystem/qiniu/home.md +++ b/docs/docs/reference/filesystem/qiniu/home.md @@ -51,7 +51,7 @@ import com.google.inject.Guice import com.google.inject.Injector import com.google.inject.Key import com.google.inject.TypeLiteral -import io.edurt.datacap.fs.Fs +import io.edurt.datacap.fs.FsService import io.edurt.datacap.fs.FsManager import io.edurt.datacap.fs.FsRequest import org.junit.Before diff --git a/docs/docs/reference/filesystem/s3/home.md b/docs/docs/reference/filesystem/s3/home.md index 53585e8f..6511ba4e 100644 --- a/docs/docs/reference/filesystem/s3/home.md +++ b/docs/docs/reference/filesystem/s3/home.md @@ -51,7 +51,7 @@ import com.google.inject.Guice import com.google.inject.Injector import com.google.inject.Key import com.google.inject.TypeLiteral -import io.edurt.datacap.fs.Fs +import io.edurt.datacap.fs.FsService import io.edurt.datacap.fs.FsManager import io.edurt.datacap.fs.FsRequest import org.junit.Assert.assertTrue diff --git a/executor/datacap-executor-spi/pom.xml b/executor/datacap-executor-spi/pom.xml index c89db99f..3af66f17 100644 --- a/executor/datacap-executor-spi/pom.xml +++ b/executor/datacap-executor-spi/pom.xml @@ -30,6 +30,11 @@ io.edurt.datacap datacap-spi + + io.edurt.datacap + datacap-plugin + ${project.version} + diff --git a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorManager.kt b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorManager.kt deleted file mode 100644 index e0901ea4..00000000 --- a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorManager.kt +++ /dev/null @@ -1,24 +0,0 @@ -package io.edurt.datacap.executor - -import com.google.inject.AbstractModule -import org.slf4j.LoggerFactory.getLogger -import java.time.LocalDateTime -import java.util.* - -class ExecutorManager : AbstractModule { - private val log = getLogger(this.javaClass) - private var externalModules: Iterable? = null - - constructor() { - this.externalModules = ServiceLoader.load(ExecutorModule::class.java) - } - - override fun configure() { - log.info("================ Executor started ================") - externalModules !!.forEach { module -> - log.info("Install Executor [ {} ] Join time [ {} ]", module.name(), LocalDateTime.now()) - this.install(module) - } - log.info("================ Executor end ================") - } -} diff --git a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorModule.kt b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorModule.kt deleted file mode 100644 index 56b9e5db..00000000 --- a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorModule.kt +++ /dev/null @@ -1,12 +0,0 @@ -package io.edurt.datacap.executor - -import com.google.inject.AbstractModule - -abstract class ExecutorModule : AbstractModule() { - open fun name(): String { - return this.javaClass - .simpleName - .removeSuffix("Module") - .removeSuffix("Executor") - } -} diff --git a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/Executor.kt b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorService.kt similarity index 64% rename from executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/Executor.kt rename to executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorService.kt index 482b41d8..ce97aeb4 100644 --- a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/Executor.kt +++ b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorService.kt @@ -2,12 +2,15 @@ package io.edurt.datacap.executor import io.edurt.datacap.executor.configure.ExecutorRequest import io.edurt.datacap.executor.configure.ExecutorResponse +import io.edurt.datacap.plugin.Service -interface Executor { - fun name(): String { +interface ExecutorService : Service +{ + fun name(): String + { return this.javaClass - .simpleName - .removeSuffix("Executor") + .simpleName + .removeSuffix("Executor") } fun start(request: ExecutorRequest): ExecutorResponse diff --git a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorUtils.kt b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorUtils.kt deleted file mode 100644 index 4e2e3184..00000000 --- a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/ExecutorUtils.kt +++ /dev/null @@ -1,13 +0,0 @@ -package io.edurt.datacap.executor - -import com.google.inject.Injector -import com.google.inject.Key -import com.google.inject.TypeLiteral - -object ExecutorUtils { - @JvmStatic - fun findOne(injector: Injector, name: String): Executor { - return injector.getInstance(Key.get(object : TypeLiteral>() {})) - .last { it.name() == name } - } -} diff --git a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/configure/ExecutorRequest.kt b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/configure/ExecutorRequest.kt index e08db6dc..0465b2f9 100644 --- a/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/configure/ExecutorRequest.kt +++ b/executor/datacap-executor-spi/src/main/kotlin/io/edurt/datacap/executor/configure/ExecutorRequest.kt @@ -1,10 +1,10 @@ package io.edurt.datacap.executor.configure -import com.google.inject.Injector import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.edurt.datacap.executor.common.RunEngine import io.edurt.datacap.executor.common.RunMode import io.edurt.datacap.executor.common.RunWay +import io.edurt.datacap.plugin.PluginManager @SuppressFBWarnings(value = ["EI_EXPOSE_REP", "EI_EXPOSE_REP2"]) data class ExecutorRequest( @@ -14,7 +14,7 @@ data class ExecutorRequest( var output: ExecutorConfigure, var executorHome: String? = null, var workHome: String? = null, - var injector: Injector? = null, + var pluginManager: PluginManager? = null, var timeout: Long = 600, var runWay: RunWay = RunWay.LOCAL, var runMode: RunMode = RunMode.CLIENT, diff --git a/fs/datacap-fs-spi/pom.xml b/fs/datacap-fs-spi/pom.xml index 2cc85e30..f374701c 100644 --- a/fs/datacap-fs-spi/pom.xml +++ b/fs/datacap-fs-spi/pom.xml @@ -20,5 +20,10 @@ ch.qos.logback logback-classic + + io.edurt.datacap + datacap-plugin + ${project.version} + diff --git a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsManager.java b/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsManager.java deleted file mode 100644 index ee8cad93..00000000 --- a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsManager.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.edurt.datacap.fs; - -import com.google.inject.AbstractModule; -import lombok.extern.slf4j.Slf4j; - -import java.util.ServiceLoader; - -@Slf4j -public class FsManager - extends AbstractModule -{ - private final Iterable externalModules; - - public FsManager() - { - this.externalModules = ServiceLoader.load(FsModule.class); - } - - @Override - protected void configure() - { - log.info("Loading fs start ..."); - for (FsModule module : this.externalModules) { - log.info("Install fs [ {} ]", module.name()); - this.install(module); - } - log.info("Loading fs end ..."); - } -} diff --git a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsModule.java b/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsModule.java deleted file mode 100644 index 7a8edaab..00000000 --- a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsModule.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.edurt.datacap.fs; - -import com.google.inject.AbstractModule; - -public abstract class FsModule - extends AbstractModule -{ - String name() - { - return this.getClass() - .getSimpleName() - .replace("Fs", "") - .replace("Module", ""); - } -} diff --git a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/Fs.java b/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsService.java similarity index 87% rename from fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/Fs.java rename to fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsService.java index 2856fde1..4176fbfa 100644 --- a/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/Fs.java +++ b/fs/datacap-fs-spi/src/main/java/io/edurt/datacap/fs/FsService.java @@ -1,6 +1,9 @@ package io.edurt.datacap.fs; -public interface Fs +import io.edurt.datacap.plugin.Service; + +public interface FsService + extends Service { default String name() { diff --git a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/FsManagerTest.java b/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/FsManagerTest.java deleted file mode 100644 index ba71c0f6..00000000 --- a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/FsManagerTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.edurt.datacap.fs; - -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.TypeLiteral; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.Set; - -public class FsManagerTest -{ - private Injector injector; - - @Before - public void before() - { - injector = Guice.createInjector(new FsManager()); - } - - @Test - public void test() - { - Assert.assertNotNull(injector.getInstance(Key.get(new TypeLiteral>() {}))); - } -} diff --git a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFs.java b/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFs.java deleted file mode 100644 index b4f88795..00000000 --- a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFs.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.edurt.datacap.fs; - -public class TestFs - implements Fs -{ - @Override - public FsResponse writer(FsRequest request) - { - return FsResponse.builder() - .build(); - } - - @Override - public FsResponse reader(FsRequest request) - { - return FsResponse.builder() - .build(); - } -} diff --git a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFsModule.java b/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFsModule.java deleted file mode 100644 index e23201ac..00000000 --- a/fs/datacap-fs-spi/src/test/java/io/edurt/datacap/fs/TestFsModule.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.edurt.datacap.fs; - -import com.google.inject.multibindings.Multibinder; - -public class TestFsModule - extends FsModule -{ - protected void configure() - { - Multibinder.newSetBinder(this.binder(), Fs.class) - .addBinding().to(TestFs.class); - } -} diff --git a/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLPlugin.java b/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLPlugin.java index 20f817e5..b77d2461 100644 --- a/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLPlugin.java +++ b/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLPlugin.java @@ -1,15 +1,6 @@ package io.edurt.datacap.plugin; -import io.edurt.datacap.spi.PluginService; - -import java.util.Set; - public class MySQLPlugin extends Plugin { - @Override - public Set> getServiceTypes() - { - return Set.of(Service.class, PluginService.class); - } } diff --git a/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLService.java b/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLService.java index fdc004a2..c2edcc1d 100644 --- a/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLService.java +++ b/plugin/datacap-plugin-mysql/src/main/java/io/edurt/datacap/plugin/MySQLService.java @@ -5,4 +5,15 @@ import io.edurt.datacap.spi.PluginService; public class MySQLService implements PluginService { + @Override + public String connectType() + { + return "mysql"; + } + + @Override + public String driver() + { + return "com.mysql.cj.jdbc.Driver"; + } } diff --git a/pom.xml b/pom.xml index 1dfc7557..b4a27021 100644 --- a/pom.xml +++ b/pom.xml @@ -91,12 +91,12 @@ parser/datacap-parser-spi - + scheduler/datacap-scheduler-spi notify/datacap-notify-spi convert/datacap-convert-spi - + convert/datacap-convert-txt diff --git a/scheduler/datacap-scheduler-spi/pom.xml b/scheduler/datacap-scheduler-spi/pom.xml index 1f7c3c4e..4756ecbf 100644 --- a/scheduler/datacap-scheduler-spi/pom.xml +++ b/scheduler/datacap-scheduler-spi/pom.xml @@ -35,6 +35,11 @@ quartz ${quartz.version} + + io.edurt.datacap + datacap-plugin + ${project.version} + diff --git a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/ScheduleManager.kt b/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/ScheduleManager.kt deleted file mode 100644 index d14bd169..00000000 --- a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/ScheduleManager.kt +++ /dev/null @@ -1,25 +0,0 @@ -package io.edurt.datacap.scheduler - -import com.google.inject.AbstractModule -import org.slf4j.LoggerFactory -import java.time.LocalTime -import java.util.* - - -class ScheduleManager : AbstractModule { - private val log = LoggerFactory.getLogger(this.javaClass) - private var externalModules: Iterable? = null - - constructor() { - this.externalModules = ServiceLoader.load(SchedulerModule::class.java) - } - - override fun configure() { - log.info("================ Scheduler started ================") - externalModules !!.forEach { module -> - log.info("Install scheduler [ {} ] Join time [ {} ]", module.name(), LocalTime.now()) - this.install(module) - } - log.info("================ Scheduler end ================") - } -} diff --git a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerModule.kt b/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerModule.kt deleted file mode 100644 index 6e9f4711..00000000 --- a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerModule.kt +++ /dev/null @@ -1,12 +0,0 @@ -package io.edurt.datacap.scheduler - -import com.google.inject.AbstractModule - -abstract class SchedulerModule : AbstractModule() { - open fun name(): String { - return this.javaClass - .simpleName - .removeSuffix("Module") - .removeSuffix("Scheduler") - } -} diff --git a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/Scheduler.kt b/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerService.kt similarity index 53% rename from scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/Scheduler.kt rename to scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerService.kt index 34bfb9ef..49aacd85 100644 --- a/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/Scheduler.kt +++ b/scheduler/datacap-scheduler-spi/src/main/kotlin/io/edurt/datacap/scheduler/SchedulerService.kt @@ -1,10 +1,14 @@ package io.edurt.datacap.scheduler -interface Scheduler { - fun name(): String { +import io.edurt.datacap.plugin.Service + +interface SchedulerService : Service +{ + fun name(): String + { return this.javaClass - .simpleName - .removeSuffix("Scheduler") + .simpleName + .removeSuffix("Scheduler") } fun initialize(request: SchedulerRequest): SchedulerResponse