diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index b0a5625341..b6ad565894 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,6 +17,10 @@ package org.apache.dolphinscheduler.server.worker.runner; +import static java.util.Calendar.DAY_OF_MONTH; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -163,6 +167,8 @@ public class TaskExecuteThread implements Runnable, Delayed { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); + preBuildBusinessParams(); + TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); @@ -359,4 +365,21 @@ public class TaskExecuteThread implements Runnable, Delayed { } return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } + + private void preBuildBusinessParams() { + Map paramsMap = new HashMap<>(); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (taskExecutionContext.getScheduleTime() != null) { + Date date = taskExecutionContext.getScheduleTime(); + if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) { + date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1); + } + String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_DATETIME); + paramsMap.put(Constants.PARAMETER_DATETIME, p); + } + taskExecutionContext.setParamsMap(paramsMap); + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 03462b8c49..01457da642 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.python; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.util.MapUtils; import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.TaskConstants; @@ -27,6 +28,8 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +import java.util.HashMap; import java.util.Map; /** @@ -51,7 +54,6 @@ public class PythonTask extends AbstractTaskExecutor { private TaskRequest taskRequest; - /** * constructor * @@ -101,7 +103,7 @@ public class PythonTask extends AbstractTaskExecutor { } catch (Exception e) { logger.error("python task failure", e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); - throw new TaskException("run python task error",e); + throw new TaskException("run python task error", e); } } @@ -150,6 +152,7 @@ public class PythonTask extends AbstractTaskExecutor { /** * build command + * * @return raw python script * @throws Exception exception */ @@ -157,10 +160,14 @@ public class PythonTask extends AbstractTaskExecutor { String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); // replace placeholder - Map paramsMap = ParamUtils.convert(taskRequest,pythonParameters); - if (paramsMap != null){ - rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); + Map paramsMap = ParamUtils.convert(taskRequest, pythonParameters); + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); } + if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) { + paramsMap.putAll(taskRequest.getParamsMap()); + } + rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); logger.info("raw python script : {}", pythonParameters.getRawScript()); logger.info("task dir : {}", taskDir); @@ -168,5 +175,4 @@ public class PythonTask extends AbstractTaskExecutor { return rawPythonScript; } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index ea6de203f6..a5f43762fc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; +import org.apache.dolphinscheduler.plugin.task.util.MapUtils; import org.apache.dolphinscheduler.spi.task.AbstractParameters; import org.apache.dolphinscheduler.spi.task.Property; import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; @@ -27,6 +28,7 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; import org.apache.dolphinscheduler.spi.task.request.SqoopTaskRequest; import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import java.util.HashMap; import java.util.Map; /** @@ -53,7 +55,7 @@ public class SqoopTask extends AbstractYarnTask { public void init() { logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams()); sqoopParameters = - JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class); + JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class); //check sqoop task params if (null == sqoopParameters) { throw new IllegalArgumentException("Sqoop Task params is null"); @@ -73,13 +75,17 @@ public class SqoopTask extends AbstractYarnTask { // combining local and global parameters Map paramsMap = ParamUtils.convert(sqoopTaskExecutionContext, getParameters()); - if (paramsMap != null) { - String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); - logger.info("sqoop script: {}", resultScripts); - return resultScripts; + if (MapUtils.isEmpty(paramsMap)) { + paramsMap = new HashMap<>(); + } + if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) { + paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap()); } - return null; + String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + logger.info("sqoop script: {}", resultScripts); + return resultScripts; + } @Override