diff --git a/README.md b/README.md index 9a6859ac..905e10ee 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,7 @@ Open the DingTalk (left) or WeChat(right) software and scan the following QR cod [![Jetbrains](https://img.shields.io/badge/Development-Jetbrains-brightgreen?style=flat-square)](https://www.jetbrains.com/) [![App Store](https://img.shields.io/badge/App%20Store-Rainbond-brightgreen?style=flat-square)](https://www.rainbond.com/) +[![View UI Plus](https://img.shields.io/badge/UI-View%20UI%20Plus-brightgreen?style=flat-square)](https://www.iviewui.com/view-ui-plus) ## Installation and Configuration diff --git a/README.zh_CN.md b/README.zh_CN.md index a48ccab9..435b44d6 100644 --- a/README.zh_CN.md +++ b/README.zh_CN.md @@ -210,6 +210,7 @@ DataCap 可以从任何使用 SQL 的数据存储或数据引擎(ClickHouse、 [![Jetbrains](https://img.shields.io/badge/Development-Jetbrains-brightgreen?style=flat-square)](https://www.jetbrains.com/) [![App Store](https://img.shields.io/badge/App%20Store-Rainbond-brightgreen?style=flat-square)](https://www.rainbond.com/) +[![View UI Plus](https://img.shields.io/badge/UI-View%20UI%20Plus-brightgreen?style=flat-square)](https://www.iviewui.com/view-ui-plus) ## 安装和配置 diff --git a/configure/etc/conf/application.properties b/configure/etc/conf/application.properties index f0b7a4dc..dbc276a8 100644 --- a/configure/etc/conf/application.properties +++ b/configure/etc/conf/application.properties @@ -75,3 +75,5 @@ datacap.cache.expiration=5 datacap.pipeline.maxRunning=100 # Maximum number of pipeline queue datacap.pipeline.maxQueue=200 +# When the service is restarted, the status of the pipeline with status RUNNING is reset. +datacap.pipeline.reset=STOPPED diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/PipelineController.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/PipelineController.java index fe03f71d..e98750cc 100644 --- a/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/PipelineController.java +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/controller/user/PipelineController.java @@ -6,6 +6,7 @@ import io.edurt.datacap.service.body.PipelineBody; import io.edurt.datacap.service.entity.PipelineEntity; import io.edurt.datacap.service.repository.PipelineRepository; import io.edurt.datacap.service.service.PipelineService; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; @@ -13,6 +14,8 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + @RestController() @RequestMapping(value = "/api/v1/pipeline") public class PipelineController @@ -39,4 +42,10 @@ public class PipelineController { return service.stop(id); } + + @GetMapping(value = "/log/{id}") + public CommonResponse> log(@PathVariable Long id) + { + return service.log(id); + } } diff --git a/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/PipelineResetRunner.java b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/PipelineResetRunner.java new file mode 100644 index 00000000..d7a96926 --- /dev/null +++ b/core/datacap-server/src/main/java/io/edurt/datacap/server/runner/PipelineResetRunner.java @@ -0,0 +1,71 @@ +package io.edurt.datacap.server.runner; + +import com.clearspring.analytics.util.Lists; +import io.edurt.datacap.service.entity.PipelineEntity; +import io.edurt.datacap.service.repository.PipelineRepository; +import io.edurt.datacap.service.service.PipelineService; +import io.edurt.datacap.spi.executor.PipelineState; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.EnumUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.persistence.EntityManager; +import javax.transaction.Transactional; + +import java.util.List; + +@Slf4j +@Component +public class PipelineResetRunner + implements ApplicationRunner +{ + @Value(value = "${datacap.pipeline.reset}") + private String state; + + private final EntityManager entityManager; + private final PipelineRepository repository; + private final PipelineService service; + + public PipelineResetRunner(EntityManager entityManager, PipelineRepository repository, PipelineService service) + { + this.entityManager = entityManager; + this.repository = repository; + this.service = service; + } + + @Override + @Transactional + public void run(ApplicationArguments args) + { + PipelineState resetState = EnumUtils.getEnum(PipelineState.class, state); + if (resetState == null) { + log.warn("Pipeline reset state is not supported: [ {} ] Reset to [ STOPPED ]", state); + resetState = PipelineState.STOPPED; + } + + List states = Lists.newArrayList(); + states.add(PipelineState.RUNNING); + states.add(PipelineState.CREATED); + states.add(PipelineState.QUEUE); + List pipelines = repository.findAllByStateIn(states); + for (PipelineEntity pipeline : pipelines) { + log.info("Reset pipeline [ {} ] user [ {} ]", pipeline.getName(), pipeline.getUser().getUsername()); + pipeline.setState(resetState); + // If it is reset to the running state, execute it again + if (resetState.equals(PipelineState.RUNNING) + || resetState.equals(PipelineState.CREATED) + || resetState.equals(PipelineState.QUEUE)) { + service.submit(pipeline.entityToBody()); + } + else { + pipeline.setMessage("Reset to [ " + resetState + " ]"); + repository.save(pipeline); + } + entityManager.flush(); + } + entityManager.clear(); + } +} diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/PipelineEntity.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/PipelineEntity.java index b3335f9e..85d4179a 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/PipelineEntity.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/entity/PipelineEntity.java @@ -3,6 +3,8 @@ package io.edurt.datacap.service.entity; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIncludeProperties; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.edurt.datacap.service.body.PipelineBody; +import io.edurt.datacap.service.body.PipelineFieldBody; import io.edurt.datacap.service.converter.PropertiesConverter; import io.edurt.datacap.spi.executor.PipelineState; import lombok.Data; @@ -91,7 +93,6 @@ public class PipelineEntity @JoinTable(name = "pipeline_user_relation", joinColumns = @JoinColumn(name = "pipeline_id"), inverseJoinColumns = @JoinColumn(name = "user_id")) - @JsonIncludeProperties(value = {"id", "username"}) private UserEntity user; public long getElapsed() @@ -104,4 +105,28 @@ public class PipelineEntity } return elapsed; } + + /** + * Converts a PipelineEntity object to a PipelineBody object. + * + * @return the converted PipelineBody object + */ + public PipelineBody entityToBody() + { + PipelineFieldBody from = PipelineFieldBody.builder() + .id(this.getFrom().getId()) + .configures(this.getFromConfigures()) + .build(); + PipelineFieldBody to = PipelineFieldBody.builder() + .id(this.getTo().getId()) + .configures(this.getToConfigures()) + .build(); + return PipelineBody.builder() + .id(this.getId()) + .content(this.getContent()) + .from(from) + .to(to) + .executor(this.getExecutor()) + .build(); + } } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/PipelineRepository.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/PipelineRepository.java index 62815096..1d118a6b 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/PipelineRepository.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/repository/PipelineRepository.java @@ -2,12 +2,17 @@ package io.edurt.datacap.service.repository; import io.edurt.datacap.service.entity.PipelineEntity; import io.edurt.datacap.service.entity.UserEntity; +import io.edurt.datacap.spi.executor.PipelineState; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.repository.PagingAndSortingRepository; +import java.util.List; + public interface PipelineRepository extends PagingAndSortingRepository { Page findAllByUser(UserEntity user, Pageable pageable); + + List findAllByStateIn(List state); } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/PipelineService.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/PipelineService.java index d116e0d5..6f077a7f 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/PipelineService.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/PipelineService.java @@ -3,10 +3,14 @@ package io.edurt.datacap.service.service; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.service.body.PipelineBody; +import java.util.List; + public interface PipelineService extends BaseService { CommonResponse submit(PipelineBody configure); CommonResponse stop(Long id); + + CommonResponse> log(Long id); } diff --git a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java index 7a56b216..a7d0f75d 100644 --- a/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java +++ b/core/datacap-service/src/main/java/io/edurt/datacap/service/service/impl/PipelineServiceImpl.java @@ -1,5 +1,6 @@ package io.edurt.datacap.service.service.impl; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.inject.Injector; import com.google.inject.Key; @@ -8,7 +9,6 @@ import io.edurt.datacap.common.enums.ServiceState; import io.edurt.datacap.common.response.CommonResponse; import io.edurt.datacap.common.utils.BeanToPropertiesUtils; import io.edurt.datacap.service.body.PipelineBody; -import io.edurt.datacap.service.body.PipelineFieldBody; import io.edurt.datacap.service.common.PluginUtils; import io.edurt.datacap.service.configure.IConfigure; import io.edurt.datacap.service.configure.IConfigureExecutor; @@ -28,13 +28,17 @@ import io.edurt.datacap.spi.executor.PipelineResponse; import io.edurt.datacap.spi.executor.PipelineState; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.core.env.Environment; +import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.stereotype.Service; import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.sql.Timestamp; import java.util.List; import java.util.Locale; @@ -229,7 +233,7 @@ public class PipelineServiceImpl .poll(); if (ObjectUtils.isNotEmpty(entity)) { log.info("Extract tasks from the queue [ {} ] and start execution", entity.getName()); - this.submit(entityToBody(entity)); + this.submit(entity.entityToBody()); } else { log.warn("The queue extraction task failed. Please check whether there are tasks in the queue. The current number of queue tasks: [ {} ]", initializer.getTaskQueue().size()); @@ -261,6 +265,7 @@ public class PipelineServiceImpl service.shutdownNow(); } entity.setState(PipelineState.STOPPED); + entity.setMessage(null); this.repository.save(entity); // Consume queue data for execution @@ -268,12 +273,70 @@ public class PipelineServiceImpl PipelineEntity queueEntity = initializer.getTaskQueue() .poll(); if (queueEntity != null) { - this.submit(entityToBody(queueEntity)); + this.submit(entity.entityToBody()); } } return CommonResponse.success(true); } + /** + * Retrieves the log for a given pipeline ID. + * + * @param id the ID of the pipeline + * @return a response containing the log lines as a list of strings + */ + @Override + public CommonResponse> log(Long id) + { + Optional pipelineOptional = this.repository.findById(id); + if (!pipelineOptional.isPresent()) { + return CommonResponse.failure(String.format("Pipeline [ %s ] not found", id)); + } + + PipelineEntity entity = pipelineOptional.get(); + if (entity.getState().equals(PipelineState.QUEUE) + || entity.getState().equals(PipelineState.CREATED)) { + return CommonResponse.failure(String.format("Pipeline [ %s ] is not running", entity.getName())); + } + + List lines = Lists.newArrayList(); + try (FileInputStream stream = new FileInputStream(new File(String.format("%s/%s.log", entity.getWork(), entity.getName())))) { + IOUtils.readLines(stream, "UTF-8") + .forEach(lines::add); + } + catch (IOException e) { + log.error("Failed to read pipeline [ {} ] log ", entity.getName(), e); + } + return CommonResponse.success(lines); + } + + @Override + public CommonResponse deleteById(PagingAndSortingRepository repository, Long id) + { + Optional pipelineOptional = this.repository.findById(id); + if (!pipelineOptional.isPresent()) { + return CommonResponse.failure(String.format("Pipeline [ %s ] not found", id)); + } + + PipelineEntity entity = pipelineOptional.get(); + log.info("Delete pipeline [ {} ] work home", entity.getName()); + try { + FileUtils.deleteDirectory(new File(entity.getWork())); + } + catch (IOException e) { + log.warn("Failed to delete pipeline [ {} ] work home {}", entity.getName(), e); + } + return PipelineService.super.deleteById(repository, id); + } + + /** + * Merges the properties of a source entity with a list of fields and a configuration. + * + * @param entity the source entity + * @param fields the list of fields + * @param configure the configuration + * @return the merged properties + */ private Properties merge(SourceEntity entity, List fields, Properties configure) { Properties properties = new Properties(); @@ -289,6 +352,13 @@ public class PipelineServiceImpl return properties; } + /** + * Sets the property value for the given field. + * + * @param field the field to set the property value for + * @param properties the properties object to store the property + * @param configure the configuration properties object + */ private void setProperty(IConfigureExecutorField field, Properties properties, Properties configure) { Object value = "None"; @@ -310,29 +380,4 @@ public class PipelineServiceImpl } properties.put(field.getField(), value); } - - /** - * Converts a PipelineEntity object to a PipelineBody object. - * - * @param entity the PipelineEntity object to convert - * @return the converted PipelineBody object - */ - private PipelineBody entityToBody(PipelineEntity entity) - { - PipelineFieldBody from = PipelineFieldBody.builder() - .id(entity.getFrom().getId()) - .configures(entity.getFromConfigures()) - .build(); - PipelineFieldBody to = PipelineFieldBody.builder() - .id(entity.getTo().getId()) - .configures(entity.getToConfigures()) - .build(); - return PipelineBody.builder() - .id(entity.getId()) - .content(entity.getContent()) - .from(from) - .to(to) - .executor(entity.getExecutor()) - .build(); - } } diff --git a/core/datacap-web/package.json b/core/datacap-web/package.json index 8ecdb7f7..a33b2d04 100644 --- a/core/datacap-web/package.json +++ b/core/datacap-web/package.json @@ -18,6 +18,7 @@ "@types/watermark-dom": "^2.3.1", "ag-grid-community": "^29.3.5", "ag-grid-vue3": "^29.3.5", + "ansi_up": "^6.0.2", "axios": "^0.27.2", "core-js": "^3.8.3", "echarts": "^5.4.0", diff --git a/core/datacap-web/src/i18n/langs/en/pipeline.ts b/core/datacap-web/src/i18n/langs/en/pipeline.ts index 4b02fc4d..bfb009c4 100644 --- a/core/datacap-web/src/i18n/langs/en/pipeline.ts +++ b/core/datacap-web/src/i18n/langs/en/pipeline.ts @@ -5,7 +5,8 @@ export default { success: 'Success', failure: 'Failure', stop: 'Stopped', - timeout: 'Timeout' + timeout: 'Timeout', + queue: 'Queue' }, delete: { deleteTip1: 'This action cannot be undone.', diff --git a/core/datacap-web/src/i18n/langs/zhCn/pipeline.ts b/core/datacap-web/src/i18n/langs/zhCn/pipeline.ts index 89247e0e..61daf191 100644 --- a/core/datacap-web/src/i18n/langs/zhCn/pipeline.ts +++ b/core/datacap-web/src/i18n/langs/zhCn/pipeline.ts @@ -5,7 +5,8 @@ export default { success: '运行成功', failure: '运行失败', stop: '已停止', - timeout: '运行超时' + timeout: '运行超时', + queue: '排队中' }, delete: { deleteTip1: '此操作无法撤消.', diff --git a/core/datacap-web/src/services/user/PipelineService.ts b/core/datacap-web/src/services/user/PipelineService.ts index bb31be06..b19bdf3f 100644 --- a/core/datacap-web/src/services/user/PipelineService.ts +++ b/core/datacap-web/src/services/user/PipelineService.ts @@ -21,6 +21,11 @@ class PipelineService return new HttpCommon().post(`${baseUrl}/submit`, configure); } + logger(id: number): Promise + { + return new HttpCommon().get(`${baseUrl}/log/${id}`); + } + getByName(name: string): Promise { return Promise.resolve(undefined); diff --git a/core/datacap-web/src/views/admin/pipeline/AdminPipeline.vue b/core/datacap-web/src/views/admin/pipeline/AdminPipeline.vue index 1eef8620..edf3c8f7 100644 --- a/core/datacap-web/src/views/admin/pipeline/AdminPipeline.vue +++ b/core/datacap-web/src/views/admin/pipeline/AdminPipeline.vue @@ -48,13 +48,23 @@ + + +