[Improvement-14136][task] Support submitting spark-sql task with the sql file in resource center (#14527)

Co-authored-by: xiangzihao <460888207@qq.com>
This commit is contained in:
Rick Cheng 2023-07-19 10:09:30 +08:00 committed by GitHub
parent 758d5af023
commit 9778652f17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 111 additions and 157 deletions

View File

@ -28,6 +28,4 @@ public class HiveCliConstants {
public static final String HIVE_CLI_EXECUTE_FILE = "hive -f";
public static final String HIVE_CLI_EXECUTE_SCRIPT = "hive -e \"%s\"";
}

View File

@ -101,4 +101,8 @@ public class SparkConstants {
*/
public static final String SPARK_SUBMIT_COMMAND = "${SPARK_HOME}/bin/spark-submit";
public static final String TYPE_SCRIPT = "SCRIPT";
public static final String TYPE_FILE = "FILE";
}

View File

@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import java.util.ArrayList;
import java.util.List;
/**
* spark parameters
*/
import lombok.Data;
@Data
public class SparkParameters extends AbstractParameters {
/**
@ -109,134 +109,9 @@ public class SparkParameters extends AbstractParameters {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
/**
* resource list
*/
private List<ResourceInfo> resourceList = new ArrayList<>();
public ResourceInfo getMainJar() {
return mainJar;
}
public void setMainJar(ResourceInfo mainJar) {
this.mainJar = mainJar;
}
public String getMainClass() {
return mainClass;
}
public void setMainClass(String mainClass) {
this.mainClass = mainClass;
}
public String getDeployMode() {
return deployMode;
}
public void setDeployMode(String deployMode) {
this.deployMode = deployMode;
}
public String getMainArgs() {
return mainArgs;
}
public void setMainArgs(String mainArgs) {
this.mainArgs = mainArgs;
}
public int getDriverCores() {
return driverCores;
}
public void setDriverCores(int driverCores) {
this.driverCores = driverCores;
}
public String getDriverMemory() {
return driverMemory;
}
public void setDriverMemory(String driverMemory) {
this.driverMemory = driverMemory;
}
public int getNumExecutors() {
return numExecutors;
}
public void setNumExecutors(int numExecutors) {
this.numExecutors = numExecutors;
}
public int getExecutorCores() {
return executorCores;
}
public void setExecutorCores(int executorCores) {
this.executorCores = executorCores;
}
public String getExecutorMemory() {
return executorMemory;
}
public void setExecutorMemory(String executorMemory) {
this.executorMemory = executorMemory;
}
public String getAppName() {
return appName;
}
public void setAppName(String appName) {
this.appName = appName;
}
public String getYarnQueue() {
return yarnQueue;
}
public void setYarnQueue(String yarnQueue) {
this.yarnQueue = yarnQueue;
}
public String getOthers() {
return others;
}
public void setOthers(String others) {
this.others = others;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
public ProgramType getProgramType() {
return programType;
}
public void setProgramType(ProgramType programType) {
this.programType = programType;
}
public String getRawScript() {
return rawScript;
}
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
private String sqlExecutionType;
@Override
public boolean checkParameters() {

View File

@ -26,6 +26,7 @@ import static org.apache.dolphinscheduler.plugin.task.spark.SparkConstants.SPARK
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
@ -33,11 +34,13 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@ -196,8 +199,28 @@ public class SparkTask extends AbstractYarnTask {
// bin/spark-sql -f fileName
if (ProgramType.SQL == programType) {
String sqlContent = "";
String resourceFileName = "";
args.add(SparkConstants.SQL_FROM_FILE);
args.add(generateScriptFile());
if (SparkConstants.TYPE_FILE.equals(sparkParameters.getSqlExecutionType())) {
final List<ResourceInfo> resourceInfos = sparkParameters.getResourceList();
if (resourceInfos.size() > 1) {
log.warn("more than 1 files detected, use the first one by default");
}
try {
resourceFileName = resourceInfos.get(0).getResourceName();
sqlContent = FileUtils.readFileToString(
new File(String.format("%s/%s", taskExecutionContext.getExecutePath(), resourceFileName)),
StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("read sql content from file {} error ", resourceFileName, e);
throw new TaskException("read sql content error", e);
}
} else {
sqlContent = sparkParameters.getRawScript();
}
args.add(generateScriptFile(sqlContent));
}
return args;
}
@ -229,7 +252,7 @@ public class SparkTask extends AbstractYarnTask {
}
}
private String generateScriptFile() {
private String generateScriptFile(String sqlContent) {
String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
@ -237,10 +260,9 @@ public class SparkTask extends AbstractYarnTask {
Path path = file.toPath();
if (!Files.exists(path)) {
String script = replaceParam(sparkParameters.getRawScript());
sparkParameters.setRawScript(script);
String script = replaceParam(sqlContent);
log.info("raw script : {}", sparkParameters.getRawScript());
log.info("raw script : {}", script);
log.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
@ -254,7 +276,7 @@ public class SparkTask extends AbstractYarnTask {
}
Files.createFile(path, attr);
}
Files.write(path, sparkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
Files.write(path, script.getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
throw new RuntimeException("generate spark sql script error", e);
}

View File

@ -718,6 +718,9 @@ export default {
zeppelin_username_tips: 'Please enter the zeppelin server username',
zeppelin_password: 'zeppelinPassword',
zeppelin_password_tips: 'Please enter the zeppelin server password',
sql_execution_type: 'SQL Input',
sql_execution_type_from_file: 'FROM_FILE',
sql_execution_type_from_script: 'FROM_SCRIPT',
hive_cli_task_execution_type: 'Hive Cli Task Execution Type',
hive_sql_script: 'Hive SQL Script',
hive_cli_options: 'Hive Cli Options',

View File

@ -703,6 +703,9 @@ export default {
zeppelin_username_tips: '请输入zeppelin server的登陆用户名',
zeppelin_password: 'zeppelinPassword',
zeppelin_password_tips: '请输入zeppelin server的登陆密码',
sql_execution_type: 'SQL来源',
sql_execution_type_from_file: '选择资源中心文件',
sql_execution_type_from_script: '脚本输入',
hive_cli_task_execution_type: 'Hive Cli 任务类型',
hive_sql_script: 'Hive SQL 脚本',
hive_cli_options: 'Hive Cli 选项',

View File

@ -25,18 +25,29 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] {
model.hiveCliTaskExecutionType === 'SCRIPT' ? 24 : 0
)
const resourcesRequired = ref(
model.hiveCliTaskExecutionType === 'SCRIPT' ? false : true
model.hiveCliTaskExecutionType !== 'SCRIPT'
)
const resourcesLimit = computed(() =>
model.hiveCliTaskExecutionType === 'SCRIPT' ? -1 : 1
)
const SQL_EXECUTION_TYPES = [
{
label: t('project.node.sql_execution_type_from_script'),
value: 'SCRIPT'
},
{
label: t('project.node.sql_execution_type_from_file'),
value: 'FILE'
}
]
watch(
() => model.hiveCliTaskExecutionType,
() => {
resourcesRequired.value =
model.hiveCliTaskExecutionType === 'SCRIPT' ? false : true
model.hiveCliTaskExecutionType !== 'SCRIPT'
}
)
@ -45,8 +56,8 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] {
type: 'select',
field: 'hiveCliTaskExecutionType',
span: 12,
name: t('project.node.hive_cli_task_execution_type'),
options: HIVE_CLI_TASK_EXECUTION_TYPES,
name: t('project.node.sql_execution_type'),
options: SQL_EXECUTION_TYPES,
validate: {
trigger: ['input', 'blur'],
required: true
@ -77,14 +88,3 @@ export function useHiveCli(model: { [field: string]: any }): IJsonItem[] {
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}
export const HIVE_CLI_TASK_EXECUTION_TYPES = [
{
label: 'FROM_SCRIPT',
value: 'SCRIPT'
},
{
label: 'FROM_FILE',
value: 'FILE'
}
]

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { computed, ref } from 'vue'
import { computed, ref, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import {
useCustomParams,
@ -39,10 +39,39 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
const rawScriptSpan = computed(() => (model.programType === 'SQL' ? 24 : 0))
const rawScriptSpan = computed(() => (model.programType === 'SQL' && model.sqlExecutionType === 'SCRIPT' ? 24 : 0))
const showCluster = computed(() => model.programType !== 'SQL')
const resourcesRequired = ref(
model.programType === 'SQL' && model.sqlExecutionType === 'FILE'
)
const resourcesLimit = computed(() =>
model.programType === 'SQL' && model.sqlExecutionType === 'FILE' ? 1 : -1
)
const sqlExecutionTypeSpan = computed(() => (model.programType === 'SQL' ? 12 : 0))
const SQL_EXECUTION_TYPES = [
{
label: t('project.node.sql_execution_type_from_script'),
value: 'SCRIPT'
},
{
label: t('project.node.sql_execution_type_from_file'),
value: 'FILE'
}
]
watch(
() => [model.sqlExecutionType, model.programType],
() => {
resourcesRequired.value =
model.programType === 'SQL' && model.sqlExecutionType === 'FILE'
}
)
return [
{
type: 'select',
@ -57,6 +86,17 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
}
}
},
{
type: 'select',
field: 'sqlExecutionType',
span: sqlExecutionTypeSpan,
name: t('project.node.sql_execution_type'),
options: SQL_EXECUTION_TYPES,
validate: {
trigger: ['input', 'blur'],
required: true
}
},
{
type: 'input',
field: 'mainClass',
@ -85,6 +125,9 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
field: 'rawScript',
span: rawScriptSpan,
name: t('project.node.script'),
props: {
language: 'sql'
},
validate: {
trigger: ['input', 'trigger'],
required: true,
@ -126,7 +169,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
placeholder: t('project.node.option_parameters_tips')
}
},
useResources(),
useResources(24, resourcesRequired, resourcesLimit),
...useCustomParams({ model, field: 'localParams', isSimple: false })
]
}

View File

@ -71,6 +71,7 @@ export function formatParams(data: INodeData): {
taskParams.numExecutors = data.numExecutors
taskParams.executorMemory = data.executorMemory
taskParams.executorCores = data.executorCores
taskParams.sqlExecutionType = data.sqlExecutionType
}
if (data.taskType === 'FLINK' || data.taskType === 'FLINK_STREAM') {
@ -325,7 +326,8 @@ export function formatParams(data: INodeData): {
executorMemory: data.executorMemory,
numExecutors: data.numExecutors,
others: data.others,
yarnQueue: data.yarnQueue
yarnQueue: data.yarnQueue,
sqlExecutionType: data.sqlExecutionType
}
}
@ -726,6 +728,7 @@ export function formatModel(data: ITaskData) {
params.executorMemory = data.taskParams.sparkParameters.executorMemory
params.numExecutors = data.taskParams.sparkParameters.numExecutors
params.others = data.taskParams.sparkParameters.others
params.sqlExecutionType = data.taskParams.sparkParameters.sqlExecutionType
}
if (data.taskParams?.conditionResult?.successNode?.length) {

View File

@ -52,7 +52,8 @@ export function useSpark({
executorMemory: '2G',
executorCores: 2,
yarnQueue: '',
timeoutNotifyStrategy: ['WARN']
timeoutNotifyStrategy: ['WARN'],
sqlExecutionType: 'SCRIPT'
} as INodeData)
return {

View File

@ -224,6 +224,7 @@ interface ISparkParameters {
numExecutors?: number
others?: string
yarnQueue?: string
sqlExecutionType?: string
}
interface IRuleParameters {
@ -348,6 +349,7 @@ interface ITaskParams {
hiveCliOptions?: string
hiveSqlScript?: string
hiveCliTaskExecutionType?: string
sqlExecutionType?: string
noteId?: string
paragraphId?: string
condaEnvName?: string