feat(plugin): adapter executor -- seatunnel

This commit is contained in:
qianmoQ 2024-11-27 11:18:28 +08:00
parent d19e01ea48
commit 8625ba7b9a
18 changed files with 167 additions and 119 deletions

View File

@ -21,7 +21,10 @@
<div class="flex items-center space-x-4 justify-between">
<!-- Logo and Name -->
<div class="flex flex-col items-center space-y-2 justify-between">
<ShadcnAvatar class="bg-transparent border p-1.5" :src="child.logo" :alt="child.i18nFormat ? $t(child.label) : child.label"/>
<ShadcnAvatar class="bg-transparent"
size="large"
:src="child.logo"
:alt="child.i18nFormat ? $t(child.label) : child.label"/>
<ShadcnText type="h6">
{{ child.i18nFormat ? $t(child.label) : child.label }}

View File

@ -11,13 +11,14 @@
</parent>
<artifactId>datacap-executor-seatunnel</artifactId>
<description>DataCap - Executor seatunnel</description>
<description>DataCap - Executor - Seatunnel</description>
<dependencies>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-executor-spi</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>

View File

@ -41,7 +41,7 @@ public class SeaTunnelCommander
public String toCommand()
{
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
buffer.append(this.bin);
buffer.append("/");
buffer.append(startScript);

View File

@ -1,15 +0,0 @@
package io.edurt.datacap.executor.seatunnel;
import com.google.inject.multibindings.Multibinder;
import io.edurt.datacap.executor.Executor;
import io.edurt.datacap.executor.ExecutorModule;
public class SeatunnelExecutorModule
extends ExecutorModule
{
protected void configure()
{
Multibinder<Executor> plugin = Multibinder.newSetBinder(this.binder(), Executor.class);
plugin.addBinding().to(SeatunnelExecutor.class);
}
}

View File

@ -0,0 +1,14 @@
package io.edurt.datacap.executor.seatunnel;
import io.edurt.datacap.plugin.Plugin;
import io.edurt.datacap.plugin.PluginType;
public class SeatunnelExecutorPlugin
extends Plugin
{
@Override
public PluginType getType()
{
return PluginType.EXECUTOR;
}
}

View File

@ -5,8 +5,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.executor.Executor;
import io.edurt.datacap.executor.common.RunProtocol;
import io.edurt.datacap.executor.ExecutorService;
import io.edurt.datacap.executor.common.RunState;
import io.edurt.datacap.executor.configure.ExecutorConfigure;
import io.edurt.datacap.executor.configure.ExecutorRequest;
@ -30,17 +29,13 @@ import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import static java.util.Objects.requireNonNull;
@SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"})
@Slf4j
public class SeatunnelExecutor
implements Executor
public class SeatunnelExecutorService
implements ExecutorService
{
@Override
public String name()
{
return "Seatunnel";
}
@Override
public ExecutorResponse start(ExecutorRequest request)
{
@ -104,7 +99,7 @@ public class SeatunnelExecutor
jsonGenerator.writeFieldName(type);
if (ObjectUtils.isNotEmpty(configure)) {
String protocol = configure.getType();
if (configure.getProtocol().equals(RunProtocol.JDBC)) {
if (requireNonNull(configure.getProtocol()).equalsIgnoreCase("jdbc")) {
protocol = "Jdbc";
}
Connector factory = ConnectorFactory.createFormatter(ConnectorType.valueOf(protocol), configure);

View File

@ -1 +0,0 @@
io.edurt.datacap.executor.seatunnel.SeatunnelExecutorModule

View File

@ -0,0 +1 @@
io.edurt.datacap.executor.seatunnel.SeatunnelExecutorPlugin

View File

@ -0,0 +1 @@
io.edurt.datacap.executor.seatunnel.SeatunnelExecutorService

View File

@ -1,44 +0,0 @@
package io.edurt.datacap.executor.seatunnel;
import io.edurt.datacap.executor.configure.ExecutorConfigure;
import io.edurt.datacap.executor.configure.ExecutorRequest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class SeatunnelExecutorTest
{
private static final Set<String> supportOptions = new HashSet<String>()
{{
add("host");
add("database");
add("sql");
add("username");
add("password");
}};
private ExecutorRequest request;
@Before
public void before()
{
Properties properties = new Properties();
properties.put("host", "127.0.0.1");
properties.put("username", "root");
properties.put("password", "123456");
properties.put("database", "default");
properties.put("sql", "SHOW DATABASES");
ExecutorConfigure input = new ExecutorConfigure("ClickHouse", properties, supportOptions);
request = new ExecutorRequest(System.getProperty("user.dir"), input, input);
}
@Test
public void start()
{
Assert.assertNotNull(new SeatunnelExecutor().start(request));
}
}

View File

@ -1,35 +0,0 @@
package io.edurt.datacap.executor.seatunnel.connector;
import com.google.common.collect.Sets;
import io.edurt.datacap.executor.configure.ExecutorConfigure;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertNotNull;
public class ConnectorFactoryTest
{
private ConnectorType type = ConnectorType.ClickHouse;
private ExecutorConfigure input;
@Before
public void before()
{
Properties properties = new Properties();
properties.put("host", "127.0.0.1");
properties.put("username", "root");
properties.put("password", "123456");
properties.put("database", "default");
properties.put("sql", "SHOW DATABASES");
input = new ExecutorConfigure("ClickHouse", properties, Sets.newHashSet());
}
@Test
public void createFormatter()
{
Connector factory = ConnectorFactory.createFormatter(type, this.input);
assertNotNull(factory.formatToMap());
}
}

View File

@ -6,13 +6,6 @@ import io.edurt.datacap.plugin.Service
interface ExecutorService : Service
{
fun name(): String
{
return this.javaClass
.simpleName
.removeSuffix("Executor")
}
fun start(request: ExecutorRequest): ExecutorResponse
fun stop(request: ExecutorRequest): ExecutorResponse

View File

@ -27,12 +27,32 @@ data class ExecutorConfigure(
type: String?,
configure: Properties?,
supportOptions: Set<String> = mutableSetOf()
) : this(type, configure, supportOptions, RunProtocol.NONE.name, null, null, null, null, null)
) : this(
type,
configure,
supportOptions,
RunProtocol.NONE.name,
null,
null,
null,
null,
null
)
constructor(
type: String,
configure: Properties,
supportOptions: Set<String> = mutableSetOf(),
protocol: String
) : this(type, configure, supportOptions, protocol, null, null, null, null, null)
) : this(
type,
configure,
supportOptions,
protocol,
null,
null,
null,
null,
null
)
}

View File

@ -78,7 +78,7 @@
<!-- <module>plugin/datacap-plugin-influxdb</module>-->
<module>executor/datacap-executor-spi</module>
<module>executor/datacap-executor-local</module>
<!-- <module>executor/datacap-executor-seatunnel</module>-->
<module>executor/datacap-executor-seatunnel</module>
<!-- <module>shaded/datacap-shaded-ydb</module>-->
<!-- <module>shaded/datacap-shaded-pinot</module>-->
<module>fs/datacap-fs-spi</module>

View File

@ -43,6 +43,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-executor-seatunnel</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,29 @@
package io.edurt.datacap.test.seatunnel
import io.edurt.datacap.plugin.PluginConfigure
import io.edurt.datacap.plugin.PluginManager
import io.edurt.datacap.plugin.utils.PluginPathUtils
import org.junit.Assert.assertNotNull
import org.junit.Test
class SeatunnelExecutorPluginTest
{
private val pluginManager: PluginManager
private val pluginName = "SeatunnelExecutor"
init
{
val projectRoot = PluginPathUtils.findProjectRoot()
val config = PluginConfigure.builder()
.pluginsDir(projectRoot.resolve("executor/datacap-executor-seatunnel"))
.build()
pluginManager = PluginManager(config).apply { start() }
}
@Test
fun test()
{
assertNotNull(pluginManager.getPlugin(pluginName).get())
}
}

View File

@ -0,0 +1,41 @@
package io.edurt.datacap.test.seatunnel
import io.edurt.datacap.executor.configure.ExecutorConfigure
import io.edurt.datacap.executor.configure.ExecutorRequest
import io.edurt.datacap.executor.seatunnel.SeatunnelExecutorService
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import java.util.*
class SeatunnelExecutorServiceTest
{
private var request: ExecutorRequest? = null
private var supportOptions: MutableSet<String> = mutableSetOf(
"host",
"database",
"sql",
"username",
"password"
)
@Before
fun before()
{
val properties = Properties()
properties["host"] = "127.0.0.1"
properties["username"] = "root"
properties["password"] = "123456"
properties["database"] = "default"
properties["sql"] = "SHOW DATABASES"
val input = ExecutorConfigure("ClickHouse", properties, supportOptions)
request = ExecutorRequest(System.getProperty("user.dir"), input, input)
}
@Test
fun testStart()
{
Assert.assertNotNull(request?.let { SeatunnelExecutorService().start(it) })
}
}

View File

@ -0,0 +1,39 @@
package io.edurt.datacap.test.seatunnel.connector
import com.google.common.collect.Sets
import io.edurt.datacap.executor.configure.ExecutorConfigure
import io.edurt.datacap.executor.seatunnel.connector.ConnectorFactory
import io.edurt.datacap.executor.seatunnel.connector.ConnectorType
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import org.slf4j.LoggerFactory.getLogger
import java.util.*
class ConnectorFactoryTest
{
private val log = getLogger(this::class.java)
private val type = ConnectorType.ClickHouse
private var input: ExecutorConfigure? = null
@Before
fun before()
{
val properties = Properties()
properties["host"] = "127.0.0.1"
properties["username"] = "root"
properties["password"] = "123456"
properties["database"] = "default"
properties["sql"] = "SHOW DATABASES"
input = ExecutorConfigure("ClickHouse", properties, Sets.newHashSet())
}
@Test
fun createFormatter()
{
val factory = ConnectorFactory.createFormatter(type, this.input)
val map = factory.formatToMap()
log.info("Format: {}", map)
Assert.assertNotNull(factory.formatToMap())
}
}