[FIX-#6007]Wrong complement date (#6026)

* [FIX-#6007]Wrong complement date

* [style]Wrong complement date
This commit is contained in:
linquan 2021-08-24 15:06:28 +08:00 committed by GitHub
parent 75f15df361
commit 7b8579310f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 122 additions and 43 deletions

View File

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
@ -221,6 +222,19 @@ public class TaskExecutionContext implements Serializable {
*/
private String varPool;
/**
* business param
*/
private Map<String, Property> paramsMap;
public Map<String, Property> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, Property> paramsMap) {
this.paramsMap = paramsMap;
}
/**
* procedure TaskExecutionContext
*/

View File

@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.server.worker.runner;
import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -153,6 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
// task init
task.init();
preBuildBusinessParams();
//init varPool
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
@ -182,6 +187,23 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
private void preBuildBusinessParams() {
Map<String, Property> paramsMap = new HashMap<>();
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
Date date = taskExecutionContext.getScheduleTime();
if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
}
String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_DATETIME);
paramsMap.put(Constants.PARAMETER_DATETIME, p);
}
taskExecutionContext.setParamsMap(paramsMap);
}
/**
* when task finish, clear execute path.
*/
@ -227,7 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
return globalParamsMap;
}
/**
* kill task
*/

View File

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
@ -56,6 +57,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -155,6 +157,12 @@ public class DataxTask extends AbstractTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);

View File

@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -81,12 +84,16 @@ public class FlinkTask extends AbstractYarnTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
}
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
logger.info("param args : {}", args);
flinkParameters.setMainArgs(args);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
@ -49,6 +50,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -138,6 +140,12 @@ public class HttpTask extends AbstractTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {

View File

@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -85,14 +88,18 @@ public class MapReduceTask extends AbstractYarnTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
}

View File

@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.commons.collections.MapUtils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@ -118,6 +121,12 @@ public class PythonTask extends AbstractTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
@ -125,10 +134,8 @@ public class PythonTask extends AbstractTask {
catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
if (paramsMap != null) {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);

View File

@ -17,14 +17,10 @@
package org.apache.dolphinscheduler.server.worker.task.shell;
import static java.util.Calendar.DAY_OF_MONTH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
@ -34,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.commons.collections.MapUtils;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@ -41,7 +39,6 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -164,21 +161,11 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {
paramsMap = new HashMap<>();
}
Date date = taskExecutionContext.getScheduleTime();
if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
}
String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_DATETIME);
paramsMap.put(Constants.PARAMETER_DATETIME, p);
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}

View File

@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.commons.collections.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -110,12 +113,14 @@ public class SparkTask extends AbstractYarnTask {
// replace placeholder, and combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
String command = null;
if (null != paramsMap) {
command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
logger.info("spark task command: {}", command);

View File

@ -169,9 +169,15 @@ public class SqlTask extends AbstractTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// spell SQL according to the final user-defined variable
if (paramsMap == null) {
if (MapUtils.isEmpty(paramsMap)) {
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}

View File

@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.commons.collections.MapUtils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@ -74,8 +77,14 @@ public class SqoopTask extends AbstractYarnTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
}
if (paramsMap != null) {
if (MapUtils.isNotEmpty(paramsMap)) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
return resultScripts;