[Core] Add a data source scan task (#409)

This commit is contained in:
qianmoQ 2023-08-09 11:25:01 +08:00 committed by GitHub
commit 08edf35f4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 260 additions and 119 deletions

View File

@ -0,0 +1,60 @@
package io.edurt.datacap.server.configure;
import com.google.inject.Injector;
import io.edurt.datacap.schedule.ScheduledCronRegistrar;
import io.edurt.datacap.server.scheduled.SourceScheduledRunnable;
import io.edurt.datacap.server.scheduled.source.CheckScheduledRunnable;
import io.edurt.datacap.service.repository.ScheduledRepository;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.service.repository.TemplateSqlRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ScheduleRunnerConfigure
implements CommandLineRunner
{
private final Injector injector;
private final ScheduledRepository scheduledRepository;
private final SourceRepository sourceRepository;
private final TemplateSqlRepository templateSqlRepository;
private final ScheduledCronRegistrar scheduledCronRegistrar;
private final RedisTemplate redisTemplate;
private final Environment environment;
public ScheduleRunnerConfigure(Injector injector, ScheduledRepository scheduledRepository, SourceRepository sourceRepository, TemplateSqlRepository templateSqlRepository, ScheduledCronRegistrar scheduledCronRegistrar, RedisTemplate redisTemplate, Environment environment)
{
this.injector = injector;
this.scheduledRepository = scheduledRepository;
this.sourceRepository = sourceRepository;
this.templateSqlRepository = templateSqlRepository;
this.scheduledCronRegistrar = scheduledCronRegistrar;
this.redisTemplate = redisTemplate;
this.environment = environment;
}
@Override
public void run(String... args)
{
this.scheduledRepository.findAllByActiveIsTrueAndIsSystemIsTrue()
.forEach(task -> {
log.info("Add new task " + task.getName() + " to scheduler");
switch (task.getType()) {
case SOURCE_SYNCHRONIZE:
SourceScheduledRunnable scheduled = new SourceScheduledRunnable(task.getName(), this.injector, this.sourceRepository, templateSqlRepository, redisTemplate, environment);
this.scheduledCronRegistrar.addCronTask(scheduled, task.getExpression());
break;
case SOURCE_CHECK:
CheckScheduledRunnable runnable = new CheckScheduledRunnable(task.getName(), this.injector, this.sourceRepository);
this.scheduledCronRegistrar.addCronTask(runnable, task.getExpression());
break;
default:
log.warn("Unsupported task type " + task.getType());
}
});
}
}

View File

@ -1,48 +0,0 @@
package io.edurt.datacap.server.configure;
import com.google.inject.Injector;
import io.edurt.datacap.schedule.ScheduledCronRegistrar;
import io.edurt.datacap.server.scheduled.SourceScheduledRunnable;
import io.edurt.datacap.service.repository.ScheduledTaskRepository;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.service.repository.TemplateSqlRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ScheduleTaskRunnerConfigure
implements CommandLineRunner
{
private final Injector injector;
private final ScheduledTaskRepository scheduledTaskRepository;
private final SourceRepository sourceRepository;
private final TemplateSqlRepository templateSqlRepository;
private final ScheduledCronRegistrar scheduledCronRegistrar;
private final RedisTemplate redisTemplate;
private final Environment environment;
public ScheduleTaskRunnerConfigure(Injector injector, ScheduledTaskRepository scheduledTaskRepository, SourceRepository sourceRepository, TemplateSqlRepository templateSqlRepository, ScheduledCronRegistrar scheduledCronRegistrar, RedisTemplate redisTemplate, Environment environment)
{
this.injector = injector;
this.scheduledTaskRepository = scheduledTaskRepository;
this.sourceRepository = sourceRepository;
this.templateSqlRepository = templateSqlRepository;
this.scheduledCronRegistrar = scheduledCronRegistrar;
this.redisTemplate = redisTemplate;
this.environment = environment;
}
@Override
public void run(String... args)
{
this.scheduledTaskRepository.findAllByActiveIsTrueAndIsSystemIsTrue().forEach(task -> {
log.info("Add new task " + task.getName() + " to scheduler");
SourceScheduledRunnable scheduled = new SourceScheduledRunnable(task.getName(), this.injector, this.sourceRepository, templateSqlRepository, redisTemplate, environment);
this.scheduledCronRegistrar.addCronTask(scheduled, task.getExpression());
});
}
}

View File

@ -0,0 +1,32 @@
package io.edurt.datacap.server.controller.admin;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.entity.PageEntity;
import io.edurt.datacap.service.entity.ScheduledEntity;
import io.edurt.datacap.service.repository.ScheduledRepository;
import io.edurt.datacap.service.service.ScheduledService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/api/v1/admin/schedule")
public class ScheduleController
{
private final ScheduledRepository scheduledRepository;
private final ScheduledService scheduledService;
public ScheduleController(ScheduledRepository scheduledRepository, ScheduledService scheduledService)
{
this.scheduledRepository = scheduledRepository;
this.scheduledService = scheduledService;
}
@PostMapping(value = "list")
public CommonResponse<PageEntity<ScheduledEntity>> getAllByFilter(@RequestBody FilterBody filter)
{
return scheduledService.getAll(scheduledRepository, filter);
}
}

View File

@ -1,32 +0,0 @@
package io.edurt.datacap.server.controller.admin;
import io.edurt.datacap.common.response.CommonResponse;
import io.edurt.datacap.service.body.FilterBody;
import io.edurt.datacap.service.entity.PageEntity;
import io.edurt.datacap.service.entity.ScheduledTaskEntity;
import io.edurt.datacap.service.repository.ScheduledTaskRepository;
import io.edurt.datacap.service.service.ScheduledTaskService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/api/v1/admin/schedule")
public class ScheduleTaskController
{
private final ScheduledTaskRepository scheduledTaskRepository;
private final ScheduledTaskService scheduledTaskService;
public ScheduleTaskController(ScheduledTaskRepository scheduledTaskRepository, ScheduledTaskService scheduledTaskService)
{
this.scheduledTaskRepository = scheduledTaskRepository;
this.scheduledTaskService = scheduledTaskService;
}
@PostMapping(value = "list")
public CommonResponse<PageEntity<ScheduledTaskEntity>> getAllByFilter(@RequestBody FilterBody filter)
{
return scheduledTaskService.getAll(scheduledTaskRepository, filter);
}
}

View File

@ -0,0 +1,79 @@
package io.edurt.datacap.server.scheduled.source;
import com.google.inject.Injector;
import io.edurt.datacap.schedule.ScheduledRunnable;
import io.edurt.datacap.service.common.PluginUtils;
import io.edurt.datacap.service.entity.SourceEntity;
import io.edurt.datacap.service.repository.SourceRepository;
import io.edurt.datacap.spi.Plugin;
import io.edurt.datacap.spi.model.Configure;
import io.edurt.datacap.spi.model.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Optional;
@Slf4j
public class CheckScheduledRunnable
extends ScheduledRunnable
{
private final Injector injector;
private final SourceRepository sourceRepository;
public CheckScheduledRunnable(String name, Injector injector, SourceRepository sourceRepository)
{
super(name);
this.injector = injector;
this.sourceRepository = sourceRepository;
}
@Override
public void run()
{
log.info("==================== {} started =================", this.getName());
this.sourceRepository.findAll()
.forEach(entity -> {
log.info("Before check source {}", entity.getName());
Optional<Plugin> pluginOptional = PluginUtils.getPluginByNameAndType(this.injector, entity.getType(), entity.getProtocol());
if (!pluginOptional.isPresent()) {
log.warn("Check scheduled task <{}> source {} protocol {} is not available", this.getName(), entity.getType(), entity.getProtocol());
}
else {
Plugin plugin = pluginOptional.get();
plugin.connect(getConfigure(entity));
Response response = plugin.execute(plugin.validator());
if (response.getIsSuccessful()) {
entity.setAvailable(true);
if (response.getColumns().get(0) instanceof ArrayList) {
ArrayList versions = (ArrayList) response.getColumns().get(0);
entity.setVersion(versions.get(0).toString());
}
else {
entity.setVersion(response.getColumns().get(0).toString());
}
}
else {
entity.setAvailable(false);
entity.setVersion(null);
entity.setMessage(response.getMessage());
}
this.sourceRepository.save(entity);
}
});
}
private Configure getConfigure(SourceEntity entity)
{
Configure configure = new Configure();
configure.setHost(entity.getHost());
configure.setPort(entity.getPort());
configure.setUsername(Optional.ofNullable(entity.getUsername()));
configure.setPassword(Optional.ofNullable(entity.getPassword()));
Optional<String> database = StringUtils.isNotEmpty(entity.getDatabase()) ? Optional.ofNullable(entity.getDatabase()) : Optional.empty();
configure.setDatabase(database);
configure.setSsl(Optional.ofNullable(entity.getSsl()));
configure.setEnv(Optional.ofNullable(entity.getConfigures()));
return configure;
}
}

View File

@ -5,3 +5,14 @@ ALTER TABLE `source`
ADD COLUMN `available` BOOLEAN DEFAULT TRUE, ADD COLUMN `available` BOOLEAN DEFAULT TRUE,
ADD COLUMN `message` TEXT DEFAULT NULL, ADD COLUMN `message` TEXT DEFAULT NULL,
ADD COLUMN `update_time` DATETIME DEFAULT NULL; ADD COLUMN `update_time` DATETIME DEFAULT NULL;
RENAME TABLE `scheduled_task` TO `datacap_scheduled`;
ALTER TABLE `datacap_scheduled`
ADD COLUMN `type` VARCHAR(255) DEFAULT NULL;
UPDATE `datacap_scheduled`
SET `type` = 'SOURCE_SYNCHRONIZE';
INSERT INTO datacap.datacap_scheduled (name, description, expression, active, is_system, type)
VALUES ('Check source available', 'Check the availability of the data source every 1 hour', '0 0 * * * ?', 1, 1, 'SOURCE_CHECK');

View File

@ -2,14 +2,21 @@ package io.edurt.datacap.service.entity;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.edurt.datacap.service.enums.ScheduledType;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
import javax.persistence.Column; import javax.persistence.Column;
import javax.persistence.Entity; import javax.persistence.Entity;
import javax.persistence.EntityListeners;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue; import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType; import javax.persistence.GenerationType;
import javax.persistence.Id; import javax.persistence.Id;
@ -23,9 +30,10 @@ import java.sql.Timestamp;
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@Entity @Entity
@Table(name = "scheduled_task") @Table(name = "datacap_scheduled")
@EntityListeners(AuditingEntityListener.class)
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) @SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class ScheduledTaskEntity public class ScheduledEntity
{ {
@Id @Id
@GeneratedValue(strategy = GenerationType.IDENTITY) @GeneratedValue(strategy = GenerationType.IDENTITY)
@ -46,11 +54,17 @@ public class ScheduledTaskEntity
@Column(name = "is_system") @Column(name = "is_system")
private boolean isSystem; private boolean isSystem;
@Column(name = "create_time", columnDefinition = "datetime(5) default CURRENT_TIMESTAMP()") @Column(name = "type")
@Enumerated(EnumType.STRING)
private ScheduledType type;
@Column(name = "create_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@CreatedDate
private Timestamp createTime; private Timestamp createTime;
@Column(name = "update_time", columnDefinition = "datetime(5) default CURRENT_TIMESTAMP()") @Column(name = "update_time")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@LastModifiedDate
private Timestamp updateTime; private Timestamp updateTime;
} }

View File

@ -0,0 +1,7 @@
package io.edurt.datacap.service.enums;
public enum ScheduledType
{
SOURCE_CHECK, // Check that the data source is available
SOURCE_SYNCHRONIZE // Synchronize table structure
}

View File

@ -0,0 +1,12 @@
package io.edurt.datacap.service.repository;
import io.edurt.datacap.service.entity.ScheduledEntity;
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface ScheduledRepository
extends PagingAndSortingRepository<ScheduledEntity, Long>
{
List<ScheduledEntity> findAllByActiveIsTrueAndIsSystemIsTrue();
}

View File

@ -1,12 +0,0 @@
package io.edurt.datacap.service.repository;
import io.edurt.datacap.service.entity.ScheduledTaskEntity;
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface ScheduledTaskRepository
extends PagingAndSortingRepository<ScheduledTaskEntity, Long>
{
List<ScheduledTaskEntity> findAllByActiveIsTrueAndIsSystemIsTrue();
}

View File

@ -0,0 +1,8 @@
package io.edurt.datacap.service.service;
import io.edurt.datacap.service.entity.ScheduledEntity;
public interface ScheduledService
extends BaseService<ScheduledEntity>
{
}

View File

@ -1,8 +0,0 @@
package io.edurt.datacap.service.service;
import io.edurt.datacap.service.entity.ScheduledTaskEntity;
public interface ScheduledTaskService
extends BaseService<ScheduledTaskEntity>
{
}

View File

@ -0,0 +1,10 @@
package io.edurt.datacap.service.service.impl;
import io.edurt.datacap.service.service.ScheduledService;
import org.springframework.stereotype.Service;
@Service
public class ScheduledServiceImpl
implements ScheduledService
{
}

View File

@ -1,10 +0,0 @@
package io.edurt.datacap.service.service.impl;
import io.edurt.datacap.service.service.ScheduledTaskService;
import org.springframework.stereotype.Service;
@Service
public class ScheduledTaskServiceImpl
implements ScheduledTaskService
{
}

View File

@ -1,6 +1,8 @@
<template> <template>
<div> <div>
<Card style="width:100%" :title="$t('common.source')"> <Card style="width:100%"
:title="$t('common.source')"
dis-hover>
<template #extra> <template #extra>
<Tooltip> <Tooltip>
<template #content>{{ $t('common.create') }}</template> <template #content>{{ $t('common.create') }}</template>
@ -13,7 +15,7 @@
</template> </template>
<template #type="{ row }"> <template #type="{ row }">
<Tooltip transfer :content="row.type"> <Tooltip transfer :content="row.type">
<Avatar :src="'/static/images/plugin/' + row.type.split(' ')[0] + '.png'" size="small" /> <Avatar :src="'/static/images/plugin/' + row.type.split(' ')[0] + '.png'" size="small"/>
</Tooltip> </Tooltip>
</template> </template>
<template #host="{ row }"> <template #host="{ row }">
@ -22,14 +24,30 @@
<template #version="{ row }"> <template #version="{ row }">
<Tag v-if="row.version" <Tag v-if="row.version"
color="primary"> color="primary">
{{ row.version }} <Ellipsis :length="10"
:text="row.version"
tooltip
transfer>
</Ellipsis>
</Tag> </Tag>
</template> </template>
<template #available="{ row }"> <template #available="{ row }">
<Button :type="row.available ? 'success' : 'error'" <Button :type="row.available ? 'success' : 'error'"
:icon="row.available ? 'md-checkmark-circle' : 'md-close-circle'"
shape="circle" shape="circle"
size="small"> size="small"
style="padding: 0; height: auto;">
<Tooltip v-if="!row.available"
:content="row.message"
max-width="auto"
transfer>
<Icon type="md-close-circle"
size="25">
</Icon>
</Tooltip>
<Icon v-else
type="md-checkmark-circle"
size="25">
</Icon>
</Button> </Button>
</template> </template>
<template #public="{ row }"> <template #public="{ row }">