mirror of
https://gitee.com/devlive-community/datacap.git
synced 2024-12-02 12:07:37 +08:00
[Pipeline] Add logging interface and optimize UI
This commit is contained in:
parent
30762ba430
commit
8840dbd8b2
@ -6,6 +6,7 @@ import io.edurt.datacap.service.body.PipelineBody;
|
||||
import io.edurt.datacap.service.entity.PipelineEntity;
|
||||
import io.edurt.datacap.service.repository.PipelineRepository;
|
||||
import io.edurt.datacap.service.service.PipelineService;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.PutMapping;
|
||||
@ -13,6 +14,8 @@ import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@RestController()
|
||||
@RequestMapping(value = "/api/v1/pipeline")
|
||||
public class PipelineController
|
||||
@ -39,4 +42,10 @@ public class PipelineController
|
||||
{
|
||||
return service.stop(id);
|
||||
}
|
||||
|
||||
@GetMapping(value = "/log/{id}")
|
||||
public CommonResponse<List<String>> log(@PathVariable Long id)
|
||||
{
|
||||
return service.log(id);
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package io.edurt.datacap.server.runner;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
import io.edurt.datacap.service.entity.PipelineEntity;
|
||||
import io.edurt.datacap.service.repository.PipelineRepository;
|
||||
import io.edurt.datacap.service.service.PipelineService;
|
||||
@ -45,7 +46,11 @@ public class PipelineResetRunner
|
||||
resetState = PipelineState.STOPPED;
|
||||
}
|
||||
|
||||
List<PipelineEntity> pipelines = repository.findAllByStateIs(PipelineState.RUNNING);
|
||||
List<PipelineState> states = Lists.newArrayList();
|
||||
states.add(PipelineState.RUNNING);
|
||||
states.add(PipelineState.CREATED);
|
||||
states.add(PipelineState.QUEUE);
|
||||
List<PipelineEntity> pipelines = repository.findAllByStateIn(states);
|
||||
for (PipelineEntity pipeline : pipelines) {
|
||||
log.info("Reset pipeline [ {} ] user [ {} ]", pipeline.getName(), pipeline.getUser().getUsername());
|
||||
pipeline.setState(resetState);
|
||||
|
@ -14,5 +14,5 @@ public interface PipelineRepository
|
||||
{
|
||||
Page<PipelineEntity> findAllByUser(UserEntity user, Pageable pageable);
|
||||
|
||||
List<PipelineEntity> findAllByStateIs(PipelineState state);
|
||||
List<PipelineEntity> findAllByStateIn(List<PipelineState> state);
|
||||
}
|
||||
|
@ -3,10 +3,14 @@ package io.edurt.datacap.service.service;
|
||||
import io.edurt.datacap.common.response.CommonResponse;
|
||||
import io.edurt.datacap.service.body.PipelineBody;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface PipelineService
|
||||
extends BaseService
|
||||
{
|
||||
CommonResponse<Object> submit(PipelineBody configure);
|
||||
|
||||
CommonResponse<Boolean> stop(Long id);
|
||||
|
||||
CommonResponse<List<String>> log(Long id);
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package io.edurt.datacap.service.service.impl;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
@ -27,6 +28,7 @@ import io.edurt.datacap.spi.executor.PipelineResponse;
|
||||
import io.edurt.datacap.spi.executor.PipelineState;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
@ -34,6 +36,8 @@ import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
@ -260,6 +264,7 @@ public class PipelineServiceImpl
|
||||
service.shutdownNow();
|
||||
}
|
||||
entity.setState(PipelineState.STOPPED);
|
||||
entity.setMessage(null);
|
||||
this.repository.save(entity);
|
||||
|
||||
// Consume queue data for execution
|
||||
@ -273,6 +278,37 @@ public class PipelineServiceImpl
|
||||
return CommonResponse.success(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the log for a given pipeline ID.
|
||||
*
|
||||
* @param id the ID of the pipeline
|
||||
* @return a response containing the log lines as a list of strings
|
||||
*/
|
||||
@Override
|
||||
public CommonResponse<List<String>> log(Long id)
|
||||
{
|
||||
Optional<PipelineEntity> pipelineOptional = this.repository.findById(id);
|
||||
if (!pipelineOptional.isPresent()) {
|
||||
return CommonResponse.failure(String.format("Pipeline [ %s ] not found", id));
|
||||
}
|
||||
|
||||
PipelineEntity entity = pipelineOptional.get();
|
||||
if (entity.getState().equals(PipelineState.QUEUE)
|
||||
|| entity.getState().equals(PipelineState.CREATED)) {
|
||||
return CommonResponse.failure(String.format("Pipeline [ %s ] is not running", entity.getName()));
|
||||
}
|
||||
|
||||
List<String> lines = Lists.newArrayList();
|
||||
try (FileInputStream stream = new FileInputStream(new File(String.format("%s/%s.log", entity.getWork(), entity.getName())))) {
|
||||
IOUtils.readLines(stream, "UTF-8")
|
||||
.forEach(lines::add);
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.error("Failed to read pipeline [ {} ] log ", entity.getName(), e);
|
||||
}
|
||||
return CommonResponse.success(lines);
|
||||
}
|
||||
|
||||
private Properties merge(SourceEntity entity, List<IConfigureExecutorField> fields, Properties configure)
|
||||
{
|
||||
Properties properties = new Properties();
|
||||
|
@ -5,7 +5,8 @@ export default {
|
||||
success: 'Success',
|
||||
failure: 'Failure',
|
||||
stop: 'Stopped',
|
||||
timeout: 'Timeout'
|
||||
timeout: 'Timeout',
|
||||
queue: 'Queue'
|
||||
},
|
||||
delete: {
|
||||
deleteTip1: 'This action cannot be undone.',
|
||||
|
@ -5,7 +5,8 @@ export default {
|
||||
success: '运行成功',
|
||||
failure: '运行失败',
|
||||
stop: '已停止',
|
||||
timeout: '运行超时'
|
||||
timeout: '运行超时',
|
||||
queue: '排队中'
|
||||
},
|
||||
delete: {
|
||||
deleteTip1: '此操作无法撤消.',
|
||||
|
@ -55,6 +55,16 @@
|
||||
@click="handlerVisibleMarkdownPreview(row.message, true)">
|
||||
</Button>
|
||||
</Tooltip>
|
||||
<Tooltip :content="$t('common.log')"
|
||||
transfer>
|
||||
<Button shape="circle"
|
||||
:disabled="row.state === 'QUEUE' || row.state === 'CREATED'"
|
||||
type="primary"
|
||||
size="small"
|
||||
icon="md-bulb"
|
||||
@click="handlerVisibleMarkdownPreview(row.message, true)">
|
||||
</Button>
|
||||
</Tooltip>
|
||||
<Tooltip :content="$t('common.stop')"
|
||||
transfer>
|
||||
<Button shape="circle"
|
||||
|
@ -109,6 +109,8 @@ const getText = (i18n: any, origin: string): string => {
|
||||
return i18n.t('pipeline.common.stop');
|
||||
case 'TIMEOUT':
|
||||
return i18n.t('pipeline.common.timeout');
|
||||
case 'QUEUE':
|
||||
return i18n.t('pipeline.common.queue');
|
||||
default:
|
||||
return origin;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user