| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """Module database.""" |
| from __future__ import annotations |
| |
| import json |
| import re |
| from dataclasses import dataclass |
| |
| from py4j.java_gateway import JavaObject |
| |
| from pydolphinscheduler.java_gateway import gateway |
| from pydolphinscheduler.models.connection import Connection |
| from pydolphinscheduler.models.meta import ModelMeta |
| |
| |
| @dataclass |
| class TaskUsage: |
| """Class for task usage just like datasource in web ui.""" |
| |
| id: int |
| type: str |
| |
| |
| class Datasource(metaclass=ModelMeta): |
| """Model datasource, communicate with DolphinScheduler API server and convert Java Object into Python. |
| |
| We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to convert Java Object into Python. |
| And code in this class just call Java API method. |
| |
| You provider database_name contain connection information, it decisions which |
| database type and database instance would run task. |
| |
| :param id_: datasource id, the primary key of table t_ds_datasource. |
| :param name: datasource name, part of unique key (:param:``type_``, :param:``name``) for datasource |
| object, we support both query the datasource by name or by (type_ + name). But name must be required |
| unique when you want to query with the name only. |
| :param note: datasource description. A note for current datasource. |
| :param type_: datasource type, part of unique key (:param:``type_``, :param:``name``) for datasource. |
| It is a datasource type code instead of datasource type name. Optional when you query datasource by |
| name only. But it must be required when you create it. |
| :param user_id: user id for who create this datasource. |
| :param connection_params: datasource connection detail, including protocol, host, port, schema etc. |
| In json format and just like this: |
| |
| .. code-block:: json |
| |
| { |
| "user": "root", |
| "password": "mysql", |
| "address": "jdbc:mysql://127.0.0.1:3306", |
| "database": "test", |
| "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test", |
| "driverClassName": "com.mysql.cj.jdbc.Driver", |
| "validationQuery": "select 1" |
| } |
| |
| """ |
| |
| _PATTERN = re.compile("jdbc:.*://(?P<host>[\\w\\W]+):(?P<port>\\d+)") |
| |
| _DATABASE_TYPE_MAP = dict( |
| mysql=0, |
| postgresql=1, |
| hive=2, |
| spark=3, |
| clickhouse=4, |
| oracle=5, |
| sqlserver=6, |
| db2=7, |
| presto=8, |
| h2=9, |
| redshift=10, |
| dameng=11, |
| starrocks=12, |
| ) |
| |
| def __init__( |
| self, |
| type_: str, |
| name: str, |
| connection_params: str, |
| user_id: int | None = None, |
| id_: int | None = None, |
| note: str | None = None, |
| ): |
| self.id = id_ |
| self.name = name |
| self.note = note |
| # TODO try to handle type_ in metaclass |
| self.type_: JavaObject = type_ |
| self.user_id = user_id |
| self.connection_params = connection_params |
| |
| @classmethod |
| def get( |
| cls, datasource_name: str, datasource_type: str | None = None |
| ) -> Datasource: |
| """Get single datasource. |
| |
| :param datasource_name: datasource name |
| :param datasource_type: datasource type, if not provided, will get datasource by name only |
| """ |
| datasource = gateway.get_datasource(datasource_name, datasource_type) |
| if datasource is None: |
| raise ValueError( |
| f"Datasource with name: {datasource_name} and type: {datasource_type} not found." |
| ) |
| return datasource |
| |
| @classmethod |
| def get_task_usage_4j( |
| cls, datasource_name: str, datasource_type: str | None = None |
| ) -> TaskUsage: |
| """Get the necessary information of datasource for task usage in web UI.""" |
| datasource: Datasource = cls.get(datasource_name, datasource_type) |
| return TaskUsage( |
| id=datasource.id, |
| type=datasource.type.upper(), |
| ) |
| |
| @property |
| def connection(self) -> Connection: |
| """Parse dolphinscheduler connection_params to Connection.""" |
| data = json.loads(self.connection_params) |
| |
| address_match = self._PATTERN.match(data.get("jdbcUrl", None)).groupdict() |
| |
| return Connection( |
| host=address_match.get("host", None), |
| port=int(address_match.get("port", None)), |
| schema=data.get("database", None), |
| username=data.get("user", None), |
| password=data.get("password", None), |
| ) |
| |
| @property |
| def type(self) -> str: |
| """Property datasource type.""" |
| return self.type_.getDescp() |
| |
| @property |
| def type_code(self) -> str: |
| """Property datasource type.""" |
| return self.type_.getCode() |
| |
| @property |
| def host(self) -> str: |
| """Property datasource host, such as ``127.0.0.1`` or ``localhosts``.""" |
| return self.connection.host |
| |
| @property |
| def port(self) -> int: |
| """Property datasource host, such as ``3306`` or ``5432``.""" |
| return int(self.connection.port) |
| |
| @property |
| def username(self) -> str: |
| """Property datasource username, such as ``root`` or ``postgres``.""" |
| return self.connection.username |
| |
| @property |
| def password(self) -> str: |
| """Property datasource password.""" |
| return self.connection.password |
| |
| @property |
| def schema(self) -> str: |
| """Property datasource schema.""" |
| return self.connection.schema |