cherry-pick [Fix][Data Quality]Dataquality Spark Get Datasource Password Error When Password Has $ #13643

This commit is contained in:
旺阳 2023-02-28 18:50:44 +08:00 committed by zhuangchong
parent e84a80f6d4
commit 76a1a6dfcb
5 changed files with 142 additions and 34 deletions

View File

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.data.quality.config.ValidateResult;
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchReader;
import org.apache.dolphinscheduler.data.quality.utils.ConfigUtils;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
@ -79,7 +80,7 @@ public class JdbcReader implements BatchReader {
.option(URL, config.getString(URL))
.option(DB_TABLE, config.getString(TABLE))
.option(USER, config.getString(USER))
.option(PASSWORD, config.getString(PASSWORD))
.option(PASSWORD, ParserUtils.decode(config.getString(PASSWORD)))
.option(DRIVER, config.getString(DRIVER));
Config jdbcConfig = ConfigUtils.extractSubConfig(config, JDBC + DOTS, false);

View File

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.data.quality.config.Config;
import org.apache.dolphinscheduler.data.quality.config.ValidateResult;
import org.apache.dolphinscheduler.data.quality.execution.SparkRuntimeEnvironment;
import org.apache.dolphinscheduler.data.quality.flow.batch.BatchWriter;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@ -75,13 +76,13 @@ public class JdbcWriter implements BatchWriter {
}
data.write()
.format(JDBC)
.option(DRIVER,config.getString(DRIVER))
.option(URL,config.getString(URL))
.option(DB_TABLE, config.getString(TABLE))
.option(USER, config.getString(USER))
.option(PASSWORD, config.getString(PASSWORD))
.mode(config.getString(SAVE_MODE))
.save();
.format(JDBC)
.option(DRIVER, config.getString(DRIVER))
.option(URL, config.getString(URL))
.option(DB_TABLE, config.getString(TABLE))
.option(USER, config.getString(USER))
.option(PASSWORD, ParserUtils.decode(config.getString(PASSWORD)))
.mode(config.getString(SAVE_MODE))
.save();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.URLDecoder;
import java.net.URLEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* ParserUtil
*/
@Slf4j
public class ParserUtils {
private ParserUtils() {
throw new UnsupportedOperationException("Construct ParserUtils");
}
public static String encode(String str) {
String rs = str;
try {
rs = URLEncoder.encode(str, UTF_8.toString());
} catch (Exception e) {
log.error("encode str exception!", e);
}
return rs;
}
public static String decode(String str) {
String rs = str;
try {
rs = URLDecoder.decode(str, UTF_8.toString());
} catch (Exception e) {
log.error("decode str exception!", e);
}
return rs;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.data.quality.utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ParserUtilsTest {
@Test
public void testParserUtils() {
String testStr = "aaa$bbb$ccc%ddd^eee#fff";
String encode = ParserUtils.encode(testStr);
String decode = ParserUtils.decode(encode);
Assertions.assertEquals(testStr, decode);
String blank = "";
Assertions.assertEquals(ParserUtils.encode(blank), blank);
Assertions.assertEquals(ParserUtils.decode(blank), blank);
Assertions.assertNull(ParserUtils.encode(null));
Assertions.assertNull(ParserUtils.decode(null));
}
}

View File

@ -60,6 +60,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConst
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.USER;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.data.quality.utils.ParserUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
@ -115,12 +116,14 @@ public class RuleParserUtils {
sourceBaseConfig.setType(dataQualityTaskExecutionContext.getSourceConnectorType());
Map<String,Object> config = new HashMap<>();
if (sourceDataSource != null) {
config.put(DATABASE,sourceDataSource.getDatabase());
config.put(TABLE,inputParameterValue.get(SRC_TABLE));
config.put(URL,DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getSourceType()),sourceDataSource));
config.put(USER,sourceDataSource.getUser());
config.put(PASSWORD,sourceDataSource.getPassword());
config.put(DRIVER, DataSourceUtils.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getSourceType())));
config.put(DATABASE, sourceDataSource.getDatabase());
config.put(TABLE, inputParameterValue.get(SRC_TABLE));
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getSourceType()),
sourceDataSource));
config.put(USER, sourceDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(sourceDataSource.getPassword()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getSourceType())));
String outputTable = sourceDataSource.getDatabase() + "_" + inputParameterValue.get(SRC_TABLE);
config.put(OUTPUT_TABLE,outputTable);
inputParameterValue.put(SRC_TABLE,outputTable);
@ -140,12 +143,14 @@ public class RuleParserUtils {
targetBaseConfig.setType(dataQualityTaskExecutionContext.getTargetConnectorType());
Map<String,Object> config = new HashMap<>();
if (targetDataSource != null) {
config.put(DATABASE,targetDataSource.getDatabase());
config.put(TABLE,inputParameterValue.get(TARGET_TABLE));
config.put(URL,DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getTargetType()),targetDataSource));
config.put(USER,targetDataSource.getUser());
config.put(PASSWORD,targetDataSource.getPassword());
config.put(DRIVER, DataSourceUtils.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getTargetType())));
config.put(DATABASE, targetDataSource.getDatabase());
config.put(TABLE, inputParameterValue.get(TARGET_TABLE));
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getTargetType()),
targetDataSource));
config.put(USER, targetDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(targetDataSource.getPassword()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getTargetType())));
String outputTable = targetDataSource.getDatabase() + "_" + inputParameterValue.get(TARGET_TABLE);
config.put(OUTPUT_TABLE,outputTable);
inputParameterValue.put(TARGET_TABLE,outputTable);
@ -266,13 +271,15 @@ public class RuleParserUtils {
Map<String,Object> config = new HashMap<>();
if (writerDataSource != null) {
config.put(DATABASE,writerDataSource.getDatabase());
config.put(TABLE,dataQualityTaskExecutionContext.getWriterTable());
config.put(URL,DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getWriterType()),writerDataSource));
config.put(USER,writerDataSource.getUser());
config.put(PASSWORD,writerDataSource.getPassword());
config.put(DRIVER, DataSourceUtils.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
config.put(SQL,sql);
config.put(DATABASE, writerDataSource.getDatabase());
config.put(TABLE, dataQualityTaskExecutionContext.getWriterTable());
config.put(URL, DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getWriterType()),
writerDataSource));
config.put(USER, writerDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(writerDataSource.getPassword()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
config.put(SQL, sql);
}
writerConfig.setConfig(config);
writerConfigList.add(writerConfig);
@ -332,12 +339,14 @@ public class RuleParserUtils {
Map<String,Object> config = new HashMap<>();
if (writerDataSource != null) {
config.put(DATABASE,writerDataSource.getDatabase());
config.put(TABLE,dataQualityTaskExecutionContext.getStatisticsValueTable());
config.put(URL,DataSourceUtils.getJdbcUrl(DbType.of(dataQualityTaskExecutionContext.getStatisticsValueType()),writerDataSource));
config.put(USER,writerDataSource.getUser());
config.put(PASSWORD,writerDataSource.getPassword());
config.put(DRIVER, DataSourceUtils.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
config.put(DATABASE, writerDataSource.getDatabase());
config.put(TABLE, dataQualityTaskExecutionContext.getStatisticsValueTable());
config.put(URL, DataSourceUtils.getJdbcUrl(
DbType.of(dataQualityTaskExecutionContext.getStatisticsValueType()), writerDataSource));
config.put(USER, writerDataSource.getUser());
config.put(PASSWORD, ParserUtils.encode(writerDataSource.getPassword()));
config.put(DRIVER, DataSourceUtils
.getDatasourceDriver(DbType.of(dataQualityTaskExecutionContext.getWriterType())));
}
baseConfig.setConfig(config);