[Core] [Executor] Support the execution of pipeline to record logs to the local

This commit is contained in:
qianmoQ 2023-03-14 22:04:41 +08:00
parent 6c795141c9
commit f3536b62ca
53 changed files with 702 additions and 674 deletions

1
.gitignore vendored
View File

@ -41,3 +41,4 @@ tmp/
.vscode
dist/datacap/
list
*.configure

View File

@ -96,6 +96,11 @@
<artifactId>chatgpt-java</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-spi</artifactId>

View File

@ -10,4 +10,6 @@ spring.datasource.password=12345678
datacap.security.secret=DataCapSecretKey
datacap.security.expiration=86400000
# Executor
### If this directory is not set, the system will get the project root directory to build the data subdirectory
datacap.executor.data=
datacap.executor.seatunnel.home=/opt/lib/seatunnel

View File

@ -1,50 +0,0 @@
{
"name": "Default",
"supportTime": "2022-09-22",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 1,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"group": "authorization"
},
{
"field": "password",
"type": "String",
"group": "authorization"
},
{
"field": "database",
"type": "String",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [],
"group": "custom"
}
]
}

View File

@ -0,0 +1,33 @@
name: Default
supportTime: '2022-09-22'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 1
message: port is a required field, please be sure to enter
- field: username
type: String
group: authorization
- field: password
type: String
group: authorization
- field: database
type: String
message: database is a required field, please be sure to enter
group: advanced
- field: configures
type: Array
value: [ ]
group: custom

View File

@ -1,28 +0,0 @@
{
"name": "ClickHouse",
"supportTime": "2022-11-09",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 8123,
"message": "port is a required field, please be sure to enter"
}
]
}

View File

@ -0,0 +1,58 @@
name: ClickHouse
supportTime: '2022-11-09'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 9000
message: port is a required field, please be sure to enter
pipelines:
- executor: Seatunnel
type: SOURCE
fields:
- field: host
origin: host
required: true
- field: database
origin: database
required: true
- field: sql
origin: context
required: true
- field: username
origin: username
required: true
- field: password
origin: password
required: true
- executor: Seatunnel
type: SINK
fields:
- field: host
origin: host
required: true
- field: database
origin: database
required: true
- field: sql
origin: context
required: true
- field: username
origin: username
required: true
- field: password
origin: password
required: true

View File

@ -1,77 +0,0 @@
{
"name": "ClickHouse",
"supportTime": "2022-09-22",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 9000,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"group": "authorization"
},
{
"field": "password",
"type": "String",
"group": "authorization"
},
{
"field": "database",
"type": "String",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [],
"group": "custom"
}
],
"pipelines": [
{
"executor": "Seatunnel",
"type": "FROM",
"fields": [
{
"field": "host",
"required": true
},
{
"field": "database",
"required": true
},
{
"field": "sql",
"required": true
},
{
"field": "username",
"required": true
},
{
"field": "password",
"required": true
}
]
}
]
}

View File

@ -0,0 +1,71 @@
name: ClickHouse
supportTime: '2022-09-22'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 9000
message: port is a required field, please be sure to enter
- field: username
type: String
group: authorization
- field: password
type: String
group: authorization
- field: database
type: String
group: advanced
- field: configures
type: Array
value: [ ]
group: custom
pipelines:
- executor: Seatunnel
type: SOURCE
fields:
- field: host
required: true
- field: database
required: true
override: true
input: true
- field: sql
origin: context
required: true
override: true
input: true
- field: username
required: true
- field: password
required: false
- executor: Seatunnel
type: SINK
fields:
- field: host
required: true
- field: database
override: true
required: true
- field: table
override: true
required: true
- field: username
required: true
- field: password
required: true
- field: fields
override: true
required: true

View File

@ -1,36 +0,0 @@
{
"name": "DuckDB",
"supportTime": "2023-02-20",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "/root",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 0,
"message": "port is a required field, please be sure to enter"
},
{
"field": "database",
"type": "String",
"required": true,
"value": "local",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
}
]
}

View File

@ -0,0 +1,25 @@
name: DuckDB
supportTime: '2023-02-20'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: "/root"
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 0
message: port is a required field, please be sure to enter
- field: database
type: String
required: true
value: local
message: database is a required field, please be sure to enter
group: advanced

View File

@ -1,62 +0,0 @@
{
"name": "MySQL",
"supportTime": "2022-09-19",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 3306,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"group": "authorization"
},
{
"field": "password",
"type": "String",
"group": "authorization"
},
{
"field": "ssl",
"type": "Boolean",
"group": "authorization"
},
{
"field": "database",
"type": "String",
"required": true,
"value": "default",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [
{
"field": "useOldAliasMetadataBehavior",
"value": true
}
],
"group": "custom"
}
]
}

View File

@ -0,0 +1,40 @@
name: MySQL
supportTime: '2022-09-19'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 3306
message: port is a required field, please be sure to enter
- field: username
type: String
group: authorization
- field: password
type: String
group: authorization
- field: ssl
type: Boolean
group: authorization
- field: database
type: String
required: true
value: default
message: database is a required field, please be sure to enter
group: advanced
- field: configures
type: Array
value:
- field: useOldAliasMetadataBehavior
value: true
group: custom

View File

@ -1,59 +0,0 @@
{
"name": "OceanBase",
"supportTime": "2022-11-30",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 2881,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "password",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "database",
"type": "String",
"required": true,
"value": "default",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [
{
"field": "useOldAliasMetadataBehavior",
"value": true
}
],
"group": "custom"
}
]
}

View File

@ -0,0 +1,39 @@
name: OceanBase
supportTime: '2022-11-30'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 2881
message: port is a required field, please be sure to enter
- field: username
type: String
required: true
group: authorization
- field: password
type: String
required: true
group: authorization
- field: database
type: String
required: true
value: default
message: database is a required field, please be sure to enter
group: advanced
- field: configures
type: Array
value:
- field: useOldAliasMetadataBehavior
value: true
group: custom

View File

@ -1,56 +0,0 @@
{
"name": "Presto",
"supportTime": "2022-09-25",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 8080,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"group": "authorization"
},
{
"field": "password",
"type": "String",
"group": "authorization"
},
{
"field": "database",
"type": "String",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
},
{
"field": "catalog",
"type": "String",
"value": "",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [],
"group": "custom"
}
]
}

View File

@ -0,0 +1,37 @@
name: Presto
supportTime: '2022-09-25'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 8080
message: port is a required field, please be sure to enter
- field: username
type: String
group: authorization
- field: password
type: String
group: authorization
- field: database
type: String
message: database is a required field, please be sure to enter
group: advanced
- field: catalog
type: String
value: ''
group: advanced
- field: configures
type: Array
value: [ ]
group: custom

View File

@ -1,33 +0,0 @@
{
"name": "Redis",
"supportTime": "2022-09-26",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 6379,
"message": "port is a required field, please be sure to enter"
},
{
"field": "password",
"type": "String",
"group": "authorization"
}
]
}

View File

@ -0,0 +1,22 @@
name: Redis
supportTime: '2022-09-26'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 6379
message: port is a required field, please be sure to enter
- field: password
type: String
group: authorization

View File

@ -1,52 +0,0 @@
{
"name": "ClickHouse",
"supportTime": "2023-01-29",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 80,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "password",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "database",
"type": "String",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
},
{
"field": "configures",
"type": "Array",
"value": [],
"group": "custom"
}
]
}

View File

@ -0,0 +1,35 @@
name: Snowflake
supportTime: '2023-01-29'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 80
message: port is a required field, please be sure to enter
- field: username
type: String
required: true
group: authorization
- field: password
type: String
required: true
group: authorization
- field: database
type: String
message: database is a required field, please be sure to enter
group: advanced
- field: configures
type: Array
value: [ ]
group: custom

View File

@ -1,46 +0,0 @@
{
"name": "YDB",
"supportTime": "2023-01-30",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 2136,
"message": "port is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"group": "authorization"
},
{
"field": "password",
"type": "String",
"group": "authorization"
},
{
"field": "database",
"type": "String",
"required": true,
"value": "local",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
}
]
}

View File

@ -0,0 +1,31 @@
name: YDB
supportTime: '2023-01-30'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 2136
message: port is a required field, please be sure to enter
- field: username
type: String
group: authorization
- field: password
type: String
group: authorization
- field: database
type: String
required: true
value: local
message: database is a required field, please be sure to enter
group: advanced

View File

@ -1,39 +0,0 @@
{
"name": "Alioss",
"supportTime": "2023-02-23",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "https://oss-cn-regison.aliyuncs.com",
"message": "host is a required field, please be sure to enter"
},
{
"field": "username",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "password",
"type": "String",
"required": true,
"group": "authorization"
},
{
"field": "database",
"type": "String",
"required": true,
"value": "default",
"message": "database is a required field, please be sure to enter",
"group": "advanced"
}
]
}

View File

@ -0,0 +1,26 @@
name: Alioss
supportTime: '2023-02-23'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: https://oss-cn-regison.aliyuncs.com
message: host is a required field, please be sure to enter
- field: username
type: String
required: true
group: authorization
- field: password
type: String
required: true
group: authorization
- field: database
type: String
required: true
value: default
message: database is a required field, please be sure to enter
group: advanced

View File

@ -1,19 +0,0 @@
{
"name": "Kafka",
"supportTime": "2023-03-06",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1:9092",
"message": "host is a required field, please be sure to enter"
}
]
}

View File

@ -0,0 +1,12 @@
name: Kafka
supportTime: '2023-03-06'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1:9092
message: host is a required field, please be sure to enter

View File

@ -1,33 +0,0 @@
{
"name": "Redis",
"supportTime": "2022-12-01",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1",
"message": "host is a required field, please be sure to enter"
},
{
"field": "port",
"type": "Number",
"required": true,
"min": 1,
"max": 65535,
"value": 6379,
"message": "port is a required field, please be sure to enter"
},
{
"field": "password",
"type": "String",
"group": "authorization"
}
]
}

View File

@ -0,0 +1,22 @@
name: Redis
supportTime: '2022-12-01'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1
message: host is a required field, please be sure to enter
- field: port
type: Number
required: true
min: 1
max: 65535
value: 6379
message: port is a required field, please be sure to enter
- field: password
type: String
group: authorization

View File

@ -1,19 +0,0 @@
{
"name": "Zookeeper",
"supportTime": "2023-02-07",
"configures": [
{
"field": "name",
"type": "String",
"required": true,
"message": "name is a required field, please be sure to enter"
},
{
"field": "host",
"type": "String",
"required": true,
"value": "127.0.0.1:2181",
"message": "host is a required field, please be sure to enter"
}
]
}

View File

@ -0,0 +1,12 @@
name: Zookeeper
supportTime: '2023-02-07'
configures:
- field: name
type: String
required: true
message: name is a required field, please be sure to enter
- field: host
type: String
required: true
value: 127.0.0.1:2181
message: host is a required field, please be sure to enter

View File

@ -11,8 +11,8 @@ import lombok.ToString;
@AllArgsConstructor
public class PipelineBody
{
private Long from;
private Long to;
private PipelineFieldBody from;
private PipelineFieldBody to;
private String content;
private String executor;
}

View File

@ -0,0 +1,18 @@
package io.edurt.datacap.server.body;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Properties;
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class PipelineFieldBody
{
private Long id;
private Properties configures;
}

View File

@ -1,5 +1,7 @@
package io.edurt.datacap.server.common;
import org.apache.commons.lang3.ObjectUtils;
import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
@ -19,10 +21,16 @@ public class BeanToPropertiesCommon
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
String propertyName = propertyDescriptor.getName();
if (!propertyName.equals("class")) {
Method readMethod = propertyDescriptor.getReadMethod();
Object value = readMethod.invoke(bean);
properties.setProperty(propertyName, value.toString());
if (!propertyDescriptor.getPropertyType().getName().startsWith("io\\.edurt\\.datacap")) {
if (!propertyName.equals("class")) {
Method readMethod = propertyDescriptor.getReadMethod();
if (!readMethod.getGenericReturnType().getTypeName().contains("io.edurt.datacap")) {
Object value = readMethod.invoke(bean);
if (ObjectUtils.isNotEmpty(value)) {
properties.setProperty(propertyName, value.toString());
}
}
}
}
}
}

View File

@ -1,6 +1,8 @@
package io.edurt.datacap.server.common;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -11,12 +13,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import java.io.File;
import java.util.Optional;
import java.util.Set;
@Slf4j
public class PluginCommon
{
private static final ObjectMapper yamlFactory = new ObjectMapper(new YAMLFactory());
private PluginCommon() {}
public static Optional<Plugin> getPluginByName(Injector injector, String pluginName)
@ -31,6 +36,7 @@ public class PluginCommon
return pluginOptional;
}
@Deprecated
public static IConfigure loadConfigure(String type, String plugin, String resource, Environment environment)
{
String root = environment.getProperty("spring.config.location");
@ -55,4 +61,31 @@ public class PluginCommon
}
return configure;
}
public static IConfigure loadYamlConfigure(String type, String plugin, String resource, Environment environment)
{
String root = environment.getProperty("spring.config.location");
if (!resource.endsWith(".yaml")) {
resource = resource + ".yaml";
}
String path = root + String.format("plugins/%s/%s", type.toLowerCase(), resource.toLowerCase());
File file = new File(path);
if (!file.exists()) {
log.warn("Plugin {} type {} configuration file {} not found, load default configuration file", plugin, type, resource);
file = new File(root + "plugins/default.yaml");
}
else {
log.info("Load plugin {} type {} resource {} configure file path {}", plugin, type, resource, path);
}
yamlFactory.findAndRegisterModules();
IConfigure configure = null;
try {
configure = yamlFactory.readValue(file, IConfigure.class);
}
catch (Exception e) {
log.error("Format configuration file, it may be a bug, please submit issues to solve it. plugin {} type {} resource {} configure file path {} message ", plugin, type, resource, path, e);
Preconditions.checkArgument(StringUtils.isNotEmpty(path), "Format configuration file, it may be a bug, please submit issues to solve it.");
}
return configure;
}
}

View File

@ -4,6 +4,8 @@ public enum ServiceState
{
SOURCE_NOT_FOUND(1001, "Source does not exist"),
SOURCE_NOT_SUPPORTED(1002, "The current data source is not supported"),
SOURCE_NOT_SUPPORTED_PIPELINE(1003, "The current data source does not support pipeline"),
SOURCE_NOT_SUPPORTED_PIPELINE_TYPE(1004, "The current data source does not support pipeline type"),
PLUGIN_NOT_FOUND(2001, "Plugin dose not exists"),
PLUGIN_EXECUTE_FAILED(2002, "Plugin execute failed"),
PLUGIN_ONLY_ONE_TEMPLATE(2003, "Plug-ins support only templates with the same name"),

View File

@ -4,6 +4,7 @@ import io.edurt.datacap.server.body.PipelineBody;
import io.edurt.datacap.server.common.Response;
import io.edurt.datacap.server.service.PipelineService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -19,7 +20,7 @@ public class PipelineController
}
@PostMapping(value = "/create")
public Response<Object> create(PipelineBody configure)
public Response<Object> create(@RequestBody PipelineBody configure)
{
return pipelineService.submit(configure);
}

View File

@ -14,6 +14,6 @@ import java.util.List;
public class IConfigureExecutor
{
private String executor;
private String type;
private IConfigurePipelineType type;
private List<IConfigureExecutorField> fields;
}

View File

@ -12,5 +12,12 @@ import lombok.ToString;
public class IConfigureExecutorField
{
private String field;
// The default is equal to the filed value, and the custom column name uses
private String origin;
private boolean required;
/**
* If the flag is true, it means that the field is extracted through user configuration, and the default data will be discarded
*/
private boolean override;
private boolean input; // Is it an input parameter
}

View File

@ -0,0 +1,7 @@
package io.edurt.datacap.server.plugin.configure;
public enum IConfigurePipelineType
{
SOURCE,
SINK
}

View File

@ -11,20 +11,32 @@ import io.edurt.datacap.server.common.ServiceState;
import io.edurt.datacap.server.entity.SourceEntity;
import io.edurt.datacap.server.plugin.configure.IConfigure;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutor;
import io.edurt.datacap.server.plugin.configure.IConfigureExecutorField;
import io.edurt.datacap.server.plugin.configure.IConfigurePipelineType;
import io.edurt.datacap.server.repository.SourceRepository;
import io.edurt.datacap.server.security.UserDetailsService;
import io.edurt.datacap.server.service.PipelineService;
import io.edurt.datacap.spi.executor.Executor;
import io.edurt.datacap.spi.executor.Pipeline;
import io.edurt.datacap.spi.executor.PipelineField;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@Service
@Slf4j
public class PipelineServiceImpl
implements PipelineService
{
@ -42,28 +54,39 @@ public class PipelineServiceImpl
@Override
public Response<Object> submit(PipelineBody configure)
{
Optional<SourceEntity> fromSourceOptional = repository.findById(configure.getFrom());
Optional<SourceEntity> toSourceOptional = repository.findById(configure.getTo());
Optional<SourceEntity> fromSourceOptional = repository.findById(configure.getFrom().getId());
Optional<SourceEntity> toSourceOptional = repository.findById(configure.getTo().getId());
if (fromSourceOptional.isPresent() && toSourceOptional.isPresent()) {
SourceEntity fromSource = fromSourceOptional.get();
IConfigure fromConfigure = PluginCommon.loadConfigure(fromSource.getProtocol(), fromSource.getType(), fromSource.getType(), environment);
IConfigure fromConfigure = PluginCommon.loadYamlConfigure(fromSource.getProtocol(), fromSource.getType(), fromSource.getType(), environment);
// Check if Pipeline is supported
if (ObjectUtils.isEmpty(fromConfigure.getPipelines())) {
String message = String.format("Source %s is not supported pipeline, type %s", fromSource.getId(), fromSource.getType());
return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, message);
}
Optional<IConfigureExecutor> fromConfigureExecutor = fromConfigure.getPipelines()
.stream()
.filter(v -> v.getExecutor().equals(configure.getExecutor()))
.filter(v -> v.getExecutor().equals(configure.getExecutor()) && v.getType().equals(IConfigurePipelineType.SOURCE))
.findFirst();
if (!fromConfigureExecutor.isPresent()) {
return Response.failure(ServiceState.PLUGIN_NOT_FOUND);
String message = String.format("Source %s type %s is not supported pipeline type %s", fromSource.getId(), fromSource.getType(), IConfigurePipelineType.SOURCE);
return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, message);
}
SourceEntity toSource = toSourceOptional.get();
IConfigure toConfigure = PluginCommon.loadConfigure(toSource.getProtocol(), toSource.getType(), toSource.getType(), environment);
IConfigure toConfigure = PluginCommon.loadYamlConfigure(toSource.getProtocol(), toSource.getType(), toSource.getType(), environment);
if (ObjectUtils.isEmpty(toConfigure.getPipelines())) {
String message = String.format("Source %s is not supported pipeline, type %s", toSource.getId(), toSource.getType());
return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE, message);
}
Optional<IConfigureExecutor> toConfigureExecutor = toConfigure.getPipelines()
.stream()
.filter(v -> v.getExecutor().equals(configure.getExecutor()))
.filter(v -> v.getExecutor().equals(configure.getExecutor()) && v.getType().equals(IConfigurePipelineType.SINK))
.findFirst();
if (!toConfigureExecutor.isPresent()) {
return Response.failure(ServiceState.PLUGIN_NOT_FOUND);
String message = String.format("Source %s type %s is not supported pipeline type %s", toSource.getId(), toSource.getType(), IConfigurePipelineType.SINK);
return Response.failure(ServiceState.SOURCE_NOT_SUPPORTED_PIPELINE_TYPE, message);
}
Optional<Executor> executorOptional = injector.getInstance(Key.get(new TypeLiteral<Set<Executor>>() {}))
@ -72,32 +95,94 @@ public class PipelineServiceImpl
.findFirst();
// FROM source
Properties fromProperties = new Properties();
Properties fromBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(fromSource);
fromConfigureExecutor.get().getFields().forEach(pipelineField -> fromProperties.put(pipelineField.getField(), fromBeanProperties.get(pipelineField.getField())));
Properties fromOriginProperties = configure.getFrom().getConfigures();
fromOriginProperties.setProperty("context", configure.getContent());
Properties fromProperties = this.merge(fromSource, fromConfigureExecutor.get().getFields(), fromOriginProperties);
Set<String> fromOptions = new HashSet<>();
fromConfigureExecutor.get()
.getFields()
.stream()
.filter(v -> v.isRequired())
.forEach(v -> fromOptions.add(v.getField()));
PipelineField fromField = PipelineField.builder()
.type(fromSource.getType())
.configure(fromProperties)
.supportOptions(fromOptions)
.build();
// TO source
Properties toProperties = new Properties();
Properties toBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(toSource);
toConfigureExecutor.get().getFields().forEach(pipelineField -> toProperties.put(pipelineField.getField(), toBeanProperties.get(pipelineField.getField())));
Properties toOriginProperties = configure.getTo().getConfigures();
Properties toProperties = this.merge(toSource, toConfigureExecutor.get().getFields(), toOriginProperties);
Set<String> toOptions = new HashSet<>();
toConfigureExecutor.get()
.getFields()
.stream()
.filter(v -> v.isRequired())
.forEach(v -> toOptions.add(v.getField()));
PipelineField toField = PipelineField.builder()
.type(toSource.getType())
.configure(toProperties)
.supportOptions(toOptions)
.build();
String executorHome = environment.getProperty("datacap.executor.data");
if (StringUtils.isEmpty(executorHome)) {
executorHome = String.join(File.separator, System.getProperty("user.dir"), "data");
}
String username = UserDetailsService.getUser().getUsername();
String pipelineHome = DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMddHHmmssSSS");
String work = String.join(File.separator, executorHome, username, pipelineHome);
String pipelineName = String.join("_", username, configure.getExecutor().toLowerCase(), pipelineHome);
try {
FileUtils.forceMkdir(new File(work));
}
catch (Exception e) {
log.warn("Failed to create temporary directory", e);
}
Pipeline pipeline = Pipeline.builder()
.work(System.getenv("user.dir") + "/" + UUID.randomUUID().toString())
.home(environment.getProperty("datacap.executor.seatunnel.home"))
.work(work)
.home(environment.getProperty(String.format("datacap.executor.%s.home", configure.getExecutor().toLowerCase(Locale.ROOT))))
.pipelineName(pipelineName)
.username(UserDetailsService.getUser().getUsername())
.from(fromField)
.to(toField)
.timeout(600)
.build();
executorOptional.get().start(pipeline);
return Response.success(null);
}
return Response.failure(ServiceState.SOURCE_NOT_FOUND);
}
private Properties merge(SourceEntity entity, List<IConfigureExecutorField> fields, Properties configure)
{
Properties properties = new Properties();
Properties convertBeanProperties = BeanToPropertiesCommon.convertBeanToProperties(entity);
for (IConfigureExecutorField field : fields) {
if (field.isOverride()) {
this.setProperty(field, properties, configure);
}
else {
this.setProperty(field, properties, convertBeanProperties);
}
}
return properties;
}
private void setProperty(IConfigureExecutorField field, Properties properties, Properties configure)
{
Object value = "";
if (ObjectUtils.isNotEmpty(field.getOrigin())) {
if (ObjectUtils.isNotEmpty(configure.get(field.getOrigin()))) {
value = configure.get(field.getOrigin());
}
}
else {
if (ObjectUtils.isNotEmpty(configure.get(field.getField()))) {
value = configure.get(field.getField());
}
}
properties.put(field.getField(), value);
}
}

View File

@ -123,7 +123,7 @@ public class SourceServiceImpl
entity.setName(plugin.name());
entity.setDescription(plugin.description());
entity.setType(plugin.type().name());
entity.setConfigure(PluginCommon.loadConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment));
entity.setConfigure(PluginCommon.loadYamlConfigure(plugin.type().name(), plugin.name(), plugin.name(), environment));
List<PluginEntity> plugins = pluginMap.get(plugin.type().name());
if (ObjectUtils.isEmpty(plugins)) {
plugins = new ArrayList<>();
@ -168,7 +168,7 @@ public class SourceServiceImpl
}
// Check configure
IConfigure iConfigure = PluginCommon.loadConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
IConfigure iConfigure = PluginCommon.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) {
return Response.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH);
}
@ -201,7 +201,7 @@ public class SourceServiceImpl
}
// Check configure
IConfigure iConfigure = PluginCommon.loadConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
IConfigure iConfigure = PluginCommon.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
if (ObjectUtils.isEmpty(iConfigure) || iConfigure.getConfigures().size() != configure.getConfigure().getConfigures().size()) {
return Response.failure(ServiceState.PLUGIN_CONFIGURE_MISMATCH);
}
@ -237,7 +237,7 @@ public class SourceServiceImpl
configure.setName(entity.getType());
configure.setType(entity.getProtocol());
// Load default configure
IConfigure iConfigure = PluginCommon.loadConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
IConfigure iConfigure = PluginCommon.loadYamlConfigure(configure.getType(), configure.getName(), configure.getName(), environment);
configure.setConfigure(IConfigureCommon.preparedConfigure(iConfigure, entity));
entity.setSchema(iConfigure);
return Response.success(entity);

View File

@ -9,12 +9,12 @@ import org.junit.Test;
justification = "I prefer to suppress these FindBugs warnings")
public class PluginCommonTest
{
private String resource = "default.json";
private String resource = "default.yaml";
@Test
public void loadConfigure()
{
IConfigure configure = PluginCommon.loadConfigure("JDBC", "MySQL", resource, null);
IConfigure configure = PluginCommon.loadYamlConfigure("JDBC", "MySQL", resource, null);
Assert.assertNotNull(configure);
}
}

View File

@ -5,7 +5,7 @@ import org.junit.Test;
public class ResourceCommonTest
{
private String resource = "default.json";
private String resource = "default.yaml";
private String resource2 = "empty.json";
@Test

View File

@ -1,2 +0,0 @@
{
}

View File

@ -7,8 +7,6 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.UUID;
@Data
@Builder
@ToString
@ -20,14 +18,9 @@ public class Pipeline
{
private String username;
private String home;
private String pipelineName;
private String work;
private String file;
private long timeout = 600;
private PipelineField from;
private PipelineField to;
public String getFile()
{
return String.join("/", this.getWork(), String.format("%s.configure", UUID.randomUUID().toString()));
}
}

View File

@ -5,6 +5,7 @@ import lombok.Data;
import lombok.ToString;
import java.util.Properties;
import java.util.Set;
@Data
@Builder
@ -13,4 +14,5 @@ public class PipelineField
{
private String type;
private Properties configure;
private Set<String> supportOptions;
}

View File

@ -33,7 +33,8 @@ public class SeatunnelExecutor
public void before(Pipeline configure)
{
JsonFactory jsonFactory = new JsonFactory();
try (JsonGenerator jsonGenerator = jsonFactory.createGenerator(new File(configure.getFile()), JsonEncoding.UTF8)) {
String workFile = String.join(File.separator, configure.getWork(), configure.getPipelineName() + ".configure");
try (JsonGenerator jsonGenerator = jsonFactory.createGenerator(new File(workFile), JsonEncoding.UTF8)) {
jsonGenerator.setPrettyPrinter(new DefaultPrettyPrinter());
jsonGenerator.writeStartObject();
this.writeChild("source", jsonGenerator, configure.getFrom());
@ -51,14 +52,16 @@ public class SeatunnelExecutor
before(configure);
SeaTunnelCommander commander = new SeaTunnelCommander(
configure.getHome() + "/bin",
String.join("/", configure.getWork(), configure.getFile()),
configure.getUsername());
LoggerExecutor loggerExecutor = new LogbackExecutor(configure.getWork(), configure.getUsername());
ShellConfigure shellConfigure = new ShellConfigure();
shellConfigure.setDirectory(configure.getWork());
shellConfigure.setLoggerExecutor(loggerExecutor);
shellConfigure.setCommand(Arrays.asList(commander.toCommand()));
shellConfigure.setTimeout(configure.getTimeout());
String.join(File.separator, configure.getWork(), configure.getPipelineName() + ".configure"),
configure.getPipelineName());
LoggerExecutor loggerExecutor = new LogbackExecutor(configure.getWork(), configure.getPipelineName() + ".log");
ShellConfigure shellConfigure = ShellConfigure.builder()
.directory(configure.getWork())
.loggerExecutor(loggerExecutor)
.command(Arrays.asList(commander.toCommand()))
.timeout(configure.getTimeout())
.username(configure.getUsername())
.build();
ShellCommander shellExecutor = new ProcessBuilderCommander(shellConfigure);
ShellResponse response = shellExecutor.execute();
if (!response.getSuccessful()) {
@ -81,5 +84,6 @@ public class SeatunnelExecutor
}
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndObject();
}
}

View File

@ -2,26 +2,15 @@ package io.edurt.datacap.executor.connector;
import io.edurt.datacap.spi.executor.PipelineField;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class ConnectorClickHouse
extends Connector
{
private static final Set<String> supportOptions = new HashSet<String>()
{{
add("host");
add("database");
add("sql");
add("username");
add("password");
}};
public ConnectorClickHouse(ConnectorType type, PipelineField configure)
{
super(type, configure, supportOptions);
super(type, configure, configure.getSupportOptions());
}
@Override

View File

@ -5,10 +5,20 @@ import io.edurt.datacap.spi.executor.PipelineField;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
public class SeatunnelExecutorTest
{
private static final Set<String> supportOptions = new HashSet<String>()
{{
add("host");
add("database");
add("sql");
add("username");
add("password");
}};
private Pipeline pipeline;
@Before
@ -22,9 +32,11 @@ public class SeatunnelExecutorTest
properties.put("password", "123456");
properties.put("database", "default");
properties.put("sql", "SHOW DATABASES");
PipelineField from = PipelineField.builder()
.type("ClickHouse")
.configure(properties)
.supportOptions(supportOptions)
.build();
pipeline.setFrom(from);
pipeline.setTo(PipelineField.builder().type("Console").build());

View File

@ -2,6 +2,7 @@ package io.edurt.datacap.lib.shell;
import io.edurt.datacap.lib.logger.LoggerExecutor;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@ -9,6 +10,7 @@ import lombok.ToString;
import java.util.List;
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor

View File

@ -34,6 +34,7 @@ public class ProcessBuilderCommander
{
LoggerExecutor loggerExecutor = this.configure.getLoggerExecutor();
Logger logger = (Logger) loggerExecutor.getLogger();
logger.info("Execute pipeline on username {}", configure.getUsername());
ShellResponse shellResponse = new ShellResponse();
shellResponse.setSuccessful(Boolean.TRUE);
List<String> command = new ArrayList<>();
@ -49,8 +50,9 @@ public class ProcessBuilderCommander
logger.info("Work directory {}", configure.getDirectory());
Map<String, String> environment = this.getEnvironment();
builder.environment().putAll(environment);
logger.info("========== container environment loading ==========");
logger.info("========== container environment start ==========");
environment.keySet().forEach(key -> logger.info("Container environment {}={}", key, environment.get(key)));
logger.info("========== container environment end ==========");
Process process = null;
try {
@ -79,6 +81,9 @@ public class ProcessBuilderCommander
}
}
shellResponse.setCode(process.exitValue());
if (process.exitValue() > 0) {
shellResponse.setSuccessful(Boolean.FALSE);
}
}
catch (Exception ex) {
logger.error("Execute failed ", ex);
@ -89,8 +94,8 @@ public class ProcessBuilderCommander
process.destroy();
}
logger.info("Execute response {}", shellResponse);
logger.info("Execute end destroy logger components");
loggerExecutor.destroy();
logger.info("Execute end destroy logger components successful");
}
return shellResponse;
}