feat(plugin): adapter core

This commit is contained in:
qianmoQ 2024-11-21 17:29:14 +08:00
parent ad21fed0be
commit 9dc64f0c9b
65 changed files with 803 additions and 1049 deletions

View File

@ -1 +1,2 @@
datacap-plugin-mysql=plugin/datacap-plugin-mysql/pom.xml
datacap-convert-txt=convert/datacap-convert-txt/pom.xml

View File

@ -30,6 +30,11 @@
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-common</artifactId>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,8 +1,6 @@
package io.edurt.datacap.convert
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.plugin.PluginManager
import java.util.*
object ConvertFilter
@ -10,17 +8,14 @@ object ConvertFilter
/**
* Finds a file in the injector by name.
*
* @param injector the injector to search in
* @param pluginManager PluginManager
* @param name the name of the file to find
* @return an Optional containing the found File, or an empty Optional if not found
*/
@JvmStatic
fun filter(injector: Injector, name: String): Optional<Convert>
fun filter(pluginManager: PluginManager, name: String): Optional<ConvertService>
{
return injector.getInstance(Key.get(object : TypeLiteral<Set<Convert>>()
{}))
.stream()
.filter { it.name() == name }
.findFirst()
return pluginManager.getPlugin(name)
.map { it.getService(ConvertService::class.java) }
}
}

View File

@ -1,23 +0,0 @@
package io.edurt.datacap.convert
import com.google.inject.AbstractModule
import io.edurt.datacap.common.utils.DateUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory.getLogger
import java.util.*
class ConvertManager : AbstractModule()
{
private val log: Logger = getLogger(ConvertManager::class.java)
private var externalModules: Iterable<ConvertModule> = ServiceLoader.load(ConvertModule::class.java)
override fun configure()
{
log.info("========== Loading convert start ==========")
externalModules.forEach { module ->
log.info("Installing convert [ {} ] join time [ {} ]", module.name(), DateUtils.formatYMDHMSWithInterval())
install(module)
}
log.info("========== Loading convert end ==========")
}
}

View File

@ -1,11 +0,0 @@
package io.edurt.datacap.convert
import com.google.inject.AbstractModule
abstract class ConvertModule : AbstractModule()
{
fun name(): String = this.javaClass
.simpleName
.removeSuffix("Convert")
.removeSuffix("Module")
}

View File

@ -2,8 +2,9 @@ package io.edurt.datacap.convert
import io.edurt.datacap.convert.model.ConvertRequest
import io.edurt.datacap.convert.model.ConvertResponse
import io.edurt.datacap.plugin.Service
interface Convert
interface ConvertService : Service
{
fun name(): String
{

View File

@ -4,7 +4,7 @@ import com.google.inject.Guice
import org.junit.Assert
import org.junit.Test
class ConvertFilterTest
class ConvertServiceFilterTest
{
private val injector = Guice.createInjector(ConvertManager())

View File

@ -7,14 +7,14 @@ import com.google.inject.TypeLiteral
import org.junit.Assert
import org.junit.Test
class ConvertModuleTest
class ConvertServiceModuleTest
{
private val injector: Injector = Guice.createInjector(ConvertManager())
@Test
fun test()
{
injector.getInstance(Key.get(object : TypeLiteral<Set<Convert>>()
injector.getInstance(Key.get(object : TypeLiteral<Set<ConvertService>>()
{}))
.stream()
.findFirst()

View File

@ -6,8 +6,8 @@ class TestConvertModule : ConvertModule()
{
override fun configure()
{
Multibinder.newSetBinder(this.binder(), Convert::class.java)
Multibinder.newSetBinder(this.binder(), ConvertService::class.java)
.addBinding()
.to(TestConvert::class.java)
.to(TestConvertService::class.java)
}
}

View File

@ -3,7 +3,7 @@ package io.edurt.datacap.convert
import io.edurt.datacap.convert.model.ConvertRequest
import io.edurt.datacap.convert.model.ConvertResponse
class TestConvert : Convert
class TestConvertService : ConvertService
{
override fun format(request: ConvertRequest): ConvertResponse
{

View File

@ -0,0 +1,5 @@
package io.edurt.datacap.convert
import io.edurt.datacap.plugin.Plugin
class TxtPlugin : Plugin()

View File

@ -1,10 +1,8 @@
package io.edurt.datacap.convert.txt
package io.edurt.datacap.convert
import com.google.common.base.Preconditions.checkArgument
import com.google.common.base.Preconditions.checkState
import io.edurt.datacap.common.utils.DateUtils
import io.edurt.datacap.convert.Convert
import io.edurt.datacap.convert.FileConvert
import io.edurt.datacap.convert.model.ConvertRequest
import io.edurt.datacap.convert.model.ConvertResponse
import org.apache.commons.io.FileUtils
@ -15,7 +13,7 @@ import java.io.IOException
import java.io.InputStreamReader
import java.util.Objects.requireNonNull
class TxtConvert : Convert
class TxtService : ConvertService
{
private val log = getLogger(this::class.java)

View File

@ -1,15 +0,0 @@
package io.edurt.datacap.convert.txt
import com.google.inject.multibindings.Multibinder
import io.edurt.datacap.convert.Convert
import io.edurt.datacap.convert.ConvertModule
class TxtModule : ConvertModule()
{
override fun configure()
{
Multibinder.newSetBinder(this.binder(), Convert::class.java)
.addBinding()
.to(TxtConvert::class.java)
}
}

View File

@ -0,0 +1 @@
io.edurt.datacap.convert.TxtPlugin

View File

@ -0,0 +1 @@
io.edurt.datacap.convert.TxtService

View File

@ -1,4 +1,4 @@
package io.edurt.datacap.convert.txt
package io.edurt.datacap.convert
import com.google.inject.Guice
import com.google.inject.Injector

View File

@ -1,4 +1,4 @@
package io.edurt.datacap.convert.txt
package io.edurt.datacap.convert
import com.google.inject.Guice
import com.google.inject.Injector

View File

@ -50,16 +50,6 @@
<artifactId>datacap-captcha</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-fs-spi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-scheduler-spi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,48 +0,0 @@
package io.edurt.datacap.common.utils;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.fs.Fs;
import io.edurt.datacap.scheduler.Scheduler;
import java.util.Optional;
import java.util.Set;
public class SpiUtils
{
private SpiUtils()
{
}
/**
* Finds a specific Fs object by name.
*
* @param injector the injector used for dependency injection
* @param name the name of the Fs object to find
* @return an Optional containing the found Fs object, or an empty Optional if not found
*/
public static Optional<Fs> findFs(Injector injector, String name)
{
Optional<Fs> optionalFs = injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {}))
.stream()
.filter(item -> item.name().equalsIgnoreCase(name))
.findFirst();
return optionalFs;
}
/**
* Finds a schedule in the injector by name.
*
* @param injector the injector to search in
* @param name the name of the schedule to find
* @return an Optional containing the found Scheduler, or an empty Optional if not found
*/
public static Optional<Scheduler> findSchedule(Injector injector, String name)
{
return injector.getInstance(Key.get(new TypeLiteral<Set<Scheduler>>() {}))
.stream()
.filter(item -> item.name().equalsIgnoreCase(name))
.findFirst();
}
}

View File

@ -2,7 +2,9 @@ package io.edurt.datacap.plugin;
public enum PluginType
{
CONNECTOR("Connector");
CONNECTOR("Connector"),
EXECUTOR("Executor"),
SCHEDULER("Scheduler");
private String name;

View File

@ -250,4 +250,18 @@ public class PluginPathUtils
return Optional.empty();
}
}
/**
* 安全地将子路径添加到项目根目录
* Safely append a sub-path to the project root directory
*
* @param subPath 要添加的子路径
* @return 组合后的路径
*/
public static Path appendPath(String subPath)
{
Path root = findProjectRoot();
String safePath = subPath.startsWith("/") ? subPath.substring(1) : subPath;
return root.resolve(safePath);
}
}

View File

@ -1,6 +1,7 @@
package io.edurt.datacap.server.controller;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.plugin.PluginMetadata;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.body.adhoc.Adhoc;
import io.edurt.datacap.service.entity.DataSetColumnEntity;
@ -78,7 +79,7 @@ public class DataSetController
}
@GetMapping(value = "getActuators")
public CommonResponse<Set<String>> getActuators()
public CommonResponse<Set<PluginMetadata>> getActuators()
{
return service.getActuators();
}

View File

@ -5,11 +5,11 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.executor.Executor;
import io.edurt.datacap.executor.ExecutorService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.plugin.PluginMetadata;
import io.edurt.datacap.plugin.PluginType;
import io.edurt.datacap.scheduler.Scheduler;
import io.edurt.datacap.scheduler.SchedulerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -37,15 +37,15 @@ public class PluginController
public CommonResponse<Map<String, Set<String>>> getPlugins()
{
Map<String, Set<String>> plugins = Maps.newHashMap();
Set<String> executors = injector.getInstance(Key.get(new TypeLiteral<Set<Executor>>() {}))
Set<String> executors = injector.getInstance(Key.get(new TypeLiteral<Set<ExecutorService>>() {}))
.stream()
.map(Executor::name)
.map(ExecutorService::name)
.collect(Collectors.toSet());
plugins.put("executor", executors);
Set<String> schedulers = injector.getInstance(Key.get(new TypeLiteral<Set<Scheduler>>() {}))
Set<String> schedulers = injector.getInstance(Key.get(new TypeLiteral<Set<SchedulerService>>() {}))
.stream()
.map(Scheduler::name)
.map(SchedulerService::name)
.collect(Collectors.toSet());
plugins.put("scheduler", schedulers);
return CommonResponse.success(plugins);

View File

@ -112,6 +112,11 @@
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-convert-spi</artifactId>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-fs-spi</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Sql Parser -->
<dependency>
<groupId>io.edurt.datacap</groupId>

View File

@ -1,15 +1,15 @@
package io.edurt.datacap.service.audit;
import com.google.inject.Injector;
import io.edurt.datacap.common.enums.State;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.CodeUtils;
import io.edurt.datacap.common.utils.DateUtils;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.convert.ConvertFilter;
import io.edurt.datacap.convert.model.ConvertRequest;
import io.edurt.datacap.convert.model.ConvertResponse;
import io.edurt.datacap.fs.FsRequest;
import io.edurt.datacap.fs.FsService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.common.FolderUtils;
import io.edurt.datacap.service.entity.ExecuteEntity;
import io.edurt.datacap.service.entity.PluginAuditEntity;
@ -49,14 +49,14 @@ public class AuditPluginHandler
private final PluginAuditRepository pluginAuditRepository;
private final SourceRepository sourceRepository;
private final InitializerConfigure initializer;
private final Injector injector;
private final PluginManager pluginManager;
public AuditPluginHandler(PluginAuditRepository pluginAuditRepository, SourceRepository sourceRepository, InitializerConfigure initializer, Injector injector)
public AuditPluginHandler(PluginAuditRepository pluginAuditRepository, SourceRepository sourceRepository, InitializerConfigure initializer, PluginManager pluginManager)
{
this.pluginAuditRepository = pluginAuditRepository;
this.sourceRepository = sourceRepository;
this.initializer = initializer;
this.injector = injector;
this.pluginManager = pluginManager;
}
@Pointcut("@annotation(auditPlugin)")
@ -111,7 +111,7 @@ public class AuditPluginHandler
String workHome = FolderUtils.getWorkHome(initializer.getDataHome(), user.getUsername(), String.join(File.separator, "adhoc", uniqueId));
log.info("Writer file to folder [ {} ] on [ {} ]", workHome, pluginAudit.getId());
try {
ConvertFilter.filter(injector, "Json")
ConvertFilter.filter(pluginManager, "Json")
.ifPresent(it -> {
try {
FileUtils.forceMkdir(new File(workHome));
@ -139,8 +139,12 @@ public class AuditPluginHandler
fsRequest.setEndpoint(initializer.getFsConfigure().getEndpoint());
fsRequest.setFileName(String.join(File.separator, user.getUsername(), DateUtils.formatYMD(), String.join(File.separator, "adhoc", uniqueId), "result.csv"));
}
SpiUtils.findFs(injector, initializer.getFsConfigure().getType())
.map(v -> v.writer(fsRequest));
pluginManager.getPlugin(initializer.getFsConfigure().getType())
.map(v -> {
FsService fsService = v.getService(FsService.class);
return fsService.writer(fsRequest);
});
log.info("Delete temp file [ {} ] on [ {} ] state [ {} ]", tempFile, pluginAudit.getId(), Files.deleteIfExists(tempFile.toPath()));
log.info("Writer file to folder [ {} ] on [ {} ] completed", workHome, pluginAudit.getId());
pluginAudit.setHome(workHome);

View File

@ -1,9 +1,9 @@
package io.edurt.datacap.service.initializer;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.scheduler.SchedulerRequest;
import io.edurt.datacap.scheduler.SchedulerService;
import io.edurt.datacap.service.enums.SyncMode;
import io.edurt.datacap.service.initializer.job.DatasetJob;
import io.edurt.datacap.service.repository.DataSetRepository;
@ -19,14 +19,14 @@ import org.springframework.stereotype.Service;
public class DatasetSchedulerInitializer
implements CommandLineRunner
{
private final Injector injector;
private final PluginManager pluginManager;
private final DataSetRepository repository;
private final DataSetService service;
private final Scheduler scheduler;
public DatasetSchedulerInitializer(Injector injector, DataSetRepository repository, DataSetService service, Scheduler scheduler)
public DatasetSchedulerInitializer(PluginManager pluginManager, DataSetRepository repository, DataSetService service, Scheduler scheduler)
{
this.injector = injector;
this.pluginManager = pluginManager;
this.repository = repository;
this.service = service;
this.scheduler = scheduler;
@ -37,24 +37,26 @@ public class DatasetSchedulerInitializer
throws Exception
{
log.info("Start dataset scheduler initializer");
// this.repository.findAllBySyncMode(SyncMode.TIMING)
// .forEach(item -> {
// log.info("Dataset [ {} ] will be scheduled", item.getName());
// SpiUtils.findSchedule(this.injector, item.getScheduler())
// .ifPresent(scheduler -> {
// SchedulerRequest request = new SchedulerRequest();
// request.setName(item.getId().toString());
// request.setGroup("datacap");
// request.setExpression(item.getExpression());
// request.setJobId(String.valueOf(item.getId()));
// request.setCreateBeforeDelete(true);
// if (scheduler.name().equals("Default")) {
// request.setJob(new DatasetJob());
// request.setScheduler(this.scheduler);
// }
// scheduler.initialize(request);
// });
// });
this.repository.findAllBySyncMode(SyncMode.TIMING)
.forEach(item -> {
log.info("Dataset [ {} ] will be scheduled", item.getName());
pluginManager.getPlugin(item.getScheduler())
.ifPresent(scheduler -> {
SchedulerRequest request = new SchedulerRequest();
request.setName(item.getId().toString());
request.setGroup("datacap");
request.setExpression(item.getExpression());
request.setJobId(String.valueOf(item.getId()));
request.setCreateBeforeDelete(true);
SchedulerService schedulerService = scheduler.getService(SchedulerService.class);
if (schedulerService.name().equals("Default")) {
request.setJob(new DatasetJob());
request.setScheduler(this.scheduler);
}
schedulerService.initialize(request);
});
});
log.info("End dataset scheduler initializer");
}
}

View File

@ -1,6 +1,7 @@
package io.edurt.datacap.service.service;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.plugin.PluginMetadata;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.body.adhoc.Adhoc;
import io.edurt.datacap.service.entity.DataSetColumnEntity;
@ -23,7 +24,7 @@ public interface DataSetService
CommonResponse<Object> adhoc(String code, Adhoc configure);
CommonResponse<Set<String>> getActuators();
CommonResponse<Set<PluginMetadata>> getActuators();
CommonResponse<DataSetEntity> getInfo(String code);

View File

@ -3,9 +3,6 @@ package io.edurt.datacap.service.service.impl;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.enums.DataSetState;
import io.edurt.datacap.common.response.CommonResponse;
@ -15,9 +12,7 @@ import io.edurt.datacap.common.sql.configure.SqlColumn;
import io.edurt.datacap.common.sql.configure.SqlOperator;
import io.edurt.datacap.common.sql.configure.SqlOrder;
import io.edurt.datacap.common.sql.configure.SqlType;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.executor.Executor;
import io.edurt.datacap.executor.ExecutorUtils;
import io.edurt.datacap.executor.ExecutorService;
import io.edurt.datacap.executor.common.RunEngine;
import io.edurt.datacap.executor.common.RunMode;
import io.edurt.datacap.executor.common.RunProtocol;
@ -27,15 +22,16 @@ import io.edurt.datacap.executor.configure.ExecutorConfigure;
import io.edurt.datacap.executor.configure.ExecutorRequest;
import io.edurt.datacap.executor.configure.ExecutorResponse;
import io.edurt.datacap.executor.configure.OriginColumn;
import io.edurt.datacap.scheduler.Scheduler;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.plugin.PluginMetadata;
import io.edurt.datacap.scheduler.SchedulerRequest;
import io.edurt.datacap.scheduler.SchedulerService;
import io.edurt.datacap.service.adapter.PageRequestAdapter;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.body.PipelineFieldBody;
import io.edurt.datacap.service.body.adhoc.Adhoc;
import io.edurt.datacap.service.common.ConfigureUtils;
import io.edurt.datacap.service.common.FolderUtils;
import io.edurt.datacap.service.common.PluginUtils;
import io.edurt.datacap.service.configure.IConfigurePipelineType;
import io.edurt.datacap.service.entity.DataSetColumnEntity;
import io.edurt.datacap.service.entity.DataSetEntity;
@ -81,7 +77,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -97,17 +92,17 @@ public class DataSetServiceImpl
public final DataSetColumnRepository columnRepository;
private final DataSetRepository repository;
private final DatasetHistoryRepository historyRepository;
private final Injector injector;
private final PluginManager pluginManager;
private final InitializerConfigure initializerConfigure;
private final org.quartz.Scheduler scheduler;
private final Environment environment;
public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, Injector injector, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment)
public DataSetServiceImpl(DataSetRepository repository, DataSetColumnRepository columnRepository, DatasetHistoryRepository historyRepository, PluginManager pluginManager, InitializerConfigure initializerConfigure, org.quartz.Scheduler scheduler, Environment environment)
{
this.repository = repository;
this.columnRepository = columnRepository;
this.historyRepository = historyRepository;
this.injector = injector;
this.pluginManager = pluginManager;
this.initializerConfigure = initializerConfigure;
this.scheduler = scheduler;
this.environment = environment;
@ -117,7 +112,7 @@ public class DataSetServiceImpl
public CommonResponse<DataSetEntity> saveOrUpdate(DataSetEntity configure)
{
UserEntity user = UserDetailsService.getUser();
ExecutorService service = Executors.newSingleThreadExecutor();
java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> {
configure.setUser(user);
completeState(configure, DataSetState.METADATA_START);
@ -133,7 +128,7 @@ public class DataSetServiceImpl
if (!entity.isPresent()) {
return CommonResponse.failure(String.format("DataSet [ %s ] not found", id));
}
ExecutorService service = Executors.newSingleThreadExecutor();
java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> startBuild(entity.get(), false));
return CommonResponse.success(entity);
}
@ -156,7 +151,7 @@ public class DataSetServiceImpl
if (!entityOptional.isPresent()) {
return CommonResponse.failure(String.format("DataSet [ %s ] not found", id));
}
ExecutorService service = Executors.newSingleThreadExecutor();
java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor();
DataSetEntity entity = entityOptional.get();
service.submit(() -> syncData(entity, service));
return CommonResponse.success(true);
@ -167,7 +162,7 @@ public class DataSetServiceImpl
{
return repository.findByCode(code)
.map(item -> {
ExecutorService service = Executors.newSingleThreadExecutor();
java.util.concurrent.ExecutorService service = Executors.newSingleThreadExecutor();
service.submit(() -> clearData(item, service));
return CommonResponse.success(true);
})
@ -246,32 +241,33 @@ public class DataSetServiceImpl
String sql = new SqlBuilder(body).getSql();
log.info("Execute SQL: {} for DataSet [ {} ]", sql, code);
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name());
if (!pluginOptional.isPresent()) {
throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType()));
}
PluginService plugin = pluginOptional.get();
return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.map(plugin -> {
PluginService pluginService = plugin.getService(PluginService.class);
Configure targetConfigure = new Configure();
targetConfigure.setHost(initializerConfigure.getDataSetConfigure().getHost());
targetConfigure.setPort(Integer.valueOf(initializerConfigure.getDataSetConfigure().getPort()));
targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername()));
targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword()));
targetConfigure.setDatabase(Optional.ofNullable(database));
targetConfigure.setInjector(injector);
plugin.connect(targetConfigure);
Response response = plugin.execute(sql);
targetConfigure.setPluginManager(pluginManager);
Response response = pluginService.execute(targetConfigure, sql);
response.setContent(sql);
return CommonResponse.success(response);
})
.orElseGet(() -> CommonResponse.failure(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType())));
})
.orElseGet(() -> CommonResponse.failure(String.format("DataSet [ %s ] not found", code)));
}
@Override
public CommonResponse<Set<String>> getActuators()
public CommonResponse<Set<PluginMetadata>> getActuators()
{
Set<String> actuators = Sets.newHashSet();
this.injector.getInstance(Key.get(new TypeLiteral<Set<Scheduler>>() {}))
.forEach(item -> actuators.add(item.name()));
Set<PluginMetadata> actuators = Sets.newHashSet();
this.pluginManager.getPluginInfos()
.stream()
.filter(item -> item.getType().equals(io.edurt.datacap.plugin.PluginType.EXECUTOR))
.forEach(actuators::add);
return CommonResponse.success(actuators);
}
@ -338,7 +334,7 @@ public class DataSetServiceImpl
targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername()));
targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword()));
targetConfigure.setDatabase(Optional.ofNullable(database));
targetConfigure.setInjector(injector);
targetConfigure.setPluginManager(pluginManager);
return targetConfigure;
}
@ -402,11 +398,9 @@ public class DataSetServiceImpl
private void createTable(DataSetEntity entity)
{
try {
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name());
if (!pluginOptional.isPresent()) {
throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType()));
}
PluginService plugin = pluginOptional.get();
pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.ifPresentOrElse(plugin -> {
PluginService pluginService = plugin.getService(PluginService.class);
String database = initializerConfigure.getDataSetConfigure().getDatabase();
String originTableName = entity.getTableName();
String tableDefaultEngine = initializerConfigure.getDataSetConfigure().getTableDefaultEngine();
@ -448,9 +442,7 @@ public class DataSetServiceImpl
targetConfigure.setUsername(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getUsername()));
targetConfigure.setPassword(Optional.ofNullable(initializerConfigure.getDataSetConfigure().getPassword()));
targetConfigure.setDatabase(Optional.ofNullable(database));
plugin.connect(targetConfigure);
Response response = plugin.execute(sql);
plugin.destroy();
Response response = pluginService.execute(targetConfigure, sql);
if (response.getIsSuccessful()) {
entity.setTableName(originTableName);
entity.setMessage(null);
@ -459,6 +451,13 @@ public class DataSetServiceImpl
else {
throw new RuntimeException(response.getMessage());
}
},
() -> {
String type = initializerConfigure.getDataSetConfigure().getType();
String errorMessage = String.format("Plugin [ %s ] not found", type);
log.error(errorMessage);
throw new IllegalArgumentException(errorMessage);
});
}
catch (Exception e) {
log.warn("Create dataset [ {} ] ", entity.getName(), e);
@ -524,10 +523,9 @@ public class DataSetServiceImpl
TableBuilder.Companion.LIFECYCLE(String.format("`%s` + INTERVAL %s %s", item.getColumn().getName(), item.getColumn().getLength(), item.getColumn().getDefaultValue()));
String sql = TableBuilder.Companion.SQL();
log.info("Modify lifecycle sql \n {} \n on dataset [ {} ] id [ {} ]", sql, entity.getName(), entity.getId());
PluginService plugin = getOutputPlugin();
PluginService pluginService = getOutputPlugin();
SourceEntity source = getOutputSource();
plugin.connect(source.toConfigure());
Response response = plugin.execute(sql);
Response response = pluginService.execute(source.toConfigure(), sql);
Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage());
});
@ -547,24 +545,32 @@ public class DataSetServiceImpl
}
}
private void syncData(DataSetEntity entity, ExecutorService service)
private PluginService getOutputPlugin()
{
return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.map(plugin -> plugin.getService(PluginService.class))
.orElseThrow(() -> new RuntimeException("Not found service from %s" + initializerConfigure.getDataSetConfigure().getType()));
}
private void syncData(DataSetEntity entity, java.util.concurrent.ExecutorService service)
{
DatasetHistoryEntity history = new DatasetHistoryEntity();
try {
SourceEntity source = entity.getSource();
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(injector, source.getType(), source.getProtocol());
if (pluginOptional.isEmpty()) {
throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType()));
}
pluginManager.getPlugin(source.getType())
.ifPresentOrElse(plugin -> {
history.setState(RunState.CREATED);
history.setCreateTime(new Date());
history.setQuery(entity.getQuery());
history.setDataset(entity);
historyRepository.save(history);
Executor executor = ExecutorUtils.findOne(this.injector, entity.getExecutor());
PluginService inputPlugin = pluginOptional.get();
PluginService inputPlugin = plugin.getService(PluginService.class);
pluginManager.getPlugin(entity.getExecutor())
.ifPresent(executor -> {
ExecutorService executorService = executor.getService(ExecutorService.class);
Set<OriginColumn> originColumns = columnRepository.findAllByDataset(entity)
.stream()
.filter(item -> !item.isVirtualColumn())
@ -578,11 +584,13 @@ public class DataSetServiceImpl
IConfigurePipelineType.INPUT, entity.getExecutor(), entity.getQuery(), inputFieldBody);
Set<String> inputOptions = ConfigureUtils.convertOptions(source, environment, entity.getExecutor(), IConfigurePipelineType.INPUT);
Configure inputConfigure = source.toConfigure();
inputConfigure.setInjector(injector);
inputConfigure.setPluginManager(pluginManager);
ExecutorConfigure input = new ExecutorConfigure(source.getType(), inputProperties, inputOptions, RunProtocol.valueOf(source.getProtocol()),
inputPlugin, entity.getQuery(), database, entity.getTableName(), inputConfigure, originColumns);
PluginService outputPlugin = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null);
pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.ifPresent(_plugin -> {
PluginService outputPlugin = _plugin.getService(PluginService.class);
SourceEntity outputSource = new SourceEntity();
outputSource.setType(initializerConfigure.getDataSetConfigure().getType());
outputSource.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase());
@ -609,7 +617,7 @@ public class DataSetServiceImpl
String workHome = FolderUtils.getWorkHome(initializerConfigure.getDataHome(), entity.getUser().getUsername(), String.join(File.separator, "dataset", entity.getExecutor().toLowerCase(), taskName));
ExecutorRequest request = new ExecutorRequest(taskName, entity.getUser().getUsername(), input, output,
environment.getProperty(String.format("datacap.executor.%s.home", entity.getExecutor().toLowerCase())),
workHome, this.injector, 600,
workHome, this.pluginManager, 600,
RunWay.valueOf(environment.getProperty("datacap.executor.way")),
RunMode.valueOf(environment.getProperty("datacap.executor.mode")),
environment.getProperty("datacap.executor.startScript"),
@ -617,7 +625,8 @@ public class DataSetServiceImpl
history.setState(RunState.RUNNING);
historyRepository.save(history);
ExecutorResponse response = executor.start(request);
ExecutorResponse response = executorService.start(request);
history.setUpdateTime(new Date());
history.setElapsed((history.getUpdateTime().getTime() - history.getCreateTime().getTime()) / 1000);
history.setMode(QueryMode.SYNC);
@ -626,6 +635,12 @@ public class DataSetServiceImpl
Preconditions.checkArgument(response.getSuccessful(), response.getMessage());
this.flushTableMetadata(entity, outputPlugin, database, requireNonNull(output.getOriginConfigure()));
});
});
},
() -> {
throw new IllegalArgumentException(String.format("Plugin [ %s ] not found", initializerConfigure.getDataSetConfigure().getType()));
});
}
catch (Exception e) {
log.warn("Sync data for dataset [ {} ] failed", entity.getName(), e);
@ -647,7 +662,7 @@ public class DataSetServiceImpl
{
log.info("Start schedule for dataset [ {} ] id [ {} ]", entity.getName(), entity.getId());
if (entity.getSyncMode().equals(SyncMode.TIMING)) {
SpiUtils.findSchedule(this.injector, entity.getScheduler())
pluginManager.getPlugin(entity.getScheduler())
.ifPresent(scheduler -> {
SchedulerRequest request = new SchedulerRequest();
request.setName(entity.getId().toString());
@ -655,31 +670,36 @@ public class DataSetServiceImpl
request.setExpression(entity.getExpression());
request.setJobId(String.valueOf(entity.getId()));
request.setCreateBeforeDelete(true);
if (scheduler.name().equals("Default")) {
SchedulerService schedulerService = scheduler.getService(SchedulerService.class);
if (schedulerService.name().equals("Default")) {
request.setJob(new DatasetJob());
request.setScheduler(this.scheduler);
}
scheduler.initialize(request);
schedulerService.initialize(request);
});
}
else {
SpiUtils.findSchedule(this.injector, entity.getScheduler())
pluginManager.getPlugin(entity.getScheduler())
.ifPresent(scheduler -> {
SchedulerRequest request = new SchedulerRequest();
request.setName(entity.getId().toString());
request.setGroup("datacap");
if (scheduler.name().equals("Default")) {
SchedulerService schedulerService = scheduler.getService(SchedulerService.class);
if (schedulerService.name().equals("Default")) {
request.setScheduler(this.scheduler);
}
scheduler.stop(request);
schedulerService.stop(request);
});
}
}
private void clearData(DataSetEntity entity, ExecutorService service)
private void clearData(DataSetEntity entity, java.util.concurrent.ExecutorService service)
{
try {
PluginService plugin = PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null);
pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.ifPresent(plugin -> {
SourceEntity source = new SourceEntity();
source.setType(initializerConfigure.getDataSetConfigure().getType());
source.setDatabase(initializerConfigure.getDataSetConfigure().getDatabase());
@ -689,7 +709,7 @@ public class DataSetServiceImpl
source.setPassword(initializerConfigure.getDataSetConfigure().getPassword());
source.setProtocol(PluginType.JDBC.name());
plugin.connect(source.toConfigure());
PluginService pluginService = plugin.getService(PluginService.class);
SqlBody body = SqlBody.builder()
.type(SqlType.TRUNCATE)
.database(initializerConfigure.getDataSetConfigure().getDatabase())
@ -697,9 +717,10 @@ public class DataSetServiceImpl
.build();
String sql = new SqlBuilder(body).getSql();
log.info("Clear data for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql);
Response response = plugin.execute(sql);
Response response = pluginService.execute(source.toConfigure(), sql);
Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage());
this.flushTableMetadata(entity, plugin, initializerConfigure.getDataSetConfigure().getDatabase(), source.toConfigure());
this.flushTableMetadata(entity, pluginService, initializerConfigure.getDataSetConfigure().getDatabase(), source.toConfigure());
});
}
catch (Exception e) {
log.warn("Clear data for dataset [ {} ] failed", entity.getName(), e);
@ -713,11 +734,11 @@ public class DataSetServiceImpl
* Flushes the table metadata for a given dataset entity using the specified plugin and database configuration.
*
* @param entity the dataset entity to flush
* @param plugin the plugin used to connect
* @param pluginService the plugin used to connect
* @param database the database name
* @param configure the configuration settings
*/
private void flushTableMetadata(DataSetEntity entity, PluginService plugin, String database, Configure configure)
private void flushTableMetadata(DataSetEntity entity, PluginService pluginService, String database, Configure configure)
{
// Get the total number of rows and the total size of the dataset
log.info("Get the total number of rows and the total size of the dataset [ {} ]", entity.getName());
@ -733,9 +754,8 @@ public class DataSetServiceImpl
.condition(" AND ")
.build());
configure.setFormat("None");
configure.setInjector(injector);
plugin.connect(configure);
Response outputResponse = plugin.execute(builder.getSql());
configure.setPluginManager(pluginManager);
Response outputResponse = pluginService.execute(configure, builder.getSql());
if (outputResponse.getIsSuccessful()) {
Object columnData = outputResponse.getColumns().get(0);
if (columnData instanceof List<?>) {
@ -817,15 +837,16 @@ public class DataSetServiceImpl
*/
private boolean checkTableExists(DataSetEntity entity)
{
return pluginManager.getPlugin(initializerConfigure.getDataSetConfigure().getType())
.map(plugin -> {
try {
PluginService plugin = getOutputPlugin();
PluginService pluginService = plugin.getService(PluginService.class);
SourceEntity source = getOutputSource();
Configure configure = source.toConfigure();
configure.setInjector(injector);
plugin.connect(configure);
configure.setPluginManager(pluginManager);
String sql = String.format("SHOW CREATE TABLE `%s`.`%s`", initializerConfigure.getDataSetConfigure().getDatabase(), entity.getTableName());
log.info("Check table exists for dataset [ {} ] id [ {} ] sql \n {}", entity.getName(), entity.getId(), sql);
Response response = plugin.execute(sql);
Response response = pluginService.execute(configure, sql);
Preconditions.checkArgument(response.getIsSuccessful(), response.getMessage());
return true;
}
@ -833,16 +854,8 @@ public class DataSetServiceImpl
log.warn("Check table exists for dataset [ {} ] failed", entity.getName(), e);
return false;
}
}
/**
* Retrieves the output plugin using the injector and initializer configuration data set type.
*
* @return an instance of the output plugin, or null if not found
*/
private PluginService getOutputPlugin()
{
return PluginUtils.getPluginByNameAndType(injector, initializerConfigure.getDataSetConfigure().getType(), PluginType.JDBC.name()).orElseGet(null);
})
.orElse(false);
}
/**

View File

@ -1,6 +1,5 @@
package io.edurt.datacap.service.service.impl;
import com.google.inject.Injector;
import io.edurt.datacap.common.enums.ServiceState;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.sql.SqlBuilder;
@ -10,7 +9,6 @@ import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.audit.AuditPlugin;
import io.edurt.datacap.service.body.ExecuteDslBody;
import io.edurt.datacap.service.entity.ExecuteEntity;
import io.edurt.datacap.service.initializer.InitializerConfigure;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.service.service.ExecuteService;
import io.edurt.datacap.spi.PluginService;
@ -28,18 +26,14 @@ import java.util.Optional;
public class ExecuteServiceImpl
implements ExecuteService
{
private final Injector injector;
private final SourceRepository sourceRepository;
private final Environment environment;
private final InitializerConfigure initializerConfigure;
private final PluginManager pluginManager;
public ExecuteServiceImpl(Injector injector, SourceRepository sourceRepository, Environment environment, InitializerConfigure initializerConfigure, PluginManager pluginManager)
public ExecuteServiceImpl(SourceRepository sourceRepository, Environment environment, PluginManager pluginManager)
{
this.injector = injector;
this.sourceRepository = sourceRepository;
this.environment = environment;
this.initializerConfigure = initializerConfigure;
this.pluginManager = pluginManager;
}
@ -62,9 +56,8 @@ public class ExecuteServiceImpl
.env(Optional.ofNullable(entity.getConfigures()))
.format(configure.getFormat())
.usedConfig(entity.isUsedConfig())
.injector(plugin.getInjector())
.pluginManager(pluginManager)
.build();
_configure.setInjector(injector);
if (entity.isUsedConfig()) {
_configure.setUsername(Optional.of(entity.getUser().getUsername()));
String configHome = environment.getProperty("datacap.config.data");

View File

@ -6,7 +6,7 @@ import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.BeanToPropertiesUtils;
import io.edurt.datacap.executor.Executor;
import io.edurt.datacap.executor.ExecutorService;
import io.edurt.datacap.executor.common.RunEngine;
import io.edurt.datacap.executor.common.RunMode;
import io.edurt.datacap.executor.common.RunState;
@ -49,7 +49,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
@ -157,9 +156,9 @@ public class PipelineServiceImpl
log.info("Pipeline containers is not full, submit to executor [ {} ]", pipelineName);
pipelineEntity.setState(RunState.RUNNING);
repository.save(pipelineEntity);
Optional<Executor> executorOptional = injector.getInstance(Key.get(new TypeLiteral<Set<Executor>>() {}))
Optional<ExecutorService> executorOptional = injector.getInstance(Key.get(new TypeLiteral<Set<ExecutorService>>() {}))
.stream()
.filter(executor -> executor.name().equals(configure.getExecutor()))
.filter(executorService -> executorService.name().equals(configure.getExecutor()))
.findFirst();
try {
@ -176,7 +175,7 @@ public class PipelineServiceImpl
environment.getProperty("datacap.executor.startScript"),
RunEngine.valueOf(environment.getProperty("datacap.executor.engine")));
final ExecutorService executorService = Executors.newCachedThreadPool();
final java.util.concurrent.ExecutorService executorService = Executors.newCachedThreadPool();
PipelineEntity finalPipelineEntity = pipelineEntity;
executorService.submit(() -> {
initializer.getTaskExecutors()
@ -224,7 +223,7 @@ public class PipelineServiceImpl
return CommonResponse.failure(String.format("Pipeline [ %s ] is already stopped", entity.getName()));
}
ExecutorService service = initializer.getTaskExecutors()
java.util.concurrent.ExecutorService service = initializer.getTaskExecutors()
.get(entity.getName());
if (service != null) {
service.shutdownNow();

View File

@ -1,15 +1,15 @@
package io.edurt.datacap.service.service.impl;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.DateUtils;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.convert.ConvertFilter;
import io.edurt.datacap.convert.ConvertService;
import io.edurt.datacap.convert.model.ConvertRequest;
import io.edurt.datacap.convert.model.ConvertResponse;
import io.edurt.datacap.fs.FsRequest;
import io.edurt.datacap.fs.FsResponse;
import io.edurt.datacap.fs.FsService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.activity.HeatmapActivity;
import io.edurt.datacap.service.adapter.PageRequestAdapter;
import io.edurt.datacap.service.body.FilterBody;
@ -43,13 +43,13 @@ public class PluginAuditServiceImpl
{
private final PluginAuditRepository pluginAuditRepository;
private final InitializerConfigure initializer;
private final Injector injector;
private final PluginManager pluginManager;
public PluginAuditServiceImpl(PluginAuditRepository pluginAuditRepository, InitializerConfigure initializer, Injector injector)
public PluginAuditServiceImpl(PluginAuditRepository pluginAuditRepository, InitializerConfigure initializer, PluginManager pluginManager)
{
this.pluginAuditRepository = pluginAuditRepository;
this.initializer = initializer;
this.injector = injector;
this.pluginManager = pluginManager;
}
@Override
@ -116,15 +116,18 @@ public class PluginAuditServiceImpl
fsRequest.setEndpoint(initializer.getFsConfigure().getEndpoint());
fsRequest.setFileName(String.join(File.separator, value.getUser().getUsername(), DateUtils.formatYMD(), String.join(File.separator, "adhoc", code), "result.csv"));
}
FsResponse fsResponse = SpiUtils.findFs(injector, initializer.getFsConfigure().getType())
.map(v -> v.reader(fsRequest))
.get();
ConvertFilter.filter(injector, "Json")
pluginManager.getPlugin(initializer.getFsConfigure().getType())
.ifPresent(plugin -> {
FsService fsService = plugin.getService(FsService.class);
FsResponse fsResponse = fsService.reader(fsRequest);
pluginManager.getPlugin("Json")
.ifPresent(it -> {
ConvertRequest request = new ConvertRequest();
request.setStream(fsResponse.getContext());
ConvertService convertService = it.getService(ConvertService.class);
ConvertResponse _response = it.formatStream(request);
ConvertResponse _response = convertService.formatStream(request);
if (_response.getSuccessful()) {
response.setHeaders(_response.getHeaders()
.stream()
@ -133,6 +136,7 @@ public class PluginAuditServiceImpl
response.setColumns(_response.getColumns());
}
});
});
return CommonResponse.success(response);
})
.orElseGet(() -> CommonResponse.failure(String.format("Not found [ %s ] history", code)));

View File

@ -4,9 +4,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.enums.NodeType;
import io.edurt.datacap.common.enums.ServiceState;
@ -15,6 +12,7 @@ import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.CodeUtils;
import io.edurt.datacap.common.utils.JsonUtils;
import io.edurt.datacap.executor.common.RunState;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.adapter.PageRequestAdapter;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.body.SharedSourceBody;
@ -59,13 +57,11 @@ import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@ -86,10 +82,10 @@ public class SourceServiceImpl
private final ColumnRepository columnHandler;
private final TemplateSqlRepository templateHandler;
private final ScheduledHistoryRepository scheduledHistoryHandler;
private final Injector injector;
private final PluginManager pluginManager;
private final Environment environment;
public SourceServiceImpl(SourceRepository sourceRepository, UserRepository userRepository, ScheduledHistoryRepository scheduledHistoryRepository, DatabaseRepository databaseHandler, TableRepository tableHandler, ColumnRepository columnHandler, TemplateSqlRepository templateHandler, ScheduledHistoryRepository scheduledHistoryHandler, Injector injector, Environment environment)
public SourceServiceImpl(SourceRepository sourceRepository, UserRepository userRepository, ScheduledHistoryRepository scheduledHistoryRepository, DatabaseRepository databaseHandler, TableRepository tableHandler, ColumnRepository columnHandler, TemplateSqlRepository templateHandler, ScheduledHistoryRepository scheduledHistoryHandler, PluginManager pluginManager, Environment environment)
{
this.sourceRepository = sourceRepository;
this.userRepository = userRepository;
@ -99,7 +95,7 @@ public class SourceServiceImpl
this.columnHandler = columnHandler;
this.templateHandler = templateHandler;
this.scheduledHistoryHandler = scheduledHistoryHandler;
this.injector = injector;
this.pluginManager = pluginManager;
this.environment = environment;
}
@ -152,13 +148,10 @@ public class SourceServiceImpl
@Override
public CommonResponse<Object> testConnection(SourceEntity configure)
{
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getType(), configure.getProtocol());
if (!pluginOptional.isPresent()) {
return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND);
}
return pluginManager.getPlugin(configure.getType())
.map(plugin -> {
Configure _configure = new Configure();
PluginService plugin = pluginOptional.get();
PluginService pluginService = plugin.getService(PluginService.class);
_configure.setHost(configure.getHost());
_configure.setPort(configure.getPort());
_configure.setUsername(Optional.ofNullable(configure.getUsername()));
@ -167,13 +160,15 @@ public class SourceServiceImpl
_configure.setDatabase(_database);
_configure.setEnv(Optional.ofNullable(configure.getConfigures()));
_configure.setSsl(Optional.ofNullable(configure.getSsl()));
plugin.connect(_configure);
io.edurt.datacap.spi.model.Response response = plugin.execute(plugin.validator());
plugin.destroy();
pluginService.connect(_configure);
io.edurt.datacap.spi.model.Response response = pluginService.execute(pluginService.validator());
pluginService.destroy();
if (response.getIsSuccessful()) {
return CommonResponse.success(response);
}
return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage());
})
.orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND));
}
@Override
@ -207,19 +202,19 @@ public class SourceServiceImpl
public CommonResponse<Map<String, List<PluginEntity>>> getPlugins()
{
Map<String, List<PluginEntity>> pluginMap = new ConcurrentHashMap<>();
this.injector.getInstance(Key.get(new TypeLiteral<Set<PluginService>>() {})).stream().forEach(plugin -> {
PluginEntity entity = new PluginEntity();
entity.setName(plugin.name());
entity.setDescription(plugin.description());
entity.setType(plugin.type().name());
entity.setConfigure(PluginUtils.loadYamlConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment));
List<PluginEntity> plugins = pluginMap.get(plugin.type().name());
if (ObjectUtils.isEmpty(plugins)) {
plugins = new ArrayList<>();
}
plugins.add(entity);
pluginMap.put(plugin.type().name(), plugins);
});
// this.injector.getInstance(Key.get(new TypeLiteral<Set<PluginService>>() {})).stream().forEach(plugin -> {
// PluginEntity entity = new PluginEntity();
// entity.setName(plugin.name());
// entity.setDescription(plugin.description());
// entity.setType(plugin.type().name());
// entity.setConfigure(PluginUtils.loadYamlConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment));
// List<PluginEntity> plugins = pluginMap.get(plugin.type().name());
// if (ObjectUtils.isEmpty(plugins)) {
// plugins = new ArrayList<>();
// }
// plugins.add(entity);
// pluginMap.put(plugin.type().name(), plugins);
// });
return CommonResponse.success(pluginMap);
}
@ -251,11 +246,8 @@ public class SourceServiceImpl
@Override
public CommonResponse<Object> testConnectionV2(SourceBody configure)
{
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getName(), configure.getType());
if (!pluginOptional.isPresent()) {
return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND);
}
return pluginManager.getPlugin(configure.getName())
.map(plugin -> {
// Check configure
IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) {
@ -268,7 +260,7 @@ public class SourceServiceImpl
return CommonResponse.failure(ServiceState.PLUGIN_CONFIGURE_REQUIRED, ConfigureUtils.preparedMessage(requiredMismatchConfigures));
}
PluginService plugin = pluginOptional.get();
PluginService pluginService = plugin.getService(PluginService.class);
// The filter parameter value is null data
List<IConfigureField> applyConfigures = ConfigureUtils.filterNotEmpty(configure.getConfigure().getConfigures());
Configure _configure = ConfigureUtils.preparedConfigure(applyConfigures);
@ -281,24 +273,23 @@ public class SourceServiceImpl
_configure.setHome(cacheHome);
_configure.setUsername(Optional.of(UserDetailsService.getUser().getUsername()));
}
_configure.setInjector(injector);
plugin.connect(_configure);
io.edurt.datacap.spi.model.Response response = plugin.execute(plugin.validator());
_configure.setPluginManager(pluginManager);
pluginService.connect(_configure);
Response response = pluginService.execute(pluginService.validator());
if (response.getIsSuccessful()) {
plugin.destroy();
pluginService.destroy();
return CommonResponse.success(response);
}
return CommonResponse.failure(ServiceState.PLUGIN_EXECUTE_FAILED, response.getMessage());
})
.orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND));
}
@Override
public CommonResponse<SourceEntity> saveOrUpdateV2(SourceBody configure)
{
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, configure.getName(), configure.getType());
if (!pluginOptional.isPresent()) {
return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND);
}
return pluginManager.getPlugin(configure.getName())
.map(plugin -> {
// Check configure
IConfigure iConfigure = PluginUtils.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) {
@ -367,6 +358,8 @@ public class SourceServiceImpl
// Start sync metadata
this.syncMetadata(source.getId());
return CommonResponse.success(source);
})
.orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND));
}
@Override
@ -407,22 +400,18 @@ public class SourceServiceImpl
ScheduledHistoryEntity scheduledHistory = ScheduledHistoryEntity.builder().name(String.format("Sync source [ %s ]", entity.getName())).scheduled(scheduled).source(entity).state(RunState.RUNNING).build();
scheduledHistoryHandler.save(scheduledHistory);
log.info("==================== Sync metadata [ {} ] started =================", entity.getName());
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol());
if (pluginOptional.isEmpty()) {
log.warn("The source [ {} ] protocol [ {} ] is not available", entity.getName(), entity.getProtocol());
}
else {
pluginManager.getPlugin(entity.getType())
.ifPresent(plugin -> {
try {
PluginService plugin = pluginOptional.get();
PluginService pluginService = plugin.getService(PluginService.class);
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(plugin.validator());
pConfigure.setPluginManager(pluginManager);
Response response = pluginService.execute(pConfigure, pluginService.validator());
if (!response.getIsSuccessful()) {
log.error("The source [ {} ] not available", entity.getName());
}
else {
this.startSyncDatabase(entity, plugin, databaseCache, databaseTableCache, tableCache, tableColumnCache, databaseAddedCount, databaseUpdatedCount, databaseRemovedCount, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount);
this.startSyncDatabase(entity, pluginService, databaseCache, databaseTableCache, tableCache, tableColumnCache, databaseAddedCount, databaseUpdatedCount, databaseRemovedCount, tableAddedCount, tableUpdatedCount, tableRemovedCount, columnAddedCount, columnUpdatedCount, columnRemovedCount);
}
scheduledHistory.setState(RunState.SUCCESS);
}
@ -431,7 +420,7 @@ public class SourceServiceImpl
scheduledHistory.setState(RunState.FAILURE);
scheduledHistory.setMessage(ExceptionUtils.getStackTrace(e));
}
}
});
log.info("==================== Sync metadata [ {} ] finished =================", entity.getName());
Properties info = new Properties();
info.put("databaseAddedCount", databaseAddedCount.get());
@ -545,7 +534,7 @@ public class SourceServiceImpl
}
else {
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
pConfigure.setPluginManager(pluginManager);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {
@ -617,7 +606,7 @@ public class SourceServiceImpl
}
else {
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
pConfigure.setPluginManager(pluginManager);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {
@ -710,7 +699,7 @@ public class SourceServiceImpl
}
else {
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
pConfigure.setPluginManager(pluginManager);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {

View File

@ -1,7 +1,6 @@
package io.edurt.datacap.service.service.impl;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import io.edurt.datacap.common.enums.ServiceState;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.sql.SqlBuilder;
@ -11,14 +10,13 @@ import io.edurt.datacap.common.sql.configure.SqlOperator;
import io.edurt.datacap.common.sql.configure.SqlOrder;
import io.edurt.datacap.common.sql.configure.SqlType;
import io.edurt.datacap.common.utils.CSVUtils;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.fs.Fs;
import io.edurt.datacap.fs.FsRequest;
import io.edurt.datacap.fs.FsResponse;
import io.edurt.datacap.fs.FsService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.body.ExportBody;
import io.edurt.datacap.service.body.TableBody;
import io.edurt.datacap.service.body.TableFilter;
import io.edurt.datacap.service.common.PluginUtils;
import io.edurt.datacap.service.entity.BaseEntity;
import io.edurt.datacap.service.entity.ColumnEntity;
import io.edurt.datacap.service.entity.DatabaseEntity;
@ -69,16 +67,16 @@ import java.util.stream.Collectors;
public class TableServiceImpl
implements TableService
{
private final Injector injector;
private final PluginManager pluginManager;
private final TableRepository repository;
private final DatabaseRepository databaseRepository;
private final ColumnRepository columnRepository;
private final InitializerConfigure initializerConfigure;
private final HttpServletRequest request;
public TableServiceImpl(Injector injector, TableRepository repository, DatabaseRepository databaseRepository, ColumnRepository columnRepository, InitializerConfigure initializerConfigure, HttpServletRequest request)
public TableServiceImpl(PluginManager pluginManager, TableRepository repository, DatabaseRepository databaseRepository, ColumnRepository columnRepository, InitializerConfigure initializerConfigure, HttpServletRequest request)
{
this.injector = injector;
this.pluginManager = pluginManager;
this.repository = repository;
this.databaseRepository = databaseRepository;
this.columnRepository = columnRepository;
@ -100,37 +98,38 @@ public class TableServiceImpl
return repository.findByCode(code)
.map(table -> {
SourceEntity source = table.getDatabase().getSource();
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol());
if (!pluginOptional.isPresent()) {
return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND);
}
PluginService plugin = pluginOptional.get();
pluginManager.getPlugin(source.getType())
.map(plugin -> {
PluginService pluginService = plugin.getService(PluginService.class);
if (configure.getType().equals(SqlType.SELECT)) {
return this.fetchSelect(plugin, table, source, configure);
return this.fetchSelect(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.INSERT)) {
return this.fetchInsert(plugin, table, source, configure);
return this.fetchInsert(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.UPDATE)) {
return this.fetchUpdate(plugin, table, source, configure);
return this.fetchUpdate(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.DELETE)) {
return this.fetchDelete(plugin, table, source, configure);
return this.fetchDelete(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.ALTER)) {
return this.fetchAlter(plugin, table, source, configure);
return this.fetchAlter(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.SHOW)) {
return this.fetchShowCreateTable(plugin, table, source, configure);
return this.fetchShowCreateTable(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.TRUNCATE)) {
return this.fetchTruncateTable(plugin, table, source, configure);
return this.fetchTruncateTable(pluginService, table, source, configure);
}
else if (configure.getType().equals(SqlType.DROP)) {
return this.fetchDropTable(plugin, table, source, configure);
return this.fetchDropTable(pluginService, table, source, configure);
}
return CommonResponse.failure(String.format("Not implemented yet [ %s ]", configure.getType()));
})
.orElseGet(() -> CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND));
return CommonResponse.failure(ServiceState.PLUGIN_NOT_FOUND);
})
.orElse(CommonResponse.failure(String.format("Table [ %s ] not found", code)));
}
@ -144,16 +143,12 @@ public class TableServiceImpl
}
SourceEntity source = table.getDatabase().getSource();
Optional<PluginService> optionalPlugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol());
if (!optionalPlugin.isPresent()) {
return CommonResponse.failure(String.format("Plugin [ %s ] not found", source.getType()));
}
Optional<Fs> optionalFs = SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType());
if (!optionalFs.isPresent()) {
return CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType()));
}
return pluginManager.getPlugin(source.getType())
.map(plugin -> {
PluginService pluginService = plugin.getService(PluginService.class);
return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType())
.map(fsPlugin -> {
FsService fsService = fsPlugin.getService(FsService.class);
UserEntity user = UserDetailsService.getUser();
String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), user.getUsername(), "export");
log.info("Export data user [ {} ] home [ {} ]", user.getUsername(), endpoint);
@ -168,7 +163,7 @@ public class TableServiceImpl
.type(SqlType.SELECT)
.pagination(pagination)
.build();
CommonResponse<Object> tempResponse = this.fetchSelect(optionalPlugin.get(), table, source, tableFilter);
CommonResponse<Object> tempResponse = this.fetchSelect(pluginService, table, source, tableFilter);
if (tempResponse.getStatus()) {
Response pluginResponse = (Response) tempResponse.getData();
try {
@ -182,7 +177,7 @@ public class TableServiceImpl
.stream(Files.newInputStream(tempFile.toPath()))
.fileName(fileName)
.build();
optionalFs.get().writer(fsRequest);
fsService.writer(fsRequest);
if (initializerConfigure.getFsConfigure().getType().equals("Local")) {
String address = request.getRequestURL()
.toString()
@ -196,16 +191,17 @@ public class TableServiceImpl
}
}
return CommonResponse.failure(ServiceState.REQUEST_EXCEPTION);
})
.orElseGet(() -> CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType())));
})
.orElseGet(() -> CommonResponse.failure(String.format("Plugin [ %s ] not found", source.getType())));
}
@Override
public Object dataDownload(String username, String filename)
{
Optional<Fs> optionalFs = SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType());
if (!optionalFs.isPresent()) {
return CommonResponse.failure(String.format("Not found Fs [ %s ]", initializerConfigure.getFsConfigure().getType()));
}
return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType())
.map(plugin -> {
String endpoint = String.join(File.separator, initializerConfigure.getDataHome(), username, "export");
log.info("Download data user [ {} ] home [ {} ]", username, endpoint);
String filePath = String.join(File.separator, endpoint, filename);
@ -217,7 +213,9 @@ public class TableServiceImpl
.bucket(initializerConfigure.getFsConfigure().getBucket())
.fileName(filename)
.build();
FsResponse response = optionalFs.get().reader(fsRequest);
FsService fsService = plugin.getService(FsService.class);
FsResponse response = fsService.reader(fsRequest);
try {
Resource resource = new FileSystemResource(response.getRemote());
HttpHeaders headers = new HttpHeaders();
@ -231,6 +229,8 @@ public class TableServiceImpl
catch (IOException e) {
throw new RuntimeException(e);
}
})
.orElseThrow(() -> new RuntimeException("Not found plugin"));
}
@Override
@ -243,19 +243,21 @@ public class TableServiceImpl
DatabaseEntity database = optionalDatabase.get();
SourceEntity source = database.getSource();
PluginService plugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()).get();
return pluginManager.getPlugin(source.getType())
.map(plugin -> {
PluginService pluginService = plugin.getService(PluginService.class);
TableBuilder.Companion.BEGIN();
TableBuilder.Companion.CREATE_TABLE(String.format("`%s`.`%s`", database.getName(), configure.getName()));
TableBuilder.Companion.COLUMNS(configure.getColumns().stream().map(item -> item.toColumnVar()).collect(Collectors.toList()));
String sql = TableBuilder.Companion.SQL();
log.info("Create table sql \n {} \n on database [ {} ]", sql, database.getName());
Configure pConfigure = source.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(sql);
pConfigure.setPluginManager(pluginManager);
Response response = pluginService.execute(pConfigure, sql);
response.setContent(sql);
plugin.destroy();
return CommonResponse.success(response);
})
.orElseGet(() -> CommonResponse.failure("Not found plugin"));
}
@Override
@ -264,7 +266,8 @@ public class TableServiceImpl
return repository.findByCode(code)
.map(table -> {
SourceEntity source = table.getDatabase().getSource();
PluginService plugin = PluginUtils.getPluginByNameAndType(this.injector, source.getType(), source.getProtocol()).get();
pluginManager.getPlugin(source.getType())
.map(plugin -> {
AtomicReference<String> atomicReference = new AtomicReference<>(null);
if (configure.getType().equals(SqlType.CREATE)) {
ColumnBuilder.Companion.BEGIN();
@ -303,14 +306,16 @@ public class TableServiceImpl
}
else {
Configure pConfigure = source.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
response = plugin.execute(atomicReference.get());
pConfigure.setPluginManager(pluginManager);
PluginService pluginService = plugin.getService(PluginService.class);
response = pluginService.execute(pConfigure, atomicReference.get());
response.setContent(atomicReference.get());
plugin.destroy();
}
return CommonResponse.success(response);
})
.orElseGet(() -> CommonResponse.failure(""));
return CommonResponse.failure("");
})
.orElse(CommonResponse.failure(String.format("Table [ %s ] not found", code)));
}
@ -330,7 +335,7 @@ public class TableServiceImpl
int totalRows = Integer.parseInt(table.getRows());
Configure countConfigure = source.toConfigure();
countConfigure.setFormat("None");
countConfigure.setInjector(injector);
countConfigure.setPluginManager(pluginManager);
plugin.connect(countConfigure);
SqlBody countBody = SqlBody.builder()
.type(SqlType.SELECT)
@ -403,7 +408,7 @@ public class TableServiceImpl
SqlBuilder builder = new SqlBuilder(body);
String sql = builder.getSql();
Configure pConfigure = source.toConfigure();
pConfigure.setInjector(injector);
pConfigure.setPluginManager(pluginManager);
plugin.connect(pConfigure);
Response response = plugin.execute(sql);
response.setContent(sql);
@ -431,7 +436,7 @@ public class TableServiceImpl
try {
Configure updateConfigure = source.toConfigure();
updateConfigure.setFormat("None");
updateConfigure.setInjector(injector);
updateConfigure.setPluginManager(pluginManager);
plugin.connect(updateConfigure);
List<String> allSql = Lists.newArrayList();
// Gets the auto-increment column for the current row
@ -489,7 +494,7 @@ public class TableServiceImpl
try {
Configure updateConfigure = source.toConfigure();
updateConfigure.setFormat("None");
updateConfigure.setInjector(injector);
updateConfigure.setPluginManager(pluginManager);
plugin.connect(updateConfigure);
List<String> allSql = Lists.newArrayList();
configure.getColumns().forEach(v -> {
@ -527,7 +532,7 @@ public class TableServiceImpl
try {
Configure updateConfigure = source.toConfigure();
updateConfigure.setFormat("None");
updateConfigure.setInjector(injector);
updateConfigure.setPluginManager(pluginManager);
plugin.connect(updateConfigure);
List<String> allSql = Lists.newArrayList();
configure.getColumns().forEach(v -> {
@ -561,7 +566,7 @@ public class TableServiceImpl
try {
Configure alterConfigure = source.toConfigure();
alterConfigure.setFormat("None");
alterConfigure.setInjector(injector);
alterConfigure.setPluginManager(pluginManager);
plugin.connect(alterConfigure);
SqlBody body = SqlBody.builder()
.type(SqlType.ALTER)
@ -590,7 +595,7 @@ public class TableServiceImpl
try {
Configure alterConfigure = source.toConfigure();
alterConfigure.setFormat("None");
alterConfigure.setInjector(injector);
alterConfigure.setPluginManager(pluginManager);
plugin.connect(alterConfigure);
SqlBody body = SqlBody.builder()
.type(SqlType.SHOW)
@ -618,7 +623,7 @@ public class TableServiceImpl
try {
Configure alterConfigure = source.toConfigure();
alterConfigure.setFormat("None");
alterConfigure.setInjector(injector);
alterConfigure.setPluginManager(pluginManager);
plugin.connect(alterConfigure);
SqlBody body = SqlBody.builder()
.type(SqlType.TRUNCATE)
@ -646,7 +651,7 @@ public class TableServiceImpl
try {
Configure alterConfigure = source.toConfigure();
alterConfigure.setFormat("None");
alterConfigure.setInjector(injector);
alterConfigure.setPluginManager(pluginManager);
plugin.connect(alterConfigure);
SqlBody body = SqlBody.builder()
.type(SqlType.DROP)
@ -680,7 +685,7 @@ public class TableServiceImpl
.stream()
.filter(item -> item.getIsKey().equals("PRI"))
.collect(Collectors.toList());
if (originColumns.size() > 0) {
if (!originColumns.isEmpty()) {
// If the table contains a primary key, update the data using the primary key as a condition
originColumns.forEach(item -> wheres.add(SqlColumn.builder()
.column(item.getName())

View File

@ -1,13 +1,13 @@
package io.edurt.datacap.service.service.impl;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.CodeUtils;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.common.utils.UrlUtils;
import io.edurt.datacap.fs.FsRequest;
import io.edurt.datacap.fs.FsResponse;
import io.edurt.datacap.fs.FsService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.body.UploadBody;
import io.edurt.datacap.service.entity.convert.AvatarEntity;
import io.edurt.datacap.service.enums.UploadMode;
@ -28,13 +28,13 @@ import java.io.IOException;
public class UploadServiceImpl
implements UploadService
{
private final Injector injector;
private final PluginManager pluginManager;
private final HttpServletRequest request;
private final InitializerConfigure initializer;
public UploadServiceImpl(Injector injector, HttpServletRequest request, InitializerConfigure initializer)
public UploadServiceImpl(PluginManager pluginManager, HttpServletRequest request, InitializerConfigure initializer)
{
this.injector = injector;
this.pluginManager = pluginManager;
this.request = request;
this.initializer = initializer;
}
@ -49,9 +49,10 @@ public class UploadServiceImpl
.build();
try {
FsRequest fsRequest = getFsRequest(configure.getFile(), configure);
SpiUtils.findFs(injector, initializer.getFsConfigure().getType())
.ifPresent(fs -> {
FsResponse response = fs.writer(fsRequest);
pluginManager.getPlugin(initializer.getFsConfigure().getType())
.ifPresent(plugin -> {
FsService fsService = plugin.getService(FsService.class);
FsResponse response = fsService.writer(fsRequest);
entity.setPath(response.getRemote());
entity.setLocal(response.getRemote());
if (initializer.getFsConfigure().getType().equals("Local")) {

View File

@ -2,7 +2,6 @@ package io.edurt.datacap.service.service.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.common.enums.ServiceState;
import io.edurt.datacap.common.response.CommonResponse;
@ -10,9 +9,10 @@ import io.edurt.datacap.common.response.JwtResponse;
import io.edurt.datacap.common.utils.CodeUtils;
import io.edurt.datacap.common.utils.JsonUtils;
import io.edurt.datacap.common.utils.NullAwareBeanUtils;
import io.edurt.datacap.common.utils.SpiUtils;
import io.edurt.datacap.fs.FsRequest;
import io.edurt.datacap.fs.FsResponse;
import io.edurt.datacap.fs.FsService;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.service.adapter.PageRequestAdapter;
import io.edurt.datacap.service.audit.AuditUserLog;
import io.edurt.datacap.service.body.FilterBody;
@ -80,9 +80,9 @@ public class UserServiceImpl
private final RedisTemplate redisTemplate;
private final Environment environment;
private final InitializerConfigure initializerConfigure;
private final Injector injector;
private final PluginManager pluginManager;
public UserServiceImpl(UserRepository userRepository, RoleRepository roleRepository, SourceRepository sourceRepository, MenuRepository menuRepository, PasswordEncoder encoder, AuthenticationManager authenticationManager, JwtService jwtService, RedisTemplate redisTemplate, Environment environment, InitializerConfigure initializerConfigure, Injector injector)
public UserServiceImpl(UserRepository userRepository, RoleRepository roleRepository, SourceRepository sourceRepository, MenuRepository menuRepository, PasswordEncoder encoder, AuthenticationManager authenticationManager, JwtService jwtService, RedisTemplate redisTemplate, Environment environment, InitializerConfigure initializerConfigure, PluginManager pluginManager)
{
this.userRepository = userRepository;
this.roleRepository = roleRepository;
@ -94,7 +94,7 @@ public class UserServiceImpl
this.redisTemplate = redisTemplate;
this.environment = environment;
this.initializerConfigure = initializerConfigure;
this.injector = injector;
this.pluginManager = pluginManager;
}
@Override
@ -304,8 +304,9 @@ public class UserServiceImpl
@Override
public CommonResponse<FsResponse> uploadAvatar(MultipartFile file)
{
return SpiUtils.findFs(injector, initializerConfigure.getFsConfigure().getType())
.map(fs -> {
return pluginManager.getPlugin(initializerConfigure.getFsConfigure().getType())
.map(plugin -> {
UserEntity user = UserDetailsService.getUser();
try {
String avatarPath = initializerConfigure.getAvatarPath();
@ -319,7 +320,9 @@ public class UserServiceImpl
.stream(file.getInputStream())
.fileName(String.format("%s.png", user.getId()))
.build();
FsResponse response = fs.writer(fsRequest);
FsService fsService = plugin.getService(FsService.class);
FsResponse response = fsService.writer(fsRequest);
UserEntity entity = userRepository.findById(user.getId()).get();
Map<String, String> avatar = Maps.newConcurrentMap();
avatar.put("fsType", initializerConfigure.getFsConfigure().getType());

View File

@ -1,10 +1,10 @@
package io.edurt.datacap.spi.adapter;
import com.google.common.base.Preconditions;
import com.google.inject.Injector;
import io.edurt.datacap.convert.ConvertFilter;
import io.edurt.datacap.convert.model.ConvertRequest;
import io.edurt.datacap.convert.model.ConvertResponse;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.spi.model.Response;
import java.util.List;
@ -13,9 +13,9 @@ public interface Adapter
{
Response handlerExecute(String content);
default List<Object> handlerFormatter(Injector injector, String format, List<String> headers, List<Object> columns)
default List<Object> handlerFormatter(PluginManager pluginManager, String format, List<String> headers, List<Object> columns)
{
return ConvertFilter.filter(injector, format)
return ConvertFilter.filter(pluginManager, format)
.map(file -> {
ConvertRequest request = new ConvertRequest();
request.setHeaders(headers);

View File

@ -98,7 +98,7 @@ public class JdbcAdapter
finally {
response.setHeaders(headers);
response.setTypes(types);
response.setColumns(handlerFormatter(configure.getInjector(), configure.getFormat(), headers, columns));
response.setColumns(handlerFormatter(configure.getPluginManager(), configure.getFormat(), headers, columns));
response.setIsSuccessful(Boolean.TRUE);
}
}

View File

@ -77,24 +77,6 @@ public class JdbcConnection
try {
this.jdbcConfigure = (JdbcConfigure) getConfigure();
this.response = getResponse();
// Remove org.apache.pinot.client.PinotDriver and net.suteren.jdbc.influxdb.InfluxDbDriver
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
if (driver instanceof org.apache.pinot.client.PinotDriver
|| driver instanceof net.suteren.jdbc.influxdb.InfluxDbDriver) {
DriverManager.deregisterDriver(driver);
log.info("Deregistered driver {}", driver);
}
}
// Manually loading and registering the driver
Driver driver = (Driver) Class.forName(this.jdbcConfigure.getJdbcDriver())
.getDeclaredConstructor()
.newInstance();
DriverManager.registerDriver(driver);
String url = formatJdbcUrl();
log.info("Connection driver {}", this.jdbcConfigure.getJdbcDriver());
log.info("Connection url {}", url);

View File

@ -1,7 +1,7 @@
package io.edurt.datacap.spi.model;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.plugin.PluginManager;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -19,7 +19,7 @@ import java.util.Optional;
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class Configure
{
private Injector injector;
private PluginManager pluginManager;
private String host;
private Integer port;
private Optional<String> username = Optional.empty();

View File

@ -51,7 +51,7 @@ import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.fs.Fs
import io.edurt.datacap.fs.FsService
import io.edurt.datacap.fs.FsManager
import io.edurt.datacap.fs.FsRequest
import org.junit.Assert.assertTrue

View File

@ -51,7 +51,7 @@ import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.fs.Fs
import io.edurt.datacap.fs.FsService
import io.edurt.datacap.fs.FsManager
import io.edurt.datacap.fs.FsRequest
import org.junit.Assert.assertTrue

View File

@ -51,7 +51,7 @@ import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.fs.Fs
import io.edurt.datacap.fs.FsService
import io.edurt.datacap.fs.FsManager
import io.edurt.datacap.fs.FsRequest
import org.junit.Assert.assertTrue

View File

@ -51,7 +51,7 @@ import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.fs.Fs
import io.edurt.datacap.fs.FsService
import io.edurt.datacap.fs.FsManager
import io.edurt.datacap.fs.FsRequest
import org.junit.Before

View File

@ -51,7 +51,7 @@ import com.google.inject.Guice
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
import io.edurt.datacap.fs.Fs
import io.edurt.datacap.fs.FsService
import io.edurt.datacap.fs.FsManager
import io.edurt.datacap.fs.FsRequest
import org.junit.Assert.assertTrue

View File

@ -30,6 +30,11 @@
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-spi</artifactId>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,24 +0,0 @@
package io.edurt.datacap.executor
import com.google.inject.AbstractModule
import org.slf4j.LoggerFactory.getLogger
import java.time.LocalDateTime
import java.util.*
class ExecutorManager : AbstractModule {
private val log = getLogger(this.javaClass)
private var externalModules: Iterable<ExecutorModule>? = null
constructor() {
this.externalModules = ServiceLoader.load(ExecutorModule::class.java)
}
override fun configure() {
log.info("================ Executor started ================")
externalModules !!.forEach { module ->
log.info("Install Executor [ {} ] Join time [ {} ]", module.name(), LocalDateTime.now())
this.install(module)
}
log.info("================ Executor end ================")
}
}

View File

@ -1,12 +0,0 @@
package io.edurt.datacap.executor
import com.google.inject.AbstractModule
abstract class ExecutorModule : AbstractModule() {
open fun name(): String {
return this.javaClass
.simpleName
.removeSuffix("Module")
.removeSuffix("Executor")
}
}

View File

@ -2,9 +2,12 @@ package io.edurt.datacap.executor
import io.edurt.datacap.executor.configure.ExecutorRequest
import io.edurt.datacap.executor.configure.ExecutorResponse
import io.edurt.datacap.plugin.Service
interface Executor {
fun name(): String {
interface ExecutorService : Service
{
fun name(): String
{
return this.javaClass
.simpleName
.removeSuffix("Executor")

View File

@ -1,13 +0,0 @@
package io.edurt.datacap.executor
import com.google.inject.Injector
import com.google.inject.Key
import com.google.inject.TypeLiteral
object ExecutorUtils {
@JvmStatic
fun findOne(injector: Injector, name: String): Executor {
return injector.getInstance(Key.get(object : TypeLiteral<Set<Executor>>() {}))
.last { it.name() == name }
}
}

View File

@ -1,10 +1,10 @@
package io.edurt.datacap.executor.configure
import com.google.inject.Injector
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.edurt.datacap.executor.common.RunEngine
import io.edurt.datacap.executor.common.RunMode
import io.edurt.datacap.executor.common.RunWay
import io.edurt.datacap.plugin.PluginManager
@SuppressFBWarnings(value = ["EI_EXPOSE_REP", "EI_EXPOSE_REP2"])
data class ExecutorRequest(
@ -14,7 +14,7 @@ data class ExecutorRequest(
var output: ExecutorConfigure,
var executorHome: String? = null,
var workHome: String? = null,
var injector: Injector? = null,
var pluginManager: PluginManager? = null,
var timeout: Long = 600,
var runWay: RunWay = RunWay.LOCAL,
var runMode: RunMode = RunMode.CLIENT,

View File

@ -20,5 +20,10 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,29 +0,0 @@
package io.edurt.datacap.fs;
import com.google.inject.AbstractModule;
import lombok.extern.slf4j.Slf4j;
import java.util.ServiceLoader;
@Slf4j
public class FsManager
extends AbstractModule
{
private final Iterable<FsModule> externalModules;
public FsManager()
{
this.externalModules = ServiceLoader.load(FsModule.class);
}
@Override
protected void configure()
{
log.info("Loading fs start ...");
for (FsModule module : this.externalModules) {
log.info("Install fs [ {} ]", module.name());
this.install(module);
}
log.info("Loading fs end ...");
}
}

View File

@ -1,15 +0,0 @@
package io.edurt.datacap.fs;
import com.google.inject.AbstractModule;
public abstract class FsModule
extends AbstractModule
{
String name()
{
return this.getClass()
.getSimpleName()
.replace("Fs", "")
.replace("Module", "");
}
}

View File

@ -1,6 +1,9 @@
package io.edurt.datacap.fs;
public interface Fs
import io.edurt.datacap.plugin.Service;
public interface FsService
extends Service
{
default String name()
{

View File

@ -1,28 +0,0 @@
package io.edurt.datacap.fs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
public class FsManagerTest
{
private Injector injector;
@Before
public void before()
{
injector = Guice.createInjector(new FsManager());
}
@Test
public void test()
{
Assert.assertNotNull(injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {})));
}
}

View File

@ -1,19 +0,0 @@
package io.edurt.datacap.fs;
public class TestFs
implements Fs
{
@Override
public FsResponse writer(FsRequest request)
{
return FsResponse.builder()
.build();
}
@Override
public FsResponse reader(FsRequest request)
{
return FsResponse.builder()
.build();
}
}

View File

@ -1,13 +0,0 @@
package io.edurt.datacap.fs;
import com.google.inject.multibindings.Multibinder;
public class TestFsModule
extends FsModule
{
protected void configure()
{
Multibinder.newSetBinder(this.binder(), Fs.class)
.addBinding().to(TestFs.class);
}
}

View File

@ -1,15 +1,6 @@
package io.edurt.datacap.plugin;
import io.edurt.datacap.spi.PluginService;
import java.util.Set;
public class MySQLPlugin
extends Plugin
{
@Override
public Set<Class<? extends Service>> getServiceTypes()
{
return Set.of(Service.class, PluginService.class);
}
}

View File

@ -5,4 +5,15 @@ import io.edurt.datacap.spi.PluginService;
public class MySQLService
implements PluginService
{
@Override
public String connectType()
{
return "mysql";
}
@Override
public String driver()
{
return "com.mysql.cj.jdbc.Driver";
}
}

View File

@ -91,12 +91,12 @@
<module>parser/datacap-parser-spi</module>
<!-- <module>parser/datacap-parser-trino</module>-->
<!-- <module>parser/datacap-parser-mysql</module>-->
<!-- <module>scheduler/datacap-scheduler-spi</module>-->
<module>scheduler/datacap-scheduler-spi</module>
<!-- <module>scheduler/datacap-scheduler-local</module>-->
<module>notify/datacap-notify-spi</module>
<!-- <module>notify/datacap-notify-dingtalk</module>-->
<module>convert/datacap-convert-spi</module>
<!-- <module>convert/datacap-convert-txt</module>-->
<module>convert/datacap-convert-txt</module>
<!-- <module>convert/datacap-convert-json</module>-->
<!-- <module>convert/datacap-convert-none</module>-->
<!-- <module>convert/datacap-convert-csv</module>-->

View File

@ -35,6 +35,11 @@
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>

View File

@ -1,25 +0,0 @@
package io.edurt.datacap.scheduler
import com.google.inject.AbstractModule
import org.slf4j.LoggerFactory
import java.time.LocalTime
import java.util.*
class ScheduleManager : AbstractModule {
private val log = LoggerFactory.getLogger(this.javaClass)
private var externalModules: Iterable<SchedulerModule>? = null
constructor() {
this.externalModules = ServiceLoader.load(SchedulerModule::class.java)
}
override fun configure() {
log.info("================ Scheduler started ================")
externalModules !!.forEach { module ->
log.info("Install scheduler [ {} ] Join time [ {} ]", module.name(), LocalTime.now())
this.install(module)
}
log.info("================ Scheduler end ================")
}
}

View File

@ -1,12 +0,0 @@
package io.edurt.datacap.scheduler
import com.google.inject.AbstractModule
abstract class SchedulerModule : AbstractModule() {
open fun name(): String {
return this.javaClass
.simpleName
.removeSuffix("Module")
.removeSuffix("Scheduler")
}
}

View File

@ -1,7 +1,11 @@
package io.edurt.datacap.scheduler
interface Scheduler {
fun name(): String {
import io.edurt.datacap.plugin.Service
interface SchedulerService : Service
{
fun name(): String
{
return this.javaClass
.simpleName
.removeSuffix("Scheduler")