[Improv-14083][Resource Center] Handle deleteSource in OSS / S3 / GCS (#14084)

This commit is contained in:
Rick Cheng 2023-05-22 20:44:43 +08:00 committed by GitHub
parent 9979ea3467
commit 428f1559a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 35 additions and 25 deletions

View File

@ -7,6 +7,7 @@ This document records the incompatible updates between each version. You need to
* Change regex matching sql params in SQL task plugin ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
## 3.0.0

View File

@ -7,6 +7,7 @@
* 更新了SQL任务中用于匹配变量的正则表达式 ([#13378](https://github.com/apache/dolphinscheduler/pull/13378))
* Remove the spark version of spark task ([#11860](https://github.com/apache/dolphinscheduler/pull/11860)).
* Change the default unix shell executor from sh to bash ([#12180](https://github.com/apache/dolphinscheduler/pull/12180)).
* Remove `deleteSource` in `download()` of `StorageOperate` ([#14084](https://github.com/apache/dolphinscheduler/pull/14084))
## 3.0.0

View File

@ -1462,7 +1462,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
log.info("Resource path is {}, download local filename is {}", alias, localFileName);
try {
storageOperate.download(tenantCode, fullName, localFileName, false, true);
storageOperate.download(tenantCode, fullName, localFileName, true);
return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
} catch (IOException e) {
log.error("Download resource error, the path is {}, and local filename is {}, the error message is {}",

View File

@ -183,8 +183,7 @@ public class TaskCacheUtils {
log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath);
String crcString = "";
try {
storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, false,
true);
storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, true);
crcString = FileUtils.readFile2Str(new FileInputStream(targetPath));
fileProperty.setValue(crcString);
} catch (IOException e) {

View File

@ -140,11 +140,10 @@ public interface StorageOperate {
* @param tenantCode
* @param srcFilePath the full path of the srcPath
* @param dstFile
* @param deleteSource
* @param overwrite
* @throws IOException
*/
void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource,
void download(String tenantCode, String srcFilePath, String dstFile,
boolean overwrite) throws IOException;
/**

View File

@ -144,7 +144,7 @@ public class GcsStorageOperator implements Closeable, StorageOperate {
}
@Override
public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource,
public void download(String tenantCode, String srcFilePath, String dstFilePath,
boolean overwrite) throws IOException {
File dstFile = new File(dstFilePath);
if (dstFile.isDirectory()) {
@ -201,7 +201,9 @@ public class GcsStorageOperator implements Closeable, StorageOperate {
.setTarget(target)
.build());
gcsStorage.delete(source);
if (deleteSource) {
gcsStorage.delete(source);
}
return true;
}
@ -212,7 +214,12 @@ public class GcsStorageOperator implements Closeable, StorageOperate {
BlobInfo blobInfo = BlobInfo.newBuilder(
BlobId.of(bucketName, dstPath)).build();
gcsStorage.create(blobInfo, Files.readAllBytes(Paths.get(srcFile)));
Path srcPath = Paths.get(srcFile);
gcsStorage.create(blobInfo, Files.readAllBytes(srcPath));
if (deleteSource) {
Files.delete(srcPath);
}
return true;
} catch (Exception e) {
log.error("upload failed,the bucketName is {},the filePath is {}", bucketName, dstPath);

View File

@ -203,7 +203,6 @@ public class GcsStorageOperatorTest {
public void copy() {
boolean isSuccess = false;
doReturn(null).when(gcsStorage).copy(Mockito.any());
doReturn(true).when(gcsStorage).delete(Mockito.any(BlobId.class));
try {
isSuccess = gcsStorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false);
} catch (IOException e) {
@ -212,7 +211,6 @@ public class GcsStorageOperatorTest {
Assertions.assertTrue(isSuccess);
verify(gcsStorage, times(1)).copy(Mockito.any());
verify(gcsStorage, times(1)).delete(Mockito.any(BlobId.class));
}
@Test

View File

@ -312,9 +312,9 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
}
@Override
public void download(String tenantCode, String srcHdfsFilePath, String dstFile, boolean deleteSource,
public void download(String tenantCode, String srcHdfsFilePath, String dstFile,
boolean overwrite) throws IOException {
copyHdfsToLocal(srcHdfsFilePath, dstFile, deleteSource, overwrite);
copyHdfsToLocal(srcHdfsFilePath, dstFile, false, overwrite);
}
/**

View File

@ -214,7 +214,7 @@ public class OssStorageOperator implements Closeable, StorageOperate {
}
@Override
public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource,
public void download(String tenantCode, String srcFilePath, String dstFilePath,
boolean overwrite) throws IOException {
File dstFile = new File(dstFilePath);
if (dstFile.isDirectory()) {
@ -234,7 +234,7 @@ public class OssStorageOperator implements Closeable, StorageOperate {
} catch (OSSException e) {
throw new IOException(e);
} catch (FileNotFoundException e) {
log.error("cannot fin the destination file {}", dstFilePath);
log.error("cannot find the destination file {}", dstFilePath);
throw e;
}
}
@ -258,7 +258,9 @@ public class OssStorageOperator implements Closeable, StorageOperate {
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
ossClient.copyObject(bucketName, srcPath, bucketName, dstPath);
ossClient.deleteObject(bucketName, srcPath);
if (deleteSource) {
ossClient.deleteObject(bucketName, srcPath);
}
return true;
}
@ -281,6 +283,9 @@ public class OssStorageOperator implements Closeable, StorageOperate {
boolean overwrite) throws IOException {
try {
ossClient.putObject(bucketName, dstPath, new File(srcFile));
if (deleteSource) {
Files.delete(Paths.get(srcFile));
}
return true;
} catch (OSSException e) {
log.error("upload failed, the bucketName is {}, the filePath is {}", bucketName, dstPath, e);

View File

@ -215,7 +215,6 @@ public class OssStorageOperatorTest {
public void copy() {
boolean isSuccess = false;
doReturn(null).when(ossClientMock).copyObject(anyString(), anyString(), anyString(), anyString());
doReturn(null).when(ossClientMock).deleteObject(anyString(), anyString());
try {
isSuccess = ossOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false);
} catch (IOException e) {
@ -224,7 +223,6 @@ public class OssStorageOperatorTest {
Assertions.assertTrue(isSuccess);
verify(ossClientMock, times(1)).copyObject(anyString(), anyString(), anyString(), anyString());
verify(ossClientMock, times(1)).deleteObject(anyString(), anyString());
}
@Test

View File

@ -202,7 +202,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
}
@Override
public void download(String tenantCode, String srcFilePath, String dstFilePath, boolean deleteSource,
public void download(String tenantCode, String srcFilePath, String dstFilePath,
boolean overwrite) throws IOException {
File dstFile = new File(dstFilePath);
if (dstFile.isDirectory()) {
@ -263,7 +263,9 @@ public class S3StorageOperator implements Closeable, StorageOperate {
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException {
s3Client.copyObject(bucketName, srcPath, bucketName, dstPath);
s3Client.deleteObject(bucketName, srcPath);
if (deleteSource) {
s3Client.deleteObject(bucketName, srcPath);
}
return true;
}
@ -287,6 +289,10 @@ public class S3StorageOperator implements Closeable, StorageOperate {
boolean overwrite) throws IOException {
try {
s3Client.putObject(bucketName, dstPath, new File(srcFile));
if (deleteSource) {
Files.delete(Paths.get(srcFile));
}
return true;
} catch (AmazonServiceException e) {
log.error("upload failed,the bucketName is {},the filePath is {}", bucketName, dstPath);

View File

@ -219,7 +219,6 @@ public class S3StorageOperatorTest {
public void copy() {
boolean isSuccess = false;
doReturn(null).when(s3Client).copyObject(anyString(), anyString(), anyString(), anyString());
doNothing().when(s3Client).deleteObject(anyString(), anyString());
try {
isSuccess = s3StorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false);
} catch (IOException e) {
@ -228,7 +227,6 @@ public class S3StorageOperatorTest {
Assertions.assertTrue(isSuccess);
verify(s3Client, times(1)).copyObject(anyString(), anyString(), anyString(), anyString());
verify(s3Client, times(1)).deleteObject(anyString(), anyString());
}
@Test

View File

@ -142,8 +142,7 @@ public class TaskExecutionCheckerUtils {
long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(actualTenant, fullName,
execLocalPath + File.separator + fileName, false,
true);
execLocalPath + File.separator + fileName, true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics.recordWorkerResourceDownloadSize(

View File

@ -178,8 +178,7 @@ public class TaskFilesTransferUtils {
String resourceWholePath =
storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
true);
storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, true);
} catch (IOException ex) {
throw new TaskException("Download file from storage error", ex);
}