Feature integration seatunnel (#278)

This commit is contained in:
qianmoQ 2023-03-13 20:41:34 +08:00 committed by GitHub
commit 2badcef8ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 320 additions and 11 deletions

View File

@ -9,3 +9,5 @@ spring.datasource.username=root
spring.datasource.password=12345678
datacap.security.secret=DataCapSecretKey
datacap.security.expiration=86400000
# Executor
datacap.executor.seatunnel.home=/opt/lib/seatunnel

View File

@ -45,5 +45,33 @@
"value": [],
"group": "custom"
}
],
"pipelines": [
{
"executor": "Seatunnel",
"type": "FROM",
"fields": [
{
"field": "host",
"required": true
},
{
"field": "database",
"required": true
},
{
"field": "sql",
"required": true
},
{
"field": "username",
"required": true
},
{
"field": "password",
"required": true
}
]
}
]
}

View File

@ -0,0 +1,18 @@
package io.edurt.datacap.server.body;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class PipelineBody
{
private Long from;
private Long to;
private String content;
private String executor;
}

View File

@ -0,0 +1,34 @@
package io.edurt.datacap.server.common;
import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Method;
import java.util.Properties;
public class BeanToPropertiesCommon
{
private BeanToPropertiesCommon()
{}
public static Properties convertBeanToProperties(Object bean)
{
Properties properties = new Properties();
try {
BeanInfo beanInfo = Introspector.getBeanInfo(bean.getClass());
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
String propertyName = propertyDescriptor.getName();
if (!propertyName.equals("class")) {
Method readMethod = propertyDescriptor.getReadMethod();
Object value = readMethod.invoke(bean);
properties.setProperty(propertyName, value.toString());
}
}
}
catch (Exception exception) {
exception.printStackTrace();
}
return properties;
}
}

View File

@ -0,0 +1,26 @@
package io.edurt.datacap.server.controller.user;
import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.service.PipelineService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController()
@RequestMapping(value = "/api/v1/pipeline")
public class PipelineController
{
private final PipelineService pipelineService;
public PipelineController(PipelineService pipelineService)
{
this.pipelineService = pipelineService;
}
@PostMapping(value = "/create")
public Response<Object> create(PipelineBody configure)
{
return pipelineService.submit(configure);
}
}

View File

@ -20,4 +20,5 @@ public class IConfigure
private String name;
private Date supportTime;
private List<IConfigureField> configures;
private List<IConfigureExecutor> pipelines;
}

View File

@ -0,0 +1,19 @@
package io.edurt.datacap.server.plugin.configure;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.List;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class IConfigureExecutor
{
private String executor;
private String type;
private List<IConfigureExecutorField> fields;
}

View File

@ -0,0 +1,16 @@
package io.edurt.datacap.server.plugin.configure;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class IConfigureExecutorField
{
private String field;
private boolean required;
}

View File

@ -0,0 +1,9 @@
package io.edurt.datacap.server.service;
import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.Response;
public interface PipelineService
{
Response<Object> submit(PipelineBody configure);
}

View File

@ -0,0 +1,103 @@
package io.edurt.datacap.server.service.impl;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.BeanToPropertiesCommon;
import io.edurt.datacap.server.common.PluginCommon;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.common.ServiceState;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.plugin.configure.IConfigure;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutor;
import io.edurt.datacap.server.repository.SourceRepository;
import io.edurt.datacap.server.service.PipelineService;
import io.edurt.datacap.spi.executor.Executor;
import io.edurt.datacap.spi.executor.Pipeline;
import io.edurt.datacap.spi.executor.PipelineField;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@Service
public class PipelineServiceImpl
implements PipelineService
{
private final SourceRepository repository;
private final Injector injector;
private final Environment environment;
public PipelineServiceImpl(SourceRepository repository, Injector injector, Environment environment)
{
this.repository = repository;
this.injector = injector;
this.environment = environment;
}
@Override
public Response<Object> submit(PipelineBody configure)
{
Optional<SourceEntity> fromSourceOptional = repository.findById(configure.getFrom());
Optional<SourceEntity> toSourceOptional = repository.findById(configure.getTo());
if (fromSourceOptional.isPresent() && toSourceOptional.isPresent()) {
SourceEntity fromSource = fromSourceOptional.get();
IConfigure fromConfigure = PluginCommon.loadConfigure(fromSource.getProtocol(), fromSource.getType(), fromSource.getType(), environment);
Optional<IConfigureExecutor> fromConfigureExecutor = fromConfigure.getPipelines()
.stream()
.filter(v -> v.getExecutor().equals(configure.getExecutor()))
.findFirst();
if (!fromConfigureExecutor.isPresent()) {
return Response.failure(ServiceState.PLUGIN_NOT_FOUND);
}
SourceEntity toSource = toSourceOptional.get();
IConfigure toConfigure = PluginCommon.loadConfigure(toSource.getProtocol(), toSource.getType(), toSource.getType(), environment);
Optional<IConfigureExecutor> toConfigureExecutor = toConfigure.getPipelines()
.stream()
.filter(v -> v.getExecutor().equals(configure.getExecutor()))
.findFirst();
if (!toConfigureExecutor.isPresent()) {
return Response.failure(ServiceState.PLUGIN_NOT_FOUND);
}
Optional<Executor> executorOptional = injector.getInstance(Key.get(new TypeLiteral<Set<Executor>>() {}))
.stream()
.filter(executor -> executor.name().equals(configure.getExecutor()))
.findFirst();
// FROM source
Properties fromProperties = new Properties();
Properties fromBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(fromSource);
fromConfigureExecutor.get().getFields().forEach(pipelineField -> fromProperties.put(pipelineField.getField(), fromBeanProperties.get(pipelineField.getField())));
PipelineField fromField = PipelineField.builder()
.type(fromSource.getType())
.configure(fromProperties)
.build();
// TO source
Properties toProperties = new Properties();
Properties toBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(toSource);
toConfigureExecutor.get().getFields().forEach(pipelineField -> toProperties.put(pipelineField.getField(), toBeanProperties.get(pipelineField.getField())));
PipelineField toField = PipelineField.builder()
.type(toSource.getType())
.configure(toProperties)
.build();
Pipeline pipeline = Pipeline.builder()
.work(System.getenv("user.dir") + "/" + UUID.randomUUID().toString())
.home(environment.getProperty("datacap.executor.seatunnel.home"))
.from(fromField)
.to(toField)
.build();
executorOptional.get().start(pipeline);
return Response.success(null);
}
return Response.failure(ServiceState.SOURCE_NOT_FOUND);
}
}

View File

@ -11,6 +11,12 @@ export class UserQuestion
type: string;
}
export class UserQuestionItem
{
content: string;
isSelf: boolean;
}
export class ThirdConfigure
{
token = '';

View File

@ -1,6 +1,11 @@
<template>
<div class="layout">
<Layout>
<RouterLink to="/profile/chatgpt">
<Alert style="margin-bottom: 0px;" banner closable type="success">
Support ChatGPT
</Alert>
</RouterLink>
<LayoutHeader @changeLanguage="setLangCondition($event)"/>
<LayoutContent style="background-color: #FFFFFF; padding: 12px; min-height: 500px"/>
<LayoutFooter/>

View File

@ -11,13 +11,28 @@
</template>
<div>
<Layout>
<Content style="padding: 24px 50px;">
<Input v-model="userQuestionResponse" :disabled="!userInfo?.thirdConfigure?.chatgpt?.token" type="textarea" :rows="10"/>
<Content style="padding: 0px 0px 0px 10px">
<div ref="scrollDiv" style="height: 300px; max-height: 300px; overflow: auto; background-color: #f5f7f9">
<List item-layout="vertical">
<ListItem v-for="item in userQuestionItems" :key="item">
<ListItemMeta style="margin-bottom: 0px;">
<template #title>
{{ item.isSelf ? username : 'ChatGPT' }}
</template>
<template #avatar>
<Avatar v-if="item.isSelf" icon="ios-person"></Avatar>
<Avatar v-else icon="md-ionitron" style="background-color: #87d068;"></Avatar>
</template>
</ListItemMeta>
<v-md-preview :text="item.content"/>
</ListItem>
</List>
</div>
</Content>
<Footer>
<Footer style="background-color: #FFFFFF;">
<Row>
<Col span="20">
<Input v-model="userQuestion.question" :disabled="!userInfo?.thirdConfigure?.chatgpt?.token" type="textarea" :autosize="{minRows: 2,maxRows: 5}"/>
<Input v-model="userQuestionContext" :disabled="!userInfo?.thirdConfigure?.chatgpt?.token" type="textarea" :autosize="{minRows: 2,maxRows: 5}"/>
</Col>
<Col span="2" offset="1">
<Button :disabled="!userInfo?.thirdConfigure?.chatgpt?.token" type="primary" icon="md-send"
@ -50,15 +65,20 @@
<script lang="ts">
import {defineComponent} from 'vue';
import UserService from "@/services/UserService";
import {ThirdConfigure, User, UserQuestion} from '@/model/User';
import {ThirdConfigure, User, UserQuestion, UserQuestionItem} from '@/model/User';
import Common from "@/common/Common";
import {AuthResponse} from "@/model/AuthResponse";
const userQuestion = new UserQuestion();
userQuestion.type = 'ChatGPT';
export default defineComponent({
setup()
{
let username;
const authUser = JSON.parse(localStorage.getItem(Common.token) || '{}') as AuthResponse;
if (authUser) {
username = authUser.username;
}
return {
userQuestion
username
}
},
created()
@ -72,12 +92,14 @@ export default defineComponent({
visibleModel: false,
startChatLoading: false,
userInfo: null as User,
userQuestionResponse: null
userQuestionContext: null,
userQuestionItems: null as UserQuestionItem[]
}
},
methods: {
handlerInitialize()
{
this.userQuestionItems = [];
this.loading = true;
UserService.getInfo()
.then(response => {
@ -118,11 +140,24 @@ export default defineComponent({
},
handlerStartChat()
{
const userQuestion = new UserQuestion();
userQuestion.type = 'ChatGPT';
userQuestion.question = this.userQuestionContext;
const question = new UserQuestionItem();
question.content = this.userQuestionContext;
question.isSelf = true;
this.userQuestionItems.push(question);
this.handlerGoBottom();
this.startChatLoading = true;
UserService.startChat(this.userQuestion)
this.userQuestionContext = null;
UserService.startChat(userQuestion)
.then(response => {
if (response.status) {
this.userQuestionResponse = response.data;
const answer = new UserQuestionItem();
answer.content = response.data.toString();
answer.isSelf = false;
this.userQuestionItems.push(answer);
this.handlerGoBottom();
}
else {
this.$Message.error(response.message);
@ -131,6 +166,13 @@ export default defineComponent({
.finally(() => {
this.startChatLoading = false;
});
},
handlerGoBottom()
{
let scrollElem = this.$refs.scrollDiv;
setTimeout(() => {
scrollElem.scrollTo({top: scrollElem.scrollHeight, behavior: 'smooth'});
}, 0);
}
}
});