[Feature-16132][Task Plugin] support for oceanbase datax task (#16281)

* Add support for oceanbase's datax task --feat

* fix code style

---------

Co-authored-by: xiangzihao <460888207@qq.com>
This commit is contained in:
xxsc0529 2024-07-16 09:27:05 +08:00 committed by GitHub
parent 8d1680d2f0
commit 1199a916b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 27 additions and 3 deletions

View File

@ -44,6 +44,8 @@ public abstract class BaseConnectionParam implements ConnectionParam {
protected String validationQuery; protected String validationQuery;
protected String compatibleMode;
protected Map<String, String> other; protected Map<String, String> other;
} }

View File

@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -420,7 +421,7 @@ public class DataxTask extends AbstractTask {
*/ */
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg,
String sql) { String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql); String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql, dataSourceCfg.getCompatibleMode());
if (columnNames == null || columnNames.length == 0) { if (columnNames == null || columnNames.length == 0) {
log.info("try to execute sql analysis query column name"); log.info("try to execute sql analysis query column name");
@ -440,11 +441,14 @@ public class DataxTask extends AbstractTask {
* @return column name array * @return column name array
* @throws RuntimeException if error throws RuntimeException * @throws RuntimeException if error throws RuntimeException
*/ */
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) { private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql, String compatibleMode) {
String[] columnNames; String[] columnNames;
try { try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
if (StringUtils.isNotBlank(compatibleMode)) {
parser = DataxUtils.getSqlStatementParser(compatibleMode, sql);
}
if (parser == null) { if (parser == null) {
log.warn("database driver [{}] is not support grammatical analysis sql", dbType); log.warn("database driver [{}] is not support grammatical analysis sql", dbType);
return new String[0]; return new String[0];

View File

@ -42,6 +42,8 @@ public class DataxUtils {
public static final String DATAX_READER_PLUGIN_RDBMS = "rdbmsreader"; public static final String DATAX_READER_PLUGIN_RDBMS = "rdbmsreader";
public static final String DATAX_READER_PLUGIN_OCEANBASE = "oceanbasev10reader";
public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter";
@ -55,6 +57,8 @@ public class DataxUtils {
public static final String DATAX_WRITER_PLUGIN_RDBMS = "rdbmswriter"; public static final String DATAX_WRITER_PLUGIN_RDBMS = "rdbmswriter";
public static final String DATAX_WRITER_PLUGIN_OCEANBASE = "oceanbasev10writer";
public static String getReaderPluginName(DbType dbType) { public static String getReaderPluginName(DbType dbType) {
switch (dbType) { switch (dbType) {
case MYSQL: case MYSQL:
@ -67,6 +71,8 @@ public class DataxUtils {
return DATAX_READER_PLUGIN_SQLSERVER; return DATAX_READER_PLUGIN_SQLSERVER;
case CLICKHOUSE: case CLICKHOUSE:
return DATAX_READER_PLUGIN_CLICKHOUSE; return DATAX_READER_PLUGIN_CLICKHOUSE;
case OCEANBASE:
return DATAX_READER_PLUGIN_OCEANBASE;
case HIVE: case HIVE:
case PRESTO: case PRESTO:
default: default:
@ -88,6 +94,8 @@ public class DataxUtils {
return DATAX_WRITER_PLUGIN_CLICKHOUSE; return DATAX_WRITER_PLUGIN_CLICKHOUSE;
case DATABEND: case DATABEND:
return DATAX_WRITER_PLUGIN_DATABEND; return DATAX_WRITER_PLUGIN_DATABEND;
case OCEANBASE:
return DATAX_WRITER_PLUGIN_OCEANBASE;
case HIVE: case HIVE:
case PRESTO: case PRESTO:
default: default:
@ -116,6 +124,13 @@ public class DataxUtils {
} }
} }
public static SQLStatementParser getSqlStatementParser(String compatibleMode, String sql) {
if (compatibleMode.toLowerCase().equals(DbType.ORACLE.getName())) {
return new OracleStatementParser(sql);
}
return new MySqlStatementParser(sql);
}
public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { public static String[] convertKeywordsColumns(DbType dbType, String[] columns) {
if (columns == null) { if (columns == null) {
return null; return null;
@ -152,6 +167,8 @@ public class DataxUtils {
return String.format("`%s`", column); return String.format("`%s`", column);
case DATABEND: case DATABEND:
return String.format("`%s`", column); return String.format("`%s`", column);
case OCEANBASE:
return String.format("`%s`", column);
default: default:
return column; return column;
} }

View File

@ -131,7 +131,8 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
'CLICKHOUSE', 'CLICKHOUSE',
'DATABEND', 'DATABEND',
'HIVE', 'HIVE',
'PRESTO' 'PRESTO',
'OCEANBASE'
] ]
onMounted(() => { onMounted(() => {
initConstants() initConstants()