diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index c0ad88f481..56e7ef2087 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskInstanceService; @@ -23,19 +25,30 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.User; -import io.swagger.annotations.*; + +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; -import java.util.Map; - -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR; - /** * task instance controller */ @@ -69,6 +82,7 @@ public class TaskInstanceController extends BaseController { @ApiOperation(value = "queryTaskListPaging", notes = "QUERY_TASK_INSTANCE_LIST_PAGING_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = false, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processInstanceName", value = "PROCESS_INSTANCE_NAME", required = false, type = "String"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"), @ApiImplicitParam(name = "taskName", value = "TASK_NAME", type = "String"), @ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type = "String"), @@ -85,6 +99,7 @@ public class TaskInstanceController extends BaseController { public Result queryTaskListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "processInstanceId", required = false, defaultValue = "0") Integer processInstanceId, + @RequestParam(value = "processInstanceName", required = false) String processInstanceName, @RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam(value = "taskName", required = false) String taskName, @RequestParam(value = "executorName", required = false) String executorName, @@ -95,11 +110,20 @@ public class TaskInstanceController extends BaseController { @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { - logger.info("query task instance list, project name:{},process instance:{}, search value:{},task name:{}, executor name: {},state type:{}, host:{}, start:{}, end:{}", - projectName, processInstanceId, searchVal, taskName, executorName, stateType, host, startTime, endTime); + logger.info("query task instance list, projectName:{}, processInstanceId:{}, processInstanceName:{}, search value:{}, taskName:{}, executorName: {}, stateType:{}, host:{}, start:{}, end:{}", + StringUtils.replaceNRTtoUnderline(projectName), + processInstanceId, + StringUtils.replaceNRTtoUnderline(processInstanceName), + StringUtils.replaceNRTtoUnderline(searchVal), + StringUtils.replaceNRTtoUnderline(taskName), + StringUtils.replaceNRTtoUnderline(executorName), + stateType, + StringUtils.replaceNRTtoUnderline(host), + StringUtils.replaceNRTtoUnderline(startTime), + StringUtils.replaceNRTtoUnderline(endTime)); searchVal = ParameterUtils.handleEscapes(searchVal); Map result = taskInstanceService.queryTaskListPaging( - loginUser, projectName, processInstanceId, taskName, executorName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize); + loginUser, projectName, processInstanceId, processInstanceName, taskName, executorName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize); return returnDataListPaging(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java index 74c2f6908f..d21a64c4b3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java @@ -499,7 +499,13 @@ public class DataSourceService extends BaseService { String address = buildAddress(type, host, port, connectType); Map parameterMap = new LinkedHashMap(6); - String jdbcUrl = address + "/" + database; + String jdbcUrl; + if (DbType.SQLSERVER == type) { + jdbcUrl = address + ";databaseName=" + database; + } else { + jdbcUrl = address + "/" + database; + } + if (Constants.ORACLE.equals(type.name())) { parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java index 695b76b2bc..012af8fd38 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.api.service; +package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -69,7 +69,6 @@ public class TaskInstanceService extends BaseService { @Autowired UsersService usersService; - /** * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * @@ -87,7 +86,7 @@ public class TaskInstanceService extends BaseService { * @return task list page */ public Map queryTaskListPaging(User loginUser, String projectName, - Integer processInstanceId, String taskName, String executorName, String startDate, + Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate, String endDate, String searchVal, ExecutionStatus stateType, String host, Integer pageNo, Integer pageSize) { Map result = new HashMap<>(); @@ -124,7 +123,7 @@ public class TaskInstanceService extends BaseService { int executorId = usersService.getUserIdByName(executorName); IPage taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( - page, project.getId(), processInstanceId, searchVal, taskName, executorId, statusArray, host, start, end + page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end ); Set exclusionSet = new HashSet<>(); exclusionSet.add(Constants.CLASS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java index 368981c0d3..b97c7c192f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskInstanceControllerTest.java @@ -14,52 +14,60 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.controller; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.TaskInstanceService; +import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; + +import java.util.HashMap; +import java.util.Map; + import org.junit.Assert; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; -import org.springframework.test.web.servlet.MvcResult; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; /** * task instance controller test */ -public class TaskInstanceControllerTest extends AbstractControllerTest{ - private static Logger logger = LoggerFactory.getLogger(TaskInstanceControllerTest.class); +@RunWith(MockitoJUnitRunner.Silent.class) +public class TaskInstanceControllerTest { + + @InjectMocks + private TaskInstanceController taskInstanceController; + + @Mock + private TaskInstanceService taskInstanceService; @Test - public void testQueryTaskListPaging() throws Exception { + public void testQueryTaskListPaging() { - MultiValueMap paramsMap = new LinkedMultiValueMap<>(); - //paramsMap.add("processInstanceId","1380"); - paramsMap.add("searchVal",""); - paramsMap.add("taskName",""); - //paramsMap.add("stateType",""); - paramsMap.add("startDate","2019-02-26 19:48:00"); - paramsMap.add("endDate","2019-02-26 19:48:22"); - paramsMap.add("pageNo","1"); - paramsMap.add("pageSize","20"); + Map result = new HashMap<>(); + Integer pageNo = 1; + Integer pageSize = 20; + PageInfo pageInfo = new PageInfo(pageNo, pageSize); + result.put(Constants.DATA_LIST, pageInfo); + result.put(Constants.STATUS, Status.SUCCESS); - MvcResult mvcResult = mockMvc.perform(get("/projects/{projectName}/task-instance/list-paging","cxc_1113") - .header(SESSION_ID, sessionId) - .params(paramsMap)) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) - .andReturn(); + when(taskInstanceService.queryTaskListPaging(any(), eq(""), eq(1), eq(""), eq(""), eq(""),any(), any(), + eq(""), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(result); + Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "", + "", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize); + Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode()); - Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); - logger.info(mvcResult.getResponse().getContentAsString()); } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 16547b3fd7..199b34cc1b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service; import static org.mockito.ArgumentMatchers.any; @@ -88,7 +89,7 @@ public class TaskInstanceServiceTest { //project auth fail when(projectMapper.queryByName(projectName)).thenReturn(null); when(projectService.checkProjectAndAuth(loginUser, null, projectName)).thenReturn(result); - Map proejctAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "", + Map proejctAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "", "", "test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS)); @@ -107,43 +108,43 @@ public class TaskInstanceServiceTest { when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), + when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance); - Map successRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map successRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); //executor name empty - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), + when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); - Map executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.SUCCESS, executorEmptyRes.get(Constants.STATUS)); //executor null when(usersService.queryUser(loginUser.getId())).thenReturn(null); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(-1); - Map executorNullRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map executorNullRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.SUCCESS, executorNullRes.get(Constants.STATUS)); //start/end date null - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), + when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(pageReturn); - Map executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "", null, null, "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.SUCCESS, executorNullDateRes.get(Constants.STATUS)); //start date error format - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), + when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), any(), any())).thenReturn(pageReturn); - Map executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "", "error date", null, "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, executorErrorStartDateRes.get(Constants.STATUS)); - Map executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", + Map executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "", "", "", null, "error date", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, executorErrorEndDateRes.get(Constants.STATUS)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java index f54e107995..c1f5f1d81c 100755 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -14,15 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.datax; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; /** * DataX parameter @@ -89,6 +90,16 @@ public class DataxParameters extends AbstractParameters { */ private int jobSpeedRecord; + /** + * Xms memory + */ + private int xms; + + /** + * Xmx memory + */ + private int xmx; + public int getCustomConfig() { return customConfig; } @@ -185,6 +196,22 @@ public class DataxParameters extends AbstractParameters { this.jobSpeedRecord = jobSpeedRecord; } + public int getXms() { + return xms; + } + + public void setXms(int xms) { + this.xms = xms; + } + + public int getXmx() { + return xmx; + } + + public void setXmx(int xmx) { + this.xmx = xmx; + } + @Override public boolean checkParameters() { if (customConfig == Flag.NO.ordinal()) { @@ -204,19 +231,21 @@ public class DataxParameters extends AbstractParameters { @Override public String toString() { - return "DataxParameters{" + - "customConfig=" + customConfig + - ", json='" + json + '\'' + - ", dsType='" + dsType + '\'' + - ", dataSource=" + dataSource + - ", dtType='" + dtType + '\'' + - ", dataTarget=" + dataTarget + - ", sql='" + sql + '\'' + - ", targetTable='" + targetTable + '\'' + - ", preStatements=" + preStatements + - ", postStatements=" + postStatements + - ", jobSpeedByte=" + jobSpeedByte + - ", jobSpeedRecord=" + jobSpeedRecord + - '}'; + return "DataxParameters{" + + "customConfig=" + customConfig + + ", json='" + json + '\'' + + ", dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + ", xms=" + xms + + ", xmx=" + xmx + + '}'; } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java new file mode 100644 index 0000000000..d6e2f69882 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/DataxParametersTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task; + +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; + +import org.junit.Assert; +import org.junit.Test; + +public class DataxParametersTest { + + /** + * jvm parameters + */ + public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" "; + + @Test + public void testLoadJvmEnv() { + + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + + String actual = loadJvmEnvTest(dataxParameters); + + String except = " --jvm=\"-Xms1G -Xmx1G\" "; + Assert.assertEquals(except,actual); + + dataxParameters.setXms(13); + dataxParameters.setXmx(14); + actual = loadJvmEnvTest(dataxParameters); + except = " --jvm=\"-Xms13G -Xmx14G\" "; + Assert.assertEquals(except,actual); + + } + + @Test + public void testToString() { + + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setCustomConfig(0); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + dataxParameters.setDataSource(1); + dataxParameters.setDataTarget(1); + dataxParameters.setDsType("MYSQL"); + dataxParameters.setDtType("MYSQL"); + dataxParameters.setJobSpeedByte(1); + dataxParameters.setJobSpeedRecord(1); + dataxParameters.setJson("json"); + + String expected = "DataxParameters" + + "{" + + "customConfig=0, " + + "json='json', " + + "dsType='MYSQL', " + + "dataSource=1, " + + "dtType='MYSQL', " + + "dataTarget=1, " + + "sql='null', " + + "targetTable='null', " + + "preStatements=null, " + + "postStatements=null, " + + "jobSpeedByte=1, " + + "jobSpeedRecord=1, " + + "xms=0, " + + "xmx=-100" + + "}"; + + Assert.assertEquals(expected,dataxParameters.toString()); + } + + public String loadJvmEnvTest(DataxParameters dataXParameters) { + int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); + int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); + return String.format(JVM_EVN, xms, xmx); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index ac23b25c9c..b0e9ca7338 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -14,25 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.TaskInstance; + import org.apache.ibatis.annotations.Param; import java.util.Date; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; + /** * task instance mapper interface */ public interface TaskInstanceMapper extends BaseMapper { - List queryTaskByProcessIdAndState(@Param("processInstanceId") Integer processInstanceId, @Param("state") Integer state); @@ -61,6 +63,7 @@ public interface TaskInstanceMapper extends BaseMapper { IPage queryTaskInstanceListPaging(IPage page, @Param("projectId") int projectId, @Param("processInstanceId") Integer processInstanceId, + @Param("processInstanceName") String processInstanceName, @Param("searchVal") String searchVal, @Param("taskName") String taskName, @Param("executorId") int executorId, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 080b40c0cf..eb34ffcb3a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -128,6 +128,9 @@ and instance.executor_id = #{executorId} + + and process.name like concat('%', #{processInstanceName}, '%') + order by instance.start_time desc diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index 017527137b..a225f7654c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -286,6 +286,7 @@ public class TaskInstanceMapperTest { task.getProcessInstanceId(), "", "", + "", 0, new int[0], "", diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 6d701a00a6..caf487947b 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -14,27 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.datax; - -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; -import com.alibaba.druid.sql.ast.statement.*; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.io.FileUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; @@ -46,7 +37,8 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; -import org.slf4j.Logger; + +import org.apache.commons.io.FileUtils; import java.io.File; import java.nio.charset.StandardCharsets; @@ -56,25 +48,48 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * DataX task */ public class DataxTask extends AbstractTask { + /** + * jvm parameters + */ + public static final String JVM_EVN = " --jvm=\"-Xms%sG -Xmx%sG\" "; /** * python process(datax only supports version 2.7 by default) */ private static final String DATAX_PYTHON = "python2.7"; - /** * datax home path */ private static final String DATAX_HOME_EVN = "${DATAX_HOME}"; - /** * datax channel count */ @@ -97,6 +112,7 @@ public class DataxTask extends AbstractTask { /** * constructor + * * @param taskExecutionContext taskExecutionContext * @param logger logger */ @@ -104,9 +120,8 @@ public class DataxTask extends AbstractTask { super(taskExecutionContext, logger); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext,logger); + taskExecutionContext, logger); } /** @@ -149,9 +164,7 @@ public class DataxTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); - } - catch (Exception e) { - logger.error("datax task failure", e); + } catch (Exception e) { setExitStatusCode(Constants.EXIT_CODE_FAILURE); throw e; } @@ -189,9 +202,9 @@ public class DataxTask extends AbstractTask { return fileName; } - if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){ + if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()) { json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); - }else { + } else { ObjectNode job = JSONUtils.createObjectNode(); job.putArray("content").addAll(buildDataxJobContentJson()); job.set("setting", buildDataxJobSettingJson()); @@ -248,7 +261,6 @@ public class DataxTask extends AbstractTask { readerParam.put("password", dataSourceCfg.getPassword()); readerParam.putArray("connection").addAll(readerConnArr); - ObjectNode reader = JSONUtils.createObjectNode(); reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype()))); reader.set("parameter", readerParam); @@ -277,7 +289,6 @@ public class DataxTask extends AbstractTask { } writerParam.putArray("connection").addAll(writerConnArr); - if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { ArrayNode preSqlArr = writerParam.putArray("preSql"); for (String preSql : dataXParameters.getPreStatements()) { @@ -368,7 +379,7 @@ public class DataxTask extends AbstractTask { * @throws Exception if error throws Exception */ private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) - throws Exception { + throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), @@ -387,6 +398,7 @@ public class DataxTask extends AbstractTask { sbr.append(" "); sbr.append(DATAX_HOME_EVN); sbr.append(" "); + sbr.append(loadJvmEnv(dataXParameters)); sbr.append(jobConfigFilePath); // replace placeholder @@ -409,17 +421,19 @@ public class DataxTask extends AbstractTask { return fileName; } + public String loadJvmEnv(DataxParameters dataXParameters) { + int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); + int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); + return String.format(JVM_EVN, xms, xmx); + } + /** * parsing synchronized column names in SQL statements * - * @param dsType - * the database type of the data source - * @param dtType - * the database type of the data target - * @param dataSourceCfg - * the database connection parameters of the data source - * @param sql - * sql for data synchronization + * @param dsType the database type of the data source + * @param dtType the database type of the data target + * @param dataSourceCfg the database connection parameters of the data source + * @param sql sql for data synchronization * @return Keyword converted column names */ private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) { @@ -438,10 +452,8 @@ public class DataxTask extends AbstractTask { /** * try grammatical parsing column * - * @param dbType - * database type - * @param sql - * sql for data synchronization + * @param dbType database type + * @param sql sql for data synchronization * @return column name array * @throws RuntimeException if error throws RuntimeException */ @@ -453,16 +465,16 @@ public class DataxTask extends AbstractTask { notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); SQLStatement sqlStatement = parser.parseStatement(); - SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; + SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement; SQLSelect sqlSelect = sqlSelectStatement.getSelect(); List selectItemList = null; if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) { - SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery(); selectItemList = block.getSelectList(); } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) { - SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery(); - SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight(); + SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight(); selectItemList = block.getSelectList(); } @@ -470,7 +482,7 @@ public class DataxTask extends AbstractTask { String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); columnNames = new String[selectItemList.size()]; - for (int i = 0; i < selectItemList.size(); i++ ) { + for (int i = 0; i < selectItemList.size(); i++) { SQLSelectItem item = selectItemList.get(i); String columnName = null; @@ -479,10 +491,10 @@ public class DataxTask extends AbstractTask { columnName = item.getAlias(); } else if (item.getExpr() != null) { if (item.getExpr() instanceof SQLPropertyExpr) { - SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr(); + SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr(); columnName = expr.getName(); } else if (item.getExpr() instanceof SQLIdentifierExpr) { - SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr(); + SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr(); columnName = expr.getName(); } } else { @@ -497,8 +509,7 @@ public class DataxTask extends AbstractTask { columnNames[i] = columnName; } - } - catch (Exception e) { + } catch (Exception e) { logger.warn(e.getMessage(), e); return null; } @@ -509,10 +520,8 @@ public class DataxTask extends AbstractTask { /** * try to execute sql to resolve column names * - * @param baseDataSource - * the database connection parameters - * @param sql - * sql for data synchronization + * @param baseDataSource the database connection parameters + * @param sql sql for data synchronization * @return column name array */ public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) { @@ -529,11 +538,10 @@ public class DataxTask extends AbstractTask { ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); columnNames = new String[num]; - for (int i = 1; i <= num; i++ ) { + for (int i = 1; i <= num; i++) { columnNames[i - 1] = md.getColumnName(i); } - } - catch (SQLException e) { + } catch (SQLException e) { logger.warn(e.getMessage(), e); return null; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java index 31eac07270..1c9f4922a7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java @@ -23,7 +23,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.Arrays; -import java.util.Collections; +import java.util.List; /** * round robin selector @@ -39,82 +39,59 @@ public class RoundRobinSelectorTest { @Test public void testSelect1() { RoundRobinSelector selector = new RoundRobinSelector(); + // dismiss of server warm-up time + long startTime = System.currentTimeMillis() - 60 * 10 * 1000; + List hostOneList = Arrays.asList( + new Host("192.168.1.1", 80, 20, startTime, "kris"), + new Host("192.168.1.2", 80, 10, startTime, "kris")); + + List hostTwoList = Arrays.asList( + new Host("192.168.1.1", 80, 20, startTime, "kris"), + new Host("192.168.1.2", 80, 10, startTime, "kris"), + new Host("192.168.1.3", 80, 10, startTime, "kris")); + Host result; - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(hostOneList); + Assert.assertEquals("192.168.1.1", result.getIp()); + + result = selector.select(hostOneList); + Assert.assertEquals("192.168.1.2", result.getIp()); + // add new host - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); - Assert.assertEquals("192.168.1.2", result.getIp()); - - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); - Assert.assertEquals("192.168.1.1", result.getIp()); - - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.3", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), - new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostTwoList); Assert.assertEquals("192.168.1.3", result.getIp()); // remove host3 - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList( - new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), - new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java index e5177aa786..8d03c1460a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -14,19 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.task.datax; +import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; @@ -39,6 +33,13 @@ import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.UUID; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,7 +50,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import static org.apache.dolphinscheduler.common.enums.CommandType.START_PROCESS; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * DataxTask Tester. @@ -58,7 +60,13 @@ public class DataxTaskTest { private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class); - private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + private static final String CONNECTION_PARAMS = " {\n" + + " \"user\":\"root\",\n" + + " \"password\":\"123456\",\n" + + " \"address\":\"jdbc:mysql://127.0.0.1:3306\",\n" + + " \"database\":\"test\",\n" + + " \"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"\n" + + "}"; private DataxTask dataxTask; @@ -69,7 +77,7 @@ public class DataxTaskTest { private ApplicationContext applicationContext; private TaskExecutionContext taskExecutionContext; - private TaskProps props = new TaskProps(); + private final TaskProps props = new TaskProps(); @Before public void before() @@ -97,12 +105,40 @@ public class DataxTaskTest { props.setTaskTimeout(0); if (customConfig == 1) { props.setTaskParams( - "{\"customConfig\":1, \"localParams\":[{\"prop\":\"test\",\"value\":\"38294729\"}],\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"${test}\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); + "{\n" + + " \"customConfig\":1,\n" + + " \"localParams\":[\n" + + " {\n" + + " \"prop\":\"test\",\n" + + " \"value\":\"38294729\"\n" + + " }\n" + + " ],\n" + + " \"json\":\"" + + "{\"job\":{\"setting\":{\"speed\":{\"byte\":1048576},\"errorLimit\":{\"record\":0,\"percentage\":0.02}},\"content\":[" + + "{\"reader\":{\"name\":\"rdbmsreader\",\"parameter\":{\"username\":\"xxx\",\"password\":\"${test}\",\"column\":[\"id\",\"name\"],\"splitPk\":\"pk\",\"" + + "connection\":[{\"querySql\":[\"SELECT * from dual\"],\"jdbcUrl\":[\"jdbc:dm://ip:port/database\"]}],\"fetchSize\":1024,\"where\":\"1 = 1\"}},\"" + + "writer\":{\"name\":\"streamwriter\",\"parameter\":{\"print\":true}}}]}}\"\n" + + "}"); -// "{\"customConfig\":1,\"json\":\"{\\\"job\\\":{\\\"setting\\\":{\\\"speed\\\":{\\\"byte\\\":1048576},\\\"errorLimit\\\":{\\\"record\\\":0,\\\"percentage\\\":0.02}},\\\"content\\\":[{\\\"reader\\\":{\\\"name\\\":\\\"rdbmsreader\\\",\\\"parameter\\\":{\\\"username\\\":\\\"xxx\\\",\\\"password\\\":\\\"xxx\\\",\\\"column\\\":[\\\"id\\\",\\\"name\\\"],\\\"splitPk\\\":\\\"pk\\\",\\\"connection\\\":[{\\\"querySql\\\":[\\\"SELECT * from dual\\\"],\\\"jdbcUrl\\\":[\\\"jdbc:dm://ip:port/database\\\"]}],\\\"fetchSize\\\":1024,\\\"where\\\":\\\"1 = 1\\\"}},\\\"writer\\\":{\\\"name\\\":\\\"streamwriter\\\",\\\"parameter\\\":{\\\"print\\\":true}}}]}}\"}"); } else { props.setTaskParams( - "{\"customConfig\":0,\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"dataSource\":1,\"dsType\":\"MYSQL\",\"dataTarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + "{\n" + + " \"customConfig\":0,\n" + + " \"targetTable\":\"test\",\n" + + " \"postStatements\":[\n" + + " \"delete from test\"\n" + + " ],\n" + + " \"jobSpeedByte\":0,\n" + + " \"jobSpeedRecord\":1000,\n" + + " \"dtType\":\"MYSQL\",\n" + + " \"dataSource\":1,\n" + + " \"dsType\":\"MYSQL\",\n" + + " \"dataTarget\":2,\n" + + " \"sql\":\"select 1 as test from dual\",\n" + + " \"preStatements\":[\n" + + " \"delete from test\"\n" + + " ]\n" + + "}"); } taskExecutionContext = Mockito.mock(TaskExecutionContext.class); @@ -114,7 +150,6 @@ public class DataxTaskTest { Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); - DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); dataxTaskExecutionContext.setSourcetype(0); dataxTaskExecutionContext.setTargetType(0); @@ -126,7 +161,6 @@ public class DataxTaskTest { dataxTask.init(); props.setCmdTypeIfComplement(START_PROCESS); - Mockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); @@ -138,7 +172,6 @@ public class DataxTaskTest { e.printStackTrace(); } - dataxTask = PowerMockito.spy(new DataxTask(taskExecutionContext, logger)); dataxTask.init(); } @@ -405,4 +438,23 @@ public class DataxTaskTest { } } + @Test + public void testLoadJvmEnv() { + DataxTask dataxTask = new DataxTask(null,null); + DataxParameters dataxParameters = new DataxParameters(); + dataxParameters.setXms(0); + dataxParameters.setXmx(-100); + + String actual = dataxTask.loadJvmEnv(dataxParameters); + + String except = " --jvm=\"-Xms1G -Xmx1G\" "; + Assert.assertEquals(except,actual); + + dataxParameters.setXms(13); + dataxParameters.setXmx(14); + actual = dataxTask.loadJvmEnv(dataxParameters); + except = " --jvm=\"-Xms13G -Xmx14G\" "; + Assert.assertEquals(except,actual); + + } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue index 6ced1ad0e6..42bd2341f2 100755 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -144,6 +144,22 @@ +
+
+ {{$t('Running Memory')}} +
+
+ {{$t('Min Memory')}} + + +    G    + {{$t('Max Memory')}} + + +    G +
+
+ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/common.js similarity index 100% rename from dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/common.js rename to dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/common.js diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue similarity index 99% rename from dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue rename to dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue index 1ef2e1f3e4..d4c05cf903 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/processInstance.vue @@ -68,7 +68,7 @@ import { stateType } from './common' import mConditions from '@/module/components/conditions/conditions' export default { - name: 'instance-conditions', + name: 'process-instance-conditions', data () { return { // state(list) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/taskInstance.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/taskInstance.vue new file mode 100644 index 0000000000..08f61a8fd0 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/conditions/instance/taskInstance.vue @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/tree/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/tree/index.vue index 4bc102dc24..bbc182514a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/tree/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/tree/index.vue @@ -16,6 +16,11 @@ */