feat(source): fix source task

This commit is contained in:
qianmoQ 2024-11-24 00:09:56 +08:00
parent 97f1ee0bd8
commit ae8c788176
6 changed files with 432 additions and 87 deletions

View File

@ -49,6 +49,7 @@ public class PluginConfiguration
return pluginManager;
}
// TODO: Delete it
@Bean
public Injector injector()
{

View File

@ -1,8 +1,10 @@
package io.edurt.datacap.server.configure;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.schedule.ScheduledCronRegistrar;
import io.edurt.datacap.service.entity.ScheduledEntity;
import io.edurt.datacap.service.enums.ScheduledType;
import io.edurt.datacap.service.repository.ScheduledRepository;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.service.service.SourceService;
@ -12,8 +14,14 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@Slf4j
@Service
@ -21,42 +29,106 @@ import java.util.concurrent.Executors;
public class ScheduleRunnerConfigure
implements CommandLineRunner
{
private final Injector injector;
private final PluginManager pluginManager;
private final ScheduledRepository scheduledRepository;
private final SourceRepository sourceRepository;
private final SourceService sourceService;
private final ScheduledCronRegistrar scheduledCronRegistrar;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ExecutorService executorService;
private final Map<ScheduledType, Function<String, Runnable>> taskFactories;
public ScheduleRunnerConfigure(Injector injector, ScheduledRepository scheduledRepository, SourceRepository sourceRepository, SourceService sourceService, ScheduledCronRegistrar scheduledCronRegistrar)
public ScheduleRunnerConfigure(PluginManager pluginManager,
ScheduledRepository scheduledRepository,
SourceRepository sourceRepository,
SourceService sourceService,
ScheduledCronRegistrar scheduledCronRegistrar)
{
this.injector = injector;
this.pluginManager = pluginManager;
this.scheduledRepository = scheduledRepository;
this.sourceRepository = sourceRepository;
this.sourceService = sourceService;
this.scheduledCronRegistrar = scheduledCronRegistrar;
this.executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "schedule-runner");
thread.setDaemon(true);
return thread;
});
this.taskFactories = initializeTaskFactories();
}
private Map<ScheduledType, Function<String, Runnable>> initializeTaskFactories()
{
Map<ScheduledType, Function<String, Runnable>> factories = new EnumMap<>(ScheduledType.class);
factories.put(ScheduledType.SOURCE_SYNCHRONIZE,
taskName -> new SyncMetadataScheduledRunnable(
taskName,
pluginManager,
sourceRepository,
sourceService
)
);
factories.put(ScheduledType.SOURCE_CHECK,
taskName -> new CheckScheduledRunnable(
taskName,
pluginManager,
sourceRepository
)
);
return factories;
}
@Override
public void run(String... args)
{
this.scheduledRepository.findAllByActiveIsTrueAndIsSystemIsTrue()
.forEach(task -> {
log.info("Add new task [ {} ] to scheduler", task.getName());
switch (task.getType()) {
case SOURCE_SYNCHRONIZE:
SyncMetadataScheduledRunnable syncMetadataScheduledRunnable = new SyncMetadataScheduledRunnable(task.getName(), injector, sourceRepository, sourceService);
this.scheduledCronRegistrar.addCronTask(syncMetadataScheduledRunnable, task.getExpression());
executorService.submit(syncMetadataScheduledRunnable);
break;
case SOURCE_CHECK:
CheckScheduledRunnable checkScheduledRunnable = new CheckScheduledRunnable(task.getName(), this.injector, this.sourceRepository);
this.scheduledCronRegistrar.addCronTask(checkScheduledRunnable, task.getExpression());
executorService.submit(checkScheduledRunnable);
break;
default:
log.warn("Unsupported task type [ {} ]", task.getType());
}
});
try {
scheduledRepository.findAllByActiveIsTrueAndIsSystemIsTrue()
.forEach(this::scheduleTask);
}
catch (Exception e) {
log.error("Failed to initialize scheduled tasks", e);
}
}
private void scheduleTask(ScheduledEntity task)
{
try {
log.info("Adding new task [ {} ] to scheduler", task.getName());
Function<String, Runnable> taskFactory = taskFactories.get(task.getType());
if (taskFactory != null) {
Runnable runnable = taskFactory.apply(task.getName());
scheduledCronRegistrar.addCronTask(runnable, task.getExpression());
executorService.submit(runnable);
}
else {
log.warn("Unsupported task type [ {} ]", task.getType());
}
}
catch (Exception e) {
log.error("Failed to schedule task [ {} ]", task.getName(), e);
}
}
@PreDestroy
public void shutdown()
{
if (executorService != null && !executorService.isShutdown()) {
try {
log.info("Shutting down schedule runner executor service...");
executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
log.warn("Schedule runner executor service did not terminate in time, forcing shutdown...");
executorService.shutdownNow();
}
}
catch (InterruptedException e) {
log.warn("Schedule runner shutdown interrupted", e);
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -2,11 +2,13 @@ package io.edurt.datacap.service.itransient;
import io.edurt.datacap.common.enums.Type;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor

View File

@ -1,84 +1,216 @@
package io.edurt.datacap.service.source;
import com.google.inject.Injector;
import io.edurt.datacap.plugin.Plugin;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.schedule.ScheduledRunnable;
import io.edurt.datacap.service.common.PluginUtils;
import io.edurt.datacap.service.entity.SourceEntity;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.spi.PluginService;
import io.edurt.datacap.spi.model.Configure;
import io.edurt.datacap.spi.model.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Optional;
/**
* 数据源健康检查定时任务运行器
* Scheduled task runner for checking data source health status
*
* <p>
* 该类负责定期检查所有数据源的连接状态版本信息等并更新数据源状态
* This class is responsible for periodically checking the connection status,
* version information, etc. of all data sources and updating their status
* </p>
*/
@Slf4j
public class CheckScheduledRunnable
extends ScheduledRunnable
{
private final Injector injector;
private final SourceRepository sourceRepository;
private final PluginManager pluginManager;
public CheckScheduledRunnable(String name, Injector injector, SourceRepository sourceRepository)
/**
* 构造函数
* Constructor
*
* @param name 任务名称 Task name
* @param pluginManager 插件管理器 Plugin manager
* @param sourceRepository 数据源仓库 Source repository
*/
public CheckScheduledRunnable(String name, PluginManager pluginManager, SourceRepository sourceRepository)
{
super(name);
this.injector = injector;
this.pluginManager = pluginManager;
this.sourceRepository = sourceRepository;
}
/**
* 执行定时任务
* Execute the scheduled task
*
* <p>
* 遍历所有数据源并执行健康检查
* Iterate through all data sources and perform health checks
* </p>
*/
@Override
public void run()
{
log.info("==================== {} started =================", this.getName());
this.sourceRepository.findAll()
.forEach(entity -> {
log.info("Before check source {}", entity.getName());
Optional<PluginService> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol());
if (!pluginOptional.isPresent()) {
log.warn("Check scheduled task <{}> source {} protocol {} is not available", this.getName(), entity.getType(), entity.getProtocol());
}
else {
PluginService plugin = pluginOptional.get();
plugin.connect(getConfigure(entity));
Response response = plugin.execute(plugin.validator());
if (response.getIsSuccessful()) {
entity.setAvailable(true);
if (response.getColumns().isEmpty()) {
entity.setVersion("-");
}
else {
if (response.getColumns().get(0) instanceof ArrayList) {
ArrayList versions = (ArrayList) response.getColumns().get(0);
entity.setVersion(versions.get(0).toString());
}
else {
entity.setVersion(response.getColumns().get(0).toString());
}
}
}
else {
entity.setAvailable(false);
entity.setVersion(null);
entity.setMessage(response.getMessage());
}
this.sourceRepository.save(entity);
}
});
sourceRepository.findAll()
.forEach(this::checkAndUpdateSource);
}
private Configure getConfigure(SourceEntity entity)
/**
* 检查并更新单个数据源状态
* Check and update status for a single data source
*
* @param source 待检查的数据源 The data source to be checked
*/
private void checkAndUpdateSource(SourceEntity source)
{
Configure configure = new Configure();
configure.setHost(entity.getHost());
configure.setPort(entity.getPort());
configure.setUsername(Optional.ofNullable(entity.getUsername()));
configure.setPassword(Optional.ofNullable(entity.getPassword()));
Optional<String> database = StringUtils.isNotEmpty(entity.getDatabase()) ? Optional.ofNullable(entity.getDatabase()) : Optional.empty();
configure.setDatabase(database);
configure.setSsl(Optional.ofNullable(entity.getSsl()));
configure.setEnv(Optional.ofNullable(entity.getConfigures()));
return configure;
log.info("Before check source {}", source.getName());
pluginManager.getPlugin(source.getType())
.ifPresentOrElse(
plugin -> processSourceWithPlugin(source, plugin),
() -> logUnavailableSource(source)
);
}
/**
* 使用插件处理数据源
* Process data source with the corresponding plugin
*
* @param source 数据源 The data source
* @param plugin 对应的插件 The corresponding plugin
*/
private void processSourceWithPlugin(SourceEntity source, Plugin plugin)
{
try {
PluginService pluginService = plugin.getService(PluginService.class);
Response response = executePluginService(source, plugin, pluginService);
updateSourceStatus(source, response);
sourceRepository.save(source);
}
catch (Exception e) {
handleSourceError(source, e);
}
}
/**
* 执行插件服务
* Execute plugin service
*
* @param source 数据源 The data source
* @param plugin 插件 The plugin
* @param pluginService 插件服务 The plugin service
* @return Response 执行响应 Execution response
*/
private Response executePluginService(SourceEntity source, Plugin plugin, PluginService pluginService)
{
return pluginService.execute(
source.toConfigure(pluginManager, plugin),
pluginService.validator()
);
}
/**
* 更新数据源状态
* Update data source status
*
* @param source 数据源 The data source
* @param response 检查响应 Check response
*/
private void updateSourceStatus(SourceEntity source, Response response)
{
source.setAvailable(response.getIsSuccessful());
if (response.getIsSuccessful()) {
updateSourceVersion(source, response);
}
else {
markSourceUnavailable(source, response.getMessage());
}
}
/**
* 更新数据源版本信息
* Update data source version information
*
* <p>
* 处理不同格式的版本信息包括空值数组和普通字符串
* Handle version information in different formats, including null values, arrays, and plain strings
* </p>
*
* @param source 数据源 The data source
* @param response 检查响应 Check response
*/
private void updateSourceVersion(SourceEntity source, Response response)
{
if (response.getColumns().isEmpty()) {
source.setVersion("-");
return;
}
Optional.ofNullable(response.getColumns().get(0))
.map(version -> {
if (version instanceof ArrayList) {
return ((ArrayList<?>) version).get(0).toString();
}
return version.toString();
})
.ifPresent(source::setVersion);
}
/**
* 标记数据源为不可用状态
* Mark data source as unavailable
*
* @param source 数据源 The data source
* @param message 错误消息 Error message
*/
private void markSourceUnavailable(SourceEntity source, String message)
{
source.setAvailable(false);
source.setVersion(null);
source.setMessage(message);
}
/**
* 处理数据源错误
* Handle data source error
*
* <p>
* 记录错误日志并更新数据源状态
* Log error message and update data source status
* </p>
*
* @param source 数据源 The data source
* @param e 异常信息 Exception information
*/
private void handleSourceError(SourceEntity source, Exception e)
{
log.error("Error processing source {}: {}", source.getName(), e.getMessage(), e);
markSourceUnavailable(source, "Error processing source: " + e.getMessage());
sourceRepository.save(source);
}
/**
* 记录不可用数据源日志
* Log unavailable data source
*
* <p>
* 当找不到对应的插件时记录警告日志
* Log warning message when corresponding plugin is not found
* </p>
*
* @param source 数据源 The data source
*/
private void logUnavailableSource(SourceEntity source)
{
log.warn("Check scheduled task <{}> source {} protocol {} is not available",
getName(),
source.getType(),
source.getProtocol());
}
}

View File

@ -1,38 +1,177 @@
package io.edurt.datacap.service.source;
import com.google.inject.Injector;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.plugin.PluginManager;
import io.edurt.datacap.schedule.ScheduledRunnable;
import io.edurt.datacap.service.entity.SourceEntity;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.service.service.SourceService;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 数据源元数据同步定时任务运行器
* Scheduled task runner for synchronizing data source metadata
*
* <p>
* 该类使用函数式编程方式实现数据源元数据的并行同步
* This class implements parallel synchronization of data source metadata using functional programming
* </p>
*/
@Slf4j
@SuppressFBWarnings(value = {"REC_CATCH_EXCEPTION", "EI_EXPOSE_REP2"})
public class SyncMetadataScheduledRunnable
extends ScheduledRunnable
{
private final Injector injector;
private final SourceRepository sourceHandler;
/**
* 插件管理器 Plugin manager
*/
private final PluginManager pluginManager;
/**
* 数据源仓库 Source repository
*/
private final SourceRepository sourceRepository;
/**
* 数据源服务 Source service
*/
private final SourceService sourceService;
public SyncMetadataScheduledRunnable(String name, Injector injector, SourceRepository sourceHandler, SourceService sourceService)
/**
* 构造函数 Constructor
*/
public SyncMetadataScheduledRunnable(String name,
PluginManager pluginManager,
SourceRepository sourceRepository,
SourceService sourceService)
{
super(name);
this.injector = injector;
this.sourceHandler = sourceHandler;
this.pluginManager = pluginManager;
this.sourceRepository = sourceRepository;
this.sourceService = sourceService;
}
/**
* Executes the run method of the Runnable interface.
* 执行元数据同步任务
* Execute metadata synchronization task
*/
@Override
public void run()
{
sourceHandler.findAll()
.stream()
.parallel()
.forEach(entity -> sourceService.syncMetadata(entity.getId()));
logTaskStart()
.andThen(this::executeSyncTask)
.andThen(this::logTaskEnd)
.accept(getName());
}
/**
* 记录任务开始
* Log task start
*
* @return 返回一个消费者函数 Returns a consumer function
*/
private Consumer<String> logTaskStart()
{
return taskName -> log.info("==================== {} started =================", taskName);
}
/**
* 记录任务结束
* Log task end
*/
private void logTaskEnd(String taskName)
{
log.info("==================== {} ended =================", taskName);
}
/**
* 执行同步任务
* Execute synchronization task
*
* @param taskName 任务名称 Task name
*/
private void executeSyncTask(String taskName)
{
try {
sourceRepository.findAll()
.stream()
.map(this::createSyncOperation)
.collect(Collectors.collectingAndThen(
Collectors.toList(),
this::executeParallelSync
));
}
catch (Exception e) {
log.error("Failed to execute metadata sync task: {}", e.getMessage(), e);
}
}
/**
* 创建同步操作
* Create synchronization operation
*
* @param source 数据源 Source
* @return 返回一个异步操作 Returns an async operation
*/
private CompletableFuture<Void> createSyncOperation(SourceEntity source)
{
return CompletableFuture.runAsync(() ->
Optional.of(source)
.map(SourceEntity::getId)
.ifPresent(this::syncSourceMetadata)
);
}
/**
* 执行并行同步
* Execute parallel synchronization
*
* @param futures 异步操作列表 List of async operations
*/
private Void executeParallelSync(List<CompletableFuture<Void>> futures)
{
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
}
catch (Exception e) {
log.error("Error in parallel execution: {}", e.getMessage(), e);
}
return null;
}
/**
* 同步单个数据源的元数据
* Synchronize metadata for a single data source
*
* @param sourceId 数据源ID Source ID
*/
private void syncSourceMetadata(Long sourceId)
{
Function<Long, String> getSourceName = id ->
sourceRepository.findById(id)
.map(SourceEntity::getName)
.orElse("Unknown");
Consumer<Long> syncOperation = id -> {
String sourceName = getSourceName.apply(id);
try {
log.debug("Starting metadata sync for source: {}", sourceName);
sourceService.syncMetadata(id);
log.debug("Completed metadata sync for source: {}", sourceName);
}
catch (Exception e) {
log.error("Failed to sync metadata for source {}: {}", sourceName, e.getMessage(), e);
}
};
syncOperation.accept(sourceId);
}
}

View File

@ -15,7 +15,6 @@ public interface Adapter
default List<Object> handlerFormatter(PluginManager pluginManager, String format, List<String> headers, List<Object> columns)
{
return ConvertFilter.filter(pluginManager, format)
.map(file -> {
ConvertRequest request = new ConvertRequest();