Resolve AbstractTask code conflicts

This commit is contained in:
zhuangchong 2022-10-27 13:03:33 +08:00
parent 58ef3cccd7
commit 7ca4438682

View File

@ -21,17 +21,8 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
@ -49,7 +40,8 @@ public abstract class AbstractTask {
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
protected final Logger logger =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
@ -106,62 +98,6 @@ public abstract class AbstractTask {
return null;
}
<<<<<<< HEAD
public abstract void handle() throws TaskException;
/**
* cancel application
*
* @param status status
* @throws Exception exception
*/
public void cancelApplication(boolean status) throws Exception {
this.cancel = status;
}
/**
* get application ids
* @return
* @throws IOException
*/
public Set<String> getApplicationIds() throws IOException {
Set<String> appIds = new HashSet<>();
File file = new File(taskRequest.getLogPath());
if (!file.exists()) {
return appIds;
}
/*
* analysis log? get submitted yarn application id
*/
try (
BufferedReader br = new BufferedReader(
new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
String appId = findAppId(line);
if (StringUtils.isNotEmpty(appId)) {
appIds.add(appId);
}
}
}
return appIds;
}
/**
* find app id
*
* @param line line
* @return appid
*/
protected String findAppId(String line) {
Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group();
}
return null;
}
public abstract void handle(TaskCallBack taskCallBack) throws TaskException;
public abstract void cancel() throws TaskException;
@ -282,7 +218,11 @@ public abstract class AbstractTask {
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap,
Map<String, Property> paramsPropsMap,int taskInstanceId) {
Map<String, Property> paramsPropsMap, int taskInstanceId) {
if (paramsPropsMap == null) {
return;
}
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
@ -292,12 +232,16 @@ public abstract class AbstractTask {
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskInstanceId);
logger.error(
"setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.",
paramName, taskInstanceId);
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
logger.info(
"setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.",
paramName, content);
}
}