Merge branch 'dev' of https://github.com/apache/incubator-dolphinscheduler into dev-kill-yarn-job

This commit is contained in:
Eights-LI 2020-11-16 20:25:37 +08:00
commit 78445084ef
29 changed files with 639 additions and 214 deletions

View File

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
@ -23,19 +25,30 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
/**
* task instance controller
*/
@ -69,6 +82,7 @@ public class TaskInstanceController extends BaseController {
@ApiOperation(value = "queryTaskListPaging", notes = "QUERY_TASK_INSTANCE_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "processInstanceName", value = "PROCESS_INSTANCE_NAME", required = false, type = "String"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"),
@ApiImplicitParam(name = "taskName", value = "TASK_NAME", type = "String"),
@ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type = "String"),
@ -85,6 +99,7 @@ public class TaskInstanceController extends BaseController {
public Result queryTaskListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processInstanceId", required = false, defaultValue = "0") Integer processInstanceId,
@RequestParam(value = "processInstanceName", required = false) String processInstanceName,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "taskName", required = false) String taskName,
@RequestParam(value = "executorName", required = false) String executorName,
@ -95,11 +110,20 @@ public class TaskInstanceController extends BaseController {
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
logger.info("query task instance list, project name:{},process instance:{}, search value:{},task name:{}, executor name: {},state type:{}, host:{}, start:{}, end:{}",
projectName, processInstanceId, searchVal, taskName, executorName, stateType, host, startTime, endTime);
logger.info("query task instance list, projectName:{}, processInstanceId:{}, processInstanceName:{}, search value:{}, taskName:{}, executorName: {}, stateType:{}, host:{}, start:{}, end:{}",
StringUtils.replaceNRTtoUnderline(projectName),
processInstanceId,
StringUtils.replaceNRTtoUnderline(processInstanceName),
StringUtils.replaceNRTtoUnderline(searchVal),
StringUtils.replaceNRTtoUnderline(taskName),
StringUtils.replaceNRTtoUnderline(executorName),
stateType,
StringUtils.replaceNRTtoUnderline(host),
StringUtils.replaceNRTtoUnderline(startTime),
StringUtils.replaceNRTtoUnderline(endTime));
searchVal = ParameterUtils.handleEscapes(searchVal);
Map<String, Object> result = taskInstanceService.queryTaskListPaging(
loginUser, projectName, processInstanceId, taskName, executorName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize);
loginUser, projectName, processInstanceId, processInstanceName, taskName, executorName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize);
return returnDataListPaging(result);
}

View File

@ -499,7 +499,13 @@ public class DataSourceService extends BaseService {
String address = buildAddress(type, host, port, connectType);
Map<String, Object> parameterMap = new LinkedHashMap<String, Object>(6);
String jdbcUrl = address + "/" + database;
String jdbcUrl;
if (DbType.SQLSERVER == type) {
jdbcUrl = address + ";databaseName=" + database;
} else {
jdbcUrl = address + "/" + database;
}
if (Constants.ORACLE.equals(type.name())) {
parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
}

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -69,7 +69,6 @@ public class TaskInstanceService extends BaseService {
@Autowired
UsersService usersService;
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
*
@ -87,7 +86,7 @@ public class TaskInstanceService extends BaseService {
* @return task list page
*/
public Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
Integer processInstanceId, String taskName, String executorName, String startDate,
Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
String endDate, String searchVal, ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
@ -124,7 +123,7 @@ public class TaskInstanceService extends BaseService {
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
page, project.getId(), processInstanceId, searchVal, taskName, executorId, statusArray, host, start, end
page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end
);
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);

View File

@ -14,52 +14,60 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* task instance controller test
*/
public class TaskInstanceControllerTest extends AbstractControllerTest{
private static Logger logger = LoggerFactory.getLogger(TaskInstanceControllerTest.class);
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskInstanceControllerTest {
@InjectMocks
private TaskInstanceController taskInstanceController;
@Mock
private TaskInstanceService taskInstanceService;
@Test
public void testQueryTaskListPaging() throws Exception {
public void testQueryTaskListPaging() {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
//paramsMap.add("processInstanceId","1380");
paramsMap.add("searchVal","");
paramsMap.add("taskName","");
//paramsMap.add("stateType","");
paramsMap.add("startDate","2019-02-26 19:48:00");
paramsMap.add("endDate","2019-02-26 19:48:22");
paramsMap.add("pageNo","1");
paramsMap.add("pageSize","20");
Map<String,Object> result = new HashMap<>();
Integer pageNo = 1;
Integer pageSize = 20;
PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
result.put(Constants.DATA_LIST, pageInfo);
result.put(Constants.STATUS, Status.SUCCESS);
MvcResult mvcResult = mockMvc.perform(get("/projects/{projectName}/task-instance/list-paging","cxc_1113")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
when(taskInstanceService.queryTaskListPaging(any(), eq(""), eq(1), eq(""), eq(""), eq(""),any(), any(),
eq(""), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(result);
Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "",
"", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize);
Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
}

View File

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service;
import static org.mockito.ArgumentMatchers.any;
@ -88,7 +89,7 @@ public class TaskInstanceServiceTest {
//project auth fail
when(projectMapper.queryByName(projectName)).thenReturn(null);
when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "",
Map<String, Object> proejctAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "", "",
"test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
@ -107,43 +108,43 @@ public class TaskInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Map<String, Object> successRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> successRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
//executor name empty
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Map<String, Object> executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, executorEmptyRes.get(Constants.STATUS));
//executor null
when(usersService.queryUser(loginUser.getId())).thenReturn(null);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(-1);
Map<String, Object> executorNullRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> executorNullRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, executorNullRes.get(Constants.STATUS));
//start/end date null
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(pageReturn);
Map<String, Object> executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"", null, null, "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, executorNullDateRes.get(Constants.STATUS));
//start date error format
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(pageReturn);
Map<String, Object> executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"", "error date", null, "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, executorErrorStartDateRes.get(Constants.STATUS));
Map<String, Object> executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
Map<String, Object> executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "",
"", null, "error date", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, executorErrorEndDateRes.get(Constants.STATUS));
}

View File

@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.datax;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* DataX parameter
@ -89,6 +90,16 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;
/**
* Xms memory
*/
private int xms;
/**
* Xmx memory
*/
private int xmx;
public int getCustomConfig() {
return customConfig;
}
@ -185,6 +196,22 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord;
}
public int getXms() {
return xms;
}
public void setXms(int xms) {
this.xms = xms;
}
public int getXmx() {
return xmx;
}
public void setXmx(int xmx) {
this.xmx = xmx;
}
@Override
public boolean checkParameters() {
if (customConfig == Flag.NO.ordinal()) {
@ -204,19 +231,21 @@ public class DataxParameters extends AbstractParameters {
@Override
public String toString() {
return "DataxParameters{" +
"customConfig=" + customConfig +
", json='" + json + '\'' +
", dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
", sql='" + sql + '\'' +
", targetTable='" + targetTable + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
", jobSpeedByte=" + jobSpeedByte +
", jobSpeedRecord=" + jobSpeedRecord +
'}';
return "DataxParameters{"
+ "customConfig=" + customConfig
+ ", json='" + json + '\''
+ ", dsType='" + dsType + '\''
+ ", dataSource=" + dataSource
+ ", dtType='" + dtType + '\''
+ ", dataTarget=" + dataTarget
+ ", sql='" + sql + '\''
+ ", targetTable='" + targetTable + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ ", jobSpeedByte=" + jobSpeedByte
+ ", jobSpeedRecord=" + jobSpeedRecord
+ ", xms=" + xms
+ ", xmx=" + xmx
+ '}';
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.junit.Assert;
import org.junit.Test;
public class DataxParametersTest {
/**
* jvm parameters
*/
public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" ";
@Test
public void testLoadJvmEnv() {
DataxParameters dataxParameters = new DataxParameters();
dataxParameters.setXms(0);
dataxParameters.setXmx(-100);
String actual = loadJvmEnvTest(dataxParameters);
String except = " --jvm=\"-Xms1G -Xmx1G\" ";
Assert.assertEquals(except,actual);
dataxParameters.setXms(13);
dataxParameters.setXmx(14);
actual = loadJvmEnvTest(dataxParameters);
except = " --jvm=\"-Xms13G -Xmx14G\" ";
Assert.assertEquals(except,actual);
}
@Test
public void testToString() {
DataxParameters dataxParameters = new DataxParameters();
dataxParameters.setCustomConfig(0);
dataxParameters.setXms(0);
dataxParameters.setXmx(-100);
dataxParameters.setDataSource(1);
dataxParameters.setDataTarget(1);
dataxParameters.setDsType("MYSQL");
dataxParameters.setDtType("MYSQL");
dataxParameters.setJobSpeedByte(1);
dataxParameters.setJobSpeedRecord(1);
dataxParameters.setJson("json");
String expected = "DataxParameters"
+ "{"
+ "customConfig=0, "
+ "json='json', "
+ "dsType='MYSQL', "
+ "dataSource=1, "
+ "dtType='MYSQL', "
+ "dataTarget=1, "
+ "sql='null', "
+ "targetTable='null', "
+ "preStatements=null, "
+ "postStatements=null, "
+ "jobSpeedByte=1, "
+ "jobSpeedRecord=1, "
+ "xms=0, "
+ "xmx=-100"
+ "}";
Assert.assertEquals(expected,dataxParameters.toString());
}
public String loadJvmEnvTest(DataxParameters dataXParameters) {
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
return String.format(JVM_EVN, xms, xmx);
}
}

View File

@ -14,25 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* task instance mapper interface
*/
public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
List<Integer> queryTaskByProcessIdAndState(@Param("processInstanceId") Integer processInstanceId,
@Param("state") Integer state);
@ -61,6 +63,7 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
IPage<TaskInstance> queryTaskInstanceListPaging(IPage<TaskInstance> page,
@Param("projectId") int projectId,
@Param("processInstanceId") Integer processInstanceId,
@Param("processInstanceName") String processInstanceName,
@Param("searchVal") String searchVal,
@Param("taskName") String taskName,
@Param("executorId") int executorId,

View File

@ -128,6 +128,9 @@
<if test="executorId != 0">
and instance.executor_id = #{executorId}
</if>
<if test="processInstanceName != null and processInstanceName != ''">
and process.name like concat('%', #{processInstanceName}, '%')
</if>
order by instance.start_time desc
</select>
</mapper>

View File

@ -286,6 +286,7 @@ public class TaskInstanceMapperTest {
task.getProcessInstanceId(),
"",
"",
"",
0,
new int[0],
"",

View File

@ -14,27 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.*;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
@ -46,7 +37,8 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.slf4j.Logger;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
@ -56,25 +48,48 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.*;
import java.util.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DataX task
*/
public class DataxTask extends AbstractTask {
/**
* jvm parameters
*/
public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" ";
/**
* python process(datax only supports version 2.7 by default)
*/
private static final String DATAX_PYTHON = "python2.7";
/**
* datax home path
*/
private static final String DATAX_HOME_EVN = "${DATAX_HOME}";
/**
* datax channel count
*/
@ -97,6 +112,7 @@ public class DataxTask extends AbstractTask {
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
@ -104,9 +120,8 @@ public class DataxTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,logger);
taskExecutionContext, logger);
}
/**
@ -149,9 +164,7 @@ public class DataxTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
}
catch (Exception e) {
logger.error("datax task failure", e);
} catch (Exception e) {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
@ -189,9 +202,9 @@ public class DataxTask extends AbstractTask {
return fileName;
}
if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){
if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()) {
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
}else {
} else {
ObjectNode job = JSONUtils.createObjectNode();
job.putArray("content").addAll(buildDataxJobContentJson());
job.set("setting", buildDataxJobSettingJson());
@ -248,7 +261,6 @@ public class DataxTask extends AbstractTask {
readerParam.put("password", dataSourceCfg.getPassword());
readerParam.putArray("connection").addAll(readerConnArr);
ObjectNode reader = JSONUtils.createObjectNode();
reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
reader.set("parameter", readerParam);
@ -277,7 +289,6 @@ public class DataxTask extends AbstractTask {
}
writerParam.putArray("connection").addAll(writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
ArrayNode preSqlArr = writerParam.putArray("preSql");
for (String preSql : dataXParameters.getPreStatements()) {
@ -368,7 +379,7 @@ public class DataxTask extends AbstractTask {
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception {
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
@ -387,6 +398,7 @@ public class DataxTask extends AbstractTask {
sbr.append(" ");
sbr.append(DATAX_HOME_EVN);
sbr.append(" ");
sbr.append(loadJvmEnv(dataXParameters));
sbr.append(jobConfigFilePath);
// replace placeholder
@ -409,17 +421,19 @@ public class DataxTask extends AbstractTask {
return fileName;
}
public String loadJvmEnv(DataxParameters dataXParameters) {
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
return String.format(JVM_EVN, xms, xmx);
}
/**
* parsing synchronized column names in SQL statements
*
* @param dsType
* the database type of the data source
* @param dtType
* the database type of the data target
* @param dataSourceCfg
* the database connection parameters of the data source
* @param sql
* sql for data synchronization
* @param dsType the database type of the data source
* @param dtType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) {
@ -438,10 +452,8 @@ public class DataxTask extends AbstractTask {
/**
* try grammatical parsing column
*
* @param dbType
* database type
* @param sql
* sql for data synchronization
* @param dbType database type
* @param sql sql for data synchronization
* @return column name array
* @throws RuntimeException if error throws RuntimeException
*/
@ -453,16 +465,16 @@ public class DataxTask extends AbstractTask {
notNull(parser, String.format("database driver [%s] is not support", dbType.toString()));
SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();
List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight();
selectItemList = block.getSelectList();
}
@ -470,7 +482,7 @@ public class DataxTask extends AbstractTask {
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++ ) {
for (int i = 0; i < selectItemList.size(); i++) {
SQLSelectItem item = selectItemList.get(i);
String columnName = null;
@ -479,10 +491,10 @@ public class DataxTask extends AbstractTask {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr();
SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr();
SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr();
columnName = expr.getName();
}
} else {
@ -497,8 +509,7 @@ public class DataxTask extends AbstractTask {
columnNames[i] = columnName;
}
}
catch (Exception e) {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return null;
}
@ -509,10 +520,8 @@ public class DataxTask extends AbstractTask {
/**
* try to execute sql to resolve column names
*
* @param baseDataSource
* the database connection parameters
* @param sql
* sql for data synchronization
* @param baseDataSource the database connection parameters
* @param sql sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) {
@ -529,11 +538,10 @@ public class DataxTask extends AbstractTask {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++ ) {
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i);
}
}
catch (SQLException e) {
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
return null;
}

View File

@ -23,7 +23,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* round robin selector
@ -39,82 +39,59 @@ public class RoundRobinSelectorTest {
@Test
public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector();
// dismiss of server warm-up time
long startTime = System.currentTimeMillis() - 60 * 10 * 1000;
List<Host> hostOneList = Arrays.asList(
new Host("192.168.1.1", 80, 20, startTime, "kris"),
new Host("192.168.1.2", 80, 10, startTime, "kris"));
List<Host> hostTwoList = Arrays.asList(
new Host("192.168.1.1", 80, 20, startTime, "kris"),
new Host("192.168.1.2", 80, 10, startTime, "kris"),
new Host("192.168.1.3", 80, 10, startTime, "kris"));
Host result;
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.2", result.getIp());
// add new host
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.3", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostTwoList);
Assert.assertEquals("192.168.1.3", result.getIp());
// remove host3
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(
new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"),
new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
}

View File

@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
@ -39,6 +33,13 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -49,7 +50,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* DataxTask Tester.
@ -58,7 +60,13 @@ public class DataxTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class);
private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
private static final String CONNECTION_PARAMS = " {\n"
+ " \"user\":\"root\",\n"
+ " \"password\":\"123456\",\n"
+ " \"address\":\"jdbc:mysql://127.0.0.1:3306\",\n"
+ " \"database\":\"test\",\n"
+ " \"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"\n"
+ "}";
private DataxTask dataxTask;
@ -69,7 +77,7 @@ public class DataxTaskTest {
private ApplicationContext applicationContext;
private TaskExecutionContext taskExecutionContext;
private TaskProps props = new TaskProps();
private final TaskProps props = new TaskProps();
@Before
public void before()
@ -97,12 +105,40 @@ public class DataxTaskTest {
props.setTaskTimeout(0);
if (customConfig == 1) {
props.setTaskParams(
"{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
"{\n"
+ " \"customConfig\":1,\n"
+ " \"localParams\":[\n"
+ " {\n"
+ " \"prop\":\"test\",\n"
+ " \"value\":\"38294729\"\n"
+ " }\n"
+ " ],\n"
+ " \"json\":\""
+ "{\"job\":{\"setting\":{\"speed\":{\"byte\":1048576},\"errorLimit\":{\"record\":0,\"percentage\":0.02}},\"content\":["
+ "{\"reader\":{\"name\":\"rdbmsreader\",\"parameter\":{\"username\":\"xxx\",\"password\":\"${test}\",\"column\":[\"id\",\"name\"],\"splitPk\":\"pk\",\""
+ "connection\":[{\"querySql\":[\"SELECT * from dual\"],\"jdbcUrl\":[\"jdbc:dm://ip:port/database\"]}],\"fetchSize\":1024,\"where\":\"1 = 1\"}},\""
+ "writer\":{\"name\":\"streamwriter\",\"parameter\":{\"print\":true}}}]}}\"\n"
+ "}");
// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}");
} else {
props.setTaskParams(
"{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"dataSource\":1,\"dsType\":\"MYSQL\",\"dataTarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
"{\n"
+ " \"customConfig\":0,\n"
+ " \"targetTable\":\"test\",\n"
+ " \"postStatements\":[\n"
+ " \"delete from test\"\n"
+ " ],\n"
+ " \"jobSpeedByte\":0,\n"
+ " \"jobSpeedRecord\":1000,\n"
+ " \"dtType\":\"MYSQL\",\n"
+ " \"dataSource\":1,\n"
+ " \"dsType\":\"MYSQL\",\n"
+ " \"dataTarget\":2,\n"
+ " \"sql\":\"select 1 as test from dual\",\n"
+ " \"preStatements\":[\n"
+ " \"delete from test\"\n"
+ " ]\n"
+ "}");
}
taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
@ -114,7 +150,6 @@ public class DataxTaskTest {
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
dataxTaskExecutionContext.setSourcetype(0);
dataxTaskExecutionContext.setTargetType(0);
@ -126,7 +161,6 @@ public class DataxTaskTest {
dataxTask.init();
props.setCmdTypeIfComplement(START_PROCESS);
Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
@ -138,7 +172,6 @@ public class DataxTaskTest {
e.printStackTrace();
}
dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger));
dataxTask.init();
}
@ -405,4 +438,23 @@ public class DataxTaskTest {
}
}
@Test
public void testLoadJvmEnv() {
DataxTask dataxTask = new DataxTask(null,null);
DataxParameters dataxParameters = new DataxParameters();
dataxParameters.setXms(0);
dataxParameters.setXmx(-100);
String actual = dataxTask.loadJvmEnv(dataxParameters);
String except = " --jvm=\"-Xms1G -Xmx1G\" ";
Assert.assertEquals(except,actual);
dataxParameters.setXms(13);
dataxParameters.setXmx(14);
actual = dataxTask.loadJvmEnv(dataxParameters);
except = " --jvm=\"-Xms13G -Xmx14G\" ";
Assert.assertEquals(except,actual);
}
}

View File

@ -144,6 +144,22 @@
</div>
</m-list-box>
</div>
<div class="clearfix list">
<div class="text-box">
<span>{{$t('Running Memory')}}</span>
</div>
<div class="cont-box">
<span >{{$t('Min Memory')}}</span>
<m-select-input v-model="xms" :list="[1,2,3,4]">
</m-select-input>
<span>&nbsp;&nbsp;&nbsp;G &nbsp;&nbsp;</span>
<span >{{$t('Max Memory')}}</span>
<m-select-input v-model="xmx" :list="[1,2,3,4]">
</m-select-input>
<span>&nbsp;&nbsp;&nbsp;G</span>
</div>
</div>
</div>
</template>
<script>
@ -196,6 +212,10 @@
// Custom parameter
localParams: [],
customConfig: 0,
//jvm memory xms
xms: 1,
//jvm memory xms
xmx: 1,
}
},
mixins: [disabledState],
@ -324,7 +344,9 @@
this.$emit('on-params', {
customConfig: this.customConfig,
json: jsonEditor.getValue(),
localParams: this.localParams
localParams: this.localParams,
xms:+this.xms,
xmx:+this.xmx
})
return true
} else {
@ -358,6 +380,7 @@
return false
}
debugger
// storage
this.$emit('on-params', {
customConfig: this.customConfig,
@ -370,7 +393,9 @@
jobSpeedByte: this.jobSpeedByte * 1024,
jobSpeedRecord: this.jobSpeedRecord,
preStatements: this.preStatements,
postStatements: this.postStatements
postStatements: this.postStatements,
xms:+this.xms,
xmx:+this.xmx
})
return true
}
@ -445,7 +470,9 @@
jobSpeedByte: this.jobSpeedByte * 1024,
jobSpeedRecord: this.jobSpeedRecord,
preStatements: this.preStatements,
postStatements: this.postStatements
postStatements: this.postStatements,
xms: +this.xms,
xmx: +this.xmx,
});
},
_destroyEditor () {
@ -468,6 +495,10 @@
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
// set jvm memory
this.xms = o.params.xms || 1 ;
this.xmx = o.params.xmx || 1 ;
// backfill
if(o.params.customConfig == 0) {
this.customConfig = 0
@ -544,4 +575,4 @@
right: -12px;
top: -16px;
}
</style>
</style>

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<router-view></router-view>
</template>
<script>
export default {
name: 'projects-conditions-index'
}
</script>

View File

@ -68,7 +68,7 @@
import { stateType } from './common'
import mConditions from '@/module/components/conditions/conditions'
export default {
name: 'instance-conditions',
name: 'process-instance-conditions',
data () {
return {
// state(list)

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<m-conditions>
<template slot="search-group">
<div class="list">
<x-button type="ghost" size="small" @click="_ckQuery" icon="ans-icon-search"></x-button>
</div>
<div class="list">
<x-datepicker
ref="datepicker"
@on-change="_onChangeStartStop"
type="daterange"
format="YYYY-MM-DD HH:mm:ss"
placement="bottom-end"
:value="[searchParams.startDate,searchParams.endDate]"
:panelNum="2">
<x-input slot="input" readonly slot-scope="{value}" :value="value" style="width: 310px;" size="small" :placeholder="$t('Select date range')">
<em slot="suffix"
@click.stop="_dateEmpty()"
class="ans-icon-fail-solid"
v-show="value"
style="font-size: 13px;cursor: pointer;margin-top: 1px;">
</em>
</x-input>
</x-datepicker>
</div>
<div class="list">
<x-select style="width: 140px;" @on-change="_onChangeState" :value="searchParams.stateType" >
<x-input slot="trigger" readonly :value="selectedModel ? selectedModel.label : ''" slot-scope="{ selectedModel }" style="width: 140px;" size="small" :placeholder="$t('State')" suffix-icon="ans-icon-arrow-down">
</x-input>
<x-option
v-for="city in stateTypeList"
:key="city.label"
:value="city.code"
:label="city.label">
</x-option>
</x-select>
</div>
<div class="list">
<x-input v-model.trim="searchParams.host" @on-enterkey="_ckQuery" style="width: 140px;" size="small" :placeholder="$t('host')"></x-input>
</div>
<div class="list">
<x-input v-model.trim="searchParams.executorName" @on-enterkey="_ckQuery" style="width: 140px;" size="small" :placeholder="$t('Executor')"></x-input>
</div>
<div class="list">
<x-input v-model.trim="searchParams.processInstanceName" @on-enterkey="_ckQuery" style="width: 160px;" size="small" :placeholder="$t('Process Instance')"></x-input>
</div>
<div class="list">
<x-input v-model.trim="searchParams.searchVal" @on-enterkey="_ckQuery" style="width: 160px;" size="small" :placeholder="$t('name')"></x-input>
</div>
</template>
</m-conditions>
</template>
<script>
import _ from 'lodash'
import { stateType } from './common'
import mConditions from '@/module/components/conditions/conditions'
export default {
name: 'task-instance-conditions',
data () {
return {
// state(list)
stateTypeList: stateType,
searchParams: {
// state
stateType: '',
// start date
startDate: '',
// end date
endDate: '',
// search value
searchVal: '',
// host
host: '',
// executor name
executorName: '',
processInstanceName: ''
}
}
},
props: {},
methods: {
_ckQuery () {
this.$emit('on-query', this.searchParams)
},
/**
* change times
*/
_onChangeStartStop (val) {
this.searchParams.startDate = val[0]
this.searchParams.endDate = val[1]
},
/**
* change state
*/
_onChangeState (val) {
this.searchParams.stateType = val.value
},
/**
* empty date
*/
_dateEmpty () {
this.searchParams.startDate = ''
this.searchParams.endDate = ''
this.$refs.datepicker.empty()
}
},
watch: {
},
created () {
// Routing parameter merging
if (!_.isEmpty(this.$route.query)) {
this.searchParams = _.assign(this.searchParams, this.$route.query)
}
},
mounted () {
},
computed: {
},
components: { mConditions }
}
</script>

View File

@ -16,6 +16,11 @@
*/
<template>
<m-list-construction :title="$t('TreeView')">
<template slot="operation">
<span style=" float: right; padding-right:50px">
<em class="ans-icon-fail-empty" style="font-size:20px " data-container="body" data-toggle="tooltip" :title="$t('Return')" @click="_close()"></em>
</span>
</template>
<template slot="conditions"></template>
<template slot="content">
<div class="tree-view-index-model">
@ -101,6 +106,9 @@
props: {},
methods: {
...mapActions('dag', ['getViewTree']),
_close(){
this.$router.go(-1)
},
/**
* get tree data
*/

View File

@ -16,6 +16,11 @@
*/
<template>
<m-list-construction :title="$t('Cron Manage')">
<template slot="operation">
<span style=" float: right; padding-right:50px">
<em class="ans-icon-fail-empty" style="font-size:20px " data-container="body" data-toggle="tooltip" :title="$t('Return')" @click="_close()"></em>
</span>
</template>
<template slot="content">
<m-list></m-list>
</template>
@ -27,6 +32,11 @@
import mListConstruction from '@/module/components/listConstruction/listConstruction'
export default {
name: 'definition-timing-index',
methods :{
_close(){
this.$router.go(-1)
}
},
components: { mList, mListConstruction, mSecondaryMenu }
}
</script>

View File

@ -55,7 +55,7 @@
import echarts from 'echarts'
import store from '@/conf/home/store'
import mNoData from '@/module/components/noData/noData'
import { stateType } from '@/conf/home/pages/projects/pages/_source/instanceConditions/common'
import { stateType } from '@/conf/home/pages/projects/pages/_source/conditions/instance/common'
export default {
name: 'process-state-count',
data () {

View File

@ -57,7 +57,7 @@
import echarts from 'echarts'
import store from '@/conf/home/store'
import mNoData from '@/module/components/noData/noData'
import { stateType } from '@/conf/home/pages/projects/pages/_source/instanceConditions/common'
import { stateType } from '@/conf/home/pages/projects/pages/_source/conditions/instance/common'
export default {
name: 'task-ctatus-count',

View File

@ -54,7 +54,7 @@
import { pie } from './chartConfig'
import Chart from '@/module/ana-charts'
import mNoData from '@/module/components/noData/noData'
import { stateType } from '@/conf/home/pages/projects/pages/_source/instanceConditions/common'
import { stateType } from '@/conf/home/pages/projects/pages/_source/conditions/instance/common'
export default {
name: 'task-status-count',

View File

@ -46,7 +46,7 @@
import mNoData from '@/module/components/noData/noData'
import mSecondaryMenu from '@/module/components/secondaryMenu/secondaryMenu'
import mListConstruction from '@/module/components/listConstruction/listConstruction'
import mInstanceConditions from '@/conf/home/pages/projects/pages/_source/instanceConditions'
import mInstanceConditions from '@/conf/home/pages/projects/pages/_source/conditions/instance/processInstance'
export default {
name: 'instance-list-index',

View File

@ -20,6 +20,7 @@
<template slot="conditions">
<m-instance-conditions @on-query="_onQuery"></m-instance-conditions>
</template>
<template slot="content">
<template v-if="taskInstanceList.length">
<m-list :task-instance-list="taskInstanceList" :page-no="searchParams.pageNo" :page-size="searchParams.pageSize">
@ -45,7 +46,7 @@
import listUrlParamHandle from '@/module/mixin/listUrlParamHandle'
import mSecondaryMenu from '@/module/components/secondaryMenu/secondaryMenu'
import mListConstruction from '@/module/components/listConstruction/listConstruction'
import mInstanceConditions from '@/conf/home/pages/projects/pages/_source/instanceConditions'
import mInstanceConditions from '@/conf/home/pages/projects/pages/_source/conditions/instance/taskInstance'
export default {
name: 'task-instance-list-index',
@ -72,7 +73,8 @@
// end date
endDate: '',
// Exectuor Name
executorName: ''
executorName: '',
processInstanceName: ''
},
isLeft: true
}

View File

@ -18,6 +18,7 @@
<div class="home-main list-construction-model">
<div class="content-title">
<span>{{title}}</span>
<slot name="operation"></slot>
</div>
<div class="conditions-box">
<slot name="conditions">

View File

@ -661,5 +661,8 @@ export default {
'Batch move': 'Batch move',
Version: 'Version',
'Pre tasks': 'Pre tasks',
'Running Memory':'Running Memory',
'Max Memory':'Max Memory',
'Min Memory':'Min Memory',
'The workflow canvas is abnormal and cannot be saved, please recreate': 'The workflow canvas is abnormal and cannot be saved, please recreate'
}

View File

@ -657,5 +657,8 @@ export default {
'Batch move': '批量移动',
Version: '版本',
'Pre tasks': '前置任务',
'Running Memory':'运行内存',
'Max Memory':'最大内存',
'Min Memory':'最小内存',
'The workflow canvas is abnormal and cannot be saved, please recreate': '该工作流画布异常无法保存请重新创建'
}

View File

@ -746,6 +746,7 @@
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/WorkFlowLineageServiceTest.java</include>
<include>**/api/controller/ProcessDefinitionControllerTest.java</include>
<include>**/api/controller/TaskInstanceControllerTest.java</include>
<include>**/api/controller/WorkFlowLineageControllerTest.java</include>
<include>**/api/utils/exportprocess/DataSourceParamTest.java</include>
<include>**/api/utils/exportprocess/DependentParamTest.java</include>
@ -758,6 +759,7 @@
<include>**/common/os/OshiTest.java</include>
<include>**/common/os/OSUtilsTest.java</include>
<include>**/common/shell/ShellExecutorTest.java</include>
<include>**/common/task/DataxParametersTest.java</include>
<include>**/common/task/EntityTestUtils.java</include>
<include>**/common/task/FlinkParametersTest.java</include>
<include>**/common/task/HttpParametersTest.java</include>