blob: c48218a54de5aa7f6d01955955ecdb400f6fe587 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=C,R,W
"""Defines the templating context for SQL Lab"""
import inspect
import json
import random
import time
import uuid
from datetime import datetime, timedelta
from typing import Any, List, Optional, Tuple
from dateutil.relativedelta import relativedelta
from flask import g, request
from jinja2.sandbox import SandboxedEnvironment
from superset import app
config = app.config
BASE_CONTEXT = {
"datetime": datetime,
"random": random,
"relativedelta": relativedelta,
"time": time,
"timedelta": timedelta,
"uuid": uuid,
}
BASE_CONTEXT.update(config.get("JINJA_CONTEXT_ADDONS", {}))
def url_param(param: str, default: Optional[str] = None) -> Optional[Any]:
"""Read a url or post parameter and use it in your SQL Lab query
When in SQL Lab, it's possible to add arbitrary URL "query string"
parameters, and use those in your SQL code. For instance you can
alter your url and add `?foo=bar`, as in
`{domain}/superset/sqllab?foo=bar`. Then if your query is something like
SELECT * FROM foo = '{{ url_param('foo') }}', it will be parsed at
runtime and replaced by the value in the URL.
As you create a visualization form this SQL Lab query, you can pass
parameters in the explore view as well as from the dashboard, and
it should carry through to your queries.
:param param: the parameter to lookup
:param default: the value to return in the absence of the parameter
"""
if request.args.get(param):
return request.args.get(param, default)
# Supporting POST as well as get
form_data = request.form.get("form_data")
if isinstance(form_data, str):
form_data = json.loads(form_data)
url_params = form_data.get("url_params") or {}
return url_params.get(param, default)
return default
def current_user_id() -> Optional[int]:
"""The id of the user who is currently logged in"""
if hasattr(g, "user") and g.user:
return g.user.id
return None
def current_username() -> Optional[str]:
"""The username of the user who is currently logged in"""
if g.user:
return g.user.username
return None
def filter_values(column: str, default: Optional[str] = None) -> List[str]:
""" Gets a values for a particular filter as a list
This is useful if:
- you want to use a filter box to filter a query where the name of filter box
column doesn't match the one in the select statement
- you want to have the ability for filter inside the main query for speed
purposes
This searches for "filters" and "extra_filters" in ``form_data`` for a match
Usage example::
SELECT action, count(*) as times
FROM logs
WHERE action in ( {{ "'" + "','".join(filter_values('action_type')) + "'" }} )
GROUP BY action
:param column: column/filter name to lookup
:param default: default value to return if there's no matching columns
:return: returns a list of filter values
"""
form_data = json.loads(request.form.get("form_data", "{}"))
return_val = []
for filter_type in ["filters", "extra_filters"]:
if filter_type not in form_data:
continue
for f in form_data[filter_type]:
if f["col"] == column:
if isinstance(f["val"], list):
for v in f["val"]:
return_val.append(v)
else:
return_val.append(f["val"])
if return_val:
return return_val
if default:
return [default]
else:
return []
class CacheKeyWrapper:
""" Dummy class that exposes a method used to store additional values used in
calculation of query object cache keys"""
def __init__(self, extra_cache_keys: Optional[List[Any]] = None):
self.extra_cache_keys = extra_cache_keys
def cache_key_wrapper(self, key: Any) -> Any:
""" Adds values to a list that is added to the query object used for calculating
a cache key.
This is needed if the following applies:
- Caching is enabled
- The query is dynamically generated using a jinja template
- A username or similar is used as a filter in the query
Example when using a SQL query as a data source ::
SELECT action, count(*) as times
FROM logs
WHERE logged_in_user = '{{ cache_key_wrapper(current_username()) }}'
GROUP BY action
This will ensure that the query results that were cached by `user_1` will
**not** be seen by `user_2`, as the `cache_key` for the query will be
different. ``cache_key_wrapper`` can be used similarly for regular table data
sources by adding a `Custom SQL` filter.
:param key: Any value that should be considered when calculating the cache key
:return: the original value ``key`` passed to the function
"""
if self.extra_cache_keys is not None:
self.extra_cache_keys.append(key)
return key
class BaseTemplateProcessor:
"""Base class for database-specific jinja context
There's this bit of magic in ``process_template`` that instantiates only
the database context for the active database as a ``models.Database``
object binds it to the context object, so that object methods
have access to
that context. This way, {{ hive.latest_partition('mytable') }} just
knows about the database it is operating in.
This means that object methods are only available for the active database
and are given access to the ``models.Database`` object and schema
name. For globally available methods use ``@classmethod``.
"""
engine: Optional[str] = None
def __init__(
self,
database=None,
query=None,
table=None,
extra_cache_keys: Optional[List[Any]] = None,
**kwargs
):
self.database = database
self.query = query
self.schema = None
if query and query.schema:
self.schema = query.schema
elif table:
self.schema = table.schema
self.context = {
"url_param": url_param,
"current_user_id": current_user_id,
"current_username": current_username,
"cache_key_wrapper": CacheKeyWrapper(extra_cache_keys).cache_key_wrapper,
"filter_values": filter_values,
"form_data": {},
}
self.context.update(kwargs)
self.context.update(BASE_CONTEXT)
if self.engine:
self.context[self.engine] = self
self.env = SandboxedEnvironment()
def process_template(self, sql: str, **kwargs) -> str:
"""Processes a sql template
>>> sql = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'"
>>> process_template(sql)
"SELECT '2017-01-01T00:00:00'"
"""
template = self.env.from_string(sql)
kwargs.update(self.context)
return template.render(kwargs)
class PrestoTemplateProcessor(BaseTemplateProcessor):
"""Presto Jinja context
The methods described here are namespaced under ``presto`` in the
jinja context as in ``SELECT '{{ presto.some_macro_call() }}'``
"""
engine = "presto"
@staticmethod
def _schema_table(
table_name: str, schema: Optional[str]
) -> Tuple[str, Optional[str]]:
if "." in table_name:
schema, table_name = table_name.split(".")
return table_name, schema
def first_latest_partition(self, table_name: str) -> str:
"""
Gets the first value in the array of all latest partitions
:param table_name: table name in the format `schema.table`
:return: the first (or only) value in the latest partition array
:raises IndexError: If no partition exists
"""
return self.latest_partitions(table_name)[0]
def latest_partitions(self, table_name: str) -> List[str]:
"""
Gets the array of all latest partitions
:param table_name: table name in the format `schema.table`
:return: the latest partition array
"""
table_name, schema = self._schema_table(table_name, self.schema)
return self.database.db_engine_spec.latest_partition(
table_name, schema, self.database
)[1]
def latest_sub_partition(self, table_name, **kwargs):
table_name, schema = self._schema_table(table_name, self.schema)
return self.database.db_engine_spec.latest_sub_partition(
table_name=table_name, schema=schema, database=self.database, **kwargs
)
latest_partition = first_latest_partition
class HiveTemplateProcessor(PrestoTemplateProcessor):
engine = "hive"
template_processors = {}
keys = tuple(globals().keys())
for k in keys:
o = globals()[k]
if o and inspect.isclass(o) and issubclass(o, BaseTemplateProcessor):
template_processors[o.engine] = o
def get_template_processor(database, table=None, query=None, **kwargs):
TP = template_processors.get(database.backend, BaseTemplateProcessor)
return TP(database=database, table=table, query=query, **kwargs)