| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import dataclasses |
| import logging |
| import uuid |
| from contextlib import closing |
| from datetime import datetime |
| from sys import getsizeof |
| from typing import Any, cast, Dict, List, Optional, Tuple, Union |
| |
| import backoff |
| import msgpack |
| import pyarrow as pa |
| import simplejson as json |
| from celery import Task |
| from celery.exceptions import SoftTimeLimitExceeded |
| from flask_babel import gettext as __ |
| from sqlalchemy.orm import Session |
| from werkzeug.local import LocalProxy |
| |
| from superset import app, results_backend, results_backend_use_msgpack, security_manager |
| from superset.dataframe import df_to_records |
| from superset.db_engine_specs import BaseEngineSpec |
| from superset.errors import ErrorLevel, SupersetError, SupersetErrorType |
| from superset.exceptions import SupersetErrorException, SupersetErrorsException |
| from superset.extensions import celery_app |
| from superset.models.core import Database |
| from superset.models.sql_lab import LimitingFactor, Query |
| from superset.result_set import SupersetResultSet |
| from superset.sql_parse import CtasMethod, ParsedQuery |
| from superset.utils.celery import session_scope |
| from superset.utils.core import ( |
| json_iso_dttm_ser, |
| QuerySource, |
| QueryStatus, |
| zlib_compress, |
| ) |
| from superset.utils.dates import now_as_float |
| from superset.utils.decorators import stats_timing |
| |
| |
| # pylint: disable=unused-argument, redefined-outer-name |
| def dummy_sql_query_mutator( |
| sql: str, |
| user_name: Optional[str], |
| security_manager: LocalProxy, |
| database: Database, |
| ) -> str: |
| """A no-op version of SQL_QUERY_MUTATOR""" |
| return sql |
| |
| |
| config = app.config |
| stats_logger = config["STATS_LOGGER"] |
| SQLLAB_TIMEOUT = config["SQLLAB_ASYNC_TIME_LIMIT_SEC"] |
| SQLLAB_HARD_TIMEOUT = SQLLAB_TIMEOUT + 60 |
| SQL_MAX_ROW = config["SQL_MAX_ROW"] |
| SQLLAB_CTAS_NO_LIMIT = config["SQLLAB_CTAS_NO_LIMIT"] |
| SQL_QUERY_MUTATOR = config.get("SQL_QUERY_MUTATOR") or dummy_sql_query_mutator |
| log_query = config["QUERY_LOGGER"] |
| logger = logging.getLogger(__name__) |
| cancel_query_key = "cancel_query" |
| |
| |
| class SqlLabException(Exception): |
| pass |
| |
| |
| class SqlLabSecurityException(SqlLabException): |
| pass |
| |
| |
| class SqlLabQueryStoppedException(SqlLabException): |
| pass |
| |
| |
| def handle_query_error( |
| ex: Exception, |
| query: Query, |
| session: Session, |
| payload: Optional[Dict[str, Any]] = None, |
| prefix_message: str = "", |
| ) -> Dict[str, Any]: |
| """Local method handling error while processing the SQL""" |
| payload = payload or {} |
| msg = f"{prefix_message} {str(ex)}".strip() |
| troubleshooting_link = config["TROUBLESHOOTING_LINK"] |
| query.error_message = msg |
| query.status = QueryStatus.FAILED |
| query.tmp_table_name = None |
| |
| # extract DB-specific errors (invalid column, eg) |
| if isinstance(ex, SupersetErrorException): |
| errors = [ex.error] |
| elif isinstance(ex, SupersetErrorsException): |
| errors = ex.errors |
| else: |
| errors = query.database.db_engine_spec.extract_errors(str(ex)) |
| |
| errors_payload = [dataclasses.asdict(error) for error in errors] |
| if errors: |
| query.set_extra_json_key("errors", errors_payload) |
| |
| session.commit() |
| payload.update({"status": query.status, "error": msg, "errors": errors_payload}) |
| if troubleshooting_link: |
| payload["link"] = troubleshooting_link |
| return payload |
| |
| |
| def get_query_backoff_handler(details: Dict[Any, Any]) -> None: |
| query_id = details["kwargs"]["query_id"] |
| logger.error( |
| "Query with id `%s` could not be retrieved", str(query_id), exc_info=True |
| ) |
| stats_logger.incr("error_attempting_orm_query_{}".format(details["tries"] - 1)) |
| logger.error( |
| "Query %s: Sleeping for a sec before retrying...", str(query_id), exc_info=True |
| ) |
| |
| |
| def get_query_giveup_handler(_: Any) -> None: |
| stats_logger.incr("error_failed_at_getting_orm_query") |
| |
| |
| @backoff.on_exception( |
| backoff.constant, |
| SqlLabException, |
| interval=1, |
| on_backoff=get_query_backoff_handler, |
| on_giveup=get_query_giveup_handler, |
| max_tries=5, |
| ) |
| def get_query(query_id: int, session: Session) -> Query: |
| """attempts to get the query and retry if it cannot""" |
| try: |
| return session.query(Query).filter_by(id=query_id).one() |
| except Exception: |
| raise SqlLabException("Failed at getting query") |
| |
| |
| @celery_app.task( |
| name="sql_lab.get_sql_results", |
| bind=True, |
| time_limit=SQLLAB_HARD_TIMEOUT, |
| soft_time_limit=SQLLAB_TIMEOUT, |
| ) |
| def get_sql_results( # pylint: disable=too-many-arguments |
| ctask: Task, |
| query_id: int, |
| rendered_query: str, |
| return_results: bool = True, |
| store_results: bool = False, |
| user_name: Optional[str] = None, |
| start_time: Optional[float] = None, |
| expand_data: bool = False, |
| log_params: Optional[Dict[str, Any]] = None, |
| ) -> Optional[Dict[str, Any]]: |
| """Executes the sql query returns the results.""" |
| with session_scope(not ctask.request.called_directly) as session: |
| |
| try: |
| return execute_sql_statements( |
| query_id, |
| rendered_query, |
| return_results, |
| store_results, |
| user_name, |
| session=session, |
| start_time=start_time, |
| expand_data=expand_data, |
| log_params=log_params, |
| ) |
| except Exception as ex: # pylint: disable=broad-except |
| logger.debug("Query %d: %s", query_id, ex) |
| stats_logger.incr("error_sqllab_unhandled") |
| query = get_query(query_id, session) |
| return handle_query_error(ex, query, session) |
| |
| |
| # pylint: disable=too-many-arguments, too-many-locals, too-many-statements |
| def execute_sql_statement( |
| sql_statement: str, |
| query: Query, |
| user_name: Optional[str], |
| session: Session, |
| cursor: Any, |
| log_params: Optional[Dict[str, Any]], |
| apply_ctas: bool = False, |
| ) -> SupersetResultSet: |
| """Executes a single SQL statement""" |
| database = query.database |
| db_engine_spec = database.db_engine_spec |
| parsed_query = ParsedQuery(sql_statement) |
| sql = parsed_query.stripped() |
| # This is a test to see if the query is being |
| # limited by either the dropdown or the sql. |
| # We are testing to see if more rows exist than the limit. |
| increased_limit = None if query.limit is None else query.limit + 1 |
| |
| if not db_engine_spec.is_readonly_query(parsed_query) and not database.allow_dml: |
| raise SupersetErrorException( |
| SupersetError( |
| message=__("Only SELECT statements are allowed against this database."), |
| error_type=SupersetErrorType.DML_NOT_ALLOWED_ERROR, |
| level=ErrorLevel.ERROR, |
| ) |
| ) |
| if apply_ctas: |
| if not query.tmp_table_name: |
| start_dttm = datetime.fromtimestamp(query.start_time) |
| query.tmp_table_name = "tmp_{}_table_{}".format( |
| query.user_id, start_dttm.strftime("%Y_%m_%d_%H_%M_%S") |
| ) |
| sql = parsed_query.as_create_table( |
| query.tmp_table_name, |
| schema_name=query.tmp_schema_name, |
| method=query.ctas_method, |
| ) |
| query.select_as_cta_used = True |
| |
| # Do not apply limit to the CTA queries when SQLLAB_CTAS_NO_LIMIT is set to true |
| if db_engine_spec.is_select_query(parsed_query) and not ( |
| query.select_as_cta_used and SQLLAB_CTAS_NO_LIMIT |
| ): |
| if SQL_MAX_ROW and (not query.limit or query.limit > SQL_MAX_ROW): |
| query.limit = SQL_MAX_ROW |
| if query.limit: |
| # We are fetching one more than the requested limit in order |
| # to test whether there are more rows than the limit. |
| # Later, the extra row will be dropped before sending |
| # the results back to the user. |
| sql = database.apply_limit_to_sql(sql, increased_limit, force=True) |
| |
| # Hook to allow environment-specific mutation (usually comments) to the SQL |
| sql = SQL_QUERY_MUTATOR(sql, user_name, security_manager, database) |
| try: |
| query.executed_sql = sql |
| if log_query: |
| log_query( |
| query.database.sqlalchemy_uri, |
| query.executed_sql, |
| query.schema, |
| user_name, |
| __name__, |
| security_manager, |
| log_params, |
| ) |
| session.commit() |
| with stats_timing("sqllab.query.time_executing_query", stats_logger): |
| logger.debug("Query %d: Running query: %s", query.id, sql) |
| db_engine_spec.execute(cursor, sql, async_=True) |
| logger.debug("Query %d: Handling cursor", query.id) |
| db_engine_spec.handle_cursor(cursor, query, session) |
| |
| with stats_timing("sqllab.query.time_fetching_results", stats_logger): |
| logger.debug( |
| "Query %d: Fetching data for query object: %s", |
| query.id, |
| str(query.to_dict()), |
| ) |
| data = db_engine_spec.fetch_data(cursor, increased_limit) |
| if query.limit is None or len(data) <= query.limit: |
| query.limiting_factor = LimitingFactor.NOT_LIMITED |
| else: |
| # return 1 row less than increased_query |
| data = data[:-1] |
| except SoftTimeLimitExceeded as ex: |
| logger.warning("Query %d: Time limit exceeded", query.id) |
| logger.debug("Query %d: %s", query.id, ex) |
| raise SupersetErrorException( |
| SupersetError( |
| message=__( |
| f"The query was killed after {SQLLAB_TIMEOUT} seconds. It might " |
| "be too complex, or the database might be under heavy load." |
| ), |
| error_type=SupersetErrorType.SQLLAB_TIMEOUT_ERROR, |
| level=ErrorLevel.ERROR, |
| ) |
| ) |
| except Exception as ex: |
| # query is stopped in another thread/worker |
| # stopping raises expected exceptions which we should skip |
| session.refresh(query) |
| if query.status == QueryStatus.STOPPED: |
| raise SqlLabQueryStoppedException() |
| |
| logger.error("Query %d: %s", query.id, type(ex), exc_info=True) |
| logger.debug("Query %d: %s", query.id, ex) |
| raise SqlLabException(db_engine_spec.extract_error_message(ex)) |
| |
| logger.debug("Query %d: Fetching cursor description", query.id) |
| cursor_description = cursor.description |
| return SupersetResultSet(data, cursor_description, db_engine_spec) |
| |
| |
| def _serialize_payload( |
| payload: Dict[Any, Any], use_msgpack: Optional[bool] = False |
| ) -> Union[bytes, str]: |
| logger.debug("Serializing to msgpack: %r", use_msgpack) |
| if use_msgpack: |
| return msgpack.dumps(payload, default=json_iso_dttm_ser, use_bin_type=True) |
| |
| return json.dumps(payload, default=json_iso_dttm_ser, ignore_nan=True) |
| |
| |
| def _serialize_and_expand_data( |
| result_set: SupersetResultSet, |
| db_engine_spec: BaseEngineSpec, |
| use_msgpack: Optional[bool] = False, |
| expand_data: bool = False, |
| ) -> Tuple[Union[bytes, str], List[Any], List[Any], List[Any]]: |
| selected_columns = result_set.columns |
| all_columns: List[Any] |
| expanded_columns: List[Any] |
| |
| if use_msgpack: |
| with stats_timing( |
| "sqllab.query.results_backend_pa_serialization", stats_logger |
| ): |
| data = ( |
| pa.default_serialization_context() |
| .serialize(result_set.pa_table) |
| .to_buffer() |
| .to_pybytes() |
| ) |
| |
| # expand when loading data from results backend |
| all_columns, expanded_columns = (selected_columns, []) |
| else: |
| df = result_set.to_pandas_df() |
| data = df_to_records(df) or [] |
| |
| if expand_data: |
| all_columns, data, expanded_columns = db_engine_spec.expand_data( |
| selected_columns, data |
| ) |
| else: |
| all_columns = selected_columns |
| expanded_columns = [] |
| |
| return (data, selected_columns, all_columns, expanded_columns) |
| |
| |
| def execute_sql_statements( # pylint: disable=too-many-arguments, too-many-locals, too-many-statements, too-many-branches |
| query_id: int, |
| rendered_query: str, |
| return_results: bool, |
| store_results: bool, |
| user_name: Optional[str], |
| session: Session, |
| start_time: Optional[float], |
| expand_data: bool, |
| log_params: Optional[Dict[str, Any]], |
| ) -> Optional[Dict[str, Any]]: |
| """Executes the sql query returns the results.""" |
| if store_results and start_time: |
| # only asynchronous queries |
| stats_logger.timing("sqllab.query.time_pending", now_as_float() - start_time) |
| |
| query = get_query(query_id, session) |
| payload: Dict[str, Any] = dict(query_id=query_id) |
| database = query.database |
| db_engine_spec = database.db_engine_spec |
| db_engine_spec.patch() |
| |
| if database.allow_run_async and not results_backend: |
| raise SupersetErrorException( |
| SupersetError( |
| message=__("Results backend is not configured."), |
| error_type=SupersetErrorType.RESULTS_BACKEND_NOT_CONFIGURED_ERROR, |
| level=ErrorLevel.ERROR, |
| ) |
| ) |
| |
| # Breaking down into multiple statements |
| parsed_query = ParsedQuery(rendered_query, strip_comments=True) |
| if not db_engine_spec.run_multiple_statements_as_one: |
| statements = parsed_query.get_statements() |
| logger.info( |
| "Query %s: Executing %i statement(s)", str(query_id), len(statements) |
| ) |
| else: |
| statements = [rendered_query] |
| logger.info("Query %s: Executing query as a single statement", str(query_id)) |
| |
| logger.info("Query %s: Set query to 'running'", str(query_id)) |
| query.status = QueryStatus.RUNNING |
| query.start_running_time = now_as_float() |
| session.commit() |
| |
| # Should we create a table or view from the select? |
| if ( |
| query.select_as_cta |
| and query.ctas_method == CtasMethod.TABLE |
| and not parsed_query.is_valid_ctas() |
| ): |
| raise SupersetErrorException( |
| SupersetError( |
| message=__( |
| "CTAS (create table as select) can only be run with a query where " |
| "the last statement is a SELECT. Please make sure your query has " |
| "a SELECT as its last statement. Then, try running your query " |
| "again." |
| ), |
| error_type=SupersetErrorType.INVALID_CTAS_QUERY_ERROR, |
| level=ErrorLevel.ERROR, |
| ) |
| ) |
| if ( |
| query.select_as_cta |
| and query.ctas_method == CtasMethod.VIEW |
| and not parsed_query.is_valid_cvas() |
| ): |
| raise SupersetErrorException( |
| SupersetError( |
| message=__( |
| "CVAS (create view as select) can only be run with a query with " |
| "a single SELECT statement. Please make sure your query has only " |
| "a SELECT statement. Then, try running your query again." |
| ), |
| error_type=SupersetErrorType.INVALID_CVAS_QUERY_ERROR, |
| level=ErrorLevel.ERROR, |
| ) |
| ) |
| |
| engine = database.get_sqla_engine( |
| schema=query.schema, |
| nullpool=True, |
| user_name=user_name, |
| source=QuerySource.SQL_LAB, |
| ) |
| # Sharing a single connection and cursor across the |
| # execution of all statements (if many) |
| with closing(engine.raw_connection()) as conn: |
| # closing the connection closes the cursor as well |
| cursor = conn.cursor() |
| cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query) |
| if cancel_query_id is not None: |
| query.set_extra_json_key(cancel_query_key, cancel_query_id) |
| session.commit() |
| statement_count = len(statements) |
| for i, statement in enumerate(statements): |
| # Check if stopped |
| session.refresh(query) |
| if query.status == QueryStatus.STOPPED: |
| payload.update({"status": query.status}) |
| return payload |
| |
| # For CTAS we create the table only on the last statement |
| apply_ctas = query.select_as_cta and ( |
| query.ctas_method == CtasMethod.VIEW |
| or (query.ctas_method == CtasMethod.TABLE and i == len(statements) - 1) |
| ) |
| |
| # Run statement |
| msg = f"Running statement {i+1} out of {statement_count}" |
| logger.info("Query %s: %s", str(query_id), msg) |
| query.set_extra_json_key("progress", msg) |
| session.commit() |
| try: |
| result_set = execute_sql_statement( |
| statement, |
| query, |
| user_name, |
| session, |
| cursor, |
| log_params, |
| apply_ctas, |
| ) |
| except SqlLabQueryStoppedException: |
| payload.update({"status": QueryStatus.STOPPED}) |
| return payload |
| except Exception as ex: # pylint: disable=broad-except |
| msg = str(ex) |
| prefix_message = ( |
| f"[Statement {i+1} out of {statement_count}]" |
| if statement_count > 1 |
| else "" |
| ) |
| payload = handle_query_error( |
| ex, query, session, payload, prefix_message |
| ) |
| return payload |
| |
| # Commit the connection so CTA queries will create the table. |
| conn.commit() |
| |
| # Success, updating the query entry in database |
| query.rows = result_set.size |
| query.progress = 100 |
| query.set_extra_json_key("progress", None) |
| if query.select_as_cta: |
| query.select_sql = database.select_star( |
| query.tmp_table_name, |
| schema=query.tmp_schema_name, |
| limit=query.limit, |
| show_cols=False, |
| latest_partition=False, |
| ) |
| query.end_time = now_as_float() |
| |
| use_arrow_data = store_results and cast(bool, results_backend_use_msgpack) |
| data, selected_columns, all_columns, expanded_columns = _serialize_and_expand_data( |
| result_set, db_engine_spec, use_arrow_data, expand_data |
| ) |
| |
| # TODO: data should be saved separately from metadata (likely in Parquet) |
| payload.update( |
| { |
| "status": QueryStatus.SUCCESS, |
| "data": data, |
| "columns": all_columns, |
| "selected_columns": selected_columns, |
| "expanded_columns": expanded_columns, |
| "query": query.to_dict(), |
| } |
| ) |
| payload["query"]["state"] = QueryStatus.SUCCESS |
| |
| if store_results and results_backend: |
| key = str(uuid.uuid4()) |
| logger.info( |
| "Query %s: Storing results in results backend, key: %s", str(query_id), key |
| ) |
| with stats_timing("sqllab.query.results_backend_write", stats_logger): |
| with stats_timing( |
| "sqllab.query.results_backend_write_serialization", stats_logger |
| ): |
| serialized_payload = _serialize_payload( |
| payload, cast(bool, results_backend_use_msgpack) |
| ) |
| cache_timeout = database.cache_timeout |
| if cache_timeout is None: |
| cache_timeout = config["CACHE_DEFAULT_TIMEOUT"] |
| |
| compressed = zlib_compress(serialized_payload) |
| logger.debug( |
| "*** serialized payload size: %i", getsizeof(serialized_payload) |
| ) |
| logger.debug("*** compressed payload size: %i", getsizeof(compressed)) |
| results_backend.set(key, compressed, cache_timeout) |
| query.results_key = key |
| |
| query.status = QueryStatus.SUCCESS |
| session.commit() |
| |
| if return_results: |
| # since we're returning results we need to create non-arrow data |
| if use_arrow_data: |
| ( |
| data, |
| selected_columns, |
| all_columns, |
| expanded_columns, |
| ) = _serialize_and_expand_data( |
| result_set, db_engine_spec, False, expand_data |
| ) |
| payload.update( |
| { |
| "data": data, |
| "columns": all_columns, |
| "selected_columns": selected_columns, |
| "expanded_columns": expanded_columns, |
| } |
| ) |
| return payload |
| |
| return None |
| |
| |
| def cancel_query(query: Query, user_name: Optional[str] = None) -> bool: |
| """ |
| Cancel a running query. |
| |
| Note some engines implicitly handle the cancelation of a query and thus no expliicit |
| action is required. |
| |
| :param query: Query to cancel |
| :param user_name: Default username |
| :return: True if query cancelled successfully, False otherwise |
| """ |
| |
| if query.database.db_engine_spec.has_implicit_cancel(): |
| return True |
| |
| cancel_query_id = query.extra.get(cancel_query_key) |
| if cancel_query_id is None: |
| return False |
| |
| engine = query.database.get_sqla_engine( |
| schema=query.schema, |
| nullpool=True, |
| user_name=user_name, |
| source=QuerySource.SQL_LAB, |
| ) |
| |
| with closing(engine.raw_connection()) as conn: |
| with closing(conn.cursor()) as cursor: |
| return query.database.db_engine_spec.cancel_query( |
| cursor, query, cancel_query_id |
| ) |