mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 03:08:01 +08:00
cherry-pick [Fix-13187] refactor OpenmldbParameters #13190
This commit is contained in:
parent
6b7bcda380
commit
32a76ee419
@ -325,6 +325,14 @@ public class JSONUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static String toPrettyJsonString(Object object) {
|
||||
try {
|
||||
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Object json deserialization exception.", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* serialize to json byte
|
||||
*
|
||||
|
@ -17,14 +17,11 @@
|
||||
|
||||
package org.apache.dolphinscheduler.plugin.task.openmldb;
|
||||
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
|
||||
import org.apache.dolphinscheduler.plugin.task.python.PythonParameters;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class OpenmldbParameters extends AbstractParameters {
|
||||
public class OpenmldbParameters extends PythonParameters {
|
||||
|
||||
private String zk;
|
||||
private String zkPath;
|
||||
@ -34,11 +31,6 @@ public class OpenmldbParameters extends AbstractParameters {
|
||||
*/
|
||||
private String sql;
|
||||
|
||||
/**
|
||||
* resource list
|
||||
*/
|
||||
private List<ResourceInfo> resourceList;
|
||||
|
||||
public String getZk() {
|
||||
return zk;
|
||||
}
|
||||
@ -71,21 +63,8 @@ public class OpenmldbParameters extends AbstractParameters {
|
||||
this.sql = sql;
|
||||
}
|
||||
|
||||
public List<ResourceInfo> getResourceList() {
|
||||
return resourceList;
|
||||
}
|
||||
|
||||
public void setResourceList(List<ResourceInfo> resourceList) {
|
||||
this.resourceList = resourceList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkParameters() {
|
||||
return StringUtils.isNotEmpty(zk) && StringUtils.isNotEmpty(zkPath) && StringUtils.isNotEmpty(sql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ResourceInfo> getResourceFilesList() {
|
||||
return this.resourceList;
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
|
||||
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
|
||||
import org.apache.dolphinscheduler.plugin.task.python.PythonTask;
|
||||
@ -42,9 +41,8 @@ import com.google.common.base.Preconditions;
|
||||
public class OpenmldbTask extends PythonTask {
|
||||
|
||||
/**
|
||||
* openmldb parameters
|
||||
* openmldb parameters: cast pythonParameters to OpenmldbParameters
|
||||
*/
|
||||
private OpenmldbParameters openmldbParameters;
|
||||
|
||||
/**
|
||||
* python process(openmldb only supports version 3 by default)
|
||||
@ -63,11 +61,10 @@ public class OpenmldbTask extends PythonTask {
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
logger.info("openmldb task params {}", taskRequest.getTaskParams());
|
||||
pythonParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
|
||||
|
||||
openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
|
||||
|
||||
if (openmldbParameters == null || !openmldbParameters.checkParameters()) {
|
||||
logger.info("Initialize openmldb task params {}", JSONUtils.toPrettyJsonString(pythonParameters));
|
||||
if (pythonParameters == null || !pythonParameters.checkParameters()) {
|
||||
throw new TaskException("openmldb task params is not valid");
|
||||
}
|
||||
}
|
||||
@ -78,11 +75,6 @@ public class OpenmldbTask extends PythonTask {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractParameters getParameters() {
|
||||
return openmldbParameters;
|
||||
}
|
||||
|
||||
/**
|
||||
* build python command file path
|
||||
*
|
||||
@ -100,6 +92,7 @@ public class OpenmldbTask extends PythonTask {
|
||||
*/
|
||||
@Override
|
||||
protected String buildPythonScriptContent() {
|
||||
OpenmldbParameters openmldbParameters = (OpenmldbParameters) pythonParameters;
|
||||
logger.info("raw sql script : {}", openmldbParameters.getSql());
|
||||
|
||||
String rawSQLScript = openmldbParameters.getSql().replaceAll("[\\r]?\\n", "\n");
|
||||
@ -117,6 +110,7 @@ public class OpenmldbTask extends PythonTask {
|
||||
StringBuilder builder = new StringBuilder("import openmldb\nimport sqlalchemy as db\n");
|
||||
|
||||
// connect to openmldb
|
||||
OpenmldbParameters openmldbParameters = (OpenmldbParameters) pythonParameters;
|
||||
builder.append(String.format("engine = db.create_engine('openmldb:///?zk=%s&zkPath=%s')\n",
|
||||
openmldbParameters.getZk(), openmldbParameters.getZkPath()));
|
||||
builder.append("con = engine.connect()\n");
|
||||
|
Loading…
Reference in New Issue
Block a user