blob: c4cc6f40639b22e365220e742f5ff8f1c97ce9e2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from enum import Enum
from typing import Any, Dict, Hashable, List, Optional, Type, Union
from flask_appbuilder.security.sqla.models import User
from sqlalchemy import and_, Boolean, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import foreign, Query, relationship, RelationshipProperty
from superset import security_manager
from superset.constants import NULL_STRING
from superset.models.helpers import AuditMixinNullable, ImportExportMixin, QueryResult
from superset.models.slice import Slice
from superset.typing import FilterValue, FilterValues, QueryObjectDict
from superset.utils import core as utils
METRIC_FORM_DATA_PARAMS = [
"metric",
"metrics",
"metric_2",
"percent_metrics",
"secondary_metric",
"size",
"timeseries_limit_metric",
"x",
"y",
]
COLUMN_FORM_DATA_PARAMS = [
"all_columns",
"all_columns_x",
"columns",
"entity",
"groupby",
"order_by_cols",
"series",
]
class DatasourceKind(str, Enum):
VIRTUAL = "virtual"
PHYSICAL = "physical"
class BaseDatasource(
AuditMixinNullable, ImportExportMixin
): # pylint: disable=too-many-public-methods
"""A common interface to objects that are queryable
(tables and datasources)"""
# ---------------------------------------------------------------
# class attributes to define when deriving BaseDatasource
# ---------------------------------------------------------------
__tablename__: Optional[str] = None # {connector_name}_datasource
baselink: Optional[str] = None # url portion pointing to ModelView endpoint
@property
def column_class(self) -> Type["BaseColumn"]:
# link to derivative of BaseColumn
raise NotImplementedError()
@property
def metric_class(self) -> Type["BaseMetric"]:
# link to derivative of BaseMetric
raise NotImplementedError()
owner_class: Optional[User] = None
# Used to do code highlighting when displaying the query in the UI
query_language: Optional[str] = None
# Only some datasources support Row Level Security
is_rls_supported: bool = False
@property
def name(self) -> str:
# can be a Column or a property pointing to one
raise NotImplementedError()
# ---------------------------------------------------------------
# Columns
id = Column(Integer, primary_key=True)
description = Column(Text)
default_endpoint = Column(Text)
is_featured = Column(Boolean, default=False) # TODO deprecating
filter_select_enabled = Column(Boolean, default=False)
offset = Column(Integer, default=0)
cache_timeout = Column(Integer)
params = Column(String(1000))
perm = Column(String(1000))
schema_perm = Column(String(1000))
sql: Optional[str] = None
owners: List[User]
update_from_object_fields: List[str]
@property
def kind(self) -> str:
if self.sql:
return DatasourceKind.VIRTUAL.value
return DatasourceKind.PHYSICAL.value
@declared_attr
def slices(self) -> RelationshipProperty:
return relationship(
"Slice",
primaryjoin=lambda: and_(
foreign(Slice.datasource_id) == self.id,
foreign(Slice.datasource_type) == self.type,
),
)
columns: List["BaseColumn"] = []
metrics: List["BaseMetric"] = []
@property
def type(self) -> str:
raise NotImplementedError()
@property
def uid(self) -> str:
"""Unique id across datasource types"""
return f"{self.id}__{self.type}"
@property
def column_names(self) -> List[str]:
return sorted([c.column_name for c in self.columns], key=lambda x: x or "")
@property
def columns_types(self) -> Dict[str, str]:
return {c.column_name: c.type for c in self.columns}
@property
def main_dttm_col(self) -> str:
return "timestamp"
@property
def datasource_name(self) -> str:
raise NotImplementedError()
@property
def connection(self) -> Optional[str]:
"""String representing the context of the Datasource"""
return None
@property
def schema(self) -> Optional[str]:
"""String representing the schema of the Datasource (if it applies)"""
return None
@property
def filterable_column_names(self) -> List[str]:
return sorted([c.column_name for c in self.columns if c.filterable])
@property
def dttm_cols(self) -> List[str]:
return []
@property
def url(self) -> str:
return "/{}/edit/{}".format(self.baselink, self.id)
@property
def explore_url(self) -> str:
if self.default_endpoint:
return self.default_endpoint
return f"/superset/explore/{self.type}/{self.id}/"
@property
def column_formats(self) -> Dict[str, Optional[str]]:
return {m.metric_name: m.d3format for m in self.metrics if m.d3format}
def add_missing_metrics(self, metrics: List["BaseMetric"]) -> None:
existing_metrics = {m.metric_name for m in self.metrics}
for metric in metrics:
if metric.metric_name not in existing_metrics:
metric.table_id = self.id
self.metrics.append(metric)
@property
def short_data(self) -> Dict[str, Any]:
"""Data representation of the datasource sent to the frontend"""
return {
"edit_url": self.url,
"id": self.id,
"uid": self.uid,
"schema": self.schema,
"name": self.name,
"type": self.type,
"connection": self.connection,
"creator": str(self.created_by),
}
@property
def select_star(self) -> Optional[str]:
pass
@property
def data(self) -> Dict[str, Any]:
"""Data representation of the datasource sent to the frontend"""
order_by_choices = []
# self.column_names return sorted column_names
for column_name in self.column_names:
column_name = str(column_name or "")
order_by_choices.append(
(json.dumps([column_name, True]), column_name + " [asc]")
)
order_by_choices.append(
(json.dumps([column_name, False]), column_name + " [desc]")
)
verbose_map = {"__timestamp": "Time"}
verbose_map.update(
{o.metric_name: o.verbose_name or o.metric_name for o in self.metrics}
)
verbose_map.update(
{o.column_name: o.verbose_name or o.column_name for o in self.columns}
)
return {
# simple fields
"id": self.id,
"column_formats": self.column_formats,
"description": self.description,
"database": self.database.data, # pylint: disable=no-member
"default_endpoint": self.default_endpoint,
"filter_select": self.filter_select_enabled, # TODO deprecate
"filter_select_enabled": self.filter_select_enabled,
"name": self.name,
"datasource_name": self.datasource_name,
"table_name": self.datasource_name,
"type": self.type,
"schema": self.schema,
"offset": self.offset,
"cache_timeout": self.cache_timeout,
"params": self.params,
"perm": self.perm,
"edit_url": self.url,
# sqla-specific
"sql": self.sql,
# one to many
"columns": [o.data for o in self.columns],
"metrics": [o.data for o in self.metrics],
# TODO deprecate, move logic to JS
"order_by_choices": order_by_choices,
"owners": [owner.id for owner in self.owners],
"verbose_map": verbose_map,
"select_star": self.select_star,
}
def data_for_slices(self, slices: List[Slice]) -> Dict[str, Any]:
"""
The representation of the datasource containing only the required data
to render the provided slices.
Used to reduce the payload when loading a dashboard.
"""
data = self.data
metric_names = set()
column_names = set()
for slc in slices:
form_data = slc.form_data
# pull out all required metrics from the form_data
for param in METRIC_FORM_DATA_PARAMS:
for metric in utils.get_iterable(form_data.get(param) or []):
metric_names.add(utils.get_metric_name(metric))
if utils.is_adhoc_metric(metric):
column_names.add(
(metric.get("column") or {}).get("column_name")
)
# pull out all required columns from the form_data
for filter_ in form_data.get("adhoc_filters") or []:
if filter_["clause"] == "WHERE" and filter_.get("subject"):
column_names.add(filter_.get("subject"))
for param in COLUMN_FORM_DATA_PARAMS:
for column in utils.get_iterable(form_data.get(param) or []):
column_names.add(column)
filtered_metrics = [
metric
for metric in data["metrics"]
if metric["metric_name"] in metric_names
]
filtered_columns = [
column
for column in data["columns"]
if column["column_name"] in column_names
]
del data["description"]
data.update({"metrics": filtered_metrics})
data.update({"columns": filtered_columns})
verbose_map = {"__timestamp": "Time"}
verbose_map.update(
{
metric["metric_name"]: metric["verbose_name"] or metric["metric_name"]
for metric in filtered_metrics
}
)
verbose_map.update(
{
column["column_name"]: column["verbose_name"] or column["column_name"]
for column in filtered_columns
}
)
data["verbose_map"] = verbose_map
return data
@staticmethod
def filter_values_handler(
values: Optional[FilterValues],
target_column_is_numeric: bool = False,
is_list_target: bool = False,
) -> Optional[FilterValues]:
if values is None:
return None
def handle_single_value(value: Optional[FilterValue]) -> Optional[FilterValue]:
# backward compatibility with previous <select> components
if isinstance(value, str):
value = value.strip("\t\n'\"")
if target_column_is_numeric:
# For backwards compatibility and edge cases
# where a column data type might have changed
value = utils.cast_to_num(value)
if value == NULL_STRING:
return None
if value == "<empty string>":
return ""
return value
if isinstance(values, (list, tuple)):
values = [handle_single_value(v) for v in values] # type: ignore
else:
values = handle_single_value(values)
if is_list_target and not isinstance(values, (tuple, list)):
values = [values] # type: ignore
elif not is_list_target and isinstance(values, (tuple, list)):
if values:
values = values[0]
else:
values = None
return values
def external_metadata(self) -> List[Dict[str, str]]:
"""Returns column information from the external system"""
raise NotImplementedError()
def get_query_str(self, query_obj: QueryObjectDict) -> str:
"""Returns a query as a string
This is used to be displayed to the user so that she/he can
understand what is taking place behind the scene"""
raise NotImplementedError()
def query(self, query_obj: QueryObjectDict) -> QueryResult:
"""Executes the query and returns a dataframe
query_obj is a dictionary representing Superset's query interface.
Should return a ``superset.models.helpers.QueryResult``
"""
raise NotImplementedError()
def values_for_column(self, column_name: str, limit: int = 10000) -> List[Any]:
"""Given a column, returns an iterable of distinct values
This is used to populate the dropdown showing a list of
values in filters in the explore view"""
raise NotImplementedError()
@staticmethod
def default_query(qry: Query) -> Query:
return qry
def get_column(self, column_name: Optional[str]) -> Optional["BaseColumn"]:
if not column_name:
return None
for col in self.columns:
if col.column_name == column_name:
return col
return None
@staticmethod
def get_fk_many_from_list(
object_list: List[Any],
fkmany: List[Column],
fkmany_class: Type[Union["BaseColumn", "BaseMetric"]],
key_attr: str,
) -> List[Column]:
"""Update ORM one-to-many list from object list
Used for syncing metrics and columns using the same code"""
object_dict = {o.get(key_attr): o for o in object_list}
# delete fks that have been removed
fkmany = [o for o in fkmany if getattr(o, key_attr) in object_dict]
# sync existing fks
for fk in fkmany:
obj = object_dict.get(getattr(fk, key_attr))
if obj:
for attr in fkmany_class.update_from_object_fields:
setattr(fk, attr, obj.get(attr))
# create new fks
new_fks = []
orm_keys = [getattr(o, key_attr) for o in fkmany]
for obj in object_list:
key = obj.get(key_attr)
if key not in orm_keys:
del obj["id"]
orm_kwargs = {}
for k in obj:
if k in fkmany_class.update_from_object_fields and k in obj:
orm_kwargs[k] = obj[k]
new_obj = fkmany_class(**orm_kwargs)
new_fks.append(new_obj)
fkmany += new_fks
return fkmany
def update_from_object(self, obj: Dict[str, Any]) -> None:
"""Update datasource from a data structure
The UI's table editor crafts a complex data structure that
contains most of the datasource's properties as well as
an array of metrics and columns objects. This method
receives the object from the UI and syncs the datasource to
match it. Since the fields are different for the different
connectors, the implementation uses ``update_from_object_fields``
which can be defined for each connector and
defines which fields should be synced"""
for attr in self.update_from_object_fields:
setattr(self, attr, obj.get(attr))
self.owners = obj.get("owners", [])
# Syncing metrics
metrics = (
self.get_fk_many_from_list(
obj["metrics"], self.metrics, self.metric_class, "metric_name"
)
if self.metric_class and "metrics" in obj
else []
)
self.metrics = metrics
# Syncing columns
self.columns = (
self.get_fk_many_from_list(
obj["columns"], self.columns, self.column_class, "column_name"
)
if self.column_class and "columns" in obj
else []
)
def get_extra_cache_keys( # pylint: disable=no-self-use
self, query_obj: QueryObjectDict # pylint: disable=unused-argument
) -> List[Hashable]:
"""If a datasource needs to provide additional keys for calculation of
cache keys, those can be provided via this method
:param query_obj: The dict representation of a query object
:return: list of keys
"""
return []
def __hash__(self) -> int:
return hash(self.uid)
def __eq__(self, other: object) -> bool:
if not isinstance(other, BaseDatasource):
return NotImplemented
return self.uid == other.uid
def raise_for_access(self) -> None:
"""
Raise an exception if the user cannot access the resource.
:raises SupersetSecurityException: If the user cannot access the resource
"""
security_manager.raise_for_access(datasource=self)
class BaseColumn(AuditMixinNullable, ImportExportMixin):
"""Interface for column"""
__tablename__: Optional[str] = None # {connector_name}_column
id = Column(Integer, primary_key=True)
column_name = Column(String(255), nullable=False)
verbose_name = Column(String(1024))
is_active = Column(Boolean, default=True)
type = Column(String(32))
groupby = Column(Boolean, default=True)
filterable = Column(Boolean, default=True)
description = Column(Text)
is_dttm = None
# [optional] Set this to support import/export functionality
export_fields: List[Any] = []
def __repr__(self) -> str:
return str(self.column_name)
num_types = (
"DOUBLE",
"FLOAT",
"INT",
"BIGINT",
"NUMBER",
"LONG",
"REAL",
"NUMERIC",
"DECIMAL",
"MONEY",
)
date_types = ("DATE", "TIME")
str_types = ("VARCHAR", "STRING", "CHAR")
@property
def is_numeric(self) -> bool:
return self.type and any(map(lambda t: t in self.type.upper(), self.num_types))
@property
def is_temporal(self) -> bool:
return self.type and any(map(lambda t: t in self.type.upper(), self.date_types))
@property
def is_string(self) -> bool:
return self.type and any(map(lambda t: t in self.type.upper(), self.str_types))
@property
def expression(self) -> Column:
raise NotImplementedError()
@property
def python_date_format(self) -> Column:
raise NotImplementedError()
@property
def data(self) -> Dict[str, Any]:
attrs = (
"id",
"column_name",
"verbose_name",
"description",
"expression",
"filterable",
"groupby",
"is_dttm",
"type",
)
return {s: getattr(self, s) for s in attrs if hasattr(self, s)}
class BaseMetric(AuditMixinNullable, ImportExportMixin):
"""Interface for Metrics"""
__tablename__: Optional[str] = None # {connector_name}_metric
id = Column(Integer, primary_key=True)
metric_name = Column(String(255), nullable=False)
verbose_name = Column(String(1024))
metric_type = Column(String(32))
description = Column(Text)
d3format = Column(String(128))
warning_text = Column(Text)
"""
The interface should also declare a datasource relationship pointing
to a derivative of BaseDatasource, along with a FK
datasource_name = Column(
String(255),
ForeignKey('datasources.datasource_name'))
datasource = relationship(
# needs to be altered to point to {Connector}Datasource
'BaseDatasource',
backref=backref('metrics', cascade='all, delete-orphan'),
enable_typechecks=False)
"""
@property
def perm(self) -> Optional[str]:
raise NotImplementedError()
@property
def expression(self) -> Column:
raise NotImplementedError()
@property
def data(self) -> Dict[str, Any]:
attrs = (
"id",
"metric_name",
"verbose_name",
"description",
"expression",
"warning_text",
"d3format",
)
return {s: getattr(self, s) for s in attrs}