mirror of
synced 2024-12-01 19:58:29 +08:00
[DSIP-42] Add dolphinscheduler-aws-authentication module (#16043)
This commit is contained in:
@ -120,6 +120,12 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
| conf.auto | bool | `false` | auto restart, if true, all components will be restarted automatically after the common configuration is updated. if false, you need to restart the components manually. default is false |
| conf.common."alert.rpc.port" | int | `50052` | rpc port |
| conf.common."appId.collect" | string | `"log"` | way to collect applicationId: log, aop |
| conf.common."aws.credentials.provider.type" | string | `"AWSStaticCredentialsProvider"` | |
| conf.common."aws.s3.access.key.id" | string | `"minioadmin"` | The AWS access key. if resource.storage.type=S3, and credentials.provider.type is AWSStaticCredentialsProvider. This configuration is required |
| conf.common."aws.s3.access.key.secret" | string | `"minioadmin"` | The AWS secret access key. if resource.storage.type=S3, and credentials.provider.type is AWSStaticCredentialsProvider. This configuration is required |
| conf.common."aws.s3.bucket.name" | string | `"dolphinscheduler"` | The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name. |
| conf.common."aws.s3.endpoint" | string | `"http://minio:9000"` | You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn |
| conf.common."aws.s3.region" | string | `"ca-central-1"` | The AWS Region to use. if resource.storage.type=S3, This configuration is required |
| conf.common."conda.path" | string | `"/opt/anaconda3/etc/profile.d/conda.sh"` | set path of conda.sh |
| conf.common."data-quality.jar.dir" | string | `nil` | data quality option |
| conf.common."data.basedir.path" | string | `"/tmp/dolphinscheduler"` | user data local directory path, please make sure the directory exists and have read write permissions |
@ -138,11 +144,6 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
| conf.common."resource.alibaba.cloud.oss.bucket.name" | string | `"dolphinscheduler"` | oss bucket name, required if you set resource.storage.type=OSS |
| conf.common."resource.alibaba.cloud.oss.endpoint" | string | `"https://oss-cn-hangzhou.aliyuncs.com"` | oss bucket endpoint, required if you set resource.storage.type=OSS |
| conf.common."resource.alibaba.cloud.region" | string | `"cn-hangzhou"` | alibaba cloud region, required if you set resource.storage.type=OSS |
| conf.common."resource.aws.access.key.id" | string | `"minioadmin"` | The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required |
| conf.common."resource.aws.region" | string | `"ca-central-1"` | The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required |
| conf.common."resource.aws.s3.bucket.name" | string | `"dolphinscheduler"` | The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name. |
| conf.common."resource.aws.s3.endpoint" | string | `"http://minio:9000"` | You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn |
| conf.common."resource.aws.secret.access.key" | string | `"minioadmin"` | The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required |
| conf.common."resource.azure.client.id" | string | `"minioadmin"` | azure storage account name, required if you set resource.storage.type=ABS |
| conf.common."resource.azure.client.secret" | string | `"minioadmin"` | azure storage account key, required if you set resource.storage.type=ABS |
| conf.common."resource.azure.subId" | string | `"minioadmin"` | azure storage subId, required if you set resource.storage.type=ABS |
@ -37,7 +37,7 @@ data:
{{- range $key, $value := index .Values.conf "common" }}
{{- if and $.Values.minio.enabled }}
{{- if eq $key "resource.storage.type" }}{{ $value = "S3" }}{{- end }}
{{- if eq $key "resource.aws.s3.endpoint" }}{{ $value = print "http://" (include "dolphinscheduler.minio.fullname" $) ":9000" }}{{- end }}
{{- if eq $key "aws.s3.endpoint" }}{{ $value = print "http://" (include "dolphinscheduler.minio.fullname" $) ":9000" }}{{- end }}
{{- end }}
{{ $key }}={{ $value }}
{{- end }}
@ -250,20 +250,25 @@ conf:
# -- resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path: /dolphinscheduler
# -- The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id: minioadmin
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
aws.credentials.provider.type: AWSStaticCredentialsProvider
# -- The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key: minioadmin
# -- The AWS access key. if resource.storage.type=S3, and credentials.provider.type is AWSStaticCredentialsProvider. This configuration is required
aws.s3.access.key.id: minioadmin
# -- The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region: ca-central-1
# -- The AWS secret access key. if resource.storage.type=S3, and credentials.provider.type is AWSStaticCredentialsProvider. This configuration is required
aws.s3.access.key.secret: minioadmin
# -- The AWS Region to use. if resource.storage.type=S3, This configuration is required
aws.s3.region: ca-central-1
# -- The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name: dolphinscheduler
aws.s3.bucket.name: dolphinscheduler
# -- You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint: http://minio:9000
aws.s3.endpoint: http://minio:9000
# -- alibaba cloud access key id, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.id: <your-access-key-id>
@ -191,18 +191,20 @@ The default configuration is as follows:
Note that DolphinScheduler also supports zookeeper related configuration through `bin/env/dolphinscheduler_env.sh`.
For ETCD Registry, please see more details on [link](https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/README.md).
For JDBC Registry, please see more details on [link](https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md).
For ETCD Registry, please see more details
on [link](https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/README.md).
For JDBC Registry, please see more details
on [link](https://github.com/apache/dolphinscheduler/blob/dev/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/README.md).
### common.properties [hadoop、s3、yarn config properties]
Currently, common.properties mainly configures Hadoop,s3a related configurations. Configuration file location:
| Service | Configuration file |
| Master Server | `master-server/conf/common.properties` |
| Api Server | `api-server/conf/common.properties` |
| Worker Server | `worker-server/conf/common.properties` |
| Api Server | `api-server/conf/common.properties`, `api-server/conf/aws.yaml` |
| Worker Server | `worker-server/conf/common.properties`, `worker-server/conf/aws.yaml` |
| Alert Server | `alert-server/conf/common.properties` |
The default configuration is as follows:
@ -212,10 +214,6 @@ The default configuration is as follows:
| data.basedir.path | /tmp/dolphinscheduler | local directory used to store temp files |
| resource.storage.type | NONE | type of resource files: HDFS, S3, OSS, GCS, ABS, NONE |
| resource.upload.path | /dolphinscheduler | storage path of resource files |
| aws.access.key.id | minioadmin | access key id of S3 |
| aws.secret.access.key | minioadmin | secret access key of S3 |
| aws.region | us-east-1 | region of S3 |
| aws.s3.endpoint | http://minio:9000 | endpoint of S3 |
| hdfs.root.user | hdfs | configure users with corresponding permissions if storage type is HDFS |
| fs.defaultFS | hdfs://mycluster:8020 | If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory |
| hadoop.security.authentication.startup.state | false | whether hadoop grant kerberos permission |
@ -28,74 +28,37 @@ The configuration you may need to change:
## connect AWS S3
if you want to upload resources to `Resource Center` connected to `S3`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following:
if you want to upload resources to `Resource Center` connected to `S3`, you need to configure `api-server/conf/common.properties`, `api-server/conf/aws.yaml` and `worker-server/conf/common.properties`, `worker-server/conf/aws.yaml`. You can refer to the following:
config the following fields
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s4. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
## Use HDFS or Remote Object Storage
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
bucket.name: <bucket.name>
endpoint: <endpoint>
After version 3.0.0-alpha, if you want to upload resources to `Resource Center` connected to `HDFS`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`.
## connect OSS S3
if you want to upload resources to `Resource Center` connected to `OSS`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following:
config the following fields
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# user data local directory path, please make sure the directory exists and have read write permissions
# resource view suffixs
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS
# resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
@ -107,89 +70,24 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
# alibaba cloud access key id, required if you set resource.storage.type=OBS
## connect OBS S3
if you want to upload resources to `Resource Center` connected to `OBS`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`. You can refer to the following:
config the following fields
# access key id, required if you set resource.storage.type=OBS
# alibaba cloud access key secret, required if you set resource.storage.type=OBS
# access key secret, required if you set resource.storage.type=OBS
# oss bucket name, required if you set resource.storage.type=OBS
# oss bucket endpoint, required if you set resource.storage.type=OBS
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
# whether to startup kerberos
# java.security.krb5.conf path
# login user from keytab username
# login user from keytab path
# kerberos expire time, the unit is hour
# resourcemanager port, the default value is 8088 if not specified
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
# datasource encryption enable
# datasource encryption salt
# data quality jar directory path, it would auto discovery data quality jar from this given dir. You should keep it empty if you do not change anything in
# data-quality, it will auto discovery by dolphinscheduler itself. Change it only if you want to use your own data-quality jar and it is not in worker-server
# libs directory(but may sure your jar name start with `dolphinscheduler-data-quality`).
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
# network interface preferred like eth0, default: empty
# network IP gets priority, default: inner outer
# system env path
# development state
# rpc port
# set path of conda.sh
# Task resource limit state
# way to collect applicationId: log(original regex match), aop
appId.collect: log
> **Note:**
@ -73,14 +73,17 @@ Parameters of restarting the task by interface
## Environment to prepare
Some AWS configuration is required, modify a field in file `common.properties`
Some AWS configuration is required, modify a field in file `aws.yaml`
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=<AWS REGION>
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
endpoint: <endpoint>
@ -35,14 +35,17 @@ The task plugin are shown as follows:
## Environment to prepare
Some AWS configuration is required, modify a field in file `common.properties`
Some AWS configuration is required, modify a field in file `aws.yaml`
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=<AWS REGION>
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
endpoint: <endpoint>
@ -26,77 +26,35 @@ Dolphinscheduler 资源中心使用本地系统默认是开启的,不需要用
## 对接AWS S3
如果需要使用到资源中心的 S3 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下:
如果需要使用到资源中心的 S3 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties`, `api-server/conf/aws.yaml` 和 `worker-server/conf/common.properties`, `worker-server/conf/aws.yaml`。可参考如下:
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s4. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
## 对接分布式或远端对象存储
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
bucket.name: <bucket.name>
endpoint: <endpoint>
### 配置 common.properties 文件
## 对接阿里云 OSS
在 3.0.0-alpha 版本之后,如果需要使用到资源中心的 HDFS 或 S3 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下:
如果需要使用到资源中心的 OSS 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# user data local directory path, please make sure the directory exists and have read write permissions
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS
# resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
@ -108,87 +66,22 @@ resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
# alibaba cloud access key id, required if you set resource.storage.type=OBS
## 对接华为云 OBS
如果需要使用到资源中心的 OBS 上传资源,我们需要对以下路径的进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`。可参考如下:
# access key id, required if you set resource.storage.type=OBS
# alibaba cloud access key secret, required if you set resource.storage.type=OBS
# access key secret, required if you set resource.storage.type=OBS
# oss bucket name, required if you set resource.storage.type=OBS
# oss bucket endpoint, required if you set resource.storage.type=OBS
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
# whether to startup kerberos
# java.security.krb5.conf path
# login user from keytab username
# login user from keytab path
# kerberos expire time, the unit is hour
# resource view suffixs
# resourcemanager port, the default value is 8088 if not specified
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value;
# If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
# datasource encryption enable
# datasource encryption salt
# data quality jar directory path, it would auto discovery data quality jar from this given dir. You should keep it empty if you do not change anything in
# data-quality, it will auto discovery by dolphinscheduler itself. Change it only if you want to use your own data-quality jar and it is not in worker-server
# libs directory(but may sure your jar name start with `dolphinscheduler-data-quality`).
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions;
# if set false, executing user is the deploy user and doesn't need sudo permissions
# network interface preferred like eth0, default: empty
# network IP gets priority, default: inner outer
# system env path
# development state
# rpc port
# way to collect applicationId: log(original regex match), aop
appId.collect: log
> **注意**:
@ -73,14 +73,17 @@ DolphinScheduler 在 启动DMS 任务后,会跟中DMS任务状态,直至DMS
## 环境配置
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=<AWS REGION>
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
endpoint: <endpoint>
@ -33,14 +33,17 @@ DolphinScheduler SageMaker 组件的功能:
## 环境配置
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=<YOUR AWS ACCESS KEY>
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=<YOUR AWS SECRET KEY>
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=<AWS REGION>
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: <access.key.id>
access.key.secret: <access.key.secret>
region: <region>
endpoint: <endpoint>
@ -56,6 +56,13 @@
@ -57,6 +57,13 @@
@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ See the License for the specific language governing permissions and
~ limitations under the License.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -0,0 +1,74 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import static org.apache.dolphinscheduler.authentication.aws.AwsConfigurationKeys.AWS_AUTHENTICATION_TYPE;
import java.util.Map;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
public class AWSCredentialsProviderFactor {
public static AWSCredentialsProvider credentialsProvider(Map<String, String> awsProperties) {
String awsAuthenticationType = awsProperties.getOrDefault(
AWSCredentialsProviderType awsCredentialsProviderType =
if (awsCredentialsProviderType == null) {
throw new IllegalArgumentException(
"The aws.credentials.provider.type: " + awsAuthenticationType + " is invalidated");
switch (awsCredentialsProviderType) {
return createAWSStaticCredentialsProvider(awsProperties);
return createInstanceProfileCredentialsProvider();
throw new IllegalArgumentException(
"The aws.credentials.provider.type: " + awsAuthenticationType + " is invalidated");
private static AWSCredentialsProvider createAWSStaticCredentialsProvider(Map<String, String> awsProperties) {
String awsAccessKeyId = awsProperties.get(AwsConfigurationKeys.AWS_ACCESS_KEY_ID);
String awsSecretAccessKey = awsProperties.get(AwsConfigurationKeys.AWS_SECRET);
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
AWSStaticCredentialsProvider awsStaticCredentialsProvider =
new AWSStaticCredentialsProvider(basicAWSCredentials);
log.info("AWSStaticCredentialsProvider created successfully");
return awsStaticCredentialsProvider;
private static AWSCredentialsProvider createInstanceProfileCredentialsProvider() {
InstanceProfileCredentialsProvider instanceProfileCredentialsProvider =
log.info("InstanceProfileCredentialsProvider created successfully");
return instanceProfileCredentialsProvider;
@ -0,0 +1,49 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import java.util.Optional;
import lombok.Getter;
public enum AWSCredentialsProviderType {
private final String name;
AWSCredentialsProviderType(String name) {
this.name = name;
public static Optional<AWSCredentialsProviderType> of(String name) {
if (name == null) {
return Optional.empty();
for (AWSCredentialsProviderType type : values()) {
if (type.getName().equalsIgnoreCase(name)) {
return Optional.of(type);
return Optional.empty();
@ -0,0 +1,53 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import java.util.Map;
import lombok.experimental.UtilityClass;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
public class AWSDatabaseMigrationServiceClientFactory {
public AWSDatabaseMigrationService createAWSDatabaseMigrationServiceClient(Map<String, String> awsProperties) {
AWSCredentialsProvider awsCredentialsProvider = AWSCredentialsProviderFactor.credentialsProvider(awsProperties);
Regions regions = Regions.fromName(awsProperties.get(AwsConfigurationKeys.AWS_REGION));
String endpoint = awsProperties.get(AwsConfigurationKeys.AWS_ENDPOINT);
if (endpoint != null && !endpoint.isEmpty()) {
return AWSDatabaseMigrationServiceClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regions.getName()))
} else {
return AWSDatabaseMigrationServiceClientBuilder
@ -0,0 +1,53 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import java.util.Map;
import lombok.experimental.UtilityClass;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
public class AmazonElasticMapReduceClientFactory {
public AmazonElasticMapReduce createAmazonElasticMapReduceClient(Map<String, String> awsProperties) {
AWSCredentialsProvider awsCredentialsProvider = AWSCredentialsProviderFactor.credentialsProvider(awsProperties);
Regions regions = Regions.fromName(awsProperties.get(AwsConfigurationKeys.AWS_REGION));
String endpoint = awsProperties.get(AwsConfigurationKeys.AWS_ENDPOINT);
if (endpoint != null && !endpoint.isEmpty()) {
return AmazonElasticMapReduceClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regions.getName()))
} else {
return AmazonElasticMapReduceClientBuilder
@ -0,0 +1,54 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import java.util.Map;
import lombok.experimental.UtilityClass;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
public class AmazonS3ClientFactory {
public AmazonS3 createAmazonS3Client(Map<String, String> awsProperties) {
AWSCredentialsProvider awsCredentialsProvider = AWSCredentialsProviderFactor.credentialsProvider(awsProperties);
Regions regions = Regions.fromName(awsProperties.get(AwsConfigurationKeys.AWS_REGION));
String endpoint = awsProperties.get(AwsConfigurationKeys.AWS_ENDPOINT);
if (endpoint != null && !endpoint.isEmpty()) {
return AmazonS3ClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regions.getName()))
} else {
return AmazonS3ClientBuilder
@ -0,0 +1,53 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import java.util.Map;
import lombok.experimental.UtilityClass;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sagemaker.AmazonSageMaker;
import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
public class AmazonSageMakerClientFactory {
public AmazonSageMaker createAmazonSageMakerClient(Map<String, String> awsProperties) {
AWSCredentialsProvider awsCredentialsProvider = AWSCredentialsProviderFactor.credentialsProvider(awsProperties);
Regions regions = Regions.fromName(awsProperties.get(AwsConfigurationKeys.AWS_REGION));
String endpoint = awsProperties.get(AwsConfigurationKeys.AWS_ENDPOINT);
if (endpoint != null && !endpoint.isEmpty()) {
return AmazonSageMakerClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, regions.getName()))
} else {
return AmazonSageMakerClientBuilder
@ -0,0 +1,28 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
public class AwsConfigurationKeys {
public static final String AWS_AUTHENTICATION_TYPE = "credentials.provider.type";
public static final String AWS_REGION = "region";
public static final String AWS_ENDPOINT = "endpoint";
public static final String AWS_ACCESS_KEY_ID = "access.key.id";
public static final String AWS_SECRET = "access.key.secret";
@ -0,0 +1,47 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.authentication.aws;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.datasync.DataSyncClient;
import java.util.Map;
import lombok.experimental.UtilityClass;
public class DataSyncClientFactory {
public DataSyncClient createDataSyncClient(Map<String, String> awsProperties) {
// todo: upgrade the version of aws sdk
String awsAccessKeyId = awsProperties.get(AwsConfigurationKeys.AWS_ACCESS_KEY_ID);
String awsSecretAccessKey = awsProperties.get(AwsConfigurationKeys.AWS_SECRET);
final AwsBasicCredentials basicAWSCredentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretAccessKey);
final AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials);
// create a datasync client
return DataSyncClient.builder()
@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: accessKey123
access.key.secret: secretKey123
region: us-east-1
bucket.name: dolphinscheduler
endpoint: http://s3:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
Normal file
Normal file
@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ See the License for the specific language governing permissions and
~ limitations under the License.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -41,6 +41,10 @@
<!-- dolphinscheduler -->
@ -36,6 +36,7 @@ public final class Constants {
public static final String COMMON_PROPERTIES_PATH = "/common.properties";
public static final String REMOTE_LOGGING_YAML_PATH = "/remote-logging.yaml";
public static final String AWS_YAML_PATH = "/aws.yaml";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
@ -130,8 +131,7 @@ public final class Constants {
public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type";
public static final String AWS_S3_BUCKET_NAME = "resource.aws.s3.bucket.name";
public static final String AWS_END_POINT = "resource.aws.s3.endpoint";
public static final String AWS_S3_BUCKET_NAME = "aws.s3.bucket.name";
public static final String ALIBABA_CLOUD_OSS_BUCKET_NAME = "resource.alibaba.cloud.oss.bucket.name";
public static final String ALIBABA_CLOUD_OSS_END_POINT = "resource.alibaba.cloud.oss.endpoint";
@ -704,19 +704,8 @@ public final class Constants {
public static final String REMOTE_LOGGING_OSS_ENDPOINT = "remote.logging.oss.endpoint";
* remote logging for S3
public static final String REMOTE_LOGGING_S3_ACCESS_KEY_ID = "remote.logging.s3.access.key.id";
public static final String REMOTE_LOGGING_S3_ACCESS_KEY_SECRET = "remote.logging.s3.access.key.secret";
public static final String REMOTE_LOGGING_S3_BUCKET_NAME = "remote.logging.s3.bucket.name";
public static final String REMOTE_LOGGING_S3_ENDPOINT = "remote.logging.s3.endpoint";
public static final String REMOTE_LOGGING_S3_REGION = "remote.logging.s3.region";
* remote logging for GCS
@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.log.remote;
import org.apache.dolphinscheduler.authentication.aws.AmazonS3ClientFactory;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@ -26,41 +27,25 @@ import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
public class S3RemoteLogHandler implements RemoteLogHandler, Closeable {
private String accessKeyId;
private final String bucketName;
private String accessKeySecret;
private String region;
private String bucketName;
private String endPoint;
private AmazonS3 s3Client;
private final AmazonS3 s3Client;
private static S3RemoteLogHandler instance;
private S3RemoteLogHandler() {
accessKeyId = readAccessKeyID();
accessKeySecret = readAccessKeySecret();
region = readRegion();
bucketName = readBucketName();
endPoint = readEndPoint();
s3Client = buildS3Client();
@ -74,23 +59,8 @@ public class S3RemoteLogHandler implements RemoteLogHandler, Closeable {
protected AmazonS3 buildS3Client() {
if (StringUtils.isNotEmpty(endPoint)) {
return AmazonS3ClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
endPoint, Regions.fromName(region).getName()))
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
} else {
return AmazonS3ClientBuilder
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
Map<String, String> awsProperties = PropertyUtils.getByPrefix("aws.s3.", "");
return AmazonS3ClientFactory.createAmazonS3Client(awsProperties);
@ -131,24 +101,8 @@ public class S3RemoteLogHandler implements RemoteLogHandler, Closeable {
protected String readAccessKeyID() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_ID);
protected String readAccessKeySecret() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_SECRET);
protected String readRegion() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_REGION);
protected String readBucketName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_BUCKET_NAME);
protected String readEndPoint() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ENDPOINT);
return PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
public void checkBucketNameExists(String bucketName) {
@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.constants.Constants.AWS_YAML_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.REMOTE_LOGGING_YAML_PATH;
@ -42,7 +43,7 @@ public class PropertyUtils {
private final ImmutablePriorityPropertyDelegate propertyDelegate =
new ImmutablePriorityPropertyDelegate(
new ImmutablePropertyDelegate(COMMON_PROPERTIES_PATH),
new ImmutableYamlDelegate(REMOTE_LOGGING_YAML_PATH));
public static String getString(String key) {
return propertyDelegate.get(key.trim());
@ -106,6 +107,19 @@ public class PropertyUtils {
return matchedProperties;
* Get all properties with specified prefix, like: fs., will replace the prefix with newPrefix
public static Map<String, String> getByPrefix(String prefix, String newPrefix) {
Map<String, String> matchedProperties = new HashMap<>();
for (String propName : propertyDelegate.getPropertyKeys()) {
if (propName.startsWith(prefix)) {
matchedProperties.put(propName.replace(prefix, newPrefix), propertyDelegate.get(propName));
return matchedProperties;
public static <T> Set<T> getSet(String key, Function<String, Set<T>> transformFunction, Set<T> defaultValue) {
return propertyDelegate.get(key, transformFunction, defaultValue);
@ -39,17 +39,6 @@ resource.azure.tenant.id=minioadmin
# The query interval
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
@ -188,16 +177,7 @@ remote.logging.oss.access.key.secret=<access.key.secret>
# oss endpoint, required if you set remote.logging.target=OSS
# s3 access key id, required if you set remote.logging.target=S3
# s3 access key secret, required if you set remote.logging.target=S3
# s3 bucket name, required if you set remote.logging.target=S3
# s3 endpoint, required if you set remote.logging.target=S3
# s3 region, required if you set remote.logging.target=S3
# the location of the google cloud credential, required if you set remote.logging.target=GCS
# gcs bucket name, required if you set remote.logging.target=GCS
@ -34,18 +34,6 @@ remote-logging:
bucket.name: <bucket.name>
# oss endpoint, required if you set remote-logging.target=OSS
endpoint: <endpoint>
# required if you set remote-logging.target=S3
# s3 access key id, required if you set remote-logging.target=S3
access.key.id: <access.key.id>
# s3 access key secret, required if you set remote-logging.target=S3
access.key.secret: <access.key.secret>
# s3 bucket name, required if you set remote-logging.target=S3
bucket.name: <bucket.name>
# s3 endpoint, required if you set remote-logging.target=S3
endpoint: <endpoint>
# s3 region, required if you set remote-logging.target=S3
region: <region>
# the location of the google cloud credential, required if you set remote-logging.target=GCS
credential: /path/to/credential
@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.common.utils;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -48,4 +51,14 @@ public class PropertyUtilsTest {
}, Sets.newHashSet("docker0"));
Assertions.assertEquals(Sets.newHashSet("docker0"), networkInterface);
void getByPrefix() {
Map<String, String> awsProperties = PropertyUtils.getByPrefix("resource.aws.", "");
assertThat(awsProperties).containsEntry("access.key.id", "minioadmin");
assertThat(awsProperties).containsEntry("secret.access.key", "minioadmin");
assertThat(awsProperties).containsEntry("region", "cn-north-1");
assertThat(awsProperties).containsEntry("s3.bucket.name", "dolphinscheduler");
assertThat(awsProperties).containsEntry("endpoint", "http://localhost:9000");
@ -45,6 +45,10 @@ resource.azure.tenant.id=minioadmin
# The query interval
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
@ -54,7 +58,7 @@ resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: accessKey123
access.key.secret: secretKey123
region: us-east-1
bucket.name: dolphinscheduler
endpoint: http://s3:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
# The AWS credentials provider type. support: AWSStaticCredentialsProvider, InstanceProfileCredentialsProvider
# AWSStaticCredentialsProvider: use the access key and secret key to authenticate
# InstanceProfileCredentialsProvider: use the IAM role to authenticate
credentials.provider.type: AWSStaticCredentialsProvider
access.key.id: minioadmin
access.key.secret: minioadmin
region: cn-north-1
endpoint: http://localhost:9000
@ -37,17 +37,6 @@ resource.azure.tenant.id=minioadmin
# The query interval
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
@ -34,6 +34,7 @@ services:
retries: 120
- ./common.properties:/opt/dolphinscheduler/conf/common.properties
- ./aws.yaml:/opt/dolphinscheduler/conf/aws.yaml
condition: service_healthy
@ -56,6 +56,13 @@
@ -86,6 +86,13 @@
@ -17,19 +17,18 @@
package org.apache.dolphinscheduler.plugin.storage.s3;
import static org.apache.dolphinscheduler.common.constants.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF;
import org.apache.dolphinscheduler.authentication.aws.AmazonS3ClientFactory;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.lang3.StringUtils;
@ -57,11 +56,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
@ -79,71 +74,27 @@ import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
public class S3StorageOperator implements Closeable, StorageOperate {
private String accessKeyId;
private String accessKeySecret;
private String region;
private String bucketName;
private String endPoint;
private AmazonS3 s3Client;
public S3StorageOperator() {
public void init() {
accessKeyId = readAccessKeyID();
accessKeySecret = readAccessKeySecret();
region = readRegion();
bucketName = readBucketName();
endPoint = readEndPoint();
s3Client = buildS3Client();
protected AmazonS3 buildS3Client() {
if (!StringUtils.isEmpty(endPoint)) {
return AmazonS3ClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
endPoint, region))
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
} else {
return AmazonS3ClientBuilder
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
protected String readAccessKeyID() {
return PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
protected String readAccessKeySecret() {
return PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
protected String readRegion() {
return PropertyUtils.getString(TaskConstants.AWS_REGION);
return AmazonS3ClientFactory.createAmazonS3Client(PropertyUtils.getByPrefix("aws.s3.", ""));
protected String readBucketName() {
return PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
protected String readEndPoint() {
return PropertyUtils.getString(AWS_END_POINT);
public void close() throws IOException {
@ -78,13 +78,7 @@ public class S3StorageOperatorTest {
public void setUp() throws Exception {
s3StorageOperator = Mockito.spy(new S3StorageOperator());
@ -96,9 +90,6 @@ public class S3StorageOperatorTest {
public void testInit() {
verify(s3StorageOperator, times(1)).buildS3Client();
Assertions.assertEquals(ACCESS_KEY_ID_MOCK, s3StorageOperator.getAccessKeyId());
Assertions.assertEquals(ACCESS_KEY_SECRET_MOCK, s3StorageOperator.getAccessKeySecret());
Assertions.assertEquals(REGION_MOCK, s3StorageOperator.getRegion());
Assertions.assertEquals(BUCKET_NAME_MOCK, s3StorageOperator.getBucketName());
@ -26,17 +26,6 @@ resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
# alibaba cloud access key id, required if you set resource.storage.type=OSS
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
@ -31,22 +31,18 @@
<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/datasync -->
@ -17,13 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.datasync;
import org.apache.dolphinscheduler.authentication.aws.DataSyncClientFactory;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.datasync.DataSyncClient;
import software.amazon.awssdk.services.datasync.model.CancelTaskExecutionRequest;
import software.amazon.awssdk.services.datasync.model.CancelTaskExecutionResponse;
@ -48,6 +44,7 @@ import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Data;
@ -73,16 +70,8 @@ public class DatasyncHook {
protected static DataSyncClient createClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
final AwsBasicCredentials basicAWSCredentials = AwsBasicCredentials.create(awsAccessKeyId, awsSecretAccessKey);
final AwsCredentialsProvider awsCredentialsProvider = StaticCredentialsProvider.create(basicAWSCredentials);
// create a datasync client
return DataSyncClient.builder().region(Region.of(awsRegion)).credentialsProvider(awsCredentialsProvider)
Map<String, String> awsProperties = PropertyUtils.getByPrefix("aws.datasync.", "");
return DataSyncClientFactory.createDataSyncClient(awsProperties);
public Boolean createDatasyncTask(DatasyncParameters parameters) {
@ -25,27 +25,23 @@
@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.dms;
import org.apache.dolphinscheduler.authentication.aws.AWSDatabaseMigrationServiceClientFactory;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.commons.io.IOUtils;
@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -37,11 +38,7 @@ import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationService;
import com.amazonaws.services.databasemigrationservice.AWSDatabaseMigrationServiceClientBuilder;
import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskRequest;
import com.amazonaws.services.databasemigrationservice.model.CreateReplicationTaskResult;
import com.amazonaws.services.databasemigrationservice.model.DeleteReplicationTaskRequest;
@ -87,17 +84,8 @@ public class DmsHook {
public static AWSDatabaseMigrationService createClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create a DMS client
return AWSDatabaseMigrationServiceClientBuilder.standard()
Map<String, String> awsProperties = PropertyUtils.getByPrefix("aws.dms.", "");
return AWSDatabaseMigrationServiceClientFactory.createAWSDatabaseMigrationServiceClient(awsProperties);
public Boolean createReplicationTask() throws Exception {
@ -28,19 +28,20 @@
@ -22,22 +22,19 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.authentication.aws.AmazonElasticMapReduceClientFactory;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import java.util.Map;
import java.util.TimeZone;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
@ -94,22 +91,8 @@ public abstract class AbstractEmrTask extends AbstractRemoteTask {
return emrParameters;
* create emr client from BasicAWSCredentials
* @return AmazonElasticMapReduce
protected AmazonElasticMapReduce createEmrClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create an EMR client
return AmazonElasticMapReduceClientBuilder.standard()
Map<String, String> awsProperties = PropertyUtils.getByPrefix("aws.emr.", "");
return AmazonElasticMapReduceClientFactory.createAmazonElasticMapReduceClient(awsProperties);
@ -31,26 +31,23 @@
@ -22,7 +22,9 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
import org.apache.dolphinscheduler.authentication.aws.AmazonSageMakerClientFactory;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.sagemaker.param.SagemakerConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
@ -41,11 +43,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sagemaker.AmazonSageMaker;
import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
@ -186,16 +184,8 @@ public class SagemakerTask extends AbstractRemoteTask {
protected AmazonSageMaker createClient() {
final String awsAccessKeyId = parameters.getUsername();
final String awsSecretAccessKey = parameters.getPassword();
final String awsRegion = parameters.getAwsRegion();
final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId, awsSecretAccessKey);
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create a SageMaker client
return AmazonSageMakerClientBuilder.standard()
Map<String, String> awsProperties = PropertyUtils.getByPrefix("aws.sagemaker.", "");
return AmazonSageMakerClientFactory.createAmazonSageMakerClient(awsProperties);
@ -57,6 +57,13 @@
@ -57,6 +57,7 @@
@ -294,6 +295,12 @@
Reference in New Issue
Block a user