feat(plugin): fix not close stream

This commit is contained in:
qianmoQ 2024-11-23 13:50:01 +08:00
parent 39465a6a67
commit cc176e0222
17 changed files with 228 additions and 145 deletions

View File

@ -53,7 +53,7 @@ public abstract class Plugin
* Set plugin class loader
*
* @param classLoader 插件类加载器
* @param classLoader Plugin class loader
* Plugin class loader
*/
public void setPluginClassLoader(PluginClassLoader classLoader)
{
@ -66,9 +66,9 @@ public abstract class Plugin
* Type-safe service binding method
*
* @param service 服务接口类型
* @param service service interface type
* service interface type
* @param implementation 服务实现类型
* @param implementation service implementation type
* service implementation type
*/
@SuppressWarnings("unchecked")
private <T extends Service> void bindService(Class<? extends Service> service, Class<? extends Service> implementation, boolean multiple, String qualifier)
@ -131,7 +131,7 @@ public abstract class Plugin
* Get list of service types to load
*
* @return 服务类型列表
* @return list of service types
* list of service types
*/
public Set<Class<? extends Service>> getServiceTypes()
{
@ -160,10 +160,6 @@ public abstract class Plugin
}
}
/**
* 配置服务
* Configure services
*/
/**
* 配置服务
* Configure services
@ -258,9 +254,9 @@ public abstract class Plugin
* Get service instance
*
* @param serviceClass 服务类型
* @param serviceClass service class type
* service class type
* @return 服务实例
* @return service instance
* service instance
*/
public <T extends Service> T getService(Class<T> serviceClass)
{
@ -288,11 +284,11 @@ public abstract class Plugin
* Get named service instance
*
* @param serviceClass 服务接口类型
* @param serviceClass service interface type
* service interface type
* @param name 服务名称
* @param name service name
* service name
* @return 服务实例
* @return service instance
* service instance
*/
public <T extends Service> T getService(Class<T> serviceClass, String name)
{

View File

@ -24,12 +24,11 @@ public class PluginContextManager
* Set the plugin class loader for the current thread
*
* @param classLoader 插件类加载器
* @param classLoader Plugin class loader
* Plugin class loader
*/
public static void setPluginClassLoader(PluginClassLoader classLoader)
{
log.debug("Setting plugin class loader: {} for plugin: {}",
classLoader, classLoader.getPluginName());
log.debug("Setting plugin class loader: {} for plugin: {}", classLoader, classLoader.getPluginName());
synchronized (contextClassLoaders) {
contextClassLoaders.put(Thread.currentThread(), classLoader);
}
@ -40,7 +39,7 @@ public class PluginContextManager
* Get the plugin class loader for the current thread
*
* @return 插件类加载器
* @return Plugin class loader
* Plugin class loader
*/
public static ClassLoader getPluginClassLoader()
{

View File

@ -8,20 +8,26 @@ import io.edurt.datacap.plugin.loader.PluginClassLoader;
import io.edurt.datacap.plugin.loader.PluginLoaderFactory;
import io.edurt.datacap.plugin.utils.PluginClassLoaderUtils;
import io.edurt.datacap.plugin.utils.VersionUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import java.io.File;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@ -42,6 +48,7 @@ public class PluginManager
// 运行状态标志
// Running state flag
@Getter
private volatile boolean running;
// 插件安装锁
@ -56,6 +63,14 @@ public class PluginManager
// Default version
private static final String DEFAULT_VERSION = "1.0.0";
// 插件监视器
// Plugin watcher
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "plugin-watcher-" + System.currentTimeMillis());
thread.setDaemon(true);
return thread;
});
public PluginManager(PluginConfigure config)
{
this.config = config;
@ -284,10 +299,18 @@ public class PluginManager
{
try {
if (Files.exists(directory)) {
Files.walk(directory)
.sorted((a, b) -> b.compareTo(a))
.map(Path::toFile)
.forEach(File::delete);
try (Stream<Path> pathStream = Files.walk(directory)) {
pathStream.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(file -> {
if (!file.delete()) {
log.warn("Failed to delete file: {}", file.getAbsolutePath());
if (file.exists()) {
file.deleteOnExit();
}
}
});
}
}
}
catch (IOException e) {
@ -302,15 +325,29 @@ public class PluginManager
{
// 验证基本文件结构
// Validate basic file structure
boolean isValid = Files.exists(pluginPath) &&
(Files.exists(pluginPath.resolve("pom.xml")) ||
Files.exists(pluginPath.resolve("plugin.properties")) ||
Files.exists(pluginPath.resolve("META-INF/services")) ||
Files.list(pluginPath).anyMatch(path -> path.toString().endsWith(".class")) ||
Files.list(pluginPath).anyMatch(path -> path.toString().endsWith(".jar")));
if (!Files.exists(pluginPath)) {
throw new IOException("Plugin path does not exist: " + pluginPath);
}
if (!isValid) {
throw new IOException("Invalid plugin structure in: " + pluginPath);
boolean hasRequiredFiles = Files.exists(pluginPath.resolve("pom.xml")) ||
Files.exists(pluginPath.resolve("plugin.properties")) ||
Files.exists(pluginPath.resolve("META-INF/services"));
if (hasRequiredFiles) {
return;
}
// 检查是否存在 class jar 文件
// Check for class or jar files
try (Stream<Path> paths = Files.list(pluginPath)) {
boolean hasClassOrJar = paths.anyMatch(path -> {
String pathStr = path.toString();
return pathStr.endsWith(".class") || pathStr.endsWith(".jar");
});
if (!hasClassOrJar) {
throw new IOException("Invalid plugin structure in: " + pluginPath);
}
}
}
@ -323,21 +360,25 @@ public class PluginManager
}
// 检查是否为 SPI 插件
// Check if it's a SPI plugin
// Check if it's SPI plugin
private boolean isSpiPlugin(Path path)
{
if (Files.isDirectory(path)) {
try {
Path servicesPath = path.resolve("META-INF/services");
return Files.exists(servicesPath) &&
Files.list(servicesPath)
.anyMatch(file -> file.toString().endsWith(Plugin.class.getName()));
}
catch (IOException e) {
return false;
}
if (!Files.isDirectory(path)) {
return false;
}
Path servicesPath = path.resolve("META-INF/services");
if (!Files.exists(servicesPath)) {
return false;
}
try (Stream<Path> serviceFiles = Files.list(servicesPath)) {
return serviceFiles.anyMatch(file -> file.toString().endsWith(Plugin.class.getName()));
}
catch (IOException e) {
log.debug("Failed to check SPI plugin at path: {}", path, e);
return false;
}
return false;
}
// 安装 Properties 类型插件
@ -475,7 +516,12 @@ public class PluginManager
pluginDir.toString(),
k -> {
try {
return PluginClassLoaderUtils.createClassLoader(pluginDir, pluginBaseName, pluginVersion);
return PluginClassLoaderUtils.createClassLoader(
pluginDir,
pluginBaseName,
pluginVersion,
true
);
}
catch (Exception e) {
log.error("Failed to create ClassLoader for plugin: {} at {}", pluginBaseName, pluginDir, e);
@ -491,7 +537,8 @@ public class PluginManager
loader = PluginClassLoaderUtils.createClassLoader(
pluginDir,
pluginBaseName,
pluginVersion
pluginVersion,
true
);
}
@ -587,20 +634,42 @@ public class PluginManager
// Start plugin watcher thread
private void startPluginWatcher()
{
Thread watchThread = new Thread(() -> {
while (running) {
try {
Thread.sleep(config.getScanInterval());
loadPlugins();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
scheduler.scheduleWithFixedDelay(() -> {
try {
loadPlugins();
}
catch (Exception e) {
log.error("Failed to load plugins during watch cycle", e);
}
}, 0, config.getScanInterval(), TimeUnit.MILLISECONDS);
log.info("Started plugin watcher with scan interval: {}ms", config.getScanInterval());
}
// 停止插件监视器线程
// Stop plugin watcher thread
private void stopPluginWatcher()
{
try {
scheduler.shutdown();
if (!scheduler.awaitTermination(config.getScanInterval(), TimeUnit.MILLISECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
log.warn("Plugin watcher scheduler did not terminate");
}
}
});
watchThread.setDaemon(true);
watchThread.start();
}
catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
log.warn("Interrupted while shutting down plugin watcher", e);
}
}
@PreDestroy
public void destroy()
{
stopPluginWatcher();
}
// 获取指定名称的插件

View File

@ -1,5 +1,8 @@
package io.edurt.datacap.plugin;
import lombok.Getter;
@Getter
public enum PluginType
{
CONNECTOR("Connector"),
@ -7,20 +10,10 @@ public enum PluginType
SCHEDULER("Scheduler"),
CONVERT("Convert");
private String name;
private final String name;
PluginType(String name)
{
this.name = name;
}
public String getName()
{
return this.name;
}
public void setName(String name)
{
this.name = name;
}
}

View File

@ -1,5 +1,8 @@
package io.edurt.datacap.plugin;
import lombok.Getter;
@Getter
public enum SpiType
{
COMPILED_POM("CompiledPom"),
@ -10,18 +13,13 @@ public enum SpiType
INJECT("Inject"),
TAR("Tar");
private String name;
private final String name;
SpiType(String name)
{
this.name = name;
}
public String getName()
{
return this.name;
}
public static SpiType fromName(String name)
{
return valueOf(name);

View File

@ -3,11 +3,8 @@ package io.edurt.datacap.plugin.loader;
import io.edurt.datacap.plugin.Plugin;
import io.edurt.datacap.plugin.SpiType;
import lombok.extern.slf4j.Slf4j;
import org.apache.maven.model.Model;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import java.io.File;
import java.io.FileReader;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.net.URLClassLoader;
@ -47,11 +44,6 @@ public class CompiledPomPluginLoader
return List.of();
}
// 读取POM文件
// Read POM file
MavenXpp3Reader reader = new MavenXpp3Reader();
Model model = reader.read(new FileReader(pomFile.toFile()));
// 获取编译后的类路径
// Get compiled classpath
Path targetClasses = path.resolve("target/classes");

View File

@ -11,6 +11,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
public class DirectoryPluginLoader
@ -33,25 +34,28 @@ public class DirectoryPluginLoader
// 创建插件专用类加载器
// Create plugin-specific class loader
@SuppressWarnings("resource")
PluginClassLoader classLoader = PluginClassLoaderUtils.createClassLoader(
path,
pluginName,
version
version,
true
);
List<Plugin> plugins = Files.walk(path)
.filter(p -> p.toString().endsWith(".class"))
.filter(p -> !p.toString().contains("$"))
.map(p -> loadClass(classLoader, p))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
try (Stream<Path> pathStream = Files.walk(path)) {
List<Plugin> plugins = pathStream.filter(p -> p.toString().endsWith(".class"))
.filter(p -> !p.toString().contains("$"))
.map(p -> loadClass(classLoader, p))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
// 设置插件的类加载器
// Set class loader for plugins
plugins.forEach(plugin -> plugin.setPluginClassLoader(classLoader));
// 设置插件的类加载器
// Set class loader for plugins
plugins.forEach(plugin -> plugin.setPluginClassLoader(classLoader));
return plugins;
return plugins;
}
}
catch (Exception e) {
log.error("Failed to load plugins from directory: {}", path, e);

View File

@ -73,7 +73,8 @@ public class InjectPluginLoader
PluginClassLoader classLoader = PluginClassLoaderUtils.createClassLoader(
path,
pluginName,
version
version,
true
);
return PluginContextManager.runWithClassLoader(classLoader, () -> {

View File

@ -3,8 +3,10 @@ package io.edurt.datacap.plugin.loader;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
/**
* 插件专用类加载器
@ -13,6 +15,7 @@ import java.net.URLClassLoader;
@Slf4j
public class PluginClassLoader
extends URLClassLoader
implements AutoCloseable
{
@Getter
private final String name;
@ -32,6 +35,7 @@ public class PluginClassLoader
this.pluginVersion = pluginVersion;
this.parentFirst = parentFirst;
this.name = String.join("-", "loader", pluginName.toLowerCase(), pluginVersion.toLowerCase());
log.debug("Created PluginClassLoader for {} with URLs: {}", pluginName, Arrays.toString(urls));
}
@Override
@ -86,4 +90,18 @@ public class PluginClassLoader
}
}
}
@Override
public void close()
throws IOException
{
try {
super.close();
log.debug("Closed PluginClassLoader for plugin: {}", pluginName);
}
catch (IOException e) {
log.error("Error closing PluginClassLoader for plugin: {}", pluginName, e);
throw e;
}
}
}

View File

@ -32,7 +32,7 @@ public class PluginLoaderFactory
* Register a new plugin loader
*
* @param loader 要注册的加载器
* @param loader the loader to register
* the loader to register
*/
public static void registerLoader(PluginLoader loader)
{
@ -68,9 +68,9 @@ public class PluginLoaderFactory
* Get loader by type
*
* @param type 加载器类型
* @param type loader type
* @return 对应的加载器实例如果未找到则返回null
* @return corresponding loader instance, or null if not found
* loader type
* @return 对应的加载器实例如果未找到则返回 null
* corresponding loader instance, or null if not found
*/
public static PluginLoader getLoader(String type)
{
@ -85,9 +85,9 @@ public class PluginLoaderFactory
* Attempt to load plugins using all registered loaders
*
* @param pluginDir 插件目录
* @param pluginDir plugin directory
* plugin directory
* @return 加载的插件模块列表
* @return list of loaded plugin modules
* list of loaded plugin modules
*/
public static List<Plugin> loadPlugins(Path pluginDir)
{
@ -124,7 +124,7 @@ public class PluginLoaderFactory
* Get all registered loader types
*
* @return 加载器类型列表
* @return list of loader types
* list of loader types
*/
public static List<String> getRegisteredTypes()
{

View File

@ -53,7 +53,8 @@ public class PomPluginLoader
PluginClassLoader classLoader = PluginClassLoaderUtils.createClassLoader(
path,
model.getArtifactId(),
version
version,
true
);
Class<?> pluginClass = classLoader.loadClass(mainClass);

View File

@ -45,7 +45,8 @@ public class SpiPluginLoader
PluginClassLoader classLoader = PluginClassLoaderUtils.createClassLoader(
path,
pluginName,
version
version,
true
);
return PluginContextManager.runWithClassLoader(classLoader, () -> {

View File

@ -17,11 +17,11 @@ public class ServiceAnnotationScanner
* Scan classes with @InjectService annotation in the specified package
*
* @param basePackage 基础包路径
* @param basePackage base package path
* base package path
* @param classLoader 类加载器
* @param classLoader class loader
* class loader
* @return 扫描到的服务类集合
* @return scanned service class collection
* scanned service class collection
*/
public static Set<Class<?>> scanServices(String basePackage, ClassLoader classLoader)
{

View File

@ -15,9 +15,9 @@ public class ServiceBindings
* Add service binding
*
* @param serviceType 服务类型必须继承自Service
* @param serviceType service type (must extend Service)
* service type (must extend Service)
* @param implementationType 实现类型必须继承自Service
* @param implementationType implementation type (must extend Service)
* implementation type (must extend Service)
*/
public void addBinding(Class<? extends Service> serviceType, Class<? extends Service> implementationType)
{
@ -29,7 +29,7 @@ public class ServiceBindings
* Get all bindings
*
* @return 服务绑定映射
* @return service binding mapping
* service binding mapping
*/
public Multimap<Class<? extends Service>, Class<? extends Service>> getBindings()
{

View File

@ -22,13 +22,13 @@ public class ServiceSpiLoader
* Load service implementations, supporting both SPI and annotation methods
*
* @param serviceType 服务类型必须继承自Service
* @param serviceType service type (must extend Service)
* service type (must extend Service)
* @param basePackage 扫描注解的基础包路径
* @param basePackage base package path for annotation scanning
* base package path for annotation scanning
* @param classLoader 类加载器
* @param classLoader class loader
* class loader
* @return 服务绑定
* @return service bindings
* service bindings
*/
public static ServiceBindings loadServices(Class<? extends Service> serviceType, String basePackage, ClassLoader classLoader)
{
@ -47,14 +47,20 @@ public class ServiceSpiLoader
if (serviceInterfaces.length == 0) {
// 如果没有指定接口,使用类实现的所有接口
// If no interface is specified, use all interfaces implemented by the class
addServiceBindings(bindings, (Class<? extends Service>) serviceImpl, serviceType);
@SuppressWarnings("unchecked")
Class<? extends Service> impl = (Class<? extends Service>) serviceImpl;
addServiceBindings(bindings, impl, serviceType);
}
else {
// 添加指定的接口绑定
// Add specified interface bindings
for (Class<?> iface : serviceInterfaces) {
if (Service.class.isAssignableFrom(iface) && serviceType.isAssignableFrom(iface)) {
bindings.addBinding((Class<? extends Service>) iface, (Class<? extends Service>) serviceImpl);
@SuppressWarnings("unchecked")
Class<? extends Service> service = (Class<? extends Service>) iface;
@SuppressWarnings("unchecked")
Class<? extends Service> impl = (Class<? extends Service>) serviceImpl;
bindings.addBinding(service, impl);
log.debug("Added annotated binding: {} -> {}", iface.getName(), serviceImpl.getName());
}
}
@ -91,11 +97,11 @@ public class ServiceSpiLoader
* Load service implementations
*
* @param serviceType 服务类型必须继承自Service
* @param serviceType service type (must extend Service)
* service type (must extend Service)
* @return 服务绑定
* @return service bindings
* service bindings
* @throws IllegalArgumentException 如果服务类型不是Service的子类
* @throws IllegalArgumentException if service type is not a Service subclass
* if service type is not a Service subclass
*/
public static ServiceBindings loadServices(Class<? extends Service> serviceType)
{
@ -107,13 +113,13 @@ public class ServiceSpiLoader
* Load service implementations
*
* @param serviceType 服务类型必须继承自Service
* @param serviceType service type (must extend Service)
* service type (must extend Service)
* @param classLoader 类加载器
* @param classLoader class loader
* class loader
* @return 服务绑定
* @return service bindings
* service bindings
* @throws IllegalArgumentException 如果服务类型不是Service的子类
* @throws IllegalArgumentException if service type is not a Service subclass
* if service type is not a Service subclass
*/
public static ServiceBindings loadServices(Class<? extends Service> serviceType, ClassLoader classLoader)
{
@ -206,7 +212,7 @@ public class ServiceSpiLoader
}
}
catch (IOException e) {
log.error("Error loading services for type: " + serviceType.getName(), e);
log.error("Error loading services for type: {}", serviceType.getName(), e);
}
// 记录找到的所有绑定

View File

@ -43,7 +43,7 @@ public class PluginPathUtils
* Find project root directory
*
* @return 项目根目录的Path对象
* @return Path object of project root directory
* Path object of project root directory
*/
public static Path findProjectRoot()
{
@ -140,9 +140,9 @@ public class PluginPathUtils
* Find project root directory from specified path
*
* @param startPath 开始搜索的路径
* @param startPath path to start search from
* @return 项目根目录路径如果未找到返回null
* @return project root path, null if not found
* path to start search from
* @return 项目根目录路径如果未找到返回 null
* project root path, null if not found
*/
private static Path findRootFromPath(Path startPath)
{

View File

@ -17,6 +17,7 @@ import java.util.jar.Attributes;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Version utility for plugin system
@ -146,18 +147,20 @@ public class VersionUtils
// Look for pom.properties in META-INF/maven directory
Path mavenDir = pluginPath.resolve("META-INF/maven");
if (Files.exists(mavenDir)) {
List<Path> pomProperties = Files.walk(mavenDir)
.filter(p -> p.getFileName().toString().equals("pom.properties"))
.collect(Collectors.toList());
try (Stream<Path> pathStream = Files.walk(mavenDir)) {
List<Path> pomProperties = pathStream
.filter(p -> p.getFileName().toString().equals("pom.properties"))
.collect(Collectors.toList());
for (Path propFile : pomProperties) {
Properties props = new Properties();
try (InputStream is = Files.newInputStream(propFile)) {
props.load(is);
String version = props.getProperty("version");
if (isValidVersion(version)) {
log.debug("Found version {} in pom.properties", version);
return version;
for (Path propFile : pomProperties) {
Properties props = new Properties();
try (InputStream is = Files.newInputStream(propFile)) {
props.load(is);
String version = props.getProperty("version");
if (isValidVersion(version)) {
log.debug("Found version {} in pom.properties", version);
return version;
}
}
}
}
@ -176,14 +179,16 @@ public class VersionUtils
{
try {
if (Files.isDirectory(pluginPath)) {
List<Path> jarFiles = Files.walk(pluginPath)
.filter(path -> path.toString().endsWith(".jar"))
.collect(Collectors.toList());
for (Path jarPath : jarFiles) {
String version = readVersionFromPluginJar(jarPath);
if (version != null) {
return version;
try (Stream<Path> pathStream = Files.walk(pluginPath)) {
List<Path> jarFiles = pathStream
.filter(p -> p.toString().endsWith(".jar"))
.collect(Collectors.toList());
for (Path jarPath : jarFiles) {
String version = readVersionFromPluginJar(jarPath);
if (version != null) {
return version;
}
}
}
}