diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java index 383424f71a..d4a14d2bd7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java @@ -52,6 +52,8 @@ public class ParameterUtils { private static final String DATE_START_PATTERN = "^[0-9]"; + private static final char PARAM_REPLACE_CHAR = '?'; + private ParameterUtils() { throw new UnsupportedOperationException("Construct ParameterUtils"); } @@ -152,6 +154,64 @@ public class ParameterUtils { } } + public static String expandListParameter(Map params, String sql) { + Map expandMap = new HashMap<>(); + if (params == null || params.isEmpty()) { + return sql; + } + String[] split = sql.split("\\?"); + if (split.length == 0) { + return sql; + } + StringBuilder ret = new StringBuilder(split[0]); + int index = 1; + for (int i = 1; i < split.length; i++) { + Property property = params.get(i); + String value = property.getValue(); + if (DataType.LIST.equals(property.getType())) { + List valueList = JSONUtils.toList(value, Object.class); + if (valueList.isEmpty() && StringUtils.isNotBlank(value)) { + valueList.add(value); + } + for (int j = 0; j < valueList.size(); j++) { + ret.append(PARAM_REPLACE_CHAR); + if (j != valueList.size() - 1) { + ret.append(","); + } + } + for (Object v : valueList ) { + Property newProperty = new Property(); + if (v instanceof Integer) { + newProperty.setType(DataType.INTEGER); + } else if (v instanceof Long) { + newProperty.setType(DataType.LONG); + } else if (v instanceof Float) { + newProperty.setType(DataType.FLOAT); + } else if (v instanceof Double) { + newProperty.setType(DataType.DOUBLE); + } else { + newProperty.setType(DataType.VARCHAR); + } + newProperty.setValue(v.toString()); + newProperty.setProp(property.getProp()); + newProperty.setDirect(property.getDirect()); + expandMap.put(index++, newProperty); + } + } else { + ret.append(PARAM_REPLACE_CHAR); + expandMap.put(index++, property); + } + ret.append(split[i]); + } + if (PARAM_REPLACE_CHAR == sql.charAt(sql.length() - 1)) { + ret.append(PARAM_REPLACE_CHAR); + expandMap.put(index, params.get(split.length)); + } + params.clear(); + params.putAll(expandMap); + return ret.toString(); + } + /** * curing user define parameters * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtilsTest.java new file mode 100644 index 0000000000..ddee405935 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtilsTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.parser; + +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class ParameterUtilsTest { + + @Test + public void expandListParameter() { + Map params = new HashMap<>(); + params.put(1, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList("c1", "c2", "c3")))); + params.put(2, new Property(null, null, DataType.DATE, "2020-06-30")); + params.put(3, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList(3.1415, 2.44, 3.44)))); + String sql = ParameterUtils.expandListParameter(params, "select * from test where col1 in (?) and date=? and col2 in (?)"); + Assert.assertEquals("select * from test where col1 in (?,?,?) and date=? and col2 in (?,?,?)", sql); + Assert.assertEquals(7, params.size()); + + Map params2 = new HashMap<>(); + params2.put(1, new Property(null, null, DataType.LIST, JSONUtils.toJsonString(Lists.newArrayList("c1")))); + params2.put(2, new Property(null, null, DataType.DATE, "2020-06-30")); + String sql2 = ParameterUtils.expandListParameter(params2, "select * from test where col1 in (?) and date=?"); + Assert.assertEquals("select * from test where col1 in (?) and date=?", sql2); + Assert.assertEquals(2, params2.size()); + + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index c090cf9a30..16dfec861c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -369,7 +369,6 @@ public class SqlTask extends AbstractTaskExecutor { } catch (Exception exception) { throw new TaskException("SQL task prepareStatementAndBind error", exception); } - } /** @@ -429,9 +428,10 @@ public class SqlTask extends AbstractTaskExecutor { sql = replaceOriginalValue(sql, rgexo, paramsMap); // replace the ${} of the SQL statement with the Placeholder String formatSql = sql.replaceAll(rgex, "?"); + // Convert the list parameter + formatSql = ParameterUtils.expandListParameter(sqlParamsMap, formatSql); sqlBuilder.append(formatSql); - - // print repalce sql + // print replace sql printReplacedSql(sql, formatSql, rgex, sqlParamsMap); return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); }