#203 add sqoop task (#1974)

* add sqoop task

* add sqoop task

* add sqoop task test

* add sqoop task test

* add sqoop task test

* modify add sqoop task test

* modify sqoop task test

* modify sqoop task test

* modify sqoop task test

* modify sqoop task test 2

* modify sqoop task test 3

* modify sqoop task test 3

* modify sqoop task test 3

* modify sqoop task test 4
This commit is contained in:
孙朝和 2020-02-24 10:30:17 +08:00 committed by GitHub
parent 5ecd3b30b9
commit 3d6eee5d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 2910 additions and 3 deletions

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum QueryType {
FORM,
SQL;
public static QueryType getEnum(int value){
for (QueryType e:QueryType.values()) {
if(e.ordinal() == value) {
return e;
}
}
//For values out of enum scope
return null;
}
}

View File

@ -34,6 +34,7 @@ public enum TaskType {
* 8 FLINK
* 9 HTTP
* 10 DATAX
* 11 SQOOP
*/
SHELL(0, "shell"),
SQL(1, "sql"),
@ -45,7 +46,8 @@ public enum TaskType {
DEPENDENT(7, "dependent"),
FLINK(8, "flink"),
HTTP(9, "http"),
DATAX(10, "datax");
DATAX(10, "datax"),
SQOOP(11, "sqoop");
TaskType(int code, String descp){
this.code = code;

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* sqoop parameters
*/
public class SqoopParameters extends AbstractParameters {
/**
* model type
*/
private String modelType;
/**
* concurrency
*/
private int concurrency;
/**
* source type
*/
private String sourceType;
/**
* target type
*/
private String targetType;
/**
* source params
*/
private String sourceParams;
/**
* target params
*/
private String targetParams;
public String getModelType() {
return modelType;
}
public void setModelType(String modelType) {
this.modelType = modelType;
}
public int getConcurrency() {
return concurrency;
}
public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}
public String getSourceType() {
return sourceType;
}
public void setSourceType(String sourceType) {
this.sourceType = sourceType;
}
public String getTargetType() {
return targetType;
}
public void setTargetType(String targetType) {
this.targetType = targetType;
}
public String getSourceParams() {
return sourceParams;
}
public void setSourceParams(String sourceParams) {
this.sourceParams = sourceParams;
}
public String getTargetParams() {
return targetParams;
}
public void setTargetParams(String targetParams) {
this.targetParams = targetParams;
}
@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(modelType)&&
concurrency != 0 &&
StringUtils.isNotEmpty(sourceType)&&
StringUtils.isNotEmpty(targetType)&&
StringUtils.isNotEmpty(sourceParams)&&
StringUtils.isNotEmpty(targetParams);
}
@Override
public List<String> getResourceFilesList() {
return new ArrayList<>();
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.sources;
/**
* source hdfs parameter
*/
public class SourceHdfsParameter {
/**
* export dir
*/
private String exportDir;
public String getExportDir() {
return exportDir;
}
public void setExportDir(String exportDir) {
this.exportDir = exportDir;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.sources;
/**
* source hive parameter
*/
public class SourceHiveParameter {
/**
* hive database
*/
private String hiveDatabase;
/**
* hive table
*/
private String hiveTable;
/**
* hive partition key
*/
private String hivePartitionKey;
/**
* hive partition value
*/
private String hivePartitionValue;
public String getHiveDatabase() {
return hiveDatabase;
}
public void setHiveDatabase(String hiveDatabase) {
this.hiveDatabase = hiveDatabase;
}
public String getHiveTable() {
return hiveTable;
}
public void setHiveTable(String hiveTable) {
this.hiveTable = hiveTable;
}
public String getHivePartitionKey() {
return hivePartitionKey;
}
public void setHivePartitionKey(String hivePartitionKey) {
this.hivePartitionKey = hivePartitionKey;
}
public String getHivePartitionValue() {
return hivePartitionValue;
}
public void setHivePartitionValue(String hivePartitionValue) {
this.hivePartitionValue = hivePartitionValue;
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.sources;
import org.apache.dolphinscheduler.common.process.Property;
import java.util.List;
/**
* source mysql parameter
*/
public class SourceMysqlParameter {
/**
* src datasource
*/
private int srcDatasource;
/**
* src table
*/
private String srcTable;
/**
* src query type
*/
private int srcQueryType;
/**
* src query sql
*/
private String srcQuerySql;
/**
* src column type
*/
private int srcColumnType;
/**
* src columns
*/
private String srcColumns;
/**
* src condition list
*/
private List<Property> srcConditionList;
/**
* map column hive
*/
private List<Property> mapColumnHive;
/**
* map column java
*/
private List<Property> mapColumnJava;
public int getSrcDatasource() {
return srcDatasource;
}
public void setSrcDatasource(int srcDatasource) {
this.srcDatasource = srcDatasource;
}
public String getSrcTable() {
return srcTable;
}
public void setSrcTable(String srcTable) {
this.srcTable = srcTable;
}
public int getSrcQueryType() {
return srcQueryType;
}
public void setSrcQueryType(int srcQueryType) {
this.srcQueryType = srcQueryType;
}
public String getSrcQuerySql() {
return srcQuerySql;
}
public void setSrcQuerySql(String srcQuerySql) {
this.srcQuerySql = srcQuerySql;
}
public int getSrcColumnType() {
return srcColumnType;
}
public void setSrcColumnType(int srcColumnType) {
this.srcColumnType = srcColumnType;
}
public String getSrcColumns() {
return srcColumns;
}
public void setSrcColumns(String srcColumns) {
this.srcColumns = srcColumns;
}
public List<Property> getSrcConditionList() {
return srcConditionList;
}
public void setSrcConditionList(List<Property> srcConditionList) {
this.srcConditionList = srcConditionList;
}
public List<Property> getMapColumnHive() {
return mapColumnHive;
}
public void setMapColumnHive(List<Property> mapColumnHive) {
this.mapColumnHive = mapColumnHive;
}
public List<Property> getMapColumnJava() {
return mapColumnJava;
}
public void setMapColumnJava(List<Property> mapColumnJava) {
this.mapColumnJava = mapColumnJava;
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.targets;
/**
* target hdfs parameter
*/
public class TargetHdfsParameter {
/**
* target dir
*/
private String targetPath;
/**
* delete target dir
*/
private boolean deleteTargetDir;
/**
* file type
*/
private String fileType;
/**
* compression codec
*/
private String compressionCodec;
/**
* fields terminated
*/
private String fieldsTerminated;
/**
* lines terminated
*/
private String linesTerminated;
public String getTargetPath() {
return targetPath;
}
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
public boolean isDeleteTargetDir() {
return deleteTargetDir;
}
public void setDeleteTargetDir(boolean deleteTargetDir) {
this.deleteTargetDir = deleteTargetDir;
}
public String getFileType() {
return fileType;
}
public void setFileType(String fileType) {
this.fileType = fileType;
}
public String getCompressionCodec() {
return compressionCodec;
}
public void setCompressionCodec(String compressionCodec) {
this.compressionCodec = compressionCodec;
}
public String getFieldsTerminated() {
return fieldsTerminated;
}
public void setFieldsTerminated(String fieldsTerminated) {
this.fieldsTerminated = fieldsTerminated;
}
public String getLinesTerminated() {
return linesTerminated;
}
public void setLinesTerminated(String linesTerminated) {
this.linesTerminated = linesTerminated;
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.targets;
/**
* target hive parameter
*/
public class TargetHiveParameter {
/**
* hive database
*/
private String hiveDatabase;
/**
* hive table
*/
private String hiveTable;
/**
* create hive table
*/
private boolean createHiveTable;
/**
* drop delimiter
*/
private boolean dropDelimiter;
/**
* hive overwrite
*/
private boolean hiveOverWrite;
/**
* replace delimiter
*/
private String replaceDelimiter;
/**
* hive partition key
*/
private String hivePartitionKey;
/**
* hive partition value
*/
private String hivePartitionValue;
public String getHiveDatabase() {
return hiveDatabase;
}
public void setHiveDatabase(String hiveDatabase) {
this.hiveDatabase = hiveDatabase;
}
public String getHiveTable() {
return hiveTable;
}
public void setHiveTable(String hiveTable) {
this.hiveTable = hiveTable;
}
public boolean isCreateHiveTable() {
return createHiveTable;
}
public void setCreateHiveTable(boolean createHiveTable) {
this.createHiveTable = createHiveTable;
}
public boolean isDropDelimiter() {
return dropDelimiter;
}
public void setDropDelimiter(boolean dropDelimiter) {
this.dropDelimiter = dropDelimiter;
}
public boolean isHiveOverWrite() {
return hiveOverWrite;
}
public void setHiveOverWrite(boolean hiveOverWrite) {
this.hiveOverWrite = hiveOverWrite;
}
public String getReplaceDelimiter() {
return replaceDelimiter;
}
public void setReplaceDelimiter(String replaceDelimiter) {
this.replaceDelimiter = replaceDelimiter;
}
public String getHivePartitionKey() {
return hivePartitionKey;
}
public void setHivePartitionKey(String hivePartitionKey) {
this.hivePartitionKey = hivePartitionKey;
}
public String getHivePartitionValue() {
return hivePartitionValue;
}
public void setHivePartitionValue(String hivePartitionValue) {
this.hivePartitionValue = hivePartitionValue;
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sqoop.targets;
/**
* target mysql parameter
*/
public class TargetMysqlParameter {
/**
* target datasource
*/
private int targetDatasource;
/**
* target table
*/
private String targetTable;
/**
* target columns
*/
private String targetColumns;
/**
* fields terminated
*/
private String fieldsTerminated;
/**
* lines terminated
*/
private String linesTerminated;
/**
* pre query
*/
private String preQuery;
/**
* is update
*/
private boolean isUpdate;
/**
* target update key
*/
private String targetUpdateKey;
/**
* target update mode
*/
private String targetUpdateMode;
public int getTargetDatasource() {
return targetDatasource;
}
public void setTargetDatasource(int targetDatasource) {
this.targetDatasource = targetDatasource;
}
public String getTargetTable() {
return targetTable;
}
public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}
public String getTargetColumns() {
return targetColumns;
}
public void setTargetColumns(String targetColumns) {
this.targetColumns = targetColumns;
}
public String getFieldsTerminated() {
return fieldsTerminated;
}
public void setFieldsTerminated(String fieldsTerminated) {
this.fieldsTerminated = fieldsTerminated;
}
public String getLinesTerminated() {
return linesTerminated;
}
public void setLinesTerminated(String linesTerminated) {
this.linesTerminated = linesTerminated;
}
public String getPreQuery() {
return preQuery;
}
public void setPreQuery(String preQuery) {
this.preQuery = preQuery;
}
public boolean isUpdate() {
return isUpdate;
}
public void setUpdate(boolean update) {
isUpdate = update;
}
public String getTargetUpdateKey() {
return targetUpdateKey;
}
public void setTargetUpdateKey(String targetUpdateKey) {
this.targetUpdateKey = targetUpdateKey;
}
public String getTargetUpdateMode() {
return targetUpdateMode;
}
public void setTargetUpdateMode(String targetUpdateMode) {
this.targetUpdateMode = targetUpdateMode;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,6 +72,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, HttpParameters.class);
case DATAX:
return JSONUtils.parseObject(parameter, DataxParameters.class);
case SQOOP:
return JSONUtils.parseObject(parameter, SqoopParameters.class);
default:
return null;
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
/**
* entity test utils
*/
public class EntityTestUtils {
private static final Map<String, Object> OBJECT_MAP = new HashMap<>();
private static final String SKIP_METHOD = "getClass,notify,notifyAll,wait,equals,hashCode,clone";
static {
OBJECT_MAP.put("java.lang.Long", 1L);
OBJECT_MAP.put("java.lang.String", "test");
OBJECT_MAP.put("java.lang.Integer", 1);
OBJECT_MAP.put("int", 1);
OBJECT_MAP.put("long", 1L);
OBJECT_MAP.put("java.util.Date", new Date());
OBJECT_MAP.put("char", '1');
OBJECT_MAP.put("java.util.Map", new HashMap());
OBJECT_MAP.put("boolean", true);
}
public static void run(List<Class> classList)
throws IllegalAccessException, InvocationTargetException, InstantiationException {
for (Class temp : classList) {
Object tempInstance = new Object();
Constructor[] constructors = temp.getConstructors();
for (Constructor constructor : constructors) {
final Class<?>[] parameterTypes = constructor.getParameterTypes();
if (parameterTypes.length == 0) {
tempInstance = constructor.newInstance();
} else {
Object[] objects = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
objects[i] = OBJECT_MAP.get(parameterTypes[i].getName());
}
tempInstance = constructor.newInstance(objects);
}
}
Method[] methods = temp.getMethods();
for (final Method method : methods) {
if (SKIP_METHOD.contains(method.getName())) {
break;
}
final Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 0) {
Object[] objects = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
objects[i] = OBJECT_MAP.get(parameterTypes[i].getName());
}
method.invoke(tempInstance, objects);
} else {
method.invoke(tempInstance);
}
}
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* sqoop parameter entity test
*/
public class SqoopParameterEntityTest {
@Test
public void testEntity(){
try {
List<Class> classList = new ArrayList<>();
classList.add(SourceMysqlParameter.class);
classList.add(SourceHiveParameter.class);
classList.add(SourceHdfsParameter.class);
classList.add(SqoopParameters.class);
classList.add(TargetMysqlParameter.class);
classList.add(TargetHiveParameter.class);
classList.add(TargetHdfsParameter.class);
EntityTestUtils.run(classList);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
@ -198,6 +199,9 @@ public abstract class AbstractTask {
case DATAX:
paramsClass = DataxParameters.class;
break;
case SQOOP:
paramsClass = SqoopParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");

View File

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.task.python.PythonTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
import org.slf4j.Logger;
/**
@ -68,6 +69,8 @@ public class TaskManager {
return new HttpTask(props, logger);
case DATAX:
return new DataxTask(props, logger);
case SQOOP:
return new SqoopTask(props, logger);
default:
logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type");

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.slf4j.Logger;
import java.util.Map;
/**
* sqoop task extends the shell task
*/
public class SqoopTask extends AbstractYarnTask {
private SqoopParameters sqoopParameters;
public SqoopTask(TaskProps props, Logger logger){
super(props,logger);
}
@Override
public void init() throws Exception {
logger.info("sqoop task params {}", taskProps.getTaskParams());
sqoopParameters =
JSON.parseObject(taskProps.getTaskParams(),SqoopParameters.class);
if (!sqoopParameters.checkParameters()) {
throw new RuntimeException("sqoop task params is not valid");
}
}
@Override
protected String buildCommand() throws Exception {
//get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters);
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sqoopParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if(paramsMap != null){
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
return resultScripts;
}
return null;
}
@Override
public AbstractParameters getParameters() {
return sqoopParameters;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* common script generator
*/
public class CommonGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
result.append("sqoop ")
.append(sqoopParameters.getModelType());
if(sqoopParameters.getConcurrency() >0){
result.append(" -m ")
.append(sqoopParameters.getConcurrency());
}
}catch (Exception e){
logger.error(e.getMessage());
}
return result.toString();
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
/**
* Source Generator Interface
*/
public interface ISourceGenerator {
/**
* generate the source script
* @param sqoopParameters sqoop params
* @return
*/
String generate(SqoopParameters sqoopParameters);
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
/**
* Target Generator Interface
*/
public interface ITargetGenerator {
/**
* generate the target script
* @param sqoopParameters sqoop params
* @return
*/
String generate(SqoopParameters sqoopParameters);
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HiveSourceGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.MysqlSourceGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.HdfsTargetGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.HiveTargetGenerator;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.MysqlTargetGenerator;
/**
* Sqoop Job Scripts Generator
*/
public class SqoopJobGenerator {
private static final String MYSQL = "MYSQL";
private static final String HIVE = "HIVE";
private static final String HDFS = "HDFS";
/**
* target script generator
*/
private ITargetGenerator targetGenerator;
/**
* source script generator
*/
private ISourceGenerator sourceGenerator;
/**
* common script generator
*/
private CommonGenerator commonGenerator;
public SqoopJobGenerator(){
commonGenerator = new CommonGenerator();
}
private void createSqoopJobGenerator(String sourceType,String targetType){
sourceGenerator = createSourceGenerator(sourceType);
targetGenerator = createTargetGenerator(targetType);
}
/**
* get the final sqoop scripts
* @param sqoopParameters
* @return
*/
public String generateSqoopJob(SqoopParameters sqoopParameters){
createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType());
if(sourceGenerator == null || targetGenerator == null){
return null;
}
return commonGenerator.generate(sqoopParameters)
+ sourceGenerator.generate(sqoopParameters)
+ targetGenerator.generate(sqoopParameters);
}
/**
* get the source generator
* @param sourceType
* @return
*/
private ISourceGenerator createSourceGenerator(String sourceType){
switch (sourceType){
case MYSQL:
return new MysqlSourceGenerator();
case HIVE:
return new HiveSourceGenerator();
case HDFS:
return new HdfsSourceGenerator();
default:
return null;
}
}
/**
* get the target generator
* @param targetType
* @return
*/
private ITargetGenerator createTargetGenerator(String targetType){
switch (targetType){
case MYSQL:
return new MysqlTargetGenerator();
case HIVE:
return new HiveTargetGenerator();
case HDFS:
return new HdfsTargetGenerator();
default:
return null;
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* hdfs source generator
*/
public class HdfsSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
SourceHdfsParameter sourceHdfsParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class);
if(sourceHdfsParameter != null){
if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){
result.append(" --export-dir ")
.append(sourceHdfsParameter.getExportDir());
}else{
throw new Exception("--export-dir is null");
}
}
}catch (Exception e){
logger.error("get hdfs source failed",e);
}
return result.toString();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* hive source generator
*/
public class HiveSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder sb = new StringBuilder();
try{
SourceHiveParameter sourceHiveParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class);
if(sourceHiveParameter != null){
if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){
sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase());
}
if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){
sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable());
}
if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&&
StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){
sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey())
.append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue());
}
}
}catch (Exception e){
logger.error(e.getMessage());
}
return sb.toString();
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.QueryType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* mysql source generator
*/
public class MysqlSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try {
SourceMysqlParameter sourceMysqlParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);
if(sourceMysqlParameter != null){
ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
DataSource dataSource= processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
.append(baseDataSource.getJdbcUrl())
.append(" --username ")
.append(baseDataSource.getUser())
.append(" --password ")
.append(baseDataSource.getPassword());
if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){
if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){
result.append(" --table ").append(sourceMysqlParameter.getSrcTable());
}
if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){
result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns());
}
}else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()){
if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){
String srcQuery = sourceMysqlParameter.getSrcQuerySql();
if(srcQuery.toLowerCase().contains("where")){
srcQuery += " AND "+"$CONDITIONS";
}else{
srcQuery += " WHERE $CONDITIONS";
}
result.append(" --query \'"+srcQuery+"\'");
}
}
List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
if(mapColumnHive != null && !mapColumnHive.isEmpty()){
String columnMap = "";
for(Property item:mapColumnHive){
columnMap = item.getProp()+"="+ item.getValue()+",";
}
if(StringUtils.isNotEmpty(columnMap)){
result.append(" --map-column-hive ")
.append(columnMap.substring(0,columnMap.length()-1));
}
}
List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
if(mapColumnJava != null && !mapColumnJava.isEmpty()){
String columnMap = "";
for(Property item:mapColumnJava){
columnMap = item.getProp()+"="+ item.getValue()+",";
}
if(StringUtils.isNotEmpty(columnMap)){
result.append(" --map-column-java ")
.append(columnMap.substring(0,columnMap.length()-1));
}
}
}
}
}catch (Exception e){
logger.error(e.getMessage());
}
return result.toString();
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* hdfs target generator
*/
public class HdfsTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
TargetHdfsParameter targetHdfsParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class);
if(targetHdfsParameter != null){
if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){
result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath());
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){
result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec());
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){
result.append(" ").append(targetHdfsParameter.getFileType());
}
if(targetHdfsParameter.isDeleteTargetDir()){
result.append(" --delete-target-dir");
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){
result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'");
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){
result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'");
}
result.append(" --null-non-string 'NULL' --null-string 'NULL'");
}
}catch(Exception e){
logger.error(e.getMessage());
}
return result.toString();
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* hive target generator
*/
public class HiveTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
TargetHiveParameter targetHiveParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class);
if(targetHiveParameter != null){
result.append(" --hive-import ");
if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&&
StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){
result.append(" --hive-table ")
.append(targetHiveParameter.getHiveDatabase())
.append(".")
.append(targetHiveParameter.getHiveTable());
}
if(targetHiveParameter.isCreateHiveTable()){
result.append(" --create-hive-table");
}
if(targetHiveParameter.isDropDelimiter()){
result.append(" --hive-drop-import-delims");
}
if(targetHiveParameter.isHiveOverWrite()){
result.append(" --hive-overwrite -delete-target-dir");
}
if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){
result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter());
}
if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&&
StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){
result.append(" --hive-partition-key ")
.append(targetHiveParameter.getHivePartitionKey())
.append(" --hive-partition-value ")
.append(targetHiveParameter.getHivePartitionValue());
}
}
}catch(Exception e){
logger.error(e.getMessage());
}
return result.toString();
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* mysql target generator
*/
public class MysqlTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
TargetMysqlParameter targetMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class);
if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){
ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
DataSource dataSource= processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
// get datasource
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
.append(baseDataSource.getJdbcUrl())
.append(" --username ")
.append(baseDataSource.getUser())
.append(" --password ")
.append(baseDataSource.getPassword())
.append(" --table ")
.append(targetMysqlParameter.getTargetTable());
if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){
result.append(" --columns ").append(targetMysqlParameter.getTargetColumns());
}
if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){
result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'");
}
if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){
result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'");
}
if(targetMysqlParameter.isUpdate()){
if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())&&
StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){
result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey())
.append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode());
}
}
}
}
}catch (Exception e){
logger.error(e.getMessage());
}
return result.toString();
}
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.*;
/**
* sqoop task test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class SqoopTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class);
private ProcessService processService;
private ApplicationContext applicationContext;
private SqoopTask sqoopTask;
@Before
public void before() throws Exception{
processService = Mockito.mock(ProcessService.class);
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
sqoopTask = new SqoopTask(props,logger);
sqoopTask.init();
}
@Test
public void testGenerator(){
String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters1 = JSONObject.parseObject(data1,SqoopParameters.class);
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters1);
String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(expected, script);
String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters2 = JSONObject.parseObject(data2,SqoopParameters.class);
String script2 = generator.generateSqoopJob(sqoopParameters2);
String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(expected2, script2);
String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters3 = JSONObject.parseObject(data3,SqoopParameters.class);
String script3 = generator.generateSqoopJob(sqoopParameters3);
String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(expected3, script3);
String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters4 = JSONObject.parseObject(data4,SqoopParameters.class);
String script4 = generator.generateSqoopJob(sqoopParameters4);
String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(expected4, script4);
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
dataSource.setConnectionParams(
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}");
dataSource.setUserId(1);
return dataSource;
}
@Test
public void testGetParameters() {
Assert.assertNotNull(sqoopTask.getParameters());
}
/**
* Method: init
*/
@Test
public void testInit(){
try {
sqoopTask.init();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

View File

@ -283,6 +283,10 @@ let tasksType = {
'DATAX': {
desc: 'DataX',
color: '#1fc747'
},
'SQOOP': {
desc: 'SQOOP',
color: '#E46F13'
}
}

View File

@ -104,6 +104,9 @@
.icos-DATAX {
background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%;
}
.icos-SQOOP {
background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%;
}
.toolbar {
width: 60px;
height: 100%;

View File

@ -204,6 +204,13 @@
ref="DATAX"
:backfill-item="backfillItem">
</m-datax>
<m-sqoop
v-if="taskType === 'SQOOP'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
ref="SQOOP"
:backfill-item="backfillItem">
</m-sqoop>
</div>
</div>
<div class="bottom-box">
@ -229,6 +236,7 @@
import mDependent from './tasks/dependent'
import mHttp from './tasks/http'
import mDatax from './tasks/datax'
import mSqoop from './tasks/sqoop'
import mSubProcess from './tasks/sub_process'
import mSelectInput from './_source/selectInput'
import mTimeoutAlarm from './_source/timeoutAlarm'
@ -589,6 +597,7 @@
mDependent,
mHttp,
mDatax,
mSqoop,
mSelectInput,
mTimeoutAlarm,
mPriority,

View File

@ -0,0 +1,981 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="sql-model">
<m-list-box>
<div slot="text">{{$t('Direct')}}</div>
<div slot="content">
<x-select
style="width: 130px;"
v-model="modelType"
:disabled="isDetails">
<x-option
v-for="city in modelTypeList"
:key="city.code"
:value="city.code"
:label="city.code">
</x-option>
</x-select>
</div>
</m-list-box>
<template>
<m-list-box>
<div slot="text" style="font-weight:bold">{{$t('Data Source')}}</div>
</m-list-box>
<hr style="margin-left: 60px;">
<m-list-box>
<div slot="text">{{$t('Type')}}</div>
<div slot="content">
<x-select
style="width: 130px;"
v-model="sourceType"
:disabled="isDetails"
@on-change="_handleSourceTypeChange">
<x-option
v-for="city in sourceTypeList"
:key="city.code"
:value="city.code"
:label="city.code">
</x-option>
</x-select>
</div>
</m-list-box>
<template v-if="sourceType ==='MYSQL'">
<m-list-box>
<div slot="text">{{$t('Datasource')}}</div>
<div slot="content">
<m-datasource
ref="refSourceDs"
@on-dsData="_onSourceDsData"
:data="{ type:'MYSQL',datasource:srcDatasource }"
>
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('ModelType')}}</div>
<div slot="content">
<x-radio-group v-model="srcQueryType" @on-change="_handleQueryType">
<x-radio label="0">{{$t('Form')}}</x-radio>
<x-radio label="1">SQL</x-radio>
</x-radio-group>
</div>
</m-list-box>
<template v-if="sourceMysqlParams.srcQueryType=='0'">
<m-list-box>
<div slot="text">{{$t('Table')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceMysqlParams.srcTable"
:placeholder="$t('Please enter Mysql Table(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('ColumnType')}}</div>
<div slot="content">
<x-radio-group v-model="sourceMysqlParams.srcColumnType">
<x-radio label="0">{{$t('All Columns')}}</x-radio>
<x-radio label="1">{{$t('Some Columns')}}</x-radio>
</x-radio-group>
</div>
</m-list-box>
<m-list-box v-if="sourceMysqlParams.srcColumnType=='1'">
<div slot="text">{{$t('Column')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceMysqlParams.srcColumns"
:placeholder="$t('Please enter Columns (Comma separated)')">
</x-input>
</div>
</m-list-box>
</template>
</template>
</template>
<template v-if="sourceType=='HIVE'">
<m-list-box>
<div slot="text">{{$t('Database')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceHiveParams.hiveDatabase"
:placeholder="$t('Please enter Hive Database(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Table')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceHiveParams.hiveTable"
:placeholder="$t('Please enter Hive Table(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Hive partition Keys')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceHiveParams.hivePartitionKey"
:placeholder="$t('Please enter Hive Partition Keys')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Hive partition Values')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceHiveParams.hivePartitionValue"
:placeholder="$t('Please enter Hive Partition Values')">
</x-input>
</div>
</m-list-box>
</template>
<template v-if="sourceType=='HDFS'">
<m-list-box>
<div slot="text">{{$t('Export Dir')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="sourceHdfsParams.exportDir"
:placeholder="$t('Please enter Export Dir(required)')">
</x-input>
</div>
</m-list-box>
</template>
<m-list-box v-show="srcQueryType === '1' && sourceType ==='MYSQL'">
<div slot="text">{{$t('SQL Statement')}}</div>
<div slot="content">
<div class="from-mirror">
<textarea
id="code-sql-mirror"
name="code-sql-mirror"
style="opacity: 0;">
</textarea>
</div>
</div>
</m-list-box>
<template>
<m-list-box v-show="sourceType === 'MYSQL'">
<div slot="text">{{$t('Map Column Hive')}}</div>
<div slot="content">
<m-local-params
ref="refMapColumnHiveParams"
@on-local-params="_onMapColumnHive"
:udp-list="sourceMysqlParams.mapColumnHive"
:hide="false">
</m-local-params>
</div>
</m-list-box>
<m-list-box v-show="sourceType === 'MYSQL'">
<div slot="text">{{$t('Map Column Java')}}</div>
<div slot="content">
<m-local-params
ref="refMapColumnJavaParams"
@on-local-params="_onMapColumnJava"
:udp-list="sourceMysqlParams.mapColumnJava"
:hide="false">
</m-local-params>
</div>
</m-list-box>
</template>
<m-list-box>
<div slot="text" style="font-weight:bold">{{$t('Data Target')}}</div>
</m-list-box>
<hr style="margin-left: 60px;">
<m-list-box>
<div slot="text">{{$t('Type')}}</div>
<div slot="content">
<x-select
style="width: 130px;"
v-model="targetType"
:disabled="isDetails">
<x-option
v-for="city in targetTypeList"
:key="city.code"
:value="city.code"
:label="city.code">
</x-option>
</x-select>
</div>
</m-list-box>
<div v-show="targetType==='HIVE'">
<m-list-box>
<div slot="text">{{$t('Database')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.hiveDatabase"
:placeholder="$t('Please enter Hive Database(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Table')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.hiveTable"
:placeholder="$t('Please enter Hive Table(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('CreateHiveTable')}}</div>
<div slot="content">
<x-switch v-model="targetHiveParams.createHiveTable"></x-switch>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('DropDelimiter')}}</div>
<div slot="content">
<x-switch v-model="targetHiveParams.dropDelimiter"></x-switch>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('OverWriteSrc')}}</div>
<div slot="content">
<x-switch v-model="targetHiveParams.hiveOverWrite"></x-switch>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('ReplaceDelimiter')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.replaceDelimiter"
:placeholder="$t('Please enter Replace Delimiter')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Hive partition Keys')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.hivePartitionKey"
:placeholder="$t('Please enter Hive Partition Keys')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Hive partition Values')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHiveParams.hivePartitionValue"
:placeholder="$t('Please enter Hive Partition Values')">
</x-input>
</div>
</m-list-box>
</div>
<div v-show="targetType==='HDFS'">
<m-list-box>
<div slot="text">{{$t('Target Dir')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHdfsParams.targetPath"
:placeholder="$t('Please enter Target Dir(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('DeleteTargetDir')}}</div>
<div slot="content">
<x-switch v-model="targetHdfsParams.deleteTargetDir"></x-switch>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('CompressionCodec')}}</div>
<div slot="content">
<x-radio-group v-model="targetHdfsParams.compressionCodec">
<x-radio label="snappy">snappy</x-radio>
<x-radio label="lzo">lzo</x-radio>
<x-radio label="gzip">gzip</x-radio>
<x-radio label="">no</x-radio>
</x-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('FileType')}}</div>
<div slot="content">
<x-radio-group v-model="targetHdfsParams.fileType">
<x-radio label="--as-avrodatafile">avro</x-radio>
<x-radio label="--as-sequencefile">sequence</x-radio>
<x-radio label="--as-textfile">text</x-radio>
<x-radio label="--as-parquetfile">parquet</x-radio>
</x-radio-group>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('FieldsTerminated')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHdfsParams.fieldsTerminated"
:placeholder="$t('Please enter Fields Terminated')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('LinesTerminated')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetHdfsParams.linesTerminated"
:placeholder="$t('Please enter Lines Terminated')">
</x-input>
</div>
</m-list-box>
</div>
<div v-show="targetType==='MYSQL'">
<m-list-box>
<div slot="text">{{$t('Datasource')}}</div>
<div slot="content">
<m-datasource
ref="refTargetDs"
@on-dsData="_onTargetDsData"
:data="{ type:type,datasource:targetDatasource }"
>
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Table')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetMysqlParams.targetTable"
:placeholder="$t('Please enter Mysql Table(required)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Column')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetMysqlParams.targetColumns"
:placeholder="$t('Please enter Columns (Comma separated)')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('FieldsTerminated')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetMysqlParams.fieldsTerminated"
:placeholder="$t('Please enter Fields Terminated')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('LinesTerminated')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetMysqlParams.linesTerminated"
:placeholder="$t('Please enter Lines Terminated')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('IsUpdate')}}</div>
<div slot="content">
<x-switch v-model="targetMysqlParams.isUpdate"></x-switch>
</div>
</m-list-box>
<m-list-box v-show="targetMysqlParams.isUpdate">
<div slot="text">{{$t('UpdateKey')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="targetMysqlParams.targetUpdateKey"
:placeholder="$t('Please enter Update Key')">
</x-input>
</div>
</m-list-box>
<m-list-box v-show="targetMysqlParams.isUpdate">
<div slot="text">{{$t('UpdateMode')}}</div>
<div slot="content">
<x-radio-group v-model="targetMysqlParams.targetUpdateMode">
<x-radio label="updateonly">{{$t('OnlyUpdate')}}</x-radio>
<x-radio label="allowinsert">{{$t('AllowInsert')}}</x-radio>
</x-radio-group>
</div>
</m-list-box>
</div>
<m-list-box>
<div slot="text">{{$t('Concurrency')}}</div>
<div slot="content">
<x-input
:disabled="isDetails"
type="text"
v-model="concurrency"
:placeholder="$t('Please enter Concurrency')">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Custom Parameters')}}</div>
<div slot="content">
<m-local-params
ref="refLocalParams"
@on-local-params="_onLocalParams"
:udp-list="localParams"
:hide="false">
</m-local-params>
</div>
</m-list-box>
</div>
</template>
<script>
import _ from 'lodash'
import i18n from '@/module/i18n'
import mListBox from './_source/listBox'
import mDatasource from './_source/datasource'
import mLocalParams from './_source/localParams'
import disabledState from '@/module/mixin/disabledState'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
let editor
export default {
name: 'sql',
data () {
return {
/**
* Customer Params
*/
localParams: [],
/**
* mysql query type
*/
srcQueryType:'1',
/**
* source data source
*/
srcDatasource:'',
/**
* target data source
*/
targetDatasource:'',
/**
* concurrency
*/
concurrency:1,
/**
* direct model type
*/
modelType:'import',
modelTypeList: [{ code: 'import' }, { code: 'export' }],
sourceTypeList:[
{
code:"MYSQL"
},
{
code:"HDFS"
},
{
code:"HIVE"
}
],
targetTypeList:[
{
code:"HIVE"
},
{
code:"HDFS"
}
],
sourceType:"MYSQL",
targetType:"HDFS",
sourceMysqlParams:{
srcDatasource:-1,
srcTable:"",
srcQueryType:"1",
srcQuerySql:'',
srcColumnType:"0",
srcColumns:"",
srcConditionList:[],
mapColumnHive:[],
mapColumnJava:[]
},
sourceHdfsParams:{
exportDir:""
},
sourceHiveParams:{
hiveDatabase:"",
hiveTable:"",
hivePartitionKey:"",
hivePartitionValue:""
},
targetHdfsParams:{
targetPath:"",
deleteTargetDir:true,
fileType:"--as-avrodatafile",
compressionCodec:"snappy",
fieldsTerminated:"",
linesTerminated:"",
},
targetMysqlParams:{
targetDatasource:-1,
targetTable:"",
targetColumns:"",
fieldsTerminated:"",
linesTerminated:"",
preQuery:"",
isUpdate:false,
targetUpdateKey:"",
targetUpdateMode:"allowinsert"
},
targetHiveParams:{
hiveDatabase:"",
hiveTable:"",
createHiveTable:false,
dropDelimiter:false,
hiveOverWrite:true,
replaceDelimiter:"",
hivePartitionKey:"",
hivePartitionValue:""
}
}
},
mixins: [disabledState],
props: {
backfillItem: Object
},
methods: {
_handleQueryType(o){
this.sourceMysqlParams.srcQueryType = this.srcQueryType
},
_handleSourceTypeChange(a){
this._getTargetTypeList(a.label)
this.targetType = this.targetTypeList[0].code
},
_getTargetTypeList(data){
switch(data){
case 'MYSQL':
this.targetTypeList = [
{
code:"HIVE"
},
{
code:"HDFS"
}
]
break;
case 'HDFS':
this.targetTypeList = [
{
code:"MYSQL"
}
]
break;
case 'HIVE':
this.targetTypeList = [
{
code:"MYSQL"
}
]
break;
default:
this.targetTypeList = [
{
code:"HIVE"
},
{
code:"HDFS"
}
]
break;
}
},
_onMapColumnHive (a) {
this.sourceMysqlParams.mapColumnHive = a
console.log(this.sourceMysqlParams.mapColumnHive)
},
_onMapColumnJava (a) {
this.sourceMysqlParams.mapColumnJava = a
console.log(this.sourceMysqlParams.mapColumnJava)
},
/**
* return data source
*/
_onSourceDsData (o) {
this.sourceMysqlParams.srcDatasource = o.datasource
},
/**
* return data source
*/
_onTargetDsData (o) {
this.targetMysqlParams.targetDatasource = o.datasource
},
/**
* stringify the source params
*/
_handleSourceParams() {
var params = null
switch(this.sourceType){
case "MYSQL":
this.sourceMysqlParams.srcQuerySql = editor.getValue()
params = JSON.stringify(this.sourceMysqlParams)
break;
case "ORACLE":
params = JSON.stringify(this.sourceOracleParams)
break;
case "HDFS":
params = JSON.stringify(this.sourceHdfsParams)
break;
case "HIVE":
params = JSON.stringify(this.sourceHiveParams)
break;
default:
params = "";
break;
}
return params
},
/**
* stringify the target params
*/
_handleTargetParams() {
var params = null
switch(this.targetType){
case "HIVE":
params = JSON.stringify(this.targetHiveParams)
break;
case "HDFS":
params = JSON.stringify(this.targetHdfsParams)
break;
case "MYSQL":
params = JSON.stringify(this.targetMysqlParams)
break;
default:
params = "";
break;
}
return params
},
/**
* get source params by source type
*/
_getSourceParams(data) {
switch(this.sourceType){
case "MYSQL":
this.sourceMysqlParams = JSON.parse(data)
this.srcDatasource = this.sourceMysqlParams.srcDatasource
break;
case "ORACLE":
this.sourceOracleParams = JSON.parse(data)
break;
case "HDFS":
this.sourceHdfsParams = JSON.parse(data)
break;
case "HIVE":
this.sourceHiveParams = JSON.parse(data)
break;
default:
break;
}
},
/**
* get target params by target type
*/
_getTargetParams(data) {
switch(this.targetType){
case "HIVE":
this.targetHiveParams = JSON.parse(data)
break;
case "HDFS":
this.targetHdfsParams = JSON.parse(data)
break;
case "MYSQL":
this.targetMysqlParams = JSON.parse(data)
this.targetDatasource = this.targetMysqlParams.targetDatasource
break;
default:
break;
}
},
/**
* verification
*/
_verification () {
switch(this.sourceType){
case "MYSQL":
if (!this.$refs.refSourceDs._verifDatasource()) {
return false
}
if(this.srcQueryType === '1'){
if (!editor.getValue()) {
this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
return false
}
}else{
if (this.sourceMysqlParams.srcTable === "") {
this.$message.warning(`${i18n.$t('Please enter Mysql Table(required)')}`)
return false
}
if(this.sourceMysqlParams.srcColumnType === "1" && this.sourceMysqlParams.srcColumns === ""){
this.$message.warning(`${i18n.$t('Please enter Columns (Comma separated)')}`)
return false
}
}
break;
case "HDFS":
if(this.sourceHdfsParams.exportDir === ""){
this.$message.warning(`${i18n.$t('Please enter Export Dir(required)')}`)
return false
}
break;
case "HIVE":
if(this.sourceHiveParams.hiveDatabase === ""){
this.$message.warning(`${i18n.$t('Please enter Hive Database(required)')}`)
return false
}
if(this.sourceHiveParams.hiveTable === ""){
this.$message.warning(`${i18n.$t('Please enter Hive Table(required)')}`)
return false
}
break;
default:
break;
}
switch(this.targetType){
case "HIVE":
if(this.targetHiveParams.hiveDatabase === ""){
this.$message.warning(`${i18n.$t('Please enter Hive Database(required)')}`)
return false
}
if(this.targetHiveParams.hiveTable === ""){
this.$message.warning(`${i18n.$t('Please enter Hive Table(required)')}`)
return false
}
break;
case "HDFS":
if(this.targetHdfsParams.targetPath === ""){
this.$message.warning(`${i18n.$t('Please enter Target Dir(required)')}`)
return false
}
break;
case "MYSQL":
if (!this.$refs.refTargetDs._verifDatasource()) {
return false
}
if(this.targetMysqlParams.targetTable === ""){
this.$message.warning(`${i18n.$t('Please enter Mysql Table(required)')}`)
return false
}
break;
default:
break;
}
// storage
this.$emit('on-params', {
concurrency:this.concurrency,
modelType:this.modelType,
sourceType:this.sourceType,
targetType:this.targetType,
sourceParams:this._handleSourceParams(),
targetParams:this._handleTargetParams(),
localParams:this.localParams
})
return true
},
/**
* Processing code highlighting
*/
_handlerEditor () {
editor = codemirror('code-sql-mirror', {
mode: 'sql',
readOnly: this.isDetails
})
this.keypress = () => {
if (!editor.getOption('readOnly')) {
editor.showHint({
completeSingle: false
})
}
}
// Monitor keyboard
editor.on('keypress', this.keypress)
editor.setValue(this.sourceMysqlParams.srcQuerySql)
return editor
},
/**
* return localParams
*/
_onLocalParams (a) {
this.localParams = a
},
},
watch: {
// Listening to sqlType
sqlType (val) {
if (val==0) {
this.showType = []
}
if (val != 0) {
this.title = ''
this.receivers = []
this.receiversCc = []
}
},
// Listening data source
type (val) {
if (val !== 'HIVE') {
this.connParams = ''
}
},
//Watch the cacheParams
cacheParams (val) {
this.$emit('on-cache-params', val);
}
},
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
this.concurrency = o.params.concurrency || 1,
this.modelType = o.params.modelType,
this.sourceType = o.params.sourceType,
this._getTargetTypeList(this.sourceType)
this.targetType = o.params.targetType,
this._getSourceParams(o.params.sourceParams),
this._getTargetParams(o.params.targetParams),
this.localParams = o.params.localParams
}
},
mounted () {
setTimeout(() => {
this._handlerEditor()
}, 200)
setTimeout(() => {
this.srcQueryType = this.sourceMysqlParams.srcQueryType
}, 500)
},
destroyed () {
/**
* Destroy the editor instance
*/
if (editor) {
editor.toTextArea() // Uninstall
editor.off($('.code-sql-mirror'), 'keypress', this.keypress)
}
},
computed: {
},
components: { mListBox, mDatasource, mLocalParams}
}
</script>
<style lang="scss" rel="stylesheet/scss">
.requiredIcon {
color: #ff0000;
padding-right: 4px;
}
</style>

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

View File

@ -519,5 +519,54 @@ export default {
'SpeedRecord': 'speed(record count)',
'0 means unlimited by byte': '0 means unlimited',
'0 means unlimited by count': '0 means unlimited',
'Modify User': 'Modify User'
'Modify User': 'Modify User',
'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)',
'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)',
'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)',
'Please enter Target Dir(required)': 'Please enter Target Dir(required)',
'Please enter Export Dir(required)': 'Please enter Export Dir(required)',
'Please enter Hive Database(required)': 'Please enter Hive Databasec(required)',
'Please enter Hive Table(required)': 'Please enter Hive Table(required)',
'Please enter Hive Partition Keys': 'Please enter Hive Partition Key',
'Please enter Hive Partition Values': 'Please enter Partition Value',
'Please enter Replace Delimiter': 'Please enter Replace Delimiter',
'Please enter Fields Terminated': 'Please enter Fields Terminated',
'Please enter Lines Terminated': 'Please enter Lines Terminated',
'Please enter Concurrency': 'Please enter Concurrency',
'Please enter Update Key': 'Please enter Update Key',
'Direct': 'Direct',
'Type': 'Type',
'ModelType': 'ModelType',
'ColumnType': 'ColumnType',
'Database': 'Database',
'Column': 'Column',
'Map Column Hive': 'Map Column Hive',
'Map Column Java': 'Map Column Java',
'Export Dir': 'Export Dir',
'Hive partition Keys': 'Hive partition Keys',
'Hive partition Values': 'Hive partition Values',
'FieldsTerminated': 'FieldsTerminated',
'LinesTerminated': 'LinesTerminated',
'IsUpdate': 'IsUpdate',
'UpdateKey': 'UpdateKey',
'UpdateMode': 'UpdateMode',
'Target Dir': 'Target Dir',
'DeleteTargetDir': 'DeleteTargetDir',
'FileType': 'FileType',
'CompressionCodec': 'CompressionCodec',
'CreateHiveTable': 'CreateHiveTable',
'DropDelimiter': 'DropDelimiter',
'OverWriteSrc': 'OverWriteSrc',
'ReplaceDelimiter': 'ReplaceDelimiter',
'Concurrency': 'Concurrency',
'Form': 'Form',
'OnlyUpdate': 'OnlyUpdate',
'AllowInsert': 'AllowInsert',
'Data Source': 'Data Source',
'Data Target': 'Data Target',
'All Columns': 'All Columns',
'Some Columns': 'Some Columns'
}

View File

@ -519,5 +519,51 @@ export default {
'SpeedRecord': '限流(记录数)',
'0 means unlimited by byte': 'KB0代表不限制',
'0 means unlimited by count': '0代表不限制',
'Modify User': '修改用户'
'Modify User': '修改用户',
'Please enter Mysql Database(required)': '请输入Mysql数据库(必填)',
'Please enter Mysql Table(required)': '请输入Mysql表名(必填)',
'Please enter Columns (Comma separated)': '请输入列名 , 隔开',
'Please enter Target Dir(required)': '请输入目标路径(必填)',
'Please enter Export Dir(required)': '请输入数据源路径(必填)',
'Please enter Hive Database(required)': '请输入Hive数据库(必填)',
'Please enter Hive Table(required)': '请输入Hive表名(必填)',
'Please enter Hive Partition Keys': '请输入分区键',
'Please enter Hive Partition Values': '请输入分区值',
'Please enter Replace Delimiter': '请输入替换分隔符',
'Please enter Fields Terminated': '请输入列分隔符',
'Please enter Lines Terminated': '请输入行分隔符',
'Please enter Concurrency': '请输入并发度',
'Please enter Update Key': '请输入更新列',
'Direct': '流向',
'Type': '类型',
'ModelType': '模式',
'ColumnType': '列类型',
'Database': '数据库',
'Column': '列',
'Map Column Hive': 'Hive类型映射',
'Map Column Java': 'Java类型映射',
'Export Dir': '数据源路径',
'Hive partition Keys': 'Hive 分区键',
'Hive partition Values': 'Hive 分区值',
'FieldsTerminated': '列分隔符',
'LinesTerminated': '行分隔符',
'IsUpdate': '是否更新',
'UpdateKey': '更新列',
'UpdateMode': '更新类型',
'Target Dir': '目标路径',
'DeleteTargetDir': '是否删除目录',
'FileType': '保存格式',
'CompressionCodec': '压缩类型',
'CreateHiveTable': '是否创建新表',
'DropDelimiter': '是否删除分隔符',
'OverWriteSrc': '是否覆盖数据源',
'ReplaceDelimiter': '替换分隔符',
'Concurrency': '并发度',
'Form': '表单',
'OnlyUpdate': '只更新',
'AllowInsert': '无更新便插入',
'Data Source': '数据来源',
'Data Target': '数据目的',
'All Columns': '全表导入',
'Some Columns': '选择列'
}

View File

@ -685,6 +685,7 @@
<include>**/common/threadutils/*.java</include>
<include>**/common/graph/*.java</include>
<include>**/common/queue/*.java</include>
<include>**/common/task/SqoopParameterEntityTest.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
<include>**/api/utils/FourLetterWordTest.java</include>
@ -727,6 +728,7 @@
<include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
</includes>
<!-- <skip>true</skip> -->