From b86dc53ad10cb5c4d76f5e85c38e5a5659a349fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=AE=B6=E5=90=8D?= <13774486042@163.com> Date: Tue, 14 Jun 2022 13:53:18 +0800 Subject: [PATCH] [fix][Python] Support same task name in project defferent process definition (#10428) close: #10431 --- .../api/python/PythonGateway.java | 14 +- .../api/service/TaskDefinitionService.java | 2 + .../impl/TaskDefinitionServiceImpl.java | 5 +- .../api/python/PythonGatewayTest.java | 123 ++++++++++++++++++ .../TaskDefinitionServiceImplTest.java | 6 +- .../dao/mapper/TaskDefinitionMapper.java | 2 + .../dao/mapper/TaskDefinitionMapper.xml | 20 ++- .../dao/mapper/TaskDefinitionMapperTest.java | 25 +++- .../src/pydolphinscheduler/core/task.py | 2 +- 9 files changed, 185 insertions(+), 14 deletions(-) create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index e142c421b0..9b89d9636b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -166,7 +166,7 @@ public class PythonGateway { return taskDefinitionService.genTaskCodeList(genNum); } - public Map getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException { + public Map getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException { Project project = projectMapper.queryByName(projectName); Map result = new HashMap<>(); // project do not exists, mean task not exists too, so we should directly return init value @@ -175,7 +175,15 @@ public class PythonGateway { result.put("version", 0L); return result; } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName); + + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + if (processDefinition == null) { + String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName); if (taskDefinition == null) { result.put("code", CodeGenerateUtils.getInstance().genCode()); result.put("version", 0L); @@ -520,7 +528,7 @@ public class PythonGateway { result.put("processDefinitionCode", processDefinition.getCode()); if (taskName != null) { - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName); + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName); result.put("taskDefinitionCode", taskDefinition.getCode()); } return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 5d2ba85b02..a715b68e97 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -60,10 +60,12 @@ public interface TaskDefinitionService { * * @param loginUser login user * @param projectCode project code + * @param processCode process code * @param taskName task name */ Map queryTaskDefinitionByName(User loginUser, long projectCode, + long processCode, String taskName); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 92fb74d787..30f2d0235c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -272,10 +272,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * * @param loginUser login user * @param projectCode project code + * @param processCode process code * @param taskName task name */ @Override - public Map queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) { + public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); @@ -283,7 +284,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName); + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName); if (taskDefinition == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName); } else { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java new file mode 100644 index 0000000000..7d8b6efabc --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/python/PythonGatewayTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.python; + +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Date; +import java.util.Map; + +/** + * python gate test + */ +@RunWith(MockitoJUnitRunner.class) +public class PythonGatewayTest { + + @InjectMocks + private PythonGateway pythonGateway; + + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProcessDefinitionMapper processDefinitionMapper; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + + @Test + public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException { + Project project = getTestProject(); + Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project); + + ProcessDefinition processDefinition = getTestProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition); + + TaskDefinition taskDefinition = getTestTaskDefinition(); + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition); + + Map result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName()); + Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode()); + } + + @Test + public void testGetDependentInfo() { + Project project = getTestProject(); + Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project); + + ProcessDefinition processDefinition = getTestProcessDefinition(); + Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition); + + TaskDefinition taskDefinition = getTestTaskDefinition(); + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition); + + Map result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName()); + Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode()); + } + + private Project getTestProject() { + Project project = new Project(); + project.setName("ut-project"); + project.setUserId(111); + project.setCode(1L); + project.setCreateTime(new Date()); + project.setUpdateTime(new Date()); + return project; + } + + private ProcessDefinition getTestProcessDefinition() { + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); + processDefinition.setName("ut-process-definition"); + processDefinition.setProjectCode(1L); + processDefinition.setUserId(111); + processDefinition.setUpdateTime(new Date()); + processDefinition.setCreateTime(new Date()); + return processDefinition; + } + + private TaskDefinition getTestTaskDefinition() { + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setCode(888888L); + taskDefinition.setName("ut-task-definition"); + taskDefinition.setProjectCode(1L); + taskDefinition.setTaskType("SHELL"); + taskDefinition.setUserId(111); + taskDefinition.setResourceIds("1"); + taskDefinition.setWorkerGroup("default"); + taskDefinition.setEnvironmentCode(1L); + taskDefinition.setVersion(1); + taskDefinition.setCreateTime(new Date()); + taskDefinition.setUpdateTime(new Date()); + return taskDefinition; + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index beea8cc675..6a969b182b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -146,7 +146,7 @@ public class TaskDefinitionServiceImplTest { public void queryTaskDefinitionByName() { String taskName = "task"; long projectCode = 1L; - + long processCode = 1L; Project project = getProject(projectCode); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); @@ -158,11 +158,11 @@ public class TaskDefinitionServiceImplTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION )).thenReturn(result); - Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName)) + Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName)) .thenReturn(new TaskDefinition()); Map relation = taskDefinitionService - .queryTaskDefinitionByName(loginUser, projectCode, taskName); + .queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 304e623a0a..c426da6b04 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper { * query task definition by name * * @param projectCode projectCode + * @param processCode processCode * @param name name * @return task definition */ TaskDefinition queryByName(@Param("projectCode") long projectCode, + @Param("processCode") long processCode, @Param("name") String name); /** diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index c78dd5a449..b8c49faa3a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -23,12 +23,24 @@ worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max + + ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id, + ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, + ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout, + ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id, + ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max +