mirror of
https://gitee.com/dolphinscheduler/DolphinScheduler.git
synced 2024-12-02 12:17:43 +08:00
[Improve][python] Support create table syntax and custom sql type param (#9673)
This commit is contained in:
parent
cd82f45d5e
commit
8a8b63cd96
@ -17,6 +17,7 @@
|
||||
|
||||
"""Task sql."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Dict, Optional
|
||||
|
||||
@ -24,6 +25,8 @@ from pydolphinscheduler.constants import TaskType
|
||||
from pydolphinscheduler.core.database import Database
|
||||
from pydolphinscheduler.core.task import Task
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
class SqlType:
|
||||
"""SQL type, for now it just contain `SELECT` and `NO_SELECT`."""
|
||||
@ -61,6 +64,7 @@ class Sql(Task):
|
||||
name: str,
|
||||
datasource_name: str,
|
||||
sql: str,
|
||||
sql_type: Optional[int] = None,
|
||||
pre_statements: Optional[str] = None,
|
||||
post_statements: Optional[str] = None,
|
||||
display_rows: Optional[int] = 10,
|
||||
@ -69,6 +73,7 @@ class Sql(Task):
|
||||
):
|
||||
super().__init__(name, TaskType.SQL, *args, **kwargs)
|
||||
self.sql = sql
|
||||
self.param_sql_type = sql_type
|
||||
self.datasource_name = datasource_name
|
||||
self.pre_statements = pre_statements or []
|
||||
self.post_statements = post_statements or []
|
||||
@ -76,9 +81,24 @@ class Sql(Task):
|
||||
|
||||
@property
|
||||
def sql_type(self) -> int:
|
||||
"""Judgement sql type, use regexp to check which type of the sql is."""
|
||||
"""Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`.
|
||||
|
||||
If `param_sql_type` dot not specific, will use regexp to check
|
||||
which type of the SQL is. But if `param_sql_type` is specific
|
||||
will use the parameter overwrites the regexp way
|
||||
"""
|
||||
if (
|
||||
self.param_sql_type == SqlType.SELECT
|
||||
or self.param_sql_type == SqlType.NOT_SELECT
|
||||
):
|
||||
log.info(
|
||||
"The sql type is specified by a parameter, with value %s",
|
||||
self.param_sql_type,
|
||||
)
|
||||
return self.param_sql_type
|
||||
pattern_select_str = (
|
||||
"^(?!(.* |)insert |(.* |)delete |(.* |)drop |(.* |)update |(.* |)alter ).*"
|
||||
"^(?!(.* |)insert |(.* |)delete |(.* |)drop "
|
||||
"|(.* |)update |(.* |)alter |(.* |)create ).*"
|
||||
)
|
||||
pattern_select = re.compile(pattern_select_str, re.IGNORECASE)
|
||||
if pattern_select.match(self.sql) is None:
|
||||
|
@ -26,24 +26,38 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"sql, sql_type",
|
||||
"sql, param_sql_type, sql_type",
|
||||
[
|
||||
("select 1", SqlType.SELECT),
|
||||
(" select 1", SqlType.SELECT),
|
||||
(" select 1 ", SqlType.SELECT),
|
||||
(" select 'insert' ", SqlType.SELECT),
|
||||
(" select 'insert ' ", SqlType.SELECT),
|
||||
("with tmp as (select 1) select * from tmp ", SqlType.SELECT),
|
||||
("insert into table_name(col1, col2) value (val1, val2)", SqlType.NOT_SELECT),
|
||||
("select 1", None, SqlType.SELECT),
|
||||
(" select 1", None, SqlType.SELECT),
|
||||
(" select 1 ", None, SqlType.SELECT),
|
||||
(" select 'insert' ", None, SqlType.SELECT),
|
||||
(" select 'insert ' ", None, SqlType.SELECT),
|
||||
("with tmp as (select 1) select * from tmp ", None, SqlType.SELECT),
|
||||
(
|
||||
"insert into table_name(select, col2) value ('select', val2)",
|
||||
"insert into table_name(col1, col2) value (val1, val2)",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("update table_name SET col1=val1 where col1=val2", SqlType.NOT_SELECT),
|
||||
("update table_name SET col1='select' where col1=val2", SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", SqlType.NOT_SELECT),
|
||||
("alter table table_name add column col1 int", SqlType.NOT_SELECT),
|
||||
(
|
||||
"insert into table_name(select, col2) value ('select', val2)",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("update table_name SET col1=val1 where col1=val2", None, SqlType.NOT_SELECT),
|
||||
(
|
||||
"update table_name SET col1='select' where col1=val2",
|
||||
None,
|
||||
SqlType.NOT_SELECT,
|
||||
),
|
||||
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
|
||||
("delete from table_name where id < 10", None, SqlType.NOT_SELECT),
|
||||
("alter table table_name add column col1 int", None, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", None, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", SqlType.SELECT, SqlType.SELECT),
|
||||
("select 1", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
|
||||
("create table table_name2 (col1 int)", SqlType.NOT_SELECT, SqlType.NOT_SELECT),
|
||||
("select 1", SqlType.SELECT, SqlType.SELECT),
|
||||
],
|
||||
)
|
||||
@patch(
|
||||
@ -54,11 +68,13 @@ from pydolphinscheduler.tasks.sql import Sql, SqlType
|
||||
"pydolphinscheduler.core.database.Database.get_database_info",
|
||||
return_value=({"id": 1, "type": "mock_type"}),
|
||||
)
|
||||
def test_get_sql_type(mock_datasource, mock_code_version, sql, sql_type):
|
||||
def test_get_sql_type(
|
||||
mock_datasource, mock_code_version, sql, param_sql_type, sql_type
|
||||
):
|
||||
"""Test property sql_type could return correct type."""
|
||||
name = "test_get_sql_type"
|
||||
datasource_name = "test_datasource"
|
||||
task = Sql(name, datasource_name, sql)
|
||||
task = Sql(name, datasource_name, sql, sql_type=param_sql_type)
|
||||
assert (
|
||||
sql_type == task.sql_type
|
||||
), f"Sql {sql} expect sql type is {sql_type} but got {task.sql_type}"
|
||||
|
Loading…
Reference in New Issue
Block a user