mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-11-30 11:17:54 +08:00
[fix][Python] Support same task name in project defferent process definition (#10428)
close: #10431
(cherry picked from commit b86dc53ad1
)
This commit is contained in:
parent
ba2f2ba64d
commit
476f2395dc
@ -166,7 +166,7 @@ public class PythonGateway {
|
||||
return taskDefinitionService.genTaskCodeList(genNum);
|
||||
}
|
||||
|
||||
public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
|
||||
public Map<String, Long> getCodeAndVersion(String projectName, String processDefinitionName, String taskName) throws CodeGenerateUtils.CodeGenerateException {
|
||||
Project project = projectMapper.queryByName(projectName);
|
||||
Map<String, Long> result = new HashMap<>();
|
||||
// project do not exists, mean task not exists too, so we should directly return init value
|
||||
@ -175,7 +175,15 @@ public class PythonGateway {
|
||||
result.put("version", 0L);
|
||||
return result;
|
||||
}
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
|
||||
|
||||
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName);
|
||||
if (processDefinition == null) {
|
||||
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
|
||||
logger.error(msg);
|
||||
throw new IllegalArgumentException(msg);
|
||||
}
|
||||
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskName);
|
||||
if (taskDefinition == null) {
|
||||
result.put("code", CodeGenerateUtils.getInstance().genCode());
|
||||
result.put("version", 0L);
|
||||
@ -520,7 +528,7 @@ public class PythonGateway {
|
||||
result.put("processDefinitionCode", processDefinition.getCode());
|
||||
|
||||
if (taskName != null) {
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName);
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, processDefinition.getCode(), taskName);
|
||||
result.put("taskDefinitionCode", taskDefinition.getCode());
|
||||
}
|
||||
return result;
|
||||
|
@ -60,10 +60,12 @@ public interface TaskDefinitionService {
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param projectCode project code
|
||||
* @param processCode process code
|
||||
* @param taskName task name
|
||||
*/
|
||||
Map<String, Object> queryTaskDefinitionByName(User loginUser,
|
||||
long projectCode,
|
||||
long processCode,
|
||||
String taskName);
|
||||
|
||||
/**
|
||||
|
@ -270,10 +270,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
||||
*
|
||||
* @param loginUser login user
|
||||
* @param projectCode project code
|
||||
* @param processCode process code
|
||||
* @param taskName task name
|
||||
*/
|
||||
@Override
|
||||
public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, String taskName) {
|
||||
public Map<String, Object> queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) {
|
||||
Project project = projectMapper.queryByCode(projectCode);
|
||||
//check user access for project
|
||||
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
|
||||
@ -281,7 +282,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
||||
return result;
|
||||
}
|
||||
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
|
||||
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
|
||||
if (taskDefinition == null) {
|
||||
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
|
||||
} else {
|
||||
|
@ -0,0 +1,123 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.dolphinscheduler.api.python;
|
||||
|
||||
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.Project;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
|
||||
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* python gate test
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class PythonGatewayTest {
|
||||
|
||||
@InjectMocks
|
||||
private PythonGateway pythonGateway;
|
||||
|
||||
@Mock
|
||||
private ProjectMapper projectMapper;
|
||||
|
||||
@Mock
|
||||
private ProcessDefinitionMapper processDefinitionMapper;
|
||||
|
||||
@Mock
|
||||
private TaskDefinitionMapper taskDefinitionMapper;
|
||||
|
||||
@Test
|
||||
public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
|
||||
Project project = getTestProject();
|
||||
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
|
||||
|
||||
ProcessDefinition processDefinition = getTestProcessDefinition();
|
||||
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
|
||||
|
||||
TaskDefinition taskDefinition = getTestTaskDefinition();
|
||||
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
|
||||
|
||||
Map<String, Long> result = pythonGateway.getCodeAndVersion(project.getName(), processDefinition.getName(), taskDefinition.getName());
|
||||
Assert.assertEquals(result.get("code").longValue(), taskDefinition.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDependentInfo() {
|
||||
Project project = getTestProject();
|
||||
Mockito.when(projectMapper.queryByName(project.getName())).thenReturn(project);
|
||||
|
||||
ProcessDefinition processDefinition = getTestProcessDefinition();
|
||||
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), processDefinition.getName())).thenReturn(processDefinition);
|
||||
|
||||
TaskDefinition taskDefinition = getTestTaskDefinition();
|
||||
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processDefinition.getCode(), taskDefinition.getName())).thenReturn(taskDefinition);
|
||||
|
||||
Map<String, Object> result = pythonGateway.getDependentInfo(project.getName(), processDefinition.getName(), taskDefinition.getName());
|
||||
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
|
||||
}
|
||||
|
||||
private Project getTestProject() {
|
||||
Project project = new Project();
|
||||
project.setName("ut-project");
|
||||
project.setUserId(111);
|
||||
project.setCode(1L);
|
||||
project.setCreateTime(new Date());
|
||||
project.setUpdateTime(new Date());
|
||||
return project;
|
||||
}
|
||||
|
||||
private ProcessDefinition getTestProcessDefinition() {
|
||||
ProcessDefinition processDefinition = new ProcessDefinition();
|
||||
processDefinition.setCode(1L);
|
||||
processDefinition.setName("ut-process-definition");
|
||||
processDefinition.setProjectCode(1L);
|
||||
processDefinition.setUserId(111);
|
||||
processDefinition.setUpdateTime(new Date());
|
||||
processDefinition.setCreateTime(new Date());
|
||||
return processDefinition;
|
||||
}
|
||||
|
||||
private TaskDefinition getTestTaskDefinition() {
|
||||
TaskDefinition taskDefinition = new TaskDefinition();
|
||||
taskDefinition.setCode(888888L);
|
||||
taskDefinition.setName("ut-task-definition");
|
||||
taskDefinition.setProjectCode(1L);
|
||||
taskDefinition.setTaskType("SHELL");
|
||||
taskDefinition.setUserId(111);
|
||||
taskDefinition.setResourceIds("1");
|
||||
taskDefinition.setWorkerGroup("default");
|
||||
taskDefinition.setEnvironmentCode(1L);
|
||||
taskDefinition.setVersion(1);
|
||||
taskDefinition.setCreateTime(new Date());
|
||||
taskDefinition.setUpdateTime(new Date());
|
||||
return taskDefinition;
|
||||
}
|
||||
|
||||
}
|
@ -145,7 +145,7 @@ public class TaskDefinitionServiceImplTest {
|
||||
public void queryTaskDefinitionByName() {
|
||||
String taskName = "task";
|
||||
long projectCode = 1L;
|
||||
|
||||
long processCode = 1L;
|
||||
Project project = getProject(projectCode);
|
||||
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
|
||||
|
||||
@ -157,11 +157,11 @@ public class TaskDefinitionServiceImplTest {
|
||||
putMsg(result, Status.SUCCESS, projectCode);
|
||||
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
|
||||
|
||||
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName))
|
||||
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName))
|
||||
.thenReturn(new TaskDefinition());
|
||||
|
||||
Map<String, Object> relation = taskDefinitionService
|
||||
.queryTaskDefinitionByName(loginUser, projectCode, taskName);
|
||||
.queryTaskDefinitionByName(loginUser, projectCode, processCode, taskName);
|
||||
|
||||
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
|
||||
}
|
||||
|
@ -41,10 +41,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
|
||||
* query task definition by name
|
||||
*
|
||||
* @param projectCode projectCode
|
||||
* @param processCode processCode
|
||||
* @param name name
|
||||
* @return task definition
|
||||
*/
|
||||
TaskDefinition queryByName(@Param("projectCode") long projectCode,
|
||||
@Param("processCode") long processCode,
|
||||
@Param("name") String name);
|
||||
|
||||
/**
|
||||
|
@ -23,12 +23,24 @@
|
||||
worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
|
||||
resource_ids, create_time, update_time, task_group_id,task_group_priority
|
||||
</sql>
|
||||
<sql id="baseSqlV2">
|
||||
${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
|
||||
${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
|
||||
${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
|
||||
${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
|
||||
${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max
|
||||
</sql>
|
||||
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
|
||||
select
|
||||
<include refid="baseSql"/>
|
||||
from t_ds_task_definition
|
||||
WHERE project_code = #{projectCode}
|
||||
and name = #{name}
|
||||
<include refid="baseSqlV2">
|
||||
<property name="alias" value="td"/>
|
||||
</include>
|
||||
from t_ds_task_definition td
|
||||
join t_ds_process_task_relation ptr on ptr.project_code = td.project_code
|
||||
where td.project_code = #{projectCode}
|
||||
and td.name = #{name}
|
||||
and ptr.process_definition_code = #{processCode}
|
||||
and td.code = ptr.post_task_code
|
||||
</select>
|
||||
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
|
||||
select
|
||||
|
@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils;
|
||||
import org.apache.dolphinscheduler.dao.BaseDaoTest;
|
||||
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
|
||||
import org.apache.dolphinscheduler.dao.entity.User;
|
||||
|
||||
@ -36,6 +37,9 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
|
||||
@Autowired
|
||||
private TaskDefinitionMapper taskDefinitionMapper;
|
||||
|
||||
@Autowired
|
||||
private ProcessTaskRelationMapper processTaskRelationMapper;
|
||||
|
||||
@Autowired
|
||||
private UserMapper userMapper;
|
||||
|
||||
@ -60,6 +64,24 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
|
||||
return taskDefinition;
|
||||
}
|
||||
|
||||
/**
|
||||
* insert
|
||||
*
|
||||
* @return ProcessDefinition
|
||||
*/
|
||||
private ProcessTaskRelation insertTaskRelation(long postTaskCode) {
|
||||
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
|
||||
processTaskRelation.setName("def 1");
|
||||
processTaskRelation.setProjectCode(1L);
|
||||
processTaskRelation.setProcessDefinitionCode(1L);
|
||||
processTaskRelation.setPostTaskCode(postTaskCode);
|
||||
processTaskRelation.setPreTaskCode(0L);
|
||||
processTaskRelation.setUpdateTime(new Date());
|
||||
processTaskRelation.setCreateTime(new Date());
|
||||
processTaskRelationMapper.insert(processTaskRelation);
|
||||
return processTaskRelation;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsert() {
|
||||
TaskDefinition taskDefinition = insertOne();
|
||||
@ -69,7 +91,8 @@ public class TaskDefinitionMapperTest extends BaseDaoTest {
|
||||
@Test
|
||||
public void testQueryByDefinitionName() {
|
||||
TaskDefinition taskDefinition = insertOne();
|
||||
TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
|
||||
ProcessTaskRelation processTaskRelation = insertTaskRelation(taskDefinition.getCode());
|
||||
TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode(), processTaskRelation.getProcessDefinitionCode()
|
||||
, taskDefinition.getName());
|
||||
|
||||
Assert.assertNotNull(result);
|
||||
|
@ -273,7 +273,7 @@ class Task(Base):
|
||||
# TODO get code from specific project process definition and task name
|
||||
gateway = launch_gateway()
|
||||
result = gateway.entry_point.getCodeAndVersion(
|
||||
self.process_definition._project, self.name
|
||||
self.process_definition._project, self.process_definition.name, self.name
|
||||
)
|
||||
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
|
||||
# gateway_result_checker(result)
|
||||
|
Loading…
Reference in New Issue
Block a user