[Fs] [Amazon S3] Support reader

This commit is contained in:
qianmoQ 2024-08-25 19:46:33 +08:00
parent e5784b4baf
commit 24089f2143
4 changed files with 103 additions and 2 deletions

View File

@ -41,6 +41,24 @@ class AmazonS3Fs : Fs
override fun reader(request: FsRequest?): FsResponse
{
TODO("Not yet implemented")
requireNotNull(request) { "request must not be null" }
log.info("{} reader origin path [ {} ]", this.name(), request.fileName)
val response = FsResponse.builder()
.remote(request.fileName)
.successful(true)
.build()
try
{
response.context = AmazonS3Utils.reader(request)
log.info("{} reader [ {} ] successfully", this.name(), request.fileName)
}
catch (e: java.lang.Exception)
{
log.error("{} reader error", this.name(), e)
response.isSuccessful = false
response.message = e.message
}
return response
}
}

View File

@ -9,12 +9,12 @@ import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.GetObjectRequest
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.services.s3.model.PutObjectRequest
import io.edurt.datacap.fs.FsRequest
import java.io.InputStream
class AmazonS3Utils
{
companion object
@ -75,5 +75,52 @@ class AmazonS3Utils
client.shutdown()
}
}
@JvmStatic
fun reader(request: FsRequest): InputStream
{
val client = getClient(request)
try
{
val getObjectRequest = GetObjectRequest(request.bucket, request.fileName)
val s3Object = client.getObject(getObjectRequest)
val originalStream = s3Object.objectContent
return object : InputStream()
{
override fun read(): Int
{
return originalStream.read()
}
override fun read(b: ByteArray): Int
{
return originalStream.read(b)
}
override fun read(b: ByteArray, off: Int, len: Int): Int
{
return originalStream.read(b, off, len)
}
override fun close()
{
try
{
originalStream.close()
}
finally
{
client.shutdown()
}
}
}
}
catch (e: Exception)
{
client.shutdown()
throw RuntimeException(e)
}
}
}
}

View File

@ -11,7 +11,11 @@ import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import org.slf4j.LoggerFactory
import java.io.BufferedReader
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStreamReader
import java.nio.charset.StandardCharsets
class AmazonS3FsTest
{
@ -44,4 +48,29 @@ class AmazonS3FsTest
val response = plugin !!.writer(request)
assertTrue(response.isSuccessful)
}
@Test
fun reader()
{
val plugins: Set<Fs?>? = injector?.getInstance(Key.get(object : TypeLiteral<Set<Fs?>?>()
{}))
val plugin: Fs? = plugins?.first { v -> v?.name().equals(name) }
val response = plugin !!.reader(request)
assertTrue(response.isSuccessful)
try
{
BufferedReader(InputStreamReader(response.context, StandardCharsets.UTF_8)).use { reader ->
var line: String?
while ((reader.readLine().also { line = it }) != null)
{
log.info(line)
}
}
}
catch (e: IOException)
{
log.error("Reader error", e)
}
}
}

View File

@ -1,6 +1,7 @@
package io.edurt.datacap.fs.s3
import io.edurt.datacap.fs.FsRequest
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
@ -27,4 +28,10 @@ class AmazonS3UtilsTest
val result = AmazonS3Utils.copy(request, stream, "AmazonS3FsTest.kt")
assertTrue(result != null)
}
@Test
fun reader()
{
assertNotNull(AmazonS3Utils.reader(request))
}
}