| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from datetime import datetime |
| from typing import Any, Dict, List, Optional |
| from urllib import parse |
| |
| import simplejson as json |
| from sqlalchemy.engine.url import make_url, URL |
| |
| from superset.db_engine_specs.base import BaseEngineSpec |
| from superset.utils import core as utils |
| |
| |
| class TrinoEngineSpec(BaseEngineSpec): |
| engine = "trino" |
| engine_name = "Trino" |
| |
| # pylint: disable=line-too-long |
| _time_grain_expressions = { |
| None: "{col}", |
| "PT1S": "date_trunc('second', CAST({col} AS TIMESTAMP))", |
| "PT1M": "date_trunc('minute', CAST({col} AS TIMESTAMP))", |
| "PT1H": "date_trunc('hour', CAST({col} AS TIMESTAMP))", |
| "P1D": "date_trunc('day', CAST({col} AS TIMESTAMP))", |
| "P1W": "date_trunc('week', CAST({col} AS TIMESTAMP))", |
| "P1M": "date_trunc('month', CAST({col} AS TIMESTAMP))", |
| "P0.25Y": "date_trunc('quarter', CAST({col} AS TIMESTAMP))", |
| "P1Y": "date_trunc('year', CAST({col} AS TIMESTAMP))", |
| # "1969-12-28T00:00:00Z/P1W", # Week starting Sunday |
| # "1969-12-29T00:00:00Z/P1W", # Week starting Monday |
| # "P1W/1970-01-03T00:00:00Z", # Week ending Saturday |
| # "P1W/1970-01-04T00:00:00Z", # Week ending Sunday |
| } |
| |
| @classmethod |
| def convert_dttm(cls, target_type: str, dttm: datetime) -> Optional[str]: |
| tt = target_type.upper() |
| if tt == utils.TemporalType.DATE: |
| value = dttm.date().isoformat() |
| return f"from_iso8601_date('{value}')" |
| if tt == utils.TemporalType.TIMESTAMP: |
| value = dttm.isoformat(timespec="microseconds") |
| return f"from_iso8601_timestamp('{value}')" |
| return None |
| |
| @classmethod |
| def epoch_to_dttm(cls) -> str: |
| return "from_unixtime({col})" |
| |
| @classmethod |
| def adjust_database_uri( |
| cls, uri: URL, selected_schema: Optional[str] = None |
| ) -> None: |
| database = uri.database |
| if selected_schema and database: |
| selected_schema = parse.quote(selected_schema, safe="") |
| database = database.split("/")[0] + "/" + selected_schema |
| uri.database = database |
| |
| @classmethod |
| def update_impersonation_config( |
| cls, connect_args: Dict[str, Any], uri: str, username: Optional[str], |
| ) -> None: |
| """ |
| Update a configuration dictionary |
| that can set the correct properties for impersonating users |
| :param connect_args: config to be updated |
| :param uri: URI string |
| :param impersonate_user: Flag indicating if impersonation is enabled |
| :param username: Effective username |
| :return: None |
| """ |
| url = make_url(uri) |
| backend_name = url.get_backend_name() |
| |
| # Must be Trino connection, enable impersonation, and set optional param |
| # auth=LDAP|KERBEROS |
| # Set principal_username=$effective_username |
| if backend_name == "trino" and username is not None: |
| connect_args["user"] = username |
| |
| @classmethod |
| def modify_url_for_impersonation( |
| cls, url: URL, impersonate_user: bool, username: Optional[str] |
| ) -> None: |
| """ |
| Modify the SQL Alchemy URL object with the user to impersonate if applicable. |
| :param url: SQLAlchemy URL object |
| :param impersonate_user: Flag indicating if impersonation is enabled |
| :param username: Effective username |
| """ |
| # Do nothing and let update_impersonation_config take care of impersonation |
| |
| @classmethod |
| def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool: |
| return True |
| |
| @classmethod |
| def estimate_statement_cost( # pylint: disable=too-many-locals |
| cls, statement: str, cursor: Any |
| ) -> Dict[str, Any]: |
| """ |
| Run a SQL query that estimates the cost of a given statement. |
| |
| :param statement: A single SQL statement |
| :param database: Database instance |
| :param cursor: Cursor instance |
| :param username: Effective username |
| :return: JSON response from Trino |
| """ |
| sql = f"EXPLAIN (TYPE IO, FORMAT JSON) {statement}" |
| cursor.execute(sql) |
| |
| # the output from Trino is a single column and a single row containing |
| # JSON: |
| # |
| # { |
| # ... |
| # "estimate" : { |
| # "outputRowCount" : 8.73265878E8, |
| # "outputSizeInBytes" : 3.41425774958E11, |
| # "cpuCost" : 3.41425774958E11, |
| # "maxMemory" : 0.0, |
| # "networkCost" : 3.41425774958E11 |
| # } |
| # } |
| result = json.loads(cursor.fetchone()[0]) |
| return result |
| |
| @classmethod |
| def query_cost_formatter( |
| cls, raw_cost: List[Dict[str, Any]] |
| ) -> List[Dict[str, str]]: |
| """ |
| Format cost estimate. |
| |
| :param raw_cost: JSON estimate from Trino |
| :return: Human readable cost estimate |
| """ |
| |
| def humanize(value: Any, suffix: str) -> str: |
| try: |
| value = int(value) |
| except ValueError: |
| return str(value) |
| |
| prefixes = ["K", "M", "G", "T", "P", "E", "Z", "Y"] |
| prefix = "" |
| to_next_prefix = 1000 |
| while value > to_next_prefix and prefixes: |
| prefix = prefixes.pop(0) |
| value //= to_next_prefix |
| |
| return f"{value} {prefix}{suffix}" |
| |
| cost = [] |
| columns = [ |
| ("outputRowCount", "Output count", " rows"), |
| ("outputSizeInBytes", "Output size", "B"), |
| ("cpuCost", "CPU cost", ""), |
| ("maxMemory", "Max memory", "B"), |
| ("networkCost", "Network cost", ""), |
| ] |
| for row in raw_cost: |
| estimate: Dict[str, float] = row.get("estimate", {}) |
| statement_cost = {} |
| for key, label, suffix in columns: |
| if key in estimate: |
| statement_cost[label] = humanize(estimate[key], suffix).strip() |
| cost.append(statement_cost) |
| |
| return cost |