[Core] [Source] Adapter source metadata

This commit is contained in:
qianmoQ 2024-06-10 00:12:04 +08:00
parent a0c0171d2c
commit 51c26a340c

View File

@ -407,13 +407,15 @@ public class SourceServiceImpl
scheduledHistoryHandler.save(scheduledHistory);
log.info("==================== Sync metadata [ {} ] started =================", entity.getName());
Optional<Plugin> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol());
if (!pluginOptional.isPresent()) {
if (pluginOptional.isEmpty()) {
log.warn("The source [ {} ] protocol [ {} ] is not available", entity.getName(), entity.getProtocol());
}
else {
try {
Plugin plugin = pluginOptional.get();
plugin.connect(entity.toConfigure());
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(plugin.validator());
if (!response.getIsSuccessful()) {
log.error("The source [ {} ] not available", entity.getName());
@ -541,7 +543,9 @@ public class SourceServiceImpl
log.warn("The source [ {} ] protocol [ {} ] template [ {} ] is not available, skip sync database", entity.getName(), entity.getProtocol(), templateName);
}
else {
plugin.connect(entity.toConfigure());
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {
log.error("The source [ {} ] protocol [ {} ] sync metadata [ {} ] failed", entity.getName(), entity.getProtocol(), response.getMessage());
@ -611,7 +615,9 @@ public class SourceServiceImpl
log.warn("The source [ {} ] protocol [ {} ] template [ {} ] is not available, skip sync table", entity.getName(), entity.getProtocol(), templateName);
}
else {
plugin.connect(entity.toConfigure());
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {
log.error("The source [ {} ] protocol [ {} ] sync metadata tables [ {} ] failed", entity.getName(), entity.getProtocol(), response.getMessage());
@ -702,7 +708,9 @@ public class SourceServiceImpl
log.warn("The source [ {} ] protocol [ {} ] template [ {} ] is not available, skip sync column", entity.getName(), entity.getProtocol(), templateName);
}
else {
plugin.connect(entity.toConfigure());
Configure pConfigure = entity.toConfigure();
pConfigure.setInjector(injector);
plugin.connect(pConfigure);
Response response = plugin.execute(getSqlContext(template, null));
if (!response.getIsSuccessful()) {
log.error("The source [ {} ] protocol [ {} ] sync metadata columns [ {} ] failed", entity.getName(), entity.getProtocol(), response.getMessage());