| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """Task sql.""" |
| from __future__ import annotations |
| |
| import logging |
| import re |
| from collections.abc import Sequence |
| |
| from pydolphinscheduler.constants import TaskType |
| from pydolphinscheduler.core.task import BatchTask |
| from pydolphinscheduler.models.datasource import Datasource |
| |
| log = logging.getLogger(__file__) |
| |
| |
| class SqlType: |
| """SQL type, for now it just contain `SELECT` and `NO_SELECT`.""" |
| |
| SELECT = "0" |
| NOT_SELECT = "1" |
| |
| |
| class Sql(BatchTask): |
| """Task SQL object, declare behavior for SQL task to dolphinscheduler. |
| |
| It should run sql job in multiply sql lik engine, such as: |
| - ClickHouse |
| - DB2 |
| - HIVE |
| - MySQL |
| - Oracle |
| - Postgresql |
| - Presto |
| - SQLServer |
| You provider datasource_name contain connection information, it decisions which |
| database type and database instance would run this sql. |
| |
| :param name: SQL task name |
| :param datasource_name: datasource name in dolphinscheduler, the name must exists and must be ``online`` |
| datasource instead of ``test``. |
| :param sql: SQL statement, the sql script you want to run. Support resource plugin in this parameter. |
| :param sql_type: SQL type, whether sql statement is select query or not. If not provided, it will be auto |
| detected according to sql statement using :func:`pydolphinscheduler.tasks.sql.Sql.sql_type`, and you |
| can also set it manually. by ``SqlType.SELECT`` for query statement or ``SqlType.NOT_SELECT`` for not |
| query statement. |
| :param pre_statements: SQL statements to be executed before the main SQL statement. |
| :param post_statements: SQL statements to be executed after the main SQL statement. |
| :param display_rows: The number of record rows number to be displayed in the SQL task log, default is 10. |
| """ |
| |
| _task_custom_attr = { |
| "sql", |
| "sql_type", |
| "pre_statements", |
| "post_statements", |
| "display_rows", |
| } |
| |
| ext: set = {".sql"} |
| ext_attr: str = "_sql" |
| |
| def __init__( |
| self, |
| name: str, |
| datasource_name: str, |
| sql: str, |
| datasource_type: str | None = None, |
| sql_type: str | None = None, |
| pre_statements: str | Sequence[str] | None = None, |
| post_statements: str | Sequence[str] | None = None, |
| display_rows: int | None = 10, |
| *args, |
| **kwargs, |
| ): |
| self._sql = sql |
| super().__init__(name, TaskType.SQL, *args, **kwargs) |
| self.param_sql_type = sql_type |
| self.datasource_name = datasource_name |
| self.datasource_type = datasource_type |
| self.pre_statements = self.get_stm_list(pre_statements) |
| self.post_statements = self.get_stm_list(post_statements) |
| self.display_rows = display_rows |
| |
| @staticmethod |
| def get_stm_list(stm: str | Sequence[str]) -> list[str]: |
| """Convert statement to str of list. |
| |
| :param stm: statements string |
| :return: statements list |
| """ |
| if not stm: |
| return [] |
| elif isinstance(stm, str): |
| return [stm] |
| return list(stm) |
| |
| @property |
| def sql_type(self) -> str: |
| """Judgement sql type, it will return the SQL type for type `SELECT` or `NOT_SELECT`. |
| |
| If `param_sql_type` dot not specific, will use regexp to check |
| which type of the SQL is. But if `param_sql_type` is specific |
| will use the parameter overwrites the regexp way |
| """ |
| if ( |
| self.param_sql_type == SqlType.SELECT |
| or self.param_sql_type == SqlType.NOT_SELECT |
| ): |
| log.info( |
| "The sql type is specified by a parameter, with value %s", |
| self.param_sql_type, |
| ) |
| return self.param_sql_type |
| pattern_select_str = ( |
| "^(?!(.* |)insert |(.* |)delete |(.* |)drop " |
| "|(.* |)update |(.* |)truncate |(.* |)alter |(.* |)create ).*" |
| ) |
| pattern_select = re.compile(pattern_select_str, re.IGNORECASE) |
| if pattern_select.match(self._sql) is None: |
| return SqlType.NOT_SELECT |
| else: |
| return SqlType.SELECT |
| |
| @property |
| def datasource(self) -> dict: |
| """Get datasource for procedure sql.""" |
| datasource_task_u = Datasource.get_task_usage_4j( |
| self.datasource_name, self.datasource_type |
| ) |
| return { |
| "datasource": datasource_task_u.id, |
| "type": datasource_task_u.type, |
| } |
| |
| @property |
| def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> dict: |
| """Override Task.task_params for sql task. |
| |
| sql task have some specials attribute for task_params, and is odd if we |
| directly set as python property, so we Override Task.task_params here. |
| """ |
| params = super().task_params |
| params.update(self.datasource) |
| return params |