[Pipeline] Add some fn (#427)

This commit is contained in:
qianmoQ 2023-09-06 18:56:46 +08:00 committed by GitHub
commit f67bd32c19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 316 additions and 35 deletions

View File

@ -210,6 +210,7 @@ Open the DingTalk (left) or WeChat(right) software and scan the following QR cod
[![Jetbrains](https://img.shields.io/badge/Development-Jetbrains-brightgreen?style=flat-square)](https://www.jetbrains.com/)
[![App Store](https://img.shields.io/badge/App%20Store-Rainbond-brightgreen?style=flat-square)](https://www.rainbond.com/)
[![View UI Plus](https://img.shields.io/badge/UI-View%20UI%20Plus-brightgreen?style=flat-square)](https://www.iviewui.com/view-ui-plus)
## Installation and Configuration

View File

@ -210,6 +210,7 @@ DataCap 可以从任何使用 SQL 的数据存储或数据引擎ClickHouse、
[![Jetbrains](https://img.shields.io/badge/Development-Jetbrains-brightgreen?style=flat-square)](https://www.jetbrains.com/)
[![App Store](https://img.shields.io/badge/App%20Store-Rainbond-brightgreen?style=flat-square)](https://www.rainbond.com/)
[![View UI Plus](https://img.shields.io/badge/UI-View%20UI%20Plus-brightgreen?style=flat-square)](https://www.iviewui.com/view-ui-plus)
## 安装和配置

View File

@ -75,3 +75,5 @@ datacap.cache.expiration=5
datacap.pipeline.maxRunning=100
# Maximum number of pipeline queue
datacap.pipeline.maxQueue=200
# When the service is restarted, the status of the pipeline with status RUNNING is reset.
datacap.pipeline.reset=STOPPED

View File

@ -6,6 +6,7 @@ import io.edurt.datacap.service.body.PipelineBody;
import io.edurt.datacap.service.entity.PipelineEntity;
import io.edurt.datacap.service.repository.PipelineRepository;
import io.edurt.datacap.service.service.PipelineService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
@ -13,6 +14,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController()
@RequestMapping(value = "/api/v1/pipeline")
public class PipelineController
@ -39,4 +42,10 @@ public class PipelineController
{
return service.stop(id);
}
@GetMapping(value = "/log/{id}")
public CommonResponse<List<String>> log(@PathVariable Long id)
{
return service.log(id);
}
}

View File

@ -0,0 +1,71 @@
package io.edurt.datacap.server.runner;
import com.clearspring.analytics.util.Lists;
import io.edurt.datacap.service.entity.PipelineEntity;
import io.edurt.datacap.service.repository.PipelineRepository;
import io.edurt.datacap.service.service.PipelineService;
import io.edurt.datacap.spi.executor.PipelineState;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
import java.util.List;
@Slf4j
@Component
public class PipelineResetRunner
implements ApplicationRunner
{
@Value(value = "${datacap.pipeline.reset}")
private String state;
private final EntityManager entityManager;
private final PipelineRepository repository;
private final PipelineService service;
public PipelineResetRunner(EntityManager entityManager, PipelineRepository repository, PipelineService service)
{
this.entityManager = entityManager;
this.repository = repository;
this.service = service;
}
@Override
@Transactional
public void run(ApplicationArguments args)
{
PipelineState resetState = EnumUtils.getEnum(PipelineState.class, state);
if (resetState == null) {
log.warn("Pipeline reset state is not supported: [ {} ] Reset to [ STOPPED ]", state);
resetState = PipelineState.STOPPED;
}
List<PipelineState> states = Lists.newArrayList();
states.add(PipelineState.RUNNING);
states.add(PipelineState.CREATED);
states.add(PipelineState.QUEUE);
List<PipelineEntity> pipelines = repository.findAllByStateIn(states);
for (PipelineEntity pipeline : pipelines) {
log.info("Reset pipeline [ {} ] user [ {} ]", pipeline.getName(), pipeline.getUser().getUsername());
pipeline.setState(resetState);
// If it is reset to the running state, execute it again
if (resetState.equals(PipelineState.RUNNING)
|| resetState.equals(PipelineState.CREATED)
|| resetState.equals(PipelineState.QUEUE)) {
service.submit(pipeline.entityToBody());
}
else {
pipeline.setMessage("Reset to [ " + resetState + " ]");
repository.save(pipeline);
}
entityManager.flush();
}
entityManager.clear();
}
}

View File

@ -3,6 +3,8 @@ package io.edurt.datacap.service.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIncludeProperties;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.service.body.PipelineBody;
import io.edurt.datacap.service.body.PipelineFieldBody;
import io.edurt.datacap.service.converter.PropertiesConverter;
import io.edurt.datacap.spi.executor.PipelineState;
import lombok.Data;
@ -91,7 +93,6 @@ public class PipelineEntity
@JoinTable(name = "pipeline_user_relation",
joinColumns = @JoinColumn(name = "pipeline_id"),
inverseJoinColumns = @JoinColumn(name = "user_id"))
@JsonIncludeProperties(value = {"id", "username"})
private UserEntity user;
public long getElapsed()
@ -104,4 +105,28 @@ public class PipelineEntity
}
return elapsed;
}
/**
* Converts a PipelineEntity object to a PipelineBody object.
*
* @return the converted PipelineBody object
*/
public PipelineBody entityToBody()
{
PipelineFieldBody from = PipelineFieldBody.builder()
.id(this.getFrom().getId())
.configures(this.getFromConfigures())
.build();
PipelineFieldBody to = PipelineFieldBody.builder()
.id(this.getTo().getId())
.configures(this.getToConfigures())
.build();
return PipelineBody.builder()
.id(this.getId())
.content(this.getContent())
.from(from)
.to(to)
.executor(this.getExecutor())
.build();
}
}

View File

@ -2,12 +2,17 @@ package io.edurt.datacap.service.repository;
import io.edurt.datacap.service.entity.PipelineEntity;
import io.edurt.datacap.service.entity.UserEntity;
import io.edurt.datacap.spi.executor.PipelineState;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface PipelineRepository
extends PagingAndSortingRepository<PipelineEntity, Long>
{
Page<PipelineEntity> findAllByUser(UserEntity user, Pageable pageable);
List<PipelineEntity> findAllByStateIn(List<PipelineState> state);
}

View File

@ -3,10 +3,14 @@ package io.edurt.datacap.service.service;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.service.body.PipelineBody;
import java.util.List;
public interface PipelineService
extends BaseService
{
CommonResponse<Object> submit(PipelineBody configure);
CommonResponse<Boolean> stop(Long id);
CommonResponse<List<String>> log(Long id);
}

View File

@ -1,5 +1,6 @@
package io.edurt.datacap.service.service.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -8,7 +9,6 @@ import io.edurt.datacap.common.enums.ServiceState;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.common.utils.BeanToPropertiesUtils;
import io.edurt.datacap.service.body.PipelineBody;
import io.edurt.datacap.service.body.PipelineFieldBody;
import io.edurt.datacap.service.common.PluginUtils;
import io.edurt.datacap.service.configure.IConfigure;
import io.edurt.datacap.service.configure.IConfigureExecutor;
@ -28,13 +28,17 @@ import io.edurt.datacap.spi.executor.PipelineResponse;
import io.edurt.datacap.spi.executor.PipelineState;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.core.env.Environment;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
import java.util.Locale;
@ -229,7 +233,7 @@ public class PipelineServiceImpl
.poll();
if (ObjectUtils.isNotEmpty(entity)) {
log.info("Extract tasks from the queue [ {} ] and start execution", entity.getName());
this.submit(entityToBody(entity));
this.submit(entity.entityToBody());
}
else {
log.warn("The queue extraction task failed. Please check whether there are tasks in the queue. The current number of queue tasks: [ {} ]", initializer.getTaskQueue().size());
@ -261,6 +265,7 @@ public class PipelineServiceImpl
service.shutdownNow();
}
entity.setState(PipelineState.STOPPED);
entity.setMessage(null);
this.repository.save(entity);
// Consume queue data for execution
@ -268,12 +273,70 @@ public class PipelineServiceImpl
PipelineEntity queueEntity = initializer.getTaskQueue()
.poll();
if (queueEntity != null) {
this.submit(entityToBody(queueEntity));
this.submit(entity.entityToBody());
}
}
return CommonResponse.success(true);
}
/**
* Retrieves the log for a given pipeline ID.
*
* @param id the ID of the pipeline
* @return a response containing the log lines as a list of strings
*/
@Override
public CommonResponse<List<String>> log(Long id)
{
Optional<PipelineEntity> pipelineOptional = this.repository.findById(id);
if (!pipelineOptional.isPresent()) {
return CommonResponse.failure(String.format("Pipeline [ %s ] not found", id));
}
PipelineEntity entity = pipelineOptional.get();
if (entity.getState().equals(PipelineState.QUEUE)
|| entity.getState().equals(PipelineState.CREATED)) {
return CommonResponse.failure(String.format("Pipeline [ %s ] is not running", entity.getName()));
}
List<String> lines = Lists.newArrayList();
try (FileInputStream stream = new FileInputStream(new File(String.format("%s/%s.log", entity.getWork(), entity.getName())))) {
IOUtils.readLines(stream, "UTF-8")
.forEach(lines::add);
}
catch (IOException e) {
log.error("Failed to read pipeline [ {} ] log ", entity.getName(), e);
}
return CommonResponse.success(lines);
}
@Override
public CommonResponse<Long> deleteById(PagingAndSortingRepository repository, Long id)
{
Optional<PipelineEntity> pipelineOptional = this.repository.findById(id);
if (!pipelineOptional.isPresent()) {
return CommonResponse.failure(String.format("Pipeline [ %s ] not found", id));
}
PipelineEntity entity = pipelineOptional.get();
log.info("Delete pipeline [ {} ] work home", entity.getName());
try {
FileUtils.deleteDirectory(new File(entity.getWork()));
}
catch (IOException e) {
log.warn("Failed to delete pipeline [ {} ] work home {}", entity.getName(), e);
}
return PipelineService.super.deleteById(repository, id);
}
/**
* Merges the properties of a source entity with a list of fields and a configuration.
*
* @param entity the source entity
* @param fields the list of fields
* @param configure the configuration
* @return the merged properties
*/
private Properties merge(SourceEntity entity, List<IConfigureExecutorField> fields, Properties configure)
{
Properties properties = new Properties();
@ -289,6 +352,13 @@ public class PipelineServiceImpl
return properties;
}
/**
* Sets the property value for the given field.
*
* @param field the field to set the property value for
* @param properties the properties object to store the property
* @param configure the configuration properties object
*/
private void setProperty(IConfigureExecutorField field, Properties properties, Properties configure)
{
Object value = "None";
@ -310,29 +380,4 @@ public class PipelineServiceImpl
}
properties.put(field.getField(), value);
}
/**
* Converts a PipelineEntity object to a PipelineBody object.
*
* @param entity the PipelineEntity object to convert
* @return the converted PipelineBody object
*/
private PipelineBody entityToBody(PipelineEntity entity)
{
PipelineFieldBody from = PipelineFieldBody.builder()
.id(entity.getFrom().getId())
.configures(entity.getFromConfigures())
.build();
PipelineFieldBody to = PipelineFieldBody.builder()
.id(entity.getTo().getId())
.configures(entity.getToConfigures())
.build();
return PipelineBody.builder()
.id(entity.getId())
.content(entity.getContent())
.from(from)
.to(to)
.executor(entity.getExecutor())
.build();
}
}

View File

@ -18,6 +18,7 @@
"@types/watermark-dom": "^2.3.1",
"ag-grid-community": "^29.3.5",
"ag-grid-vue3": "^29.3.5",
"ansi_up": "^6.0.2",
"axios": "^0.27.2",
"core-js": "^3.8.3",
"echarts": "^5.4.0",

View File

@ -5,7 +5,8 @@ export default {
success: 'Success',
failure: 'Failure',
stop: 'Stopped',
timeout: 'Timeout'
timeout: 'Timeout',
queue: 'Queue'
},
delete: {
deleteTip1: 'This action cannot be undone.',

View File

@ -5,7 +5,8 @@ export default {
success: '运行成功',
failure: '运行失败',
stop: '已停止',
timeout: '运行超时'
timeout: '运行超时',
queue: '排队中'
},
delete: {
deleteTip1: '此操作无法撤消.',

View File

@ -21,6 +21,11 @@ class PipelineService
return new HttpCommon().post(`${baseUrl}/submit`, configure);
}
logger(id: number): Promise<ResponseModel>
{
return new HttpCommon().get(`${baseUrl}/log/${id}`);
}
getByName<T>(name: string): Promise<ResponseModel>
{
return Promise.resolve(undefined);

View File

@ -48,13 +48,23 @@
<Tooltip :content="$t('common.error')"
transfer>
<Button shape="circle"
:disabled="row.state !== 'FAILURE'"
:disabled="row.state !== 'FAILURE' && !(row.state == 'STOPPED' && row.message)"
type="error"
size="small"
icon="md-bug"
@click="handlerVisibleMarkdownPreview(row.message, true)">
</Button>
</Tooltip>
<Tooltip :content="$t('common.log')"
transfer>
<Button shape="circle"
:disabled="row.state === 'QUEUE' || row.state === 'CREATED'"
type="primary"
size="small"
icon="md-bulb"
@click="handlerLogger(row, true)">
</Button>
</Tooltip>
<Tooltip :content="$t('common.stop')"
transfer>
<Button shape="circle"
@ -113,6 +123,12 @@
:info="info"
@close="handlerStop(null, false)">
</StopPipeline>
<LoggerPipeline v-if="logger"
:is-visible="logger"
:info="info"
@close="handlerLogger(null, false)">
</LoggerPipeline>
</div>
</template>
@ -128,13 +144,14 @@ import MarkdownPreview from "@/components/common/MarkdownPreview.vue";
import DeletePipeline from "@/views/admin/pipeline/DeletePipeline.vue";
import DetailsPipeline from "@/views/admin/pipeline/DetailPipeline.vue";
import StopPipeline from "@/views/admin/pipeline/StopPipeline.vue";
import LoggerPipeline from "@/views/admin/pipeline/components/LoggerPipeline.vue";
const filter: Filter = new Filter();
const pagination: Pagination = PaginationBuilder.newInstance();
export default defineComponent({
name: 'UserPipelineHome',
components: {StopPipeline, DetailsPipeline, DeletePipeline, MarkdownPreview},
components: {LoggerPipeline, StopPipeline, DetailsPipeline, DeletePipeline, MarkdownPreview},
setup()
{
const i18n = useI18n();
@ -157,7 +174,8 @@ export default defineComponent({
info: null,
// Pipeline detail
detail: false,
stopped: false
stopped: false,
logger: false
}
},
created()
@ -229,6 +247,11 @@ export default defineComponent({
this.handlerInitialize(this.filter);
}
},
handlerLogger(row: any, isOpen: boolean)
{
this.logger = isOpen
this.info = row
},
getStateText(origin: string): string
{
return getText(this.i18n, origin);

View File

@ -109,6 +109,8 @@ const getText = (i18n: any, origin: string): string => {
return i18n.t('pipeline.common.stop');
case 'TIMEOUT':
return i18n.t('pipeline.common.timeout');
case 'QUEUE':
return i18n.t('pipeline.common.queue');
default:
return origin;
}

View File

@ -0,0 +1,85 @@
<template>
<div>
<Modal v-model="visible"
:title="title"
width="80%"
:closable="false"
:maskClosable="false">
<Scroll>
<div v-for="(log, index) in logs" :key="index">
<div v-html="log" style="margin-bottom: 5px; font-size: 16px"></div>
</div>
</Scroll>
<template #footer>
<Button @click="handlerCancel()">
{{ $t('common.cancel') }}
</Button>
</template>
</Modal>
</div>
</template>
<script lang="ts">
import PipelineService from '@/services/user/PipelineService';
import {AnsiUp} from 'ansi_up';
import {defineComponent} from 'vue';
export default defineComponent({
name: 'LoggerPipeline',
props: {
isVisible: {
type: Boolean,
default: () => false
},
info: {
type: Object,
default: () => null
}
},
created()
{
this.handlerInitialize();
},
data()
{
return {
title: null,
logs: []
}
},
methods: {
handlerInitialize()
{
this.title = `[ ${this.info.name} ] ${this.$t('common.log')}`
PipelineService.logger(this.info.id)
.then(response => {
if (response.status) {
const ansi_up = new AnsiUp()
const array = response.data
for (const i in array) {
this.logs[i] = ansi_up.ansi_to_html(array[i])
}
}
else {
this.$Message.error(response.message);
}
})
},
handlerCancel()
{
this.visible = false;
}
},
computed: {
visible: {
get(): boolean
{
return this.isVisible
},
set(value: boolean)
{
this.$emit('close', value)
}
}
},
});
</script>

View File

@ -76,7 +76,7 @@ public class LogbackExecutor
PatternLayoutEncoder encoder = new PatternLayoutEncoder();
ENCODER_CONTAINER.put(this.name, encoder);
encoder.setContext(context);
String pattern = "%date %level [%thread] %logger{10} [%file:%line] %msg%n";
String pattern = "%date %highlight(%-5level) %boldMagenta([%thread]) %cyan([%file:%line]) %msg%n";
encoder.setPattern(pattern);
encoder.setCharset(Charset.forName("utf-8"));
encoder.start();