blob: a7407a2e38cac737f40e0c909398642daa4cbd93 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Module database."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from py4j.java_gateway import JavaObject
from pydolphinscheduler.java_gateway import gateway
from pydolphinscheduler.models.connection import Connection
from pydolphinscheduler.models.meta import ModelMeta
@dataclass
class TaskUsage:
"""Class for task usage just like datasource in web ui."""
id: int
type: str
class Datasource(metaclass=ModelMeta):
"""Model datasource, communicate with DolphinScheduler API server and convert Java Object into Python.
We use metaclass :class:`pydolphinscheduler.models.meta.ModelMeta` to convert Java Object into Python.
And code in this class just call Java API method.
You provider database_name contain connection information, it decisions which
database type and database instance would run task.
:param id_: datasource id, the primary key of table t_ds_datasource.
:param name: datasource name, part of unique key (:param:``type_``, :param:``name``) for datasource
object, we support both query the datasource by name or by (type_ + name). But name must be required
unique when you want to query with the name only.
:param note: datasource description. A note for current datasource.
:param type_: datasource type, part of unique key (:param:``type_``, :param:``name``) for datasource.
It is a datasource type code instead of datasource type name. Optional when you query datasource by
name only. But it must be required when you create it.
:param user_id: user id for who create this datasource.
:param connection_params: datasource connection detail, including protocol, host, port, schema etc.
In json format and just like this:
.. code-block:: json
{
"user": "root",
"password": "mysql",
"address": "jdbc:mysql://127.0.0.1:3306",
"database": "test",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
"driverClassName": "com.mysql.cj.jdbc.Driver",
"validationQuery": "select 1"
}
"""
_PATTERN = re.compile("jdbc:.*://(?P<host>[\\w\\W]+):(?P<port>\\d+)")
_DATABASE_TYPE_MAP = dict(
mysql=0,
postgresql=1,
hive=2,
spark=3,
clickhouse=4,
oracle=5,
sqlserver=6,
db2=7,
presto=8,
h2=9,
redshift=10,
dameng=11,
starrocks=12,
)
def __init__(
self,
type_: str,
name: str,
connection_params: str,
user_id: int | None = None,
id_: int | None = None,
note: str | None = None,
):
self.id = id_
self.name = name
self.note = note
# TODO try to handle type_ in metaclass
self.type_: JavaObject = type_
self.user_id = user_id
self.connection_params = connection_params
@classmethod
def get(
cls, datasource_name: str, datasource_type: str | None = None
) -> Datasource:
"""Get single datasource.
:param datasource_name: datasource name
:param datasource_type: datasource type, if not provided, will get datasource by name only
"""
datasource = gateway.get_datasource(datasource_name, datasource_type)
if datasource is None:
raise ValueError(
f"Datasource with name: {datasource_name} and type: {datasource_type} not found."
)
return datasource
@classmethod
def get_task_usage_4j(
cls, datasource_name: str, datasource_type: str | None = None
) -> TaskUsage:
"""Get the necessary information of datasource for task usage in web UI."""
datasource: Datasource = cls.get(datasource_name, datasource_type)
return TaskUsage(
id=datasource.id,
type=datasource.type.upper(),
)
@property
def connection(self) -> Connection:
"""Parse dolphinscheduler connection_params to Connection."""
data = json.loads(self.connection_params)
address_match = self._PATTERN.match(data.get("jdbcUrl", None)).groupdict()
return Connection(
host=address_match.get("host", None),
port=int(address_match.get("port", None)),
schema=data.get("database", None),
username=data.get("user", None),
password=data.get("password", None),
)
@property
def type(self) -> str:
"""Property datasource type."""
return self.type_.getDescp()
@property
def type_code(self) -> str:
"""Property datasource type."""
return self.type_.getCode()
@property
def host(self) -> str:
"""Property datasource host, such as ``127.0.0.1`` or ``localhosts``."""
return self.connection.host
@property
def port(self) -> int:
"""Property datasource host, such as ``3306`` or ``5432``."""
return int(self.connection.port)
@property
def username(self) -> str:
"""Property datasource username, such as ``root`` or ``postgres``."""
return self.connection.username
@property
def password(self) -> str:
"""Property datasource password."""
return self.connection.password
@property
def schema(self) -> str:
"""Property datasource schema."""
return self.connection.schema