blob: b72b006c601c03190d0839d6f7951c70a3cad57d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
from datetime import datetime
from typing import Any, Callable, Dict, Match, Optional, Pattern, Tuple, Union
from urllib import parse
from flask_babel import gettext as __
from sqlalchemy.dialects.mysql import (
BIT,
DECIMAL,
DOUBLE,
FLOAT,
INTEGER,
LONGTEXT,
MEDIUMINT,
MEDIUMTEXT,
TINYINT,
TINYTEXT,
)
from sqlalchemy.engine.url import URL
from sqlalchemy.types import TypeEngine
from superset.db_engine_specs.base import BaseEngineSpec
from superset.errors import SupersetErrorType
from superset.utils import core as utils
from superset.utils.core import ColumnSpec, GenericDataType
# Regular expressions to catch custom errors
CONNECTION_ACCESS_DENIED_REGEX = re.compile(
"Access denied for user '(?P<username>.*?)'@'(?P<hostname>.*?)'. "
)
CONNECTION_INVALID_HOSTNAME_REGEX = re.compile(
"Unknown MySQL server host '(?P<hostname>.*?)'."
)
CONNECTION_HOST_DOWN_REGEX = re.compile(
"Can't connect to MySQL server on '(?P<hostname>.*?)'."
)
CONNECTION_UNKNOWN_DATABASE_REGEX = re.compile("Unknown database '(?P<database>.*?)'.")
class MySQLEngineSpec(BaseEngineSpec):
engine = "mysql"
engine_name = "MySQL"
max_column_name_length = 64
column_type_mappings: Tuple[
Tuple[
Pattern[str],
Union[TypeEngine, Callable[[Match[str]], TypeEngine]],
GenericDataType,
],
...,
] = (
(re.compile(r"^int.*", re.IGNORECASE), INTEGER(), GenericDataType.NUMERIC,),
(re.compile(r"^tinyint", re.IGNORECASE), TINYINT(), GenericDataType.NUMERIC,),
(
re.compile(r"^mediumint", re.IGNORECASE),
MEDIUMINT(),
GenericDataType.NUMERIC,
),
(re.compile(r"^decimal", re.IGNORECASE), DECIMAL(), GenericDataType.NUMERIC,),
(re.compile(r"^float", re.IGNORECASE), FLOAT(), GenericDataType.NUMERIC,),
(re.compile(r"^double", re.IGNORECASE), DOUBLE(), GenericDataType.NUMERIC,),
(re.compile(r"^bit", re.IGNORECASE), BIT(), GenericDataType.NUMERIC,),
(re.compile(r"^tinytext", re.IGNORECASE), TINYTEXT(), GenericDataType.STRING,),
(
re.compile(r"^mediumtext", re.IGNORECASE),
MEDIUMTEXT(),
GenericDataType.STRING,
),
(re.compile(r"^longtext", re.IGNORECASE), LONGTEXT(), GenericDataType.STRING,),
)
_time_grain_expressions = {
None: "{col}",
"PT1S": "DATE_ADD(DATE({col}), "
"INTERVAL (HOUR({col})*60*60 + MINUTE({col})*60"
" + SECOND({col})) SECOND)",
"PT1M": "DATE_ADD(DATE({col}), "
"INTERVAL (HOUR({col})*60 + MINUTE({col})) MINUTE)",
"PT1H": "DATE_ADD(DATE({col}), " "INTERVAL HOUR({col}) HOUR)",
"P1D": "DATE({col})",
"P1W": "DATE(DATE_SUB({col}, " "INTERVAL DAYOFWEEK({col}) - 1 DAY))",
"P1M": "DATE(DATE_SUB({col}, " "INTERVAL DAYOFMONTH({col}) - 1 DAY))",
"P0.25Y": "MAKEDATE(YEAR({col}), 1) "
"+ INTERVAL QUARTER({col}) QUARTER - INTERVAL 1 QUARTER",
"P1Y": "DATE(DATE_SUB({col}, " "INTERVAL DAYOFYEAR({col}) - 1 DAY))",
"1969-12-29T00:00:00Z/P1W": "DATE(DATE_SUB({col}, "
"INTERVAL DAYOFWEEK(DATE_SUB({col}, "
"INTERVAL 1 DAY)) - 1 DAY))",
}
type_code_map: Dict[int, str] = {} # loaded from get_datatype only if needed
custom_errors = {
CONNECTION_ACCESS_DENIED_REGEX: (
__('Either the username "%(username)s" or the password is incorrect.'),
SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
),
CONNECTION_INVALID_HOSTNAME_REGEX: (
__('Unknown MySQL server host "%(hostname)s".'),
SupersetErrorType.CONNECTION_INVALID_HOSTNAME_ERROR,
),
CONNECTION_HOST_DOWN_REGEX: (
__('The host "%(hostname)s" might be down and can\'t be reached.'),
SupersetErrorType.CONNECTION_HOST_DOWN_ERROR,
),
CONNECTION_UNKNOWN_DATABASE_REGEX: (
__('Unable to connect to database "%(database)s".'),
SupersetErrorType.CONNECTION_UNKNOWN_DATABASE_ERROR,
),
}
@classmethod
def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]:
tt = target_type.upper()
if tt == utils.TemporalType.DATE:
return f"STR_TO_DATE('{dttm.date().isoformat()}', '%Y-%m-%d')"
if tt == utils.TemporalType.DATETIME:
datetime_formatted = dttm.isoformat(sep=" ", timespec="microseconds")
return f"""STR_TO_DATE('{datetime_formatted}', '%Y-%m-%d %H:%i:%s.%f')"""
return None
@classmethod
def adjust_database_uri(
cls, uri: URL, selected_schema: Optional[str] = None
) -> None:
if selected_schema:
uri.database = parse.quote(selected_schema, safe="")
@classmethod
def get_datatype(cls, type_code: Any) -> Optional[str]:
if not cls.type_code_map:
# only import and store if needed at least once
import MySQLdb
ft = MySQLdb.constants.FIELD_TYPE
cls.type_code_map = {
getattr(ft, k): k for k in dir(ft) if not k.startswith("_")
}
datatype = type_code
if isinstance(type_code, int):
datatype = cls.type_code_map.get(type_code)
if datatype and isinstance(datatype, str) and datatype:
return datatype
return None
@classmethod
def epoch_to_dttm(cls) -> str:
return "from_unixtime({col})"
@classmethod
def _extract_error_message(cls, ex: Exception) -> str:
"""Extract error message for queries"""
message = str(ex)
try:
if isinstance(ex.args, tuple) and len(ex.args) > 1:
message = ex.args[1]
except (AttributeError, KeyError):
pass
return message
@classmethod
def get_column_spec( # type: ignore
cls,
native_type: Optional[str],
source: utils.ColumnTypeSource = utils.ColumnTypeSource.GET_TABLE,
column_type_mappings: Tuple[
Tuple[
Pattern[str],
Union[TypeEngine, Callable[[Match[str]], TypeEngine]],
GenericDataType,
],
...,
] = column_type_mappings,
) -> Union[ColumnSpec, None]:
column_spec = super().get_column_spec(native_type)
if column_spec:
return column_spec
return super().get_column_spec(
native_type, column_type_mappings=column_type_mappings
)