From 027af091c736a801df3082ea5323d6821a7d64dd Mon Sep 17 00:00:00 2001 From: Devosend Date: Sat, 15 Jan 2022 23:24:55 +0800 Subject: [PATCH] [Feature-7346] [Python]Add workflow as code task type spark (#7968) * add spark task * fix code format * add parent class for flink and spark * modify Engine docstring * modify docstring of Engine --- .../examples/task_spark_example.py | 31 ++++ .../src/pydolphinscheduler/constants.py | 1 + .../src/pydolphinscheduler/core/engine.py | 95 +++++++++++ .../src/pydolphinscheduler/tasks/flink.py | 48 ++---- .../src/pydolphinscheduler/tasks/spark.py | 94 +++++++++++ .../tests/core/test_engine.py | 147 ++++++++++++++++++ .../tests/tasks/test_datax.py | 2 - .../tests/tasks/test_flink.py | 2 +- .../tests/tasks/test_spark.py | 82 ++++++++++ .../server/PythonGatewayServer.java | 2 +- 10 files changed, 464 insertions(+), 40 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py new file mode 100644 index 0000000000..6cace77157 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_spark_example.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""A example workflow for task spark.""" + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark + +with ProcessDefinition(name="task_spark_example", tenant="tenant_exists") as pd: + task = Spark( + name="task_spark", + main_class="org.apache.spark.examples.SparkPi", + main_package="spark-examples_2.12-3.2.0.jar", + program_type=ProgramType.JAVA, + deploy_mode=DeployMode.LOCAL, + ) + pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 4619c91068..7f63a82194 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -78,6 +78,7 @@ class TaskType(str): CONDITIONS = "CONDITIONS" SWITCH = "SWITCH" FLINK = "FLINK" + SPARK = "SPARK" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py new file mode 100644 index 0000000000..df84b5ba90 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/engine.py @@ -0,0 +1,95 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Module engine.""" + +from typing import Dict, Optional + +from py4j.protocol import Py4JJavaError + +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.java_gateway import launch_gateway + + +class ProgramType(str): + """Type of program engine runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`.""" + + JAVA = "JAVA" + SCALA = "SCALA" + PYTHON = "PYTHON" + + +class Engine(Task): + """Task engine object, declare behavior for engine task to dolphinscheduler. + + This is the parent class of spark, flink and mr tasks, + and is used to provide the programType, mainClass and mainJar task parameters for reuse. + """ + + def __init__( + self, + name: str, + task_type: str, + main_class: str, + main_package: str, + program_type: Optional[ProgramType] = ProgramType.SCALA, + *args, + **kwargs + ): + super().__init__(name, task_type, *args, **kwargs) + self.main_class = main_class + self.main_package = main_package + self.program_type = program_type + self._resource = {} + + def get_resource_info(self, program_type, main_package): + """Get resource info from java gateway, contains resource id, name.""" + if self._resource: + return self._resource + else: + gateway = launch_gateway() + try: + self._resource = gateway.entry_point.getResourcesFileInfo( + program_type, main_package + ) + # Handler source do not exists error, for now we just terminate the process. + except Py4JJavaError as ex: + raise PyDSParamException(str(ex.java_exception)) + return self._resource + + def get_jar_id(self) -> int: + """Get jar id from java gateway, a wrapper for :func:`get_resource_info`.""" + return self.get_resource_info(self.program_type, self.main_package).get("id") + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for engine children task. + + children task have some specials attribute for task_params, and is odd if we + directly set as python property, so we Override Task.task_params here. + """ + params = super().task_params + custom_params = { + "programType": self.program_type, + "mainClass": self.main_class, + "mainJar": { + "id": self.get_jar_id(), + }, + } + params.update(custom_params) + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py index f732a1540c..83cae956a5 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py @@ -17,19 +17,10 @@ """Task Flink.""" -from typing import Dict, Optional +from typing import Optional from pydolphinscheduler.constants import TaskType -from pydolphinscheduler.core.task import Task -from pydolphinscheduler.java_gateway import launch_gateway - - -class ProgramType(str): - """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`.""" - - JAVA = "JAVA" - SCALA = "SCALA" - PYTHON = "PYTHON" +from pydolphinscheduler.core.engine import Engine, ProgramType class FlinkVersion(str): @@ -46,12 +37,10 @@ class DeployMode(str): CLUSTER = "cluster" -class Flink(Task): +class Flink(Engine): """Task flink object, declare behavior for flink task to dolphinscheduler.""" _task_custom_attr = { - "main_class", - "main_jar", "deploy_mode", "flink_version", "slot", @@ -59,7 +48,6 @@ class Flink(Task): "job_manager_memory", "task_manager_memory", "app_name", - "program_type", "parallelism", "main_args", "others", @@ -84,10 +72,15 @@ class Flink(Task): *args, **kwargs ): - super().__init__(name, TaskType.FLINK, *args, **kwargs) - self.main_class = main_class - self.main_package = main_package - self.program_type = program_type + super().__init__( + name, + TaskType.FLINK, + main_class, + main_package, + program_type, + *args, + **kwargs + ) self.deploy_mode = deploy_mode self.flink_version = flink_version self.app_name = app_name @@ -98,20 +91,3 @@ class Flink(Task): self.parallelism = parallelism self.main_args = main_args self.others = others - self._resource = {} - - @property - def main_jar(self) -> Dict: - """Return main package of dict.""" - resource_info = self.get_resource_info(self.program_type, self.main_package) - return {"id": resource_info.get("id")} - - def get_resource_info(self, program_type, main_package) -> Dict: - """Get resource info from java gateway, contains resource id, name.""" - if not self._resource: - self._resource = launch_gateway().entry_point.getResourcesFileInfo( - program_type, - main_package, - ) - - return self._resource diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py new file mode 100644 index 0000000000..565daad71d --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/spark.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task Spark.""" + +from typing import Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.engine import Engine, ProgramType + + +class SparkVersion(str): + """Spark version, for now it just contain `SPARK1` and `SPARK2`.""" + + SPARK1 = "SPARK1" + SPARK2 = "SPARK2" + + +class DeployMode(str): + """SPARK deploy mode, for now it just contain `LOCAL`, `CLIENT` and `CLUSTER`.""" + + LOCAL = "local" + CLIENT = "client" + CLUSTER = "cluster" + + +class Spark(Engine): + """Task spark object, declare behavior for spark task to dolphinscheduler.""" + + _task_custom_attr = { + "deploy_mode", + "spark_version", + "driver_cores", + "driver_memory", + "num_executors", + "executor_memory", + "executor_cores", + "app_name", + "main_args", + "others", + } + + def __init__( + self, + name: str, + main_class: str, + main_package: str, + program_type: Optional[ProgramType] = ProgramType.SCALA, + deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER, + spark_version: Optional[SparkVersion] = SparkVersion.SPARK2, + app_name: Optional[str] = None, + driver_cores: Optional[int] = 1, + driver_memory: Optional[str] = "512M", + num_executors: Optional[int] = 2, + executor_memory: Optional[str] = "2G", + executor_cores: Optional[int] = 2, + main_args: Optional[str] = None, + others: Optional[str] = None, + *args, + **kwargs + ): + super().__init__( + name, + TaskType.SPARK, + main_class, + main_package, + program_type, + *args, + **kwargs + ) + self.deploy_mode = deploy_mode + self.spark_version = spark_version + self.app_name = app_name + self.driver_cores = driver_cores + self.driver_memory = driver_memory + self.num_executors = num_executors + self.executor_memory = executor_memory + self.executor_cores = executor_cores + self.main_args = main_args + self.others = others diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py new file mode 100644 index 0000000000..e36c47ba1b --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_engine.py @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Engine.""" + + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.core.engine import Engine, ProgramType + +TEST_ENGINE_TASK_TYPE = "ENGINE" +TEST_MAIN_CLASS = "org.apache.examples.mock.Mock" +TEST_MAIN_PACKAGE = "Mock.jar" +TEST_PROGRAM_TYPE = ProgramType.JAVA + + +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "mock_name"}), +) +def test_get_jar_detail(mock_resource, mock_code_version): + """Test :func:`get_jar_id` can return expect value.""" + name = "test_get_jar_detail" + task = Engine( + name, + TEST_ENGINE_TASK_TYPE, + TEST_MAIN_CLASS, + TEST_MAIN_PACKAGE, + TEST_PROGRAM_TYPE, + ) + assert 1 == task.get_jar_id() + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "test-task-params", + "task_type": "test-engine", + "main_class": "org.apache.examples.mock.Mock", + "main_package": "TestMock.jar", + "program_type": ProgramType.JAVA, + }, + { + "mainClass": "org.apache.examples.mock.Mock", + "mainJar": { + "id": 1, + }, + "programType": ProgramType.JAVA, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "mock_name"}), +) +def test_property_task_params(mock_resource, mock_code_version, attr, expect): + """Test task engine task property.""" + task = Engine(**attr) + assert expect == task.task_params + + +@pytest.mark.parametrize( + "attr, expect", + [ + ( + { + "name": "test-task-test_engine_get_define", + "task_type": "test-engine", + "main_class": "org.apache.examples.mock.Mock", + "main_package": "TestMock.jar", + "program_type": ProgramType.JAVA, + }, + { + "code": 123, + "name": "test-task-test_engine_get_define", + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "test-engine", + "taskParams": { + "mainClass": "org.apache.examples.mock.Mock", + "mainJar": { + "id": 1, + }, + "programType": ProgramType.JAVA, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + }, + ) + ], +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "mock_name"}), +) +def test_engine_get_define(mock_resource, mock_code_version, attr, expect): + """Test task engine function get_define.""" + task = Engine(**attr) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py index 7fa4569ad4..9473f57321 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_datax.py @@ -119,6 +119,4 @@ def test_custom_datax_get_define(json_template): return_value=(code, version), ): task = CustomDataX(name, json_template) - print(task.get_define()) - print(expect) assert task.get_define() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py index 743bdae3fa..92ae3ba91f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py @@ -23,7 +23,7 @@ from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, Prog @patch( - "pydolphinscheduler.tasks.flink.Flink.get_resource_info", + "pydolphinscheduler.core.engine.Engine.get_resource_info", return_value=({"id": 1, "name": "test"}), ) def test_flink_get_define(mock_resource): diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py new file mode 100644 index 0000000000..3b0672f963 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_spark.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Spark.""" + +from unittest.mock import patch + +from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark, SparkVersion + + +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "test"}), +) +def test_spark_get_define(mock_resource): + """Test task spark function get_define.""" + code = 123 + version = 1 + name = "test_spark_get_define" + main_class = "org.apache.spark.test_main_class" + main_package = "test_main_package" + program_type = ProgramType.JAVA + deploy_mode = DeployMode.LOCAL + + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "SPARK", + "taskParams": { + "mainClass": main_class, + "mainJar": { + "id": 1, + }, + "programType": program_type, + "deployMode": deploy_mode, + "sparkVersion": SparkVersion.SPARK2, + "driverCores": 1, + "driverMemory": "512M", + "numExecutors": 2, + "executorMemory": "2G", + "executorCores": 2, + "appName": None, + "mainArgs": None, + "others": None, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = Spark(name, main_class, main_package, program_type, deploy_mode) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 9489894f0e..2b062866e8 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -477,7 +477,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer { /** * Get resource by given program type and full name. It return map contain resource id, name. - * Useful in Python API create flink task which need processDefinition information. + * Useful in Python API create flink or spark task which need processDefinition information. * * @param programType program type one of SCALA, JAVA and PYTHON * @param fullName full name of the resource