[python] Support using full-name definitions for resources (#10551)

This commit is contained in:
陈家名 2022-06-23 22:59:59 +08:00 committed by GitHub
parent 393cb648f9
commit 7970573365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 3 deletions

View File

@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -557,6 +558,28 @@ public class PythonGateway {
return result;
}
/**
* Get resource by given resource type and full name. It return map contain resource id, name.
* Useful in Python API create task which need processDefinition information.
*
* @param userName user who query resource
* @param fullName full name of the resource
*/
public Map<String, Object> queryResourcesFileInfo(String userName, String fullName) {
Map<String, Object> result = new HashMap<>();
User user = usersService.queryUser(userName);
Result<Object> resourceResponse = resourceService.queryResource(user, fullName, null, ResourceType.FILE);
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
String msg = String.format("Can not find valid resource by name %s", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
Resource resource = (Resource) resourceResponse.getData();
result.put("id", resource.getId());
result.put("name", resource.getFullName());
return result;
}
@PostConstruct
public void init() {
if (pythonGatewayConfiguration.getEnabled()) {

View File

@ -17,13 +17,20 @@
package org.apache.dolphinscheduler.api.python;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ResourcesService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -53,6 +60,12 @@ public class PythonGatewayTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ResourcesService resourcesService;
@Mock
private UsersService usersService;
@Test
public void testGetCodeAndVersion() throws CodeGenerateUtils.CodeGenerateException {
Project project = getTestProject();
@ -83,6 +96,37 @@ public class PythonGatewayTest {
Assert.assertEquals((long) result.get("taskDefinitionCode"), taskDefinition.getCode());
}
@Test
public void testQueryResourcesFileInfo() {
User user = getTestUser();
Mockito.when(usersService.queryUser(user.getUserName())).thenReturn(user);
Result<Object> mockResult = new Result<>();
mockResult.setCode(Status.SUCCESS.getCode());
Resource resource = getTestResource();
mockResult.setData(resource);
Mockito.when(resourcesService.queryResource(user, resource.getFullName(), null, ResourceType.FILE)).thenReturn(mockResult);
Map<String, Object> result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
Assert.assertEquals((int) result.get("id"), resource.getId());
}
private Resource getTestResource() {
Resource resource = new Resource();
resource.setId(1);
resource.setType(ResourceType.FILE);
resource.setFullName("/dev/test.py");
return resource;
}
private User getTestUser() {
User user = new User();
user.setId(1);
user.setUserName("ut-user");
return user;
}
private Project getTestProject() {
Project project = new Project();
project.setName("ut-project");

View File

@ -100,3 +100,9 @@ class Time(str):
FMT_STD_TIME = "%H:%M:%S"
FMT_NO_COLON_TIME = "%H%M%S"
class ResourceKey(str):
"""Constants for key of resource."""
ID = "id"

View File

@ -405,6 +405,7 @@ class ProcessDefinition(Base):
json.dumps(self.task_relation_json),
json.dumps(self.task_definition_json),
None,
None,
)
return self._process_definition_code

View File

@ -22,6 +22,7 @@ from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from pydolphinscheduler.constants import (
Delimiter,
ResourceKey,
TaskFlag,
TaskPriority,
TaskTimeoutFlag,
@ -155,7 +156,7 @@ class Task(Base):
# Attribute for task param
self.local_params = local_params or []
self.resource_list = resource_list or []
self._resource_list = resource_list or []
self.dependence = dependence or {}
self.wait_start_timeout = wait_start_timeout or {}
self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
@ -170,6 +171,22 @@ class Task(Base):
"""Set attribute process_definition."""
self._process_definition = process_definition
@property
def resource_list(self) -> List:
"""Get task define attribute `resource_list`."""
resources = set()
for resource in self._resource_list:
if type(resource) == str:
resources.add(self.query_resource(resource).get(ResourceKey.ID))
elif type(resource) == dict and resource.get(ResourceKey.ID) is not None:
logger.warning(
"""`resource_list` should be defined using List[str] with resource paths,
the use of ids to define resources will be remove in version 3.2.0.
"""
)
resources.add(resource.get(ResourceKey.ID))
return [{ResourceKey.ID: r} for r in resources]
@property
def condition_result(self) -> Dict:
"""Get attribute condition_result."""
@ -278,3 +295,10 @@ class Task(Base):
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
# gateway_result_checker(result)
return result.get("code"), result.get("version")
def query_resource(self, full_name):
"""Get resource info from java gateway, contains resource id, name."""
gateway = launch_gateway()
return gateway.entry_point.queryResourcesFileInfo(
self.process_definition.user.name, full_name
)

View File

@ -54,7 +54,7 @@ TEST_TASK_RELATION_SIZE = 0
},
{
"localParams": ["foo", "bar"],
"resourceList": ["foo", "bar"],
"resourceList": [{"id": 1}],
"dependence": {"foo", "bar"},
"waitStartTimeout": {"foo", "bar"},
"conditionResult": {"foo": ["bar"]},
@ -62,7 +62,11 @@ TEST_TASK_RELATION_SIZE = 0
),
],
)
def test_property_task_params(attr, expect):
@patch(
"pydolphinscheduler.core.task.Task.query_resource",
return_value=({"id": 1, "name": "foo"}),
)
def test_property_task_params(mock_resource, attr, expect):
"""Test class task property."""
task = testTask(
"test-property-task-params",
@ -241,3 +245,34 @@ def test_add_duplicate(caplog):
re.findall("already in process definition", caplog.text),
]
)
@pytest.mark.parametrize(
"resources, expect",
[
(
["/dev/test.py"],
[{"id": 1}],
),
(
["/dev/test.py", {"id": 2}],
[{"id": 1}, {"id": 2}],
),
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.task.Task.query_resource",
return_value=({"id": 1, "name": "/dev/test.py"}),
)
def test_python_resource_list(mock_code_version, mock_resource, resources, expect):
"""Test python task resource list."""
task = Task(
name="python_resource_list.",
task_type="PYTHON",
resource_list=resources,
)
assert task.resource_list == expect