[Core] [Fs] Support local fs

This commit is contained in:
qianmoQ 2023-12-03 13:25:32 +08:00
parent f9bb6b30ae
commit 3845ffaff7
11 changed files with 257 additions and 2 deletions

View File

@ -0,0 +1,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap</artifactId>
<version>1.18.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>datacap-fs-local</artifactId>
<description>DataCap - File system for local</description>
<dependencies>
<dependency>
<groupId>io.edurt.datacap</groupId>
<artifactId>datacap-fs-spi</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,53 @@
package io.edurt.datacap.fs;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
public class IOUtils
{
private IOUtils()
{
}
/**
* Copies a file from the source path to the target path.
*
* @param source the path of the file to be copied
* @param target the path where the file should be copied to
* @param createdDir a flag indicating whether the parent directories of the target path should be created if they do not exist
* @return true if the file was successfully copied, false otherwise
*/
public static boolean copy(String source, String target, boolean createdDir)
{
try {
Path targetPath = Path.of(target);
if (createdDir) {
Files.createDirectories(targetPath.getParent());
}
Files.copy(Path.of(source), targetPath, StandardCopyOption.REPLACE_EXISTING);
return true;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Returns a Reader object for the specified source.
*
* @param source the path of the file to be read
* @return a Reader object for reading the file contents
*/
public static Reader reader(String source)
{
try {
return Files.newBufferedReader(Path.of(source));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,54 @@
package io.edurt.datacap.fs;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
@Slf4j
public class LocalFs
implements Fs
{
@Override
public FsResponse writer(FsRequest request)
{
log.info("LocalFs writer origin path [ {} ]", request.getLocalPath());
String targetPath = String.join(File.separator, request.getEndpoint(), request.getBucket(), request.getFileName());
FsResponse response = FsResponse.builder()
.origin(request.getLocalPath())
.remote(targetPath)
.successful(true)
.build();
log.info("LocalFs writer target path [ {} ]", targetPath);
try {
IOUtils.copy(request.getLocalPath(), targetPath, true);
log.info("LocalFs writer [ {} ] successfully", targetPath);
}
catch (Exception e) {
log.error("LocalFs writer error", e);
response.setSuccessful(false);
response.setMessage(e.getMessage());
}
return response;
}
@Override
public FsResponse reader(FsRequest request)
{
String targetPath = String.join(File.separator, request.getEndpoint(), request.getBucket(), request.getFileName());
log.info("LocalFs reader origin path [ {} ]", targetPath);
FsResponse response = FsResponse.builder()
.remote(targetPath)
.successful(true)
.build();
try {
response.setContext(IOUtils.reader(targetPath));
log.info("LocalFs reader [ {} ] successfully", targetPath);
}
catch (Exception e) {
log.error("LocalFs reader error", e);
response.setSuccessful(false);
response.setMessage(e.getMessage());
}
return response;
}
}

View File

@ -0,0 +1,14 @@
package io.edurt.datacap.fs;
import com.google.inject.multibindings.Multibinder;
public class LocalModule
extends FsModule
{
@Override
protected void configure()
{
Multibinder.newSetBinder(this.binder(), Fs.class)
.addBinding().to(LocalFs.class);
}
}

View File

@ -0,0 +1 @@
io.edurt.datacap.fs.LocalModule

View File

@ -0,0 +1,79 @@
package io.edurt.datacap.fs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
@Slf4j
public class LocalFsTest
{
private Injector injector;
private FsRequest request;
@Before
public void before()
{
injector = Guice.createInjector(new FsManager());
request = FsRequest.builder()
.access(null)
.secret(null)
.endpoint(String.join(File.separator, System.getProperty("user.dir"), "data"))
.bucket("tmp")
.localPath(String.join(File.separator, System.getProperty("user.dir"), "src/main/java/io/edurt/datacap/fs/LocalFs.java"))
.fileName("LocalFs.java")
.build();
}
@Test
public void test()
{
Set<Fs> sets = injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {}));
Assert.assertEquals("Local", sets.stream().findFirst().get().name());
}
@Test
public void writer()
{
Optional<Fs> optional = injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {})).stream().findFirst();
if (optional.isPresent()) {
Assert.assertEquals("Local", optional.get().name());
}
FsResponse response = optional.get().writer(request);
Assert.assertEquals(true, response.isSuccessful());
}
@Test
public void reader()
{
Optional<Fs> optional = injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {})).stream().findFirst();
if (optional.isPresent()) {
Assert.assertEquals("Local", optional.get().name());
}
FsResponse response = optional.get().reader(request);
Assert.assertEquals(true, response.isSuccessful());
log.info("====== [ {} ] ======", response.getRemote());
try (BufferedReader reader = new BufferedReader(response.getContext())) {
String line;
while ((line = reader.readLine()) != null) {
log.info(line);
}
}
catch (IOException e) {
log.error("Reader error", e);
}
}
}

View File

@ -0,0 +1,28 @@
package io.edurt.datacap.fs;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Set;
public class LocalModuleTest
{
private Injector injector;
@Before
public void before()
{
injector = Guice.createInjector(new FsManager());
}
@Test
public void test()
{
Assert.assertNotNull(injector.getInstance(Key.get(new TypeLiteral<Set<Fs>>() {})));
}
}

View File

@ -6,7 +6,7 @@ public interface Fs
{ {
return this.getClass() return this.getClass()
.getSimpleName() .getSimpleName()
.replace("FileSystem", ""); .replace("Fs", "");
} }
default String description() default String description()

View File

@ -18,4 +18,5 @@ public class FsRequest
private String endpoint; private String endpoint;
private String bucket; private String bucket;
private String localPath; private String localPath;
private String fileName;
} }

View File

@ -6,6 +6,8 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.ToString; import lombok.ToString;
import java.io.Reader;
@Data @Data
@Builder @Builder
@ToString @ToString
@ -16,5 +18,6 @@ public class FsResponse
private String origin; private String origin;
private String remote; private String remote;
private String message; private String message;
private Reader context;
private boolean successful; private boolean successful;
} }

View File

@ -73,6 +73,7 @@
<module>shaded/datacap-shaded-ydb</module> <module>shaded/datacap-shaded-ydb</module>
<module>shaded/datacap-shaded-pinot</module> <module>shaded/datacap-shaded-pinot</module>
<module>fs/datacap-fs-spi</module> <module>fs/datacap-fs-spi</module>
<module>fs/datacap-fs-local</module>
</modules> </modules>
<name>datacap</name> <name>datacap</name>
@ -154,7 +155,7 @@
<plugin.maven.gpg.version>1.6</plugin.maven.gpg.version> <plugin.maven.gpg.version>1.6</plugin.maven.gpg.version>
<plugin.maven.nexus.version>1.6.13</plugin.maven.nexus.version> <plugin.maven.nexus.version>1.6.13</plugin.maven.nexus.version>
<plugin.maven.dokka.version>1.9.10</plugin.maven.dokka.version> <plugin.maven.dokka.version>1.9.10</plugin.maven.dokka.version>
<environment.compile.java.version>1.8</environment.compile.java.version> <environment.compile.java.version>11</environment.compile.java.version>
</properties> </properties>
<dependencies> <dependencies>