[Feature-7346] [Python]Add workflow as code task type spark (#7968)

* add spark task

* fix code format

* add parent class for flink and spark

* modify Engine docstring

* modify docstring of Engine
This commit is contained in:
Devosend 2022-01-15 23:24:55 +08:00 committed by GitHub
parent f4a8502d43
commit 027af091c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 464 additions and 40 deletions

View File

@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""A example workflow for task spark."""
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark
with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd:
task = Spark(
name="task_spark",
main_class="org.apache.spark.examples.SparkPi",
main_package="spark-examples_2.12-3.2.0.jar",
program_type=ProgramType.JAVA,
deploy_mode=DeployMode.LOCAL,
)
pd.run()

View File

@ -78,6 +78,7 @@ class TaskType(str):
CONDITIONS = "CONDITIONS"
SWITCH = "SWITCH"
FLINK = "FLINK"
SPARK = "SPARK"
class DefaultTaskCodeNum(str):

View File

@ -0,0 +1,95 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Module engine."""
from typing import Dict, Optional
from py4j.protocol import Py4JJavaError
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.java_gateway import launch_gateway
class ProgramType(str):
"""Type of program engine runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
JAVA = "JAVA"
SCALA = "SCALA"
PYTHON = "PYTHON"
class Engine(Task):
"""Task engine object, declare behavior for engine task to dolphinscheduler.
This is the parent class of spark, flink and mr tasks,
and is used to provide the programType, mainClass and mainJar task parameters for reuse.
"""
def __init__(
self,
name: str,
task_type: str,
main_class: str,
main_package: str,
program_type: Optional[ProgramType] = ProgramType.SCALA,
*args,
**kwargs
):
super().__init__(name, task_type, *args, **kwargs)
self.main_class = main_class
self.main_package = main_package
self.program_type = program_type
self._resource = {}
def get_resource_info(self, program_type, main_package):
"""Get resource info from java gateway, contains resource id, name."""
if self._resource:
return self._resource
else:
gateway = launch_gateway()
try:
self._resource = gateway.entry_point.getResourcesFileInfo(
program_type, main_package
)
# Handler source do not exists error, for now we just terminate the process.
except Py4JJavaError as ex:
raise PyDSParamException(str(ex.java_exception))
return self._resource
def get_jar_id(self) -> int:
"""Get jar id from java gateway, a wrapper for :func:`get_resource_info`."""
return self.get_resource_info(self.program_type, self.main_package).get("id")
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for engine children task.
children task have some specials attribute for task_params, and is odd if we
directly set as python property, so we Override Task.task_params here.
"""
params = super().task_params
custom_params = {
"programType": self.program_type,
"mainClass": self.main_class,
"mainJar": {
"id": self.get_jar_id(),
},
}
params.update(custom_params)
return params

View File

@ -17,19 +17,10 @@
"""Task Flink."""
from typing import Dict, Optional
from typing import Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.java_gateway import launch_gateway
class ProgramType(str):
"""Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`."""
JAVA = "JAVA"
SCALA = "SCALA"
PYTHON = "PYTHON"
from pydolphinscheduler.core.engine import Engine, ProgramType
class FlinkVersion(str):
@ -46,12 +37,10 @@ class DeployMode(str):
CLUSTER = "cluster"
class Flink(Task):
class Flink(Engine):
"""Task flink object, declare behavior for flink task to dolphinscheduler."""
_task_custom_attr = {
"main_class",
"main_jar",
"deploy_mode",
"flink_version",
"slot",
@ -59,7 +48,6 @@ class Flink(Task):
"job_manager_memory",
"task_manager_memory",
"app_name",
"program_type",
"parallelism",
"main_args",
"others",
@ -84,10 +72,15 @@ class Flink(Task):
*args,
**kwargs
):
super().__init__(name, TaskType.FLINK, *args, **kwargs)
self.main_class = main_class
self.main_package = main_package
self.program_type = program_type
super().__init__(
name,
TaskType.FLINK,
main_class,
main_package,
program_type,
*args,
**kwargs
)
self.deploy_mode = deploy_mode
self.flink_version = flink_version
self.app_name = app_name
@ -98,20 +91,3 @@ class Flink(Task):
self.parallelism = parallelism
self.main_args = main_args
self.others = others
self._resource = {}
@property
def main_jar(self) -> Dict:
"""Return main package of dict."""
resource_info = self.get_resource_info(self.program_type, self.main_package)
return {"id": resource_info.get("id")}
def get_resource_info(self, program_type, main_package) -> Dict:
"""Get resource info from java gateway, contains resource id, name."""
if not self._resource:
self._resource = launch_gateway().entry_point.getResourcesFileInfo(
program_type,
main_package,
)
return self._resource

View File

@ -0,0 +1,94 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task Spark."""
from typing import Optional
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.engine import Engine, ProgramType
class SparkVersion(str):
"""Spark version, for now it just contain `SPARK1` and `SPARK2`."""
SPARK1 = "SPARK1"
SPARK2 = "SPARK2"
class DeployMode(str):
"""SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`."""
LOCAL = "local"
CLIENT = "client"
CLUSTER = "cluster"
class Spark(Engine):
"""Task spark object, declare behavior for spark task to dolphinscheduler."""
_task_custom_attr = {
"deploy_mode",
"spark_version",
"driver_cores",
"driver_memory",
"num_executors",
"executor_memory",
"executor_cores",
"app_name",
"main_args",
"others",
}
def __init__(
self,
name: str,
main_class: str,
main_package: str,
program_type: Optional[ProgramType] = ProgramType.SCALA,
deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER,
spark_version: Optional[SparkVersion] = SparkVersion.SPARK2,
app_name: Optional[str] = None,
driver_cores: Optional[int] = 1,
driver_memory: Optional[str] = "512M",
num_executors: Optional[int] = 2,
executor_memory: Optional[str] = "2G",
executor_cores: Optional[int] = 2,
main_args: Optional[str] = None,
others: Optional[str] = None,
*args,
**kwargs
):
super().__init__(
name,
TaskType.SPARK,
main_class,
main_package,
program_type,
*args,
**kwargs
)
self.deploy_mode = deploy_mode
self.spark_version = spark_version
self.app_name = app_name
self.driver_cores = driver_cores
self.driver_memory = driver_memory
self.num_executors = num_executors
self.executor_memory = executor_memory
self.executor_cores = executor_cores
self.main_args = main_args
self.others = others

View File

@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task Engine."""
from unittest.mock import patch
import pytest
from pydolphinscheduler.core.engine import Engine, ProgramType
TEST_ENGINE_TASK_TYPE = "ENGINE"
TEST_MAIN_CLASS = "org.apache.examples.mock.Mock"
TEST_MAIN_PACKAGE = "Mock.jar"
TEST_PROGRAM_TYPE = ProgramType.JAVA
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "mock_name"}),
)
def test_get_jar_detail(mock_resource, mock_code_version):
"""Test :func:`get_jar_id` can return expect value."""
name = "test_get_jar_detail"
task = Engine(
name,
TEST_ENGINE_TASK_TYPE,
TEST_MAIN_CLASS,
TEST_MAIN_PACKAGE,
TEST_PROGRAM_TYPE,
)
assert 1 == task.get_jar_id()
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "test-task-params",
"task_type": "test-engine",
"main_class": "org.apache.examples.mock.Mock",
"main_package": "TestMock.jar",
"program_type": ProgramType.JAVA,
},
{
"mainClass": "org.apache.examples.mock.Mock",
"mainJar": {
"id": 1,
},
"programType": ProgramType.JAVA,
"localParams": [],
"resourceList": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "mock_name"}),
)
def test_property_task_params(mock_resource, mock_code_version, attr, expect):
"""Test task engine task property."""
task = Engine(**attr)
assert expect == task.task_params
@pytest.mark.parametrize(
"attr, expect",
[
(
{
"name": "test-task-test_engine_get_define",
"task_type": "test-engine",
"main_class": "org.apache.examples.mock.Mock",
"main_package": "TestMock.jar",
"program_type": ProgramType.JAVA,
},
{
"code": 123,
"name": "test-task-test_engine_get_define",
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "test-engine",
"taskParams": {
"mainClass": "org.apache.examples.mock.Mock",
"mainJar": {
"id": 1,
},
"programType": ProgramType.JAVA,
"localParams": [],
"resourceList": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
},
)
],
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
@patch(
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "mock_name"}),
)
def test_engine_get_define(mock_resource, mock_code_version, attr, expect):
"""Test task engine function get_define."""
task = Engine(**attr)
assert task.get_define() == expect

View File

@ -119,6 +119,4 @@ def test_custom_datax_get_define(json_template):
return_value=(code, version),
):
task = CustomDataX(name, json_template)
print(task.get_define())
print(expect)
assert task.get_define() == expect

View File

@ -23,7 +23,7 @@ from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, Prog
@patch(
"pydolphinscheduler.tasks.flink.Flink.get_resource_info",
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "test"}),
)
def test_flink_get_define(mock_resource):

View File

@ -0,0 +1,82 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task Spark."""
from unittest.mock import patch
from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion
@patch(
"pydolphinscheduler.core.engine.Engine.get_resource_info",
return_value=({"id": 1, "name": "test"}),
)
def test_spark_get_define(mock_resource):
"""Test task spark function get_define."""
code = 123
version = 1
name = "test_spark_get_define"
main_class = "org.apache.spark.test_main_class"
main_package = "test_main_package"
program_type = ProgramType.JAVA
deploy_mode = DeployMode.LOCAL
expect = {
"code": code,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "SPARK",
"taskParams": {
"mainClass": main_class,
"mainJar": {
"id": 1,
},
"programType": program_type,
"deployMode": deploy_mode,
"sparkVersion": SparkVersion.SPARK2,
"driverCores": 1,
"driverMemory": "512M",
"numExecutors": 2,
"executorMemory": "2G",
"executorCores": 2,
"appName": None,
"mainArgs": None,
"others": None,
"localParams": [],
"resourceList": [],
"dependence": {},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
with patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(code, version),
):
task = Spark(name, main_class, main_package, program_type, deploy_mode)
assert task.get_define() == expect

View File

@ -477,7 +477,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
/**
* Get resource by given program type and full name. It return map contain resource id, name.
* Useful in Python API create flink task which need processDefinition information.
* Useful in Python API create flink or spark task which need processDefinition information.
*
* @param programType program type one of SCALA, JAVA and PYTHON
* @param fullName full name of the resource