mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 04:08:31 +08:00
Merge pull request #4743 from apache/params-trans
[Feature-3805][mater-worker-ui] The implement of passing variables between tasks
This commit is contained in:
commit
551759965d
@ -221,4 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
|
|||||||
@Param("processDefinitionId") int processDefinitionId,
|
@Param("processDefinitionId") int processDefinitionId,
|
||||||
@Param("states") int[] states);
|
@Param("states") int[] states);
|
||||||
|
|
||||||
|
int updateGlobalParamsById(
|
||||||
|
@Param("globalParams") String globalParams,
|
||||||
|
@Param("id") int id);
|
||||||
}
|
}
|
||||||
|
@ -66,4 +66,4 @@ spring.datasource.password=test
|
|||||||
|
|
||||||
# open PSCache, specify count PSCache for every connection
|
# open PSCache, specify count PSCache for every connection
|
||||||
#spring.datasource.poolPreparedStatements=true
|
#spring.datasource.poolPreparedStatements=true
|
||||||
#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
|
#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
|
@ -219,5 +219,9 @@
|
|||||||
</foreach>
|
</foreach>
|
||||||
order by id asc
|
order by id asc
|
||||||
</select>
|
</select>
|
||||||
|
<update id="updateGlobalParamsById">
|
||||||
|
update t_ds_process_instance
|
||||||
|
set global_params = #{globalParams}
|
||||||
|
where id = #{id}
|
||||||
|
</update>
|
||||||
</mapper>
|
</mapper>
|
||||||
|
@ -68,6 +68,10 @@ public class TaskExecuteResponseCommand implements Serializable {
|
|||||||
* varPool string
|
* varPool string
|
||||||
*/
|
*/
|
||||||
private String varPool;
|
private String varPool;
|
||||||
|
/**
|
||||||
|
* task return result
|
||||||
|
*/
|
||||||
|
private String result;
|
||||||
|
|
||||||
public void setVarPool(String varPool) {
|
public void setVarPool(String varPool) {
|
||||||
this.varPool = varPool;
|
this.varPool = varPool;
|
||||||
@ -139,4 +143,12 @@ public class TaskExecuteResponseCommand implements Serializable {
|
|||||||
+ ", appIds='" + appIds + '\''
|
+ ", appIds='" + appIds + '\''
|
||||||
+ '}';
|
+ '}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult(String result) {
|
||||||
|
this.result = result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
|
|||||||
responseCommand.getAppIds(),
|
responseCommand.getAppIds(),
|
||||||
responseCommand.getTaskInstanceId(),
|
responseCommand.getTaskInstanceId(),
|
||||||
responseCommand.getVarPool(),
|
responseCommand.getVarPool(),
|
||||||
channel);
|
channel,
|
||||||
|
responseCommand.getResult()
|
||||||
|
);
|
||||||
taskResponseService.addResponse(taskResponseEvent);
|
taskResponseService.addResponse(taskResponseEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,6 +92,10 @@ public class TaskResponseEvent {
|
|||||||
* channel
|
* channel
|
||||||
*/
|
*/
|
||||||
private Channel channel;
|
private Channel channel;
|
||||||
|
/**
|
||||||
|
* task return result
|
||||||
|
*/
|
||||||
|
private String result;
|
||||||
|
|
||||||
public static TaskResponseEvent newAck(ExecutionStatus state,
|
public static TaskResponseEvent newAck(ExecutionStatus state,
|
||||||
Date startTime,
|
Date startTime,
|
||||||
@ -118,7 +122,8 @@ public class TaskResponseEvent {
|
|||||||
String appIds,
|
String appIds,
|
||||||
int taskInstanceId,
|
int taskInstanceId,
|
||||||
String varPool,
|
String varPool,
|
||||||
Channel channel) {
|
Channel channel,
|
||||||
|
String result) {
|
||||||
TaskResponseEvent event = new TaskResponseEvent();
|
TaskResponseEvent event = new TaskResponseEvent();
|
||||||
event.setState(state);
|
event.setState(state);
|
||||||
event.setEndTime(endTime);
|
event.setEndTime(endTime);
|
||||||
@ -128,6 +133,7 @@ public class TaskResponseEvent {
|
|||||||
event.setEvent(Event.RESULT);
|
event.setEvent(Event.RESULT);
|
||||||
event.setVarPool(varPool);
|
event.setVarPool(varPool);
|
||||||
event.setChannel(channel);
|
event.setChannel(channel);
|
||||||
|
event.setResult(result);
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,4 +232,12 @@ public class TaskResponseEvent {
|
|||||||
public void setChannel(Channel channel) {
|
public void setChannel(Channel channel) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult(String result) {
|
||||||
|
this.result = result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,8 @@ public class TaskResponseService {
|
|||||||
taskResponseEvent.getProcessId(),
|
taskResponseEvent.getProcessId(),
|
||||||
taskResponseEvent.getAppIds(),
|
taskResponseEvent.getAppIds(),
|
||||||
taskResponseEvent.getTaskInstanceId(),
|
taskResponseEvent.getTaskInstanceId(),
|
||||||
taskResponseEvent.getVarPool()
|
taskResponseEvent.getVarPool(),
|
||||||
|
taskResponseEvent.getResult()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
|
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
|
||||||
|
@ -22,11 +22,13 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
|
|||||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
|
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
|
||||||
|
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
|
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
|
||||||
|
|
||||||
import org.apache.dolphinscheduler.common.Constants;
|
import org.apache.dolphinscheduler.common.Constants;
|
||||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||||
import org.apache.dolphinscheduler.common.enums.DependResult;
|
import org.apache.dolphinscheduler.common.enums.DependResult;
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Direct;
|
||||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||||
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
||||||
import org.apache.dolphinscheduler.common.enums.Flag;
|
import org.apache.dolphinscheduler.common.enums.Flag;
|
||||||
@ -36,6 +38,7 @@ import org.apache.dolphinscheduler.common.graph.DAG;
|
|||||||
import org.apache.dolphinscheduler.common.model.TaskNode;
|
import org.apache.dolphinscheduler.common.model.TaskNode;
|
||||||
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
|
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
|
||||||
import org.apache.dolphinscheduler.common.process.ProcessDag;
|
import org.apache.dolphinscheduler.common.process.ProcessDag;
|
||||||
|
import org.apache.dolphinscheduler.common.process.Property;
|
||||||
import org.apache.dolphinscheduler.common.thread.Stopper;
|
import org.apache.dolphinscheduler.common.thread.Stopper;
|
||||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
|
||||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
|
||||||
@ -67,6 +70,7 @@ import java.util.ArrayList;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -74,6 +78,7 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -491,7 +496,8 @@ public class MasterExecThread implements Runnable {
|
|||||||
*/
|
*/
|
||||||
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
|
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
|
||||||
TaskNode taskNode) {
|
TaskNode taskNode) {
|
||||||
|
//update processInstance for update the globalParams
|
||||||
|
this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
|
||||||
TaskInstance taskInstance = findTaskIfExists(nodeName);
|
TaskInstance taskInstance = findTaskIfExists(nodeName);
|
||||||
if (taskInstance == null) {
|
if (taskInstance == null) {
|
||||||
taskInstance = new TaskInstance();
|
taskInstance = new TaskInstance();
|
||||||
@ -540,13 +546,57 @@ public class MasterExecThread implements Runnable {
|
|||||||
} else {
|
} else {
|
||||||
taskInstance.setWorkerGroup(taskWorkerGroup);
|
taskInstance.setWorkerGroup(taskWorkerGroup);
|
||||||
}
|
}
|
||||||
|
//get process global
|
||||||
|
setProcessGlobal(taskNode, taskInstance);
|
||||||
// delay execution time
|
// delay execution time
|
||||||
taskInstance.setDelayTime(taskNode.getDelayTime());
|
taskInstance.setDelayTime(taskNode.getDelayTime());
|
||||||
}
|
}
|
||||||
return taskInstance;
|
return taskInstance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setProcessGlobal(TaskNode taskNode, TaskInstance taskInstance) {
|
||||||
|
String globalParams = this.processInstance.getGlobalParams();
|
||||||
|
if (StringUtils.isNotEmpty(globalParams)) {
|
||||||
|
Map<String, String> globalMap = getGlobalParamMap(globalParams);
|
||||||
|
if (globalMap != null && globalMap.size() != 0) {
|
||||||
|
setGlobalMapToTask(taskNode, taskInstance, globalMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map<String, String> globalMap) {
|
||||||
|
// the param save in localParams
|
||||||
|
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
|
||||||
|
Object localParams = result.get(LOCAL_PARAMS);
|
||||||
|
if (localParams != null) {
|
||||||
|
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
|
||||||
|
for (Property info : allParam) {
|
||||||
|
if (info.getDirect().equals(Direct.IN)) {
|
||||||
|
String paramName = info.getProp();
|
||||||
|
String value = globalMap.get(paramName);
|
||||||
|
if (StringUtils.isNotEmpty(value)) {
|
||||||
|
info.setValue(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result.put(LOCAL_PARAMS, allParam);
|
||||||
|
taskNode.setParams(JSONUtils.toJsonString(result));
|
||||||
|
// task instance node json
|
||||||
|
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getGlobalParamMap(String globalParams) {
|
||||||
|
List<Property> propList;
|
||||||
|
Map<String,String> globalParamMap = new HashMap<>();
|
||||||
|
if (StringUtils.isNotEmpty(globalParams)) {
|
||||||
|
propList = JSONUtils.toList(globalParams, Property.class);
|
||||||
|
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
return globalParamMap;
|
||||||
|
}
|
||||||
|
|
||||||
private void submitPostNode(String parentNodeName) {
|
private void submitPostNode(String parentNodeName) {
|
||||||
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
|
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
|
||||||
List<TaskInstance> taskInstances = new ArrayList<>();
|
List<TaskInstance> taskInstances = new ArrayList<>();
|
||||||
@ -951,6 +1001,7 @@ public class MasterExecThread implements Runnable {
|
|||||||
task.getName(), task.getId(), task.getState());
|
task.getName(), task.getId(), task.getState());
|
||||||
// node success , post node submit
|
// node success , post node submit
|
||||||
if (task.getState() == ExecutionStatus.SUCCESS) {
|
if (task.getState() == ExecutionStatus.SUCCESS) {
|
||||||
|
processInstance = processService.findProcessInstanceById(processInstance.getId());
|
||||||
processInstance.setVarPool(task.getVarPool());
|
processInstance.setVarPool(task.getVarPool());
|
||||||
processService.updateProcessInstance(processInstance);
|
processService.updateProcessInstance(processInstance);
|
||||||
completeTaskList.put(task.getName(), task);
|
completeTaskList.put(task.getName(), task);
|
||||||
|
@ -174,6 +174,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
|
|||||||
responseCommand.setProcessId(task.getProcessId());
|
responseCommand.setProcessId(task.getProcessId());
|
||||||
responseCommand.setAppIds(task.getAppIds());
|
responseCommand.setAppIds(task.getAppIds());
|
||||||
responseCommand.setVarPool(task.getVarPool());
|
responseCommand.setVarPool(task.getVarPool());
|
||||||
|
responseCommand.setResult(task.getResultString());
|
||||||
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
|
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("task scheduler failure", e);
|
logger.error("task scheduler failure", e);
|
||||||
|
@ -54,6 +54,7 @@ import java.util.regex.Pattern;
|
|||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* abstract command executor
|
* abstract command executor
|
||||||
*/
|
*/
|
||||||
@ -84,6 +85,11 @@ public abstract class AbstractCommandExecutor {
|
|||||||
*/
|
*/
|
||||||
protected final List<String> logBuffer;
|
protected final List<String> logBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SHELL result string
|
||||||
|
*/
|
||||||
|
protected String taskResultString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* taskExecutionContext
|
* taskExecutionContext
|
||||||
*/
|
*/
|
||||||
@ -104,6 +110,10 @@ public abstract class AbstractCommandExecutor {
|
|||||||
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
|
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected AbstractCommandExecutor(List<String> logBuffer) {
|
||||||
|
this.logBuffer = logBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* build process
|
* build process
|
||||||
*
|
*
|
||||||
@ -223,6 +233,7 @@ public abstract class AbstractCommandExecutor {
|
|||||||
return varPool.toString();
|
return varPool.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* cancel application
|
* cancel application
|
||||||
*
|
*
|
||||||
@ -355,6 +366,7 @@ public abstract class AbstractCommandExecutor {
|
|||||||
varPool.append("$VarPool$");
|
varPool.append("$VarPool$");
|
||||||
} else {
|
} else {
|
||||||
logBuffer.add(line);
|
logBuffer.add(line);
|
||||||
|
taskResultString = line;
|
||||||
lastFlushTime = flush(lastFlushTime);
|
lastFlushTime = flush(lastFlushTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -561,4 +573,12 @@ public abstract class AbstractCommandExecutor {
|
|||||||
protected abstract String commandInterpreter();
|
protected abstract String commandInterpreter();
|
||||||
|
|
||||||
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
|
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
|
||||||
|
|
||||||
|
public String getTaskResultString() {
|
||||||
|
return taskResultString;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTaskResultString(String taskResultString) {
|
||||||
|
this.taskResultString = taskResultString;
|
||||||
|
}
|
||||||
}
|
}
|
@ -63,6 +63,11 @@ public abstract class AbstractTask {
|
|||||||
*/
|
*/
|
||||||
protected int processId;
|
protected int processId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SHELL result string
|
||||||
|
*/
|
||||||
|
protected String resultString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* other resource manager appId , for example : YARN etc
|
* other resource manager appId , for example : YARN etc
|
||||||
*/
|
*/
|
||||||
@ -167,6 +172,14 @@ public abstract class AbstractTask {
|
|||||||
this.processId = processId;
|
this.processId = processId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getResultString() {
|
||||||
|
return resultString;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResultString(String resultString) {
|
||||||
|
this.resultString = resultString;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get task parameters
|
* get task parameters
|
||||||
*
|
*
|
||||||
|
@ -56,6 +56,9 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
|
|||||||
super(logHandler,taskExecutionContext,logger);
|
super(logHandler,taskExecutionContext,logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ShellCommandExecutor(List<String> logBuffer) {
|
||||||
|
super(logBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String buildCommandFilePath() {
|
protected String buildCommandFilePath() {
|
||||||
|
@ -21,6 +21,7 @@ import static java.util.Calendar.DAY_OF_MONTH;
|
|||||||
|
|
||||||
import org.apache.dolphinscheduler.common.Constants;
|
import org.apache.dolphinscheduler.common.Constants;
|
||||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Direct;
|
||||||
import org.apache.dolphinscheduler.common.process.Property;
|
import org.apache.dolphinscheduler.common.process.Property;
|
||||||
import org.apache.dolphinscheduler.common.task.AbstractParameters;
|
import org.apache.dolphinscheduler.common.task.AbstractParameters;
|
||||||
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
|
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
|
||||||
@ -34,6 +35,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
|
|||||||
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
|
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
|
||||||
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
|
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
@ -41,13 +44,13 @@ import java.nio.file.StandardOpenOption;
|
|||||||
import java.nio.file.attribute.FileAttribute;
|
import java.nio.file.attribute.FileAttribute;
|
||||||
import java.nio.file.attribute.PosixFilePermission;
|
import java.nio.file.attribute.PosixFilePermission;
|
||||||
import java.nio.file.attribute.PosixFilePermissions;
|
import java.nio.file.attribute.PosixFilePermissions;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* shell task
|
* shell task
|
||||||
*/
|
*/
|
||||||
@ -102,6 +105,7 @@ public class ShellTask extends AbstractTask {
|
|||||||
setExitStatusCode(commandExecuteResult.getExitStatusCode());
|
setExitStatusCode(commandExecuteResult.getExitStatusCode());
|
||||||
setAppIds(commandExecuteResult.getAppIds());
|
setAppIds(commandExecuteResult.getAppIds());
|
||||||
setProcessId(commandExecuteResult.getProcessId());
|
setProcessId(commandExecuteResult.getProcessId());
|
||||||
|
setResult(shellCommandExecutor.getTaskResultString());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("shell task error", e);
|
logger.error("shell task error", e);
|
||||||
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
|
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
|
||||||
@ -183,4 +187,17 @@ public class ShellTask extends AbstractTask {
|
|||||||
}
|
}
|
||||||
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
|
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setResult(String result) {
|
||||||
|
Map<String, Property> localParams = shellParameters.getLocalParametersMap();
|
||||||
|
List<Map<String, String>> outProperties = new ArrayList<>();
|
||||||
|
Map<String, String> p = new HashMap<>();
|
||||||
|
localParams.forEach((k,v) -> {
|
||||||
|
if (v.getDirect() == Direct.OUT) {
|
||||||
|
p.put(k, result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
outProperties.add(p);
|
||||||
|
resultString = JSONUtils.toJsonString(outProperties);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
|
|||||||
import org.apache.dolphinscheduler.common.Constants;
|
import org.apache.dolphinscheduler.common.Constants;
|
||||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||||
import org.apache.dolphinscheduler.common.enums.DbType;
|
import org.apache.dolphinscheduler.common.enums.DbType;
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Direct;
|
||||||
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
|
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
|
||||||
import org.apache.dolphinscheduler.common.process.Property;
|
import org.apache.dolphinscheduler.common.process.Property;
|
||||||
import org.apache.dolphinscheduler.common.task.AbstractParameters;
|
import org.apache.dolphinscheduler.common.task.AbstractParameters;
|
||||||
@ -148,7 +149,7 @@ public class SqlTask extends AbstractTask {
|
|||||||
logger);
|
logger);
|
||||||
|
|
||||||
// execute sql task
|
// execute sql task
|
||||||
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
|
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs, sqlParameters.getLocalParams());
|
||||||
|
|
||||||
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
|
setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
|
||||||
|
|
||||||
@ -237,7 +238,8 @@ public class SqlTask extends AbstractTask {
|
|||||||
public void executeFuncAndSql(SqlBinds mainSqlBinds,
|
public void executeFuncAndSql(SqlBinds mainSqlBinds,
|
||||||
List<SqlBinds> preStatementsBinds,
|
List<SqlBinds> preStatementsBinds,
|
||||||
List<SqlBinds> postStatementsBinds,
|
List<SqlBinds> postStatementsBinds,
|
||||||
List<String> createFuncs) {
|
List<String> createFuncs,
|
||||||
|
List<Property> properties) {
|
||||||
Connection connection = null;
|
Connection connection = null;
|
||||||
PreparedStatement stmt = null;
|
PreparedStatement stmt = null;
|
||||||
ResultSet resultSet = null;
|
ResultSet resultSet = null;
|
||||||
@ -253,18 +255,21 @@ public class SqlTask extends AbstractTask {
|
|||||||
preSql(connection, preStatementsBinds);
|
preSql(connection, preStatementsBinds);
|
||||||
stmt = prepareStatementAndBind(connection, mainSqlBinds);
|
stmt = prepareStatementAndBind(connection, mainSqlBinds);
|
||||||
|
|
||||||
|
String result = null;
|
||||||
// decide whether to executeQuery or executeUpdate based on sqlType
|
// decide whether to executeQuery or executeUpdate based on sqlType
|
||||||
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
|
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
|
||||||
// query statements need to be convert to JsonArray and inserted into Alert to send
|
// query statements need to be convert to JsonArray and inserted into Alert to send
|
||||||
resultSet = stmt.executeQuery();
|
resultSet = stmt.executeQuery();
|
||||||
resultProcess(resultSet);
|
result = resultProcess(resultSet);
|
||||||
|
|
||||||
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
|
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
|
||||||
// non query statement
|
// non query statement
|
||||||
stmt.executeUpdate();
|
String updateResult = String.valueOf(stmt.executeUpdate());
|
||||||
|
result = setNonQuerySqlReturn(updateResult, properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
postSql(connection, postStatementsBinds);
|
postSql(connection, postStatementsBinds);
|
||||||
|
this.setResultString(result);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("execute sql error", e);
|
logger.error("execute sql error", e);
|
||||||
@ -274,13 +279,28 @@ public class SqlTask extends AbstractTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
|
||||||
|
String result = null;
|
||||||
|
for (Property info :properties) {
|
||||||
|
if (Direct.OUT == info.getDirect()) {
|
||||||
|
List<Map<String,String>> updateRL = new ArrayList<>();
|
||||||
|
Map<String,String> updateRM = new HashMap<>();
|
||||||
|
updateRM.put(info.getProp(),updateResult);
|
||||||
|
updateRL.add(updateRM);
|
||||||
|
result = JSONUtils.toJsonString(updateRL);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* result process
|
* result process
|
||||||
*
|
*
|
||||||
* @param resultSet resultSet
|
* @param resultSet resultSet
|
||||||
* @throws Exception Exception
|
* @throws Exception Exception
|
||||||
*/
|
*/
|
||||||
private void resultProcess(ResultSet resultSet) throws Exception {
|
private String resultProcess(ResultSet resultSet) throws Exception {
|
||||||
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
|
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
|
||||||
ResultSetMetaData md = resultSet.getMetaData();
|
ResultSetMetaData md = resultSet.getMetaData();
|
||||||
int num = md.getColumnCount();
|
int num = md.getColumnCount();
|
||||||
@ -297,13 +317,13 @@ public class SqlTask extends AbstractTask {
|
|||||||
}
|
}
|
||||||
String result = JSONUtils.toJsonString(resultJSONArray);
|
String result = JSONUtils.toJsonString(resultJSONArray);
|
||||||
logger.debug("execute sql : {}", result);
|
logger.debug("execute sql : {}", result);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
|
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
|
||||||
JSONUtils.toJsonString(resultJSONArray));
|
JSONUtils.toJsonString(resultJSONArray));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("sql task sendAttachment error! msg : {} ", e.getMessage());
|
logger.warn("sql task sendAttachment error! msg : {} ", e.getMessage());
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,7 +70,8 @@ public class TaskResponseServiceTest {
|
|||||||
"ids",
|
"ids",
|
||||||
22,
|
22,
|
||||||
"varPol",
|
"varPol",
|
||||||
channel);
|
channel,
|
||||||
|
"[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
|
||||||
|
|
||||||
taskInstance = new TaskInstance();
|
taskInstance = new TaskInstance();
|
||||||
taskInstance.setId(22);
|
taskInstance.setId(22);
|
||||||
|
@ -121,6 +121,11 @@ public class TaskCallbackServiceTest {
|
|||||||
ackCommand.setStartTime(new Date());
|
ackCommand.setStartTime(new Date());
|
||||||
taskCallbackService.sendAck(1, ackCommand.convert2Command());
|
taskCallbackService.sendAck(1, ackCommand.convert2Command());
|
||||||
|
|
||||||
|
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
|
||||||
|
String result = responseCommand.getResult();
|
||||||
|
responseCommand.setResult("return string");
|
||||||
|
taskCallbackService.sendResult(1, responseCommand.convert2Command());
|
||||||
|
|
||||||
Stopper.stop();
|
Stopper.stop();
|
||||||
|
|
||||||
nettyRemotingServer.close();
|
nettyRemotingServer.close();
|
||||||
|
@ -0,0 +1,53 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.task;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||||
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@RunWith(PowerMockRunner.class)
|
||||||
|
@PrepareForTest({SpringApplicationContext.class})
|
||||||
|
public class AbstractCommandExecutorTest {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
|
||||||
|
|
||||||
|
private ShellCommandExecutor shellCommandExecutor;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
|
||||||
|
shellCommandExecutor = new ShellCommandExecutor(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetTaskResultString() {
|
||||||
|
shellCommandExecutor.setTaskResultString("shellReturn");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTaskResultString() {
|
||||||
|
logger.info(shellCommandExecutor.getTaskResultString());
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,127 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.dolphinscheduler.server.worker.task;
|
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest;
|
||||||
|
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.powermock.api.mockito.PowerMockito;
|
||||||
|
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||||
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* shell task return test.
|
||||||
|
*/
|
||||||
|
@RunWith(PowerMockRunner.class)
|
||||||
|
@PrepareForTest({ShellTask.class})
|
||||||
|
public class ShellTaskReturnTest {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
|
||||||
|
|
||||||
|
private ShellTask shellTask;
|
||||||
|
private ShellCommandExecutor shellCommandExecutor;
|
||||||
|
private TaskExecutionContext taskExecutionContext;
|
||||||
|
private CommandExecuteResult commandExecuteResult;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
|
||||||
|
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
|
||||||
|
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
|
||||||
|
taskExecutionContext = new TaskExecutionContext();
|
||||||
|
taskExecutionContext.setTaskInstanceId(1);
|
||||||
|
taskExecutionContext.setTaskName("kris test");
|
||||||
|
taskExecutionContext.setTaskType("SHELL");
|
||||||
|
taskExecutionContext.setHost("127.0.0.1:1234");
|
||||||
|
taskExecutionContext.setExecutePath("/tmp");
|
||||||
|
taskExecutionContext.setLogPath("/log");
|
||||||
|
taskExecutionContext.setTaskJson(
|
||||||
|
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
|
||||||
|
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,"
|
||||||
|
+ "\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
|
||||||
|
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
|
||||||
|
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
|
||||||
|
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
|
||||||
|
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
|
||||||
|
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
|
||||||
|
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
|
||||||
|
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,"
|
||||||
|
+ "\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
|
||||||
|
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
|
||||||
|
+ "\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
||||||
|
taskExecutionContext.setProcessInstanceId(1);
|
||||||
|
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
|
||||||
|
taskExecutionContext.setExecutorId(1);
|
||||||
|
taskExecutionContext.setCmdTypeIfComplement(5);
|
||||||
|
taskExecutionContext.setTenantCode("roo");
|
||||||
|
taskExecutionContext.setScheduleTime(new Date());
|
||||||
|
taskExecutionContext.setQueue("default");
|
||||||
|
taskExecutionContext.setTaskParams(
|
||||||
|
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
|
||||||
|
+
|
||||||
|
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
|
||||||
|
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
|
||||||
|
Map<String, String> definedParams = new HashMap<>();
|
||||||
|
definedParams.put("time_gb", "2020-12-16 00:00:00");
|
||||||
|
taskExecutionContext.setDefinedParams(definedParams);
|
||||||
|
PowerMockito.mockStatic(Files.class);
|
||||||
|
PowerMockito.when(Files.exists(Paths.get(anyString()))).thenReturn(true);
|
||||||
|
commandExecuteResult = new CommandExecuteResult();
|
||||||
|
commandExecuteResult.setAppIds("appId");
|
||||||
|
commandExecuteResult.setExitStatusCode(0);
|
||||||
|
commandExecuteResult.setProcessId(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShellReturnString() {
|
||||||
|
shellTask = new ShellTask(taskExecutionContext, logger);
|
||||||
|
shellTask.init();
|
||||||
|
try {
|
||||||
|
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
shellTask.setResult("shell return string");
|
||||||
|
logger.info("shell return string:{}", shellTask.getResultString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetTaskResultString() {
|
||||||
|
shellCommandExecutor.setTaskResultString("shellReturn");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTaskResultString() {
|
||||||
|
logger.info(shellCommandExecutor.getTaskResultString());
|
||||||
|
}
|
||||||
|
}
|
@ -17,13 +17,22 @@
|
|||||||
|
|
||||||
package org.apache.dolphinscheduler.server.worker.task;
|
package org.apache.dolphinscheduler.server.worker.task;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.process.Property;
|
||||||
|
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
|
||||||
|
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
|
||||||
|
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
|
||||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||||
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
|
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
|
||||||
|
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
|
||||||
import org.apache.dolphinscheduler.service.alert.AlertClientService;
|
import org.apache.dolphinscheduler.service.alert.AlertClientService;
|
||||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -116,4 +125,70 @@ public class TaskManagerTest {
|
|||||||
taskExecutionContext.setTaskType("XXX");
|
taskExecutionContext.setTaskType("XXX");
|
||||||
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
|
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShellTaskReturnString() {
|
||||||
|
taskExecutionContext.setTaskInstanceId(1);
|
||||||
|
taskExecutionContext.setTaskName("kris test");
|
||||||
|
taskExecutionContext.setTaskType("SHELL");
|
||||||
|
taskExecutionContext.setHost("127.0.0.1:1234");
|
||||||
|
taskExecutionContext.setExecutePath("/tmp");
|
||||||
|
taskExecutionContext.setLogPath("/log");
|
||||||
|
taskExecutionContext.setTaskJson(
|
||||||
|
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
|
||||||
|
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
|
||||||
|
+ "tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\","
|
||||||
|
+ "\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
|
||||||
|
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\","
|
||||||
|
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
|
||||||
|
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},"
|
||||||
|
+ "{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
|
||||||
|
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
|
||||||
|
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
||||||
|
taskExecutionContext.setProcessInstanceId(1);
|
||||||
|
taskExecutionContext.setGlobalParams("[{\"direct\":\"IN\",\"prop\":\"time_gb\",\"type\":\"VARCHAR\",\"value\":\"2020-12-16 17:18:33\"}]");
|
||||||
|
taskExecutionContext.setExecutorId(1);
|
||||||
|
taskExecutionContext.setCmdTypeIfComplement(5);
|
||||||
|
taskExecutionContext.setTenantCode("roo");
|
||||||
|
taskExecutionContext.setScheduleTime(new Date());
|
||||||
|
taskExecutionContext.setQueue("default");
|
||||||
|
taskExecutionContext.setTaskParams(
|
||||||
|
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
|
||||||
|
+
|
||||||
|
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
|
||||||
|
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
|
||||||
|
Map<String, String> definedParams = new HashMap<>();
|
||||||
|
definedParams.put("time_gb", "2020-12-16 00:00:00");
|
||||||
|
taskExecutionContext.setDefinedParams(definedParams);
|
||||||
|
ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
|
||||||
|
shellTask.setResultString("shell return");
|
||||||
|
String shellReturn = shellTask.getResultString();
|
||||||
|
shellTask.init();
|
||||||
|
shellTask.setResult(shellReturn);
|
||||||
|
Assert.assertSame(shellReturn, "shell return");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSqlTaskReturnString() {
|
||||||
|
String params = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
|
||||||
|
+ "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
|
||||||
|
taskExecutionContext = new TaskExecutionContext();
|
||||||
|
taskExecutionContext.setTaskParams("{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
|
||||||
|
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
|
||||||
|
+ "\"sqlType\":1}");
|
||||||
|
taskExecutionContext.setExecutePath("/tmp");
|
||||||
|
taskExecutionContext.setTaskAppId("1");
|
||||||
|
taskExecutionContext.setTenantCode("root");
|
||||||
|
taskExecutionContext.setStartTime(new Date());
|
||||||
|
taskExecutionContext.setTaskTimeout(10000);
|
||||||
|
taskExecutionContext.setLogPath("/tmp/dx");
|
||||||
|
|
||||||
|
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
|
||||||
|
sqlTaskExecutionContext.setConnectionParams(params);
|
||||||
|
taskExecutionContext.setSqlTaskExecutionContext(sqlTaskExecutionContext);
|
||||||
|
SqlTask sqlTask = new SqlTask(taskExecutionContext, logger, null);
|
||||||
|
SqlParameters sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
|
||||||
|
List<Property> properties = sqlParameters.getLocalParams();
|
||||||
|
sqlTask.setNonQuerySqlReturn("sql return", properties);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.task.shell;
|
|||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
|
||||||
|
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
|
||||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||||
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
|
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
|
||||||
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
|
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
|
||||||
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||||
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.sql.DriverManager;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
* shell task test.
|
* shell task test.
|
||||||
*/
|
*/
|
||||||
@RunWith(PowerMockRunner.class)
|
@RunWith(PowerMockRunner.class)
|
||||||
@PrepareForTest({ShellTask.class})
|
@PrepareForTest(value = {ShellTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
|
||||||
public class ShellTaskTest {
|
public class ShellTaskTest {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
|
private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
|
||||||
@ -57,6 +60,7 @@ public class ShellTaskTest {
|
|||||||
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
|
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
|
||||||
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
|
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
|
||||||
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
|
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
|
||||||
|
shellCommandExecutor.setTaskResultString("shellReturn");
|
||||||
taskExecutionContext = new TaskExecutionContext();
|
taskExecutionContext = new TaskExecutionContext();
|
||||||
taskExecutionContext.setTaskInstanceId(1);
|
taskExecutionContext.setTaskInstanceId(1);
|
||||||
taskExecutionContext.setTaskName("kris test");
|
taskExecutionContext.setTaskName("kris test");
|
||||||
@ -68,7 +72,7 @@ public class ShellTaskTest {
|
|||||||
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
|
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\",\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\""
|
||||||
+
|
+
|
||||||
"tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
|
"tasks-16849\",\"maxRetryTimes\":0,\"name\":\"shell test 001\",\"params\":\"{\\\"rawScript\\\":\\\"#!/bin/sh\\\\necho $[yyyy-MM-dd HH:mm:ss +3]\\\\necho \\\\\\\" ?? "
|
||||||
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":"
|
+ "${time1} \\\\\\\"\\\\necho \\\\\\\" ????? ${time2}\\\\\\\"\\\\n\\\",\\\"localParams\\\":[{\\\"prop\\\":\\\"time1\\\",\\\"direct\\\":\\\"OUT\\\",\\\"type\\\":"
|
||||||
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
|
+ "\\\"VARCHAR\\\",\\\"value\\\":\\\"$[yyyy-MM-dd HH:mm:ss]\\\"},{\\\"prop\\\":\\\"time2\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${time_gb}\\\"}"
|
||||||
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
|
+ "],\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\",\"taskTimeoutParameter\":"
|
||||||
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
+ "{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\",\"type\":\"SHELL\",\"workerGroup\":\"default\"}");
|
||||||
@ -82,7 +86,7 @@ public class ShellTaskTest {
|
|||||||
taskExecutionContext.setTaskParams(
|
taskExecutionContext.setTaskParams(
|
||||||
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
|
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
|
||||||
+
|
+
|
||||||
"[{\"prop\":\"time1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
|
"[{\"prop\":\"time1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"$[yyyy-MM-dd HH:mm:ss]\"},{\"prop\":\"time2\",\"direct\":\"IN\",\"type\":\"VARCHAR"
|
||||||
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
|
+ "\",\"value\":\"${time_gb}\"}],\"resourceList\":[]}");
|
||||||
Map<String, String> definedParams = new HashMap<>();
|
Map<String, String> definedParams = new HashMap<>();
|
||||||
definedParams.put("time_gb", "2020-12-16 00:00:00");
|
definedParams.put("time_gb", "2020-12-16 00:00:00");
|
||||||
@ -111,4 +115,13 @@ public class ShellTaskTest {
|
|||||||
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
|
PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
|
||||||
shellTask.handle();
|
shellTask.handle();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetResult() {
|
||||||
|
shellTask = new ShellTask(taskExecutionContext, logger);
|
||||||
|
shellTask.init();
|
||||||
|
String r = "return";
|
||||||
|
shellTask.setResult(r);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
|
|||||||
|
|
||||||
import org.apache.dolphinscheduler.common.Constants;
|
import org.apache.dolphinscheduler.common.Constants;
|
||||||
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
|
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
|
||||||
|
import org.apache.dolphinscheduler.dao.AlertDao;
|
||||||
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
|
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
|
||||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
|
||||||
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
|
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
|
||||||
import org.apache.dolphinscheduler.service.alert.AlertClientService;
|
import org.apache.dolphinscheduler.service.alert.AlertClientService;
|
||||||
|
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
* sql task test
|
* sql task test
|
||||||
*/
|
*/
|
||||||
@RunWith(PowerMockRunner.class)
|
@RunWith(PowerMockRunner.class)
|
||||||
@PrepareForTest(value = {SqlTask.class, DriverManager.class})
|
@PrepareForTest(value = {SqlTask.class, DriverManager.class, SpringApplicationContext.class, ParameterUtils.class})
|
||||||
public class SqlTaskTest {
|
public class SqlTaskTest {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
|
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
|
||||||
@ -70,7 +72,9 @@ public class SqlTaskTest {
|
|||||||
props.setTaskStartTime(new Date());
|
props.setTaskStartTime(new Date());
|
||||||
props.setTaskTimeout(0);
|
props.setTaskTimeout(0);
|
||||||
props.setTaskParams(
|
props.setTaskParams(
|
||||||
"{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}");
|
"{\"localParams\":[{\"prop\":\"ret\", \"direct\":\"OUT\", \"type\":\"VARCHAR\", \"value\":\"\"}],"
|
||||||
|
+ "\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\","
|
||||||
|
+ "\"sqlType\":1}");
|
||||||
|
|
||||||
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
|
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
|
||||||
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
|
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
|
||||||
@ -85,6 +89,8 @@ public class SqlTaskTest {
|
|||||||
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
|
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
|
||||||
PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
|
PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
|
||||||
|
|
||||||
|
PowerMockito.mockStatic(SpringApplicationContext.class);
|
||||||
|
PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao());
|
||||||
alertClientService = PowerMockito.mock(AlertClientService.class);
|
alertClientService = PowerMockito.mock(AlertClientService.class);
|
||||||
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
|
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
|
||||||
sqlTask.init();
|
sqlTask.init();
|
||||||
@ -95,7 +101,7 @@ public class SqlTaskTest {
|
|||||||
Assert.assertNotNull(sqlTask.getParameters());
|
Assert.assertNotNull(sqlTask.getParameters());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = Exception.class)
|
@Test
|
||||||
public void testHandle() throws Exception {
|
public void testHandle() throws Exception {
|
||||||
Connection connection = PowerMockito.mock(Connection.class);
|
Connection connection = PowerMockito.mock(Connection.class);
|
||||||
PowerMockito.mockStatic(DriverManager.class);
|
PowerMockito.mockStatic(DriverManager.class);
|
||||||
|
@ -24,6 +24,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PRO
|
|||||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
|
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
|
||||||
|
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
|
||||||
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
|
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toSet;
|
import static java.util.stream.Collectors.toSet;
|
||||||
@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants;
|
|||||||
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
|
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
|
||||||
import org.apache.dolphinscheduler.common.enums.CommandType;
|
import org.apache.dolphinscheduler.common.enums.CommandType;
|
||||||
import org.apache.dolphinscheduler.common.enums.CycleEnum;
|
import org.apache.dolphinscheduler.common.enums.CycleEnum;
|
||||||
|
import org.apache.dolphinscheduler.common.enums.Direct;
|
||||||
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
|
||||||
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
|
||||||
import org.apache.dolphinscheduler.common.enums.Flag;
|
import org.apache.dolphinscheduler.common.enums.Flag;
|
||||||
@ -89,6 +91,7 @@ import java.util.Date;
|
|||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@ -103,6 +106,8 @@ import org.springframework.stereotype.Component;
|
|||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import com.cronutils.model.Cron;
|
import com.cronutils.model.Cron;
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -425,7 +430,7 @@ public class ProcessService {
|
|||||||
* recursive query sub process definition id by parent id.
|
* recursive query sub process definition id by parent id.
|
||||||
*
|
*
|
||||||
* @param parentId parentId
|
* @param parentId parentId
|
||||||
* @param ids ids
|
* @param ids ids
|
||||||
*/
|
*/
|
||||||
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
|
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
|
||||||
ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
|
ProcessDefinition processDefinition = processDefineMapper.selectById(parentId);
|
||||||
@ -456,7 +461,7 @@ public class ProcessService {
|
|||||||
* create recovery waiting thread command and delete origin command at the same time.
|
* create recovery waiting thread command and delete origin command at the same time.
|
||||||
* if the recovery command is exists, only update the field update_time
|
* if the recovery command is exists, only update the field update_time
|
||||||
*
|
*
|
||||||
* @param originCommand originCommand
|
* @param originCommand originCommand
|
||||||
* @param processInstance processInstance
|
* @param processInstance processInstance
|
||||||
*/
|
*/
|
||||||
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
|
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
|
||||||
@ -508,7 +513,7 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* get schedule time from command
|
* get schedule time from command
|
||||||
*
|
*
|
||||||
* @param command command
|
* @param command command
|
||||||
* @param cmdParam cmdParam map
|
* @param cmdParam cmdParam map
|
||||||
* @return date
|
* @return date
|
||||||
*/
|
*/
|
||||||
@ -524,8 +529,8 @@ public class ProcessService {
|
|||||||
* generate a new work process instance from command.
|
* generate a new work process instance from command.
|
||||||
*
|
*
|
||||||
* @param processDefinition processDefinition
|
* @param processDefinition processDefinition
|
||||||
* @param command command
|
* @param command command
|
||||||
* @param cmdParam cmdParam map
|
* @param cmdParam cmdParam map
|
||||||
* @return process instance
|
* @return process instance
|
||||||
*/
|
*/
|
||||||
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
|
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
|
||||||
@ -601,7 +606,7 @@ public class ProcessService {
|
|||||||
* use definition creator's tenant.
|
* use definition creator's tenant.
|
||||||
*
|
*
|
||||||
* @param tenantId tenantId
|
* @param tenantId tenantId
|
||||||
* @param userId userId
|
* @param userId userId
|
||||||
* @return tenant
|
* @return tenant
|
||||||
*/
|
*/
|
||||||
public Tenant getTenantForProcess(int tenantId, int userId) {
|
public Tenant getTenantForProcess(int tenantId, int userId) {
|
||||||
@ -624,7 +629,7 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* check command parameters is valid
|
* check command parameters is valid
|
||||||
*
|
*
|
||||||
* @param command command
|
* @param command command
|
||||||
* @param cmdParam cmdParam map
|
* @param cmdParam cmdParam map
|
||||||
* @return whether command param is valid
|
* @return whether command param is valid
|
||||||
*/
|
*/
|
||||||
@ -644,7 +649,7 @@ public class ProcessService {
|
|||||||
* construct process instance according to one command.
|
* construct process instance according to one command.
|
||||||
*
|
*
|
||||||
* @param command command
|
* @param command command
|
||||||
* @param host host
|
* @param host host
|
||||||
* @return process instance
|
* @return process instance
|
||||||
*/
|
*/
|
||||||
private ProcessInstance constructProcessInstance(Command command, String host) {
|
private ProcessInstance constructProcessInstance(Command command, String host) {
|
||||||
@ -686,11 +691,6 @@ public class ProcessService {
|
|||||||
} else {
|
} else {
|
||||||
processInstance = this.findProcessInstanceDetailById(processInstanceId);
|
processInstance = this.findProcessInstanceDetailById(processInstanceId);
|
||||||
// Recalculate global parameters after rerun.
|
// Recalculate global parameters after rerun.
|
||||||
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
|
|
||||||
processDefinition.getGlobalParamMap(),
|
|
||||||
processDefinition.getGlobalParamList(),
|
|
||||||
getCommandTypeIfComplement(processInstance, command),
|
|
||||||
processInstance.getScheduleTime()));
|
|
||||||
}
|
}
|
||||||
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
|
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
|
||||||
processInstance.setProcessDefinition(processDefinition);
|
processInstance.setProcessDefinition(processDefinition);
|
||||||
@ -807,7 +807,7 @@ public class ProcessService {
|
|||||||
* return complement data if the process start with complement data
|
* return complement data if the process start with complement data
|
||||||
*
|
*
|
||||||
* @param processInstance processInstance
|
* @param processInstance processInstance
|
||||||
* @param command command
|
* @param command command
|
||||||
* @return command type
|
* @return command type
|
||||||
*/
|
*/
|
||||||
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
|
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
|
||||||
@ -822,8 +822,8 @@ public class ProcessService {
|
|||||||
* initialize complement data parameters
|
* initialize complement data parameters
|
||||||
*
|
*
|
||||||
* @param processDefinition processDefinition
|
* @param processDefinition processDefinition
|
||||||
* @param processInstance processInstance
|
* @param processInstance processInstance
|
||||||
* @param cmdParam cmdParam
|
* @param cmdParam cmdParam
|
||||||
*/
|
*/
|
||||||
private void initComplementDataParam(ProcessDefinition processDefinition,
|
private void initComplementDataParam(ProcessDefinition processDefinition,
|
||||||
ProcessInstance processInstance,
|
ProcessInstance processInstance,
|
||||||
@ -895,7 +895,7 @@ public class ProcessService {
|
|||||||
* only the keys doesn't in sub process global would be joined.
|
* only the keys doesn't in sub process global would be joined.
|
||||||
*
|
*
|
||||||
* @param parentGlobalParams parentGlobalParams
|
* @param parentGlobalParams parentGlobalParams
|
||||||
* @param subGlobalParams subGlobalParams
|
* @param subGlobalParams subGlobalParams
|
||||||
* @return global params join
|
* @return global params join
|
||||||
*/
|
*/
|
||||||
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
|
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
|
||||||
@ -965,7 +965,7 @@ public class ProcessService {
|
|||||||
* set map {parent instance id, task instance id, 0(child instance id)}
|
* set map {parent instance id, task instance id, 0(child instance id)}
|
||||||
*
|
*
|
||||||
* @param parentInstance parentInstance
|
* @param parentInstance parentInstance
|
||||||
* @param parentTask parentTask
|
* @param parentTask parentTask
|
||||||
* @return process instance map
|
* @return process instance map
|
||||||
*/
|
*/
|
||||||
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
|
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
|
||||||
@ -994,7 +994,7 @@ public class ProcessService {
|
|||||||
* find previous task work process map.
|
* find previous task work process map.
|
||||||
*
|
*
|
||||||
* @param parentProcessInstance parentProcessInstance
|
* @param parentProcessInstance parentProcessInstance
|
||||||
* @param parentTask parentTask
|
* @param parentTask parentTask
|
||||||
* @return process instance map
|
* @return process instance map
|
||||||
*/
|
*/
|
||||||
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
|
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
|
||||||
@ -1020,7 +1020,7 @@ public class ProcessService {
|
|||||||
* create sub work process command
|
* create sub work process command
|
||||||
*
|
*
|
||||||
* @param parentProcessInstance parentProcessInstance
|
* @param parentProcessInstance parentProcessInstance
|
||||||
* @param task task
|
* @param task task
|
||||||
*/
|
*/
|
||||||
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
|
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
|
||||||
if (!task.isSubProcess()) {
|
if (!task.isSubProcess()) {
|
||||||
@ -1119,7 +1119,7 @@ public class ProcessService {
|
|||||||
* update sub process definition
|
* update sub process definition
|
||||||
*
|
*
|
||||||
* @param parentProcessInstance parentProcessInstance
|
* @param parentProcessInstance parentProcessInstance
|
||||||
* @param childDefinitionId childDefinitionId
|
* @param childDefinitionId childDefinitionId
|
||||||
*/
|
*/
|
||||||
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
|
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
|
||||||
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
|
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
|
||||||
@ -1133,7 +1133,7 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* submit task to mysql
|
* submit task to mysql
|
||||||
*
|
*
|
||||||
* @param taskInstance taskInstance
|
* @param taskInstance taskInstance
|
||||||
* @param processInstance processInstance
|
* @param processInstance processInstance
|
||||||
* @return task instance
|
* @return task instance
|
||||||
*/
|
*/
|
||||||
@ -1187,16 +1187,16 @@ public class ProcessService {
|
|||||||
* return stop if work process state is ready stop
|
* return stop if work process state is ready stop
|
||||||
* if all of above are not satisfied, return submit success
|
* if all of above are not satisfied, return submit success
|
||||||
*
|
*
|
||||||
* @param taskInstance taskInstance
|
* @param taskInstance taskInstance
|
||||||
* @param processInstanceState processInstanceState
|
* @param processInstanceState processInstanceState
|
||||||
* @return process instance state
|
* @return process instance state
|
||||||
*/
|
*/
|
||||||
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
|
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
|
||||||
ExecutionStatus state = taskInstance.getState();
|
ExecutionStatus state = taskInstance.getState();
|
||||||
|
// running, delayed or killed
|
||||||
|
// the task already exists in task queue
|
||||||
|
// return state
|
||||||
if (
|
if (
|
||||||
// running, delayed or killed
|
|
||||||
// the task already exists in task queue
|
|
||||||
// return state
|
|
||||||
state == ExecutionStatus.RUNNING_EXECUTION
|
state == ExecutionStatus.RUNNING_EXECUTION
|
||||||
|| state == ExecutionStatus.DELAY_EXECUTION
|
|| state == ExecutionStatus.DELAY_EXECUTION
|
||||||
|| state == ExecutionStatus.KILL
|
|| state == ExecutionStatus.KILL
|
||||||
@ -1363,7 +1363,7 @@ public class ProcessService {
|
|||||||
* get id list by task state
|
* get id list by task state
|
||||||
*
|
*
|
||||||
* @param instanceId instanceId
|
* @param instanceId instanceId
|
||||||
* @param state state
|
* @param state state
|
||||||
* @return task instance states
|
* @return task instance states
|
||||||
*/
|
*/
|
||||||
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
|
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
|
||||||
@ -1418,7 +1418,7 @@ public class ProcessService {
|
|||||||
* find work process map by parent process id and parent task id.
|
* find work process map by parent process id and parent task id.
|
||||||
*
|
*
|
||||||
* @param parentWorkProcessId parentWorkProcessId
|
* @param parentWorkProcessId parentWorkProcessId
|
||||||
* @param parentTaskId parentTaskId
|
* @param parentTaskId parentTaskId
|
||||||
* @return process instance map
|
* @return process instance map
|
||||||
*/
|
*/
|
||||||
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
|
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
|
||||||
@ -1440,7 +1440,7 @@ public class ProcessService {
|
|||||||
* find sub process instance
|
* find sub process instance
|
||||||
*
|
*
|
||||||
* @param parentProcessId parentProcessId
|
* @param parentProcessId parentProcessId
|
||||||
* @param parentTaskId parentTaskId
|
* @param parentTaskId parentTaskId
|
||||||
* @return process instance
|
* @return process instance
|
||||||
*/
|
*/
|
||||||
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
|
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
|
||||||
@ -1472,12 +1472,12 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* change task state
|
* change task state
|
||||||
*
|
*
|
||||||
* @param state state
|
* @param state state
|
||||||
* @param startTime startTime
|
* @param startTime startTime
|
||||||
* @param host host
|
* @param host host
|
||||||
* @param executePath executePath
|
* @param executePath executePath
|
||||||
* @param logPath logPath
|
* @param logPath logPath
|
||||||
* @param taskInstId taskInstId
|
* @param taskInstId taskInstId
|
||||||
*/
|
*/
|
||||||
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
|
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
|
||||||
String executePath,
|
String executePath,
|
||||||
@ -1505,12 +1505,12 @@ public class ProcessService {
|
|||||||
* update the process instance
|
* update the process instance
|
||||||
*
|
*
|
||||||
* @param processInstanceId processInstanceId
|
* @param processInstanceId processInstanceId
|
||||||
* @param processJson processJson
|
* @param processJson processJson
|
||||||
* @param globalParams globalParams
|
* @param globalParams globalParams
|
||||||
* @param scheduleTime scheduleTime
|
* @param scheduleTime scheduleTime
|
||||||
* @param flag flag
|
* @param flag flag
|
||||||
* @param locations locations
|
* @param locations locations
|
||||||
* @param connects connects
|
* @param connects connects
|
||||||
* @return update process instance result
|
* @return update process instance result
|
||||||
*/
|
*/
|
||||||
public int updateProcessInstance(Integer processInstanceId, String processJson,
|
public int updateProcessInstance(Integer processInstanceId, String processJson,
|
||||||
@ -1531,25 +1531,85 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* change task state
|
* change task state
|
||||||
*
|
*
|
||||||
* @param state state
|
* @param state state
|
||||||
* @param endTime endTime
|
* @param endTime endTime
|
||||||
* @param taskInstId taskInstId
|
* @param taskInstId taskInstId
|
||||||
* @param varPool varPool
|
* @param varPool varPool
|
||||||
*/
|
*/
|
||||||
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
|
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
|
||||||
Date endTime,
|
Date endTime,
|
||||||
int processId,
|
int processId,
|
||||||
String appIds,
|
String appIds,
|
||||||
int taskInstId,
|
int taskInstId,
|
||||||
String varPool) {
|
String varPool,
|
||||||
|
String result) {
|
||||||
taskInstance.setPid(processId);
|
taskInstance.setPid(processId);
|
||||||
taskInstance.setAppLink(appIds);
|
taskInstance.setAppLink(appIds);
|
||||||
taskInstance.setState(state);
|
taskInstance.setState(state);
|
||||||
taskInstance.setEndTime(endTime);
|
taskInstance.setEndTime(endTime);
|
||||||
taskInstance.setVarPool(varPool);
|
taskInstance.setVarPool(varPool);
|
||||||
|
changeOutParam(result, taskInstance);
|
||||||
saveTaskInstance(taskInstance);
|
saveTaskInstance(taskInstance);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void changeOutParam(String result, TaskInstance taskInstance) {
|
||||||
|
if (StringUtils.isEmpty(result)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<Map<String, String>> workerResultParam = getListMapByString(result);
|
||||||
|
if (CollectionUtils.isEmpty(workerResultParam)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//if the result more than one line,just get the first .
|
||||||
|
Map<String, String> row = workerResultParam.get(0);
|
||||||
|
if (row == null || row.size() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TaskNode taskNode = JSONUtils.parseObject(taskInstance.getTaskJson(), TaskNode.class);
|
||||||
|
Map<String, Object> taskParams = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
|
||||||
|
Object localParams = taskParams.get(LOCAL_PARAMS);
|
||||||
|
if (localParams == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
|
||||||
|
List<Property> params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
|
||||||
|
Map<String, Property> allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
|
||||||
|
|
||||||
|
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
|
||||||
|
for (Property info : allParam) {
|
||||||
|
if (info.getDirect() == Direct.OUT) {
|
||||||
|
String paramName = info.getProp();
|
||||||
|
Property property = allParamMap.get(paramName);
|
||||||
|
if (property == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String value = row.get(paramName);
|
||||||
|
if (StringUtils.isNotEmpty(value)) {
|
||||||
|
property.setValue(value);
|
||||||
|
info.setValue(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taskParams.put(LOCAL_PARAMS, allParam);
|
||||||
|
taskNode.setParams(JSONUtils.toJsonString(taskParams));
|
||||||
|
// task instance node json
|
||||||
|
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
|
||||||
|
String params4ProcessString = JSONUtils.toJsonString(params4Property);
|
||||||
|
int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
|
||||||
|
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Map<String, String>> getListMapByString(String json) {
|
||||||
|
List<Map<String, String>> allParams = new ArrayList<>();
|
||||||
|
ArrayNode paramsByJson = JSONUtils.parseArray(json);
|
||||||
|
Iterator<JsonNode> listIterator = paramsByJson.iterator();
|
||||||
|
while (listIterator.hasNext()) {
|
||||||
|
Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
|
||||||
|
allParams.add(param);
|
||||||
|
}
|
||||||
|
return allParams;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* convert integer list to string list
|
* convert integer list to string list
|
||||||
*
|
*
|
||||||
@ -1642,7 +1702,7 @@ public class ProcessService {
|
|||||||
* update process instance state by id
|
* update process instance state by id
|
||||||
*
|
*
|
||||||
* @param processInstanceId processInstanceId
|
* @param processInstanceId processInstanceId
|
||||||
* @param executionStatus executionStatus
|
* @param executionStatus executionStatus
|
||||||
* @return update process result
|
* @return update process result
|
||||||
*/
|
*/
|
||||||
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
|
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
|
||||||
@ -1679,7 +1739,7 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* find tenant code by resource name
|
* find tenant code by resource name
|
||||||
*
|
*
|
||||||
* @param resName resource name
|
* @param resName resource name
|
||||||
* @param resourceType resource type
|
* @param resourceType resource type
|
||||||
* @return tenant code
|
* @return tenant code
|
||||||
*/
|
*/
|
||||||
@ -1703,9 +1763,9 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* get dependency cycle by work process define id and scheduler fire time
|
* get dependency cycle by work process define id and scheduler fire time
|
||||||
*
|
*
|
||||||
* @param masterId masterId
|
* @param masterId masterId
|
||||||
* @param processDefinitionId processDefinitionId
|
* @param processDefinitionId processDefinitionId
|
||||||
* @param scheduledFireTime the time the task schedule is expected to trigger
|
* @param scheduledFireTime the time the task schedule is expected to trigger
|
||||||
* @return CycleDependency
|
* @return CycleDependency
|
||||||
* @throws Exception if error throws Exception
|
* @throws Exception if error throws Exception
|
||||||
*/
|
*/
|
||||||
@ -1718,8 +1778,8 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* get dependency cycle list by work process define id list and scheduler fire time
|
* get dependency cycle list by work process define id list and scheduler fire time
|
||||||
*
|
*
|
||||||
* @param masterId masterId
|
* @param masterId masterId
|
||||||
* @param ids ids
|
* @param ids ids
|
||||||
* @param scheduledFireTime the time the task schedule is expected to trigger
|
* @param scheduledFireTime the time the task schedule is expected to trigger
|
||||||
* @return CycleDependency list
|
* @return CycleDependency list
|
||||||
* @throws Exception if error throws Exception
|
* @throws Exception if error throws Exception
|
||||||
@ -1814,8 +1874,8 @@ public class ProcessService {
|
|||||||
* find last running process instance
|
* find last running process instance
|
||||||
*
|
*
|
||||||
* @param definitionId process definition id
|
* @param definitionId process definition id
|
||||||
* @param startTime start time
|
* @param startTime start time
|
||||||
* @param endTime end time
|
* @param endTime end time
|
||||||
* @return process instance
|
* @return process instance
|
||||||
*/
|
*/
|
||||||
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
|
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
|
||||||
@ -1915,7 +1975,7 @@ public class ProcessService {
|
|||||||
/**
|
/**
|
||||||
* list unauthorized udf function
|
* list unauthorized udf function
|
||||||
*
|
*
|
||||||
* @param userId user id
|
* @param userId user id
|
||||||
* @param needChecks data source id array
|
* @param needChecks data source id array
|
||||||
* @return unauthorized udf function list
|
* @return unauthorized udf function list
|
||||||
*/
|
*/
|
||||||
|
@ -442,4 +442,26 @@ public class ProcessServiceTest {
|
|||||||
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
|
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChangeOutParam() {
|
||||||
|
String result = "[{\"d\":\"20210203\"}]";
|
||||||
|
TaskInstance taskInstance = new TaskInstance();
|
||||||
|
taskInstance.setProcessInstanceId(62);
|
||||||
|
taskInstance.setTaskJson("{\"id\":\"tasks-86175\",\"name\":\"wew\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,"
|
||||||
|
+ "\"retryInterval\":1,\"params\":{\"rawScript\":\"echo 20210203\",\"localParams\":[{\"prop\":\"d\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
|
||||||
|
+ "\"resourceList\":[]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
|
||||||
|
+ "\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,"
|
||||||
|
+ "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}");
|
||||||
|
ProcessInstance processInstance = new ProcessInstance();
|
||||||
|
processInstance.setId(62);
|
||||||
|
processInstance.setGlobalParams("[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
|
||||||
|
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
|
||||||
|
String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
|
||||||
|
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
|
||||||
|
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
|
||||||
|
Mockito.when(this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId())).thenReturn(1);
|
||||||
|
processService.changeOutParam(result,taskInstance);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
BIN
dolphinscheduler-ui.zip
Normal file
BIN
dolphinscheduler-ui.zip
Normal file
Binary file not shown.
@ -178,7 +178,7 @@
|
|||||||
:title="$t('Set the DAG diagram name')"
|
:title="$t('Set the DAG diagram name')"
|
||||||
:visible.sync="dialogVisible"
|
:visible.sync="dialogVisible"
|
||||||
width="auto">
|
width="auto">
|
||||||
<m-udp @onUdp="onUdpDialog" @close="closeDialog"></m-udp>
|
<m-udp ref="mUdp" @onUdp="onUdpDialog" @close="closeDialog"></m-udp>
|
||||||
</el-dialog>
|
</el-dialog>
|
||||||
<el-dialog
|
<el-dialog
|
||||||
:title="$t('Please set the parameters before starting')"
|
:title="$t('Please set the parameters before starting')"
|
||||||
@ -268,7 +268,7 @@
|
|||||||
},
|
},
|
||||||
methods: {
|
methods: {
|
||||||
...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState', 'switchProcessDefinitionVersion', 'getProcessDefinitionVersionsPage', 'deleteProcessDefinitionVersion']),
|
...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState', 'switchProcessDefinitionVersion', 'getProcessDefinitionVersionsPage', 'deleteProcessDefinitionVersion']),
|
||||||
...mapMutations('dag', ['addTasks', 'cacheTasks', 'resetParams', 'setIsEditDag', 'setName', 'addConnects']),
|
...mapMutations('dag', ['addTasks', 'cacheTasks', 'resetParams', 'setIsEditDag', 'setName', 'addConnects', 'resetLocalParam']),
|
||||||
startRunning (item, startNodeList, sourceType) {
|
startRunning (item, startNodeList, sourceType) {
|
||||||
this.startData = item
|
this.startData = item
|
||||||
this.startNodeList = startNodeList
|
this.startNodeList = startNodeList
|
||||||
@ -377,7 +377,7 @@
|
|||||||
|
|
||||||
// remove tip state dom
|
// remove tip state dom
|
||||||
$('.w').find('.state-p').html('')
|
$('.w').find('.state-p').html('')
|
||||||
|
const newTask = []
|
||||||
data.forEach(v1 => {
|
data.forEach(v1 => {
|
||||||
idArr.forEach(v2 => {
|
idArr.forEach(v2 => {
|
||||||
if (v2.name === v1.name) {
|
if (v2.name === v1.name) {
|
||||||
@ -387,6 +387,12 @@
|
|||||||
taskList.forEach(item => {
|
taskList.forEach(item => {
|
||||||
if (item.name === v1.name) {
|
if (item.name === v1.name) {
|
||||||
depState = item.state
|
depState = item.state
|
||||||
|
const params = item.taskJson ? JSON.parse(item.taskJson).params : ''
|
||||||
|
let localParam = params.localParams || []
|
||||||
|
newTask.push({
|
||||||
|
id: v2.id,
|
||||||
|
localParam
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
dom.attr('data-state-id', v1.stateId)
|
dom.attr('data-state-id', v1.stateId)
|
||||||
@ -403,6 +409,9 @@
|
|||||||
findComponentDownward(this.$root, `${this.type}-details`)._reset()
|
findComponentDownward(this.$root, `${this.type}-details`)._reset()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!isReset) {
|
||||||
|
this.resetLocalParam(newTask)
|
||||||
|
}
|
||||||
resolve()
|
resolve()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -550,7 +559,11 @@
|
|||||||
this.$message.warning(`${i18n.$t('Failed to create node to save')}`)
|
this.$message.warning(`${i18n.$t('Failed to create node to save')}`)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
this.dialogVisible = true
|
this.dialogVisible = true
|
||||||
|
this.$nextTick(() => {
|
||||||
|
this.$refs.mUdp.reloadParam()
|
||||||
|
})
|
||||||
},
|
},
|
||||||
/**
|
/**
|
||||||
* Return to the previous child node
|
* Return to the previous child node
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
:key="item.id"
|
:key="item.id"
|
||||||
@click="_getIndex($index)">
|
@click="_getIndex($index)">
|
||||||
<el-input
|
<el-input
|
||||||
:disabled="isDetails"
|
:disabled="isDetails || item.ifFixed"
|
||||||
type="text"
|
type="text"
|
||||||
size="small"
|
size="small"
|
||||||
v-model="localParamsList[$index].prop"
|
v-model="localParamsList[$index].prop"
|
||||||
@ -68,7 +68,7 @@
|
|||||||
@blur="_handleValue()"
|
@blur="_handleValue()"
|
||||||
:style="inputStyle">
|
:style="inputStyle">
|
||||||
</el-input>
|
</el-input>
|
||||||
<span class="lt-add">
|
<span class="lt-add" v-show="!item.ifFixed">
|
||||||
<a href="javascript:" style="color:red;" @click="!isDetails && _removeUdp($index)" >
|
<a href="javascript:" style="color:red;" @click="!isDetails && _removeUdp($index)" >
|
||||||
<em class="el-icon-delete" :class="_isDetails" data-toggle="tooltip" :title="$t('delete')" ></em>
|
<em class="el-icon-delete" :class="_isDetails" data-toggle="tooltip" :title="$t('delete')" ></em>
|
||||||
</a>
|
</a>
|
||||||
|
@ -46,7 +46,7 @@
|
|||||||
ref="refLocalParams"
|
ref="refLocalParams"
|
||||||
@on-local-params="_onLocalParams"
|
@on-local-params="_onLocalParams"
|
||||||
:udp-list="localParams"
|
:udp-list="localParams"
|
||||||
:hide="false">
|
:hide="true">
|
||||||
</m-local-params>
|
</m-local-params>
|
||||||
</div>
|
</div>
|
||||||
</m-list-box>
|
</m-list-box>
|
||||||
|
@ -142,7 +142,12 @@
|
|||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
_accuStore () {
|
_accuStore () {
|
||||||
this.store.commit('dag/setGlobalParams', _.cloneDeep(this.udpList))
|
const udp = _.cloneDeep(this.udpList)
|
||||||
|
udp.forEach(u => {
|
||||||
|
delete u.ifFixed
|
||||||
|
})
|
||||||
|
this.store.commit('dag/setGlobalParams', udp)
|
||||||
|
|
||||||
this.store.commit('dag/setName', _.cloneDeep(this.name))
|
this.store.commit('dag/setName', _.cloneDeep(this.name))
|
||||||
this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout))
|
this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout))
|
||||||
this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId))
|
this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId))
|
||||||
@ -191,6 +196,46 @@
|
|||||||
*/
|
*/
|
||||||
close () {
|
close () {
|
||||||
this.$emit('close')
|
this.$emit('close')
|
||||||
|
},
|
||||||
|
/**
|
||||||
|
* reload localParam
|
||||||
|
*/
|
||||||
|
reloadParam () {
|
||||||
|
const dag = _.cloneDeep(this.store.state.dag)
|
||||||
|
let fixedParam = []
|
||||||
|
const tasks = this.store.state.dag.tasks
|
||||||
|
for (const task of tasks) {
|
||||||
|
const localParam = task.params ? task.params.localParams : []
|
||||||
|
localParam.forEach(l => {
|
||||||
|
if (!fixedParam.some(f => { return f.prop === l.prop })) {
|
||||||
|
fixedParam.push(Object.assign({
|
||||||
|
ifFixed: true
|
||||||
|
}, l))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
let globalParams = _.cloneDeep(dag.globalParams)
|
||||||
|
|
||||||
|
globalParams = globalParams.map(g => {
|
||||||
|
if (fixedParam.some(f => { return g.prop === f.prop })) {
|
||||||
|
fixedParam = fixedParam.filter(f => { return g.prop !== f.prop })
|
||||||
|
return Object.assign(g, {
|
||||||
|
ifFixed: true
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
return g
|
||||||
|
}
|
||||||
|
})
|
||||||
|
let udpList = [...fixedParam, ...globalParams].sort(s => {
|
||||||
|
if (s.ifFixed) {
|
||||||
|
return -1
|
||||||
|
} else {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
})
|
||||||
|
this.udpList = udpList
|
||||||
|
this.udpListCache = udpList
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
watch: {
|
watch: {
|
||||||
@ -203,8 +248,7 @@
|
|||||||
},
|
},
|
||||||
created () {
|
created () {
|
||||||
const dag = _.cloneDeep(this.store.state.dag)
|
const dag = _.cloneDeep(this.store.state.dag)
|
||||||
this.udpList = dag.globalParams
|
|
||||||
this.udpListCache = dag.globalParams
|
|
||||||
this.name = dag.name
|
this.name = dag.name
|
||||||
this.originalName = dag.name
|
this.originalName = dag.name
|
||||||
this.description = dag.description
|
this.description = dag.description
|
||||||
|
@ -162,5 +162,16 @@ export default {
|
|||||||
} else {
|
} else {
|
||||||
state.cacheTasks[payload.id] = payload
|
state.cacheTasks[payload.id] = payload
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
resetLocalParam (state, payload) {
|
||||||
|
const tasks = state.tasks
|
||||||
|
tasks.forEach((task, index) => {
|
||||||
|
payload.forEach(p => {
|
||||||
|
if (p.id === task.id) {
|
||||||
|
tasks[index].params.localParams = p.localParam
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
state.tasks = tasks
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
2
pom.xml
2
pom.xml
@ -925,6 +925,8 @@
|
|||||||
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
|
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
|
||||||
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
|
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
|
||||||
<include>**/server/worker/task/TaskManagerTest.java</include>
|
<include>**/server/worker/task/TaskManagerTest.java</include>
|
||||||
|
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
|
||||||
|
<include>**/server/worker/task/ShellTaskReturnTest.java</include>
|
||||||
<include>**/server/worker/EnvFileTest.java</include>
|
<include>**/server/worker/EnvFileTest.java</include>
|
||||||
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
|
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
|
||||||
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
|
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
|
||||||
|
Loading…
Reference in New Issue
Block a user