mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 03:08:01 +08:00
fix procedure task param pass (#11919)
This commit is contained in:
parent
009cb68011
commit
0deafba12c
@ -2,27 +2,39 @@
|
||||
|
||||
- Execute the stored procedure according to the selected DataSource.
|
||||
|
||||
> Drag from the toolbar ![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png) task node into the canvas, as shown in the figure below:
|
||||
> Drag from the `PROCEDURE` task node into the canvas, as shown in the figure below:
|
||||
|
||||
<p align="center">
|
||||
<img src="../../../../img/procedure-en.png" width="80%" />
|
||||
<img src="../../../../img/procedure_edit.png" width="80%" />
|
||||
</p>
|
||||
|
||||
## Task Parameters
|
||||
|
||||
| **Parameter** | **Description** |
|
||||
| ------- | ---------- |
|
||||
| Node Name | Set the name of the task. Node names within a workflow definition are unique. |
|
||||
| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. |
|
||||
| Description | Describes the function of this node. |
|
||||
| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. |
|
||||
| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. |
|
||||
| Task group name | The group in Resources, if not configured, it will not be used. |
|
||||
| Environment Name | Configure the environment in which to run the script. |
|
||||
| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. |
|
||||
| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. |
|
||||
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
|
||||
| DataSource | The DataSource type of the stored procedure supports MySQL and POSTGRESQL, select the corresponding DataSource. |
|
||||
| Method | The method name of the stored procedure. |
|
||||
| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. |
|
||||
| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. |
|
||||
| **Parameter** | **Description** |
|
||||
|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Node Name | Set the name of the task. Node names within a workflow definition are unique. |
|
||||
| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. |
|
||||
| Description | Describes the function of this node. |
|
||||
| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. |
|
||||
| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. |
|
||||
| Task group name | The group in Resources, if not configured, it will not be used. |
|
||||
| Environment Name | Configure the environment in which to run the script. |
|
||||
| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. |
|
||||
| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. |
|
||||
| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. |
|
||||
| DataSource | The DataSource type of the stored procedure supports MySQL, POSTGRESQL, ORACLE. |
|
||||
| SQL Statement | call a stored procedure, such as `call test(${in1},${out1});`. |
|
||||
| Custom parameters | The custom parameter types of the stored procedure support `IN` and `OUT`, and the data types support: VARCHAR, INTEGER, LONG, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and BOOLEAN. |
|
||||
| Predecessor task | Selecting the predecessor task of the current task will set the selected predecessor task as the upstream of the current task. |
|
||||
|
||||
## Remark
|
||||
|
||||
- Prepare: Create a stored procedure in the database, such as:
|
||||
|
||||
```
|
||||
CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
|
||||
begin
|
||||
set out1=in1;
|
||||
END
|
||||
```
|
||||
|
||||
|
@ -1,12 +1,23 @@
|
||||
# 存储过程节点
|
||||
|
||||
- 根据选择的数据源,执行存储过程。
|
||||
> 拖动工具栏中的![PNG](https://analysys.github.io/easyscheduler_docs_cn/images/toolbar_PROCEDURE.png)任务节点到画板中,如下图所示:
|
||||
|
||||
> 拖动工具栏中的`PROCEDURE`任务节点到画板中,如下图所示:
|
||||
|
||||
<p align="center">
|
||||
<img src="../../../../img/procedure_edit.png" width="80%" />
|
||||
</p>
|
||||
|
||||
- 数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
|
||||
- 方法:是存储过程的方法名称
|
||||
- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型
|
||||
- 前提:在该数据库里面创建存储过程,如:
|
||||
|
||||
```
|
||||
CREATE PROCEDURE dolphinscheduler.test(in in1 INT, out out1 INT)
|
||||
begin
|
||||
set out1=in1;
|
||||
END
|
||||
```
|
||||
|
||||
- 数据源:存储过程的数据源类型支持MySQL、POSTGRESQL、ORACLE,选择对应的数据源
|
||||
- SQL Statement:调用存储过程,如 `call test(${in1},${out1});`;
|
||||
- 自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型;
|
||||
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 162 KiB |
Binary file not shown.
Before Width: | Height: | Size: 49 KiB After Width: | Height: | Size: 113 KiB |
@ -21,17 +21,8 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
|
||||
import org.apache.dolphinscheduler.spi.utils.StringUtils;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.regex.Matcher;
|
||||
@ -49,7 +40,8 @@ public abstract class AbstractTask {
|
||||
|
||||
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
|
||||
protected final Logger logger =
|
||||
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
|
||||
|
||||
public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
|
||||
|
||||
@ -226,7 +218,11 @@ public abstract class AbstractTask {
|
||||
* @param paramsPropsMap params props map
|
||||
*/
|
||||
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
|
||||
Map<String, Property> paramsPropsMap,int taskInstanceId) {
|
||||
Map<String, Property> paramsPropsMap, int taskInstanceId) {
|
||||
if (paramsPropsMap == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Pattern pattern = Pattern.compile(rgex);
|
||||
Matcher m = pattern.matcher(content);
|
||||
int index = 1;
|
||||
@ -236,14 +232,18 @@ public abstract class AbstractTask {
|
||||
Property prop = paramsPropsMap.get(paramName);
|
||||
|
||||
if (prop == null) {
|
||||
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
|
||||
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
|
||||
logger.error(
|
||||
"setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
|
||||
+ " with id: {}. So couldn't put Property in sqlParamsMap.",
|
||||
paramName, taskInstanceId);
|
||||
} else {
|
||||
sqlParamsMap.put(index, prop);
|
||||
index++;
|
||||
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
|
||||
logger.info(
|
||||
"setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.",
|
||||
paramName, content);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,9 @@
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.procedure;
|
||||
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
|
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
|
||||
@ -42,8 +45,7 @@ import java.sql.Types;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
|
||||
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* procedure task
|
||||
@ -74,14 +76,16 @@ public class ProcedureTask extends AbstractTask {
|
||||
|
||||
logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
|
||||
|
||||
this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
|
||||
this.procedureParameters =
|
||||
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
|
||||
|
||||
// check parameters
|
||||
if (!procedureParameters.checkParameters()) {
|
||||
throw new RuntimeException("procedure task params is not valid");
|
||||
}
|
||||
|
||||
procedureTaskExecutionContext = procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
|
||||
procedureTaskExecutionContext =
|
||||
procedureParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -98,13 +102,22 @@ public class ProcedureTask extends AbstractTask {
|
||||
// load class
|
||||
DbType dbType = DbType.valueOf(procedureParameters.getType());
|
||||
// get datasource
|
||||
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
|
||||
procedureTaskExecutionContext.getConnectionParams());
|
||||
ConnectionParam connectionParam =
|
||||
DataSourceUtils.buildConnectionParams(DbType.valueOf(procedureParameters.getType()),
|
||||
procedureTaskExecutionContext.getConnectionParams());
|
||||
|
||||
// get jdbc connection
|
||||
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
|
||||
|
||||
Map<Integer, Property> sqlParamsMap = new HashMap<>();
|
||||
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
|
||||
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()
|
||||
: taskExecutionContext.getPrepareParamsMap();
|
||||
if (procedureParameters.getOutProperty() != null) {
|
||||
// set out params before format sql
|
||||
paramsMap.putAll(procedureParameters.getOutProperty());
|
||||
}
|
||||
|
||||
// format sql
|
||||
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
|
||||
// call method
|
||||
stmt = connection.prepareCall(proceduerSql);
|
||||
@ -137,7 +150,8 @@ public class ProcedureTask extends AbstractTask {
|
||||
|
||||
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {
|
||||
// combining local and global parameters
|
||||
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
|
||||
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap,
|
||||
taskExecutionContext.getTaskInstanceId());
|
||||
return procedureParameters.getMethod().replaceAll(rgex, "?");
|
||||
}
|
||||
|
||||
@ -168,8 +182,8 @@ public class ProcedureTask extends AbstractTask {
|
||||
* @return outParameterMap
|
||||
* @throws Exception Exception
|
||||
*/
|
||||
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap
|
||||
, Map<String, Property> totalParamsMap) throws Exception {
|
||||
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt, Map<Integer, Property> paramsMap,
|
||||
Map<String, Property> totalParamsMap) throws Exception {
|
||||
Map<Integer, Property> outParameterMap = new HashMap<>();
|
||||
if (procedureParameters.getLocalParametersMap() == null) {
|
||||
return outParameterMap;
|
||||
@ -180,7 +194,8 @@ public class ProcedureTask extends AbstractTask {
|
||||
for (Map.Entry<Integer, Property> entry : paramsMap.entrySet()) {
|
||||
Property property = entry.getValue();
|
||||
if (property.getDirect().equals(Direct.IN)) {
|
||||
ParameterUtils.setInParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
|
||||
ParameterUtils.setInParameter(index, stmt, property.getType(),
|
||||
totalParamsMap.get(property.getProp()).getValue());
|
||||
} else if (property.getDirect().equals(Direct.OUT)) {
|
||||
setOutParameter(index, stmt, property.getType(), totalParamsMap.get(property.getProp()).getValue());
|
||||
outParameterMap.put(index, property);
|
||||
@ -237,7 +252,8 @@ public class ProcedureTask extends AbstractTask {
|
||||
* @param dataType dataType
|
||||
* @throws SQLException SQLException
|
||||
*/
|
||||
private Object getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
|
||||
private Object getOutputParameter(CallableStatement stmt, int index, String prop,
|
||||
DataType dataType) throws SQLException {
|
||||
Object value = null;
|
||||
switch (dataType) {
|
||||
case VARCHAR:
|
||||
|
Loading…
Reference in New Issue
Block a user