blob: d175d02267fcdc97b27150db82e3c19b3fbc7dcc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: skip-file
import json
import logging
import re
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
from distutils.version import LooseVersion
from multiprocessing.pool import ThreadPool
from typing import Any, cast, Dict, Iterable, List, Optional, Set, Tuple, Union
import pandas as pd
import sqlalchemy as sa
from dateutil.parser import parse as dparse
from flask import escape, Markup
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from flask_appbuilder.security.sqla.models import User
from flask_babel import lazy_gettext as _
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Integer,
String,
Table,
Text,
UniqueConstraint,
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import backref, relationship, Session
from sqlalchemy.sql import expression
from sqlalchemy_utils import EncryptedType
from superset import conf, db, is_feature_enabled, security_manager
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.constants import NULL_STRING
from superset.exceptions import SupersetException
from superset.models.core import Database
from superset.models.helpers import AuditMixinNullable, ImportExportMixin, QueryResult
from superset.typing import FilterValues, Granularity, Metric, QueryObjectDict
from superset.utils import core as utils
from superset.utils.date_parser import parse_human_datetime, parse_human_timedelta
try:
import requests
from pydruid.client import PyDruid
from pydruid.utils.aggregators import count
from pydruid.utils.dimensions import (
MapLookupExtraction,
RegexExtraction,
RegisteredLookupExtraction,
TimeFormatExtraction,
)
from pydruid.utils.filters import Bound, Dimension, Filter
from pydruid.utils.having import Aggregation, Having
from pydruid.utils.postaggregator import (
Const,
Field,
HyperUniqueCardinality,
Postaggregator,
Quantile,
Quantiles,
)
except ImportError:
pass
try:
from superset.utils.core import DimSelector, DTTM_ALIAS, FilterOperator, flasher
except ImportError:
pass
DRUID_TZ = conf.get("DRUID_TZ")
POST_AGG_TYPE = "postagg"
metadata = Model.metadata # pylint: disable=no-member
logger = logging.getLogger(__name__)
try:
# Postaggregator might not have been imported.
class JavascriptPostAggregator(Postaggregator):
def __init__(self, name: str, field_names: List[str], function: str) -> None:
self.post_aggregator = {
"type": "javascript",
"fieldNames": field_names,
"name": name,
"function": function,
}
self.name = name
class CustomPostAggregator(Postaggregator):
"""A way to allow users to specify completely custom PostAggregators"""
def __init__(self, name: str, post_aggregator: Dict[str, Any]) -> None:
self.name = name
self.post_aggregator = post_aggregator
except NameError:
pass
# Function wrapper because bound methods cannot
# be passed to processes
def _fetch_metadata_for(datasource: "DruidDatasource") -> Optional[Dict[str, Any]]:
return datasource.latest_metadata()
class DruidCluster(Model, AuditMixinNullable, ImportExportMixin):
"""ORM object referencing the Druid clusters"""
__tablename__ = "clusters"
type = "druid"
id = Column(Integer, primary_key=True)
verbose_name = Column(String(250), unique=True)
# short unique name, used in permissions
cluster_name = Column(String(250), unique=True, nullable=False)
broker_host = Column(String(255))
broker_port = Column(Integer, default=8082)
broker_endpoint = Column(String(255), default="druid/v2")
metadata_last_refreshed = Column(DateTime)
cache_timeout = Column(Integer)
broker_user = Column(String(255))
broker_pass = Column(EncryptedType(String(255), conf.get("SECRET_KEY")))
export_fields = [
"cluster_name",
"broker_host",
"broker_port",
"broker_endpoint",
"cache_timeout",
"broker_user",
]
update_from_object_fields = export_fields
export_children = ["datasources"]
def __repr__(self) -> str:
return self.verbose_name if self.verbose_name else self.cluster_name
def __html__(self) -> str:
return self.__repr__()
@property
def data(self) -> Dict[str, Any]:
return {"id": self.id, "name": self.cluster_name, "backend": "druid"}
@staticmethod
def get_base_url(host: str, port: int) -> str:
if not re.match("http(s)?://", host):
host = "http://" + host
url = "{0}:{1}".format(host, port) if port else host
return url
def get_base_broker_url(self) -> str:
base_url = self.get_base_url(self.broker_host, self.broker_port)
return f"{base_url}/{self.broker_endpoint}"
def get_pydruid_client(self) -> "PyDruid":
cli = PyDruid(
self.get_base_url(self.broker_host, self.broker_port), self.broker_endpoint
)
if self.broker_user and self.broker_pass:
cli.set_basic_auth_credentials(self.broker_user, self.broker_pass)
return cli
def get_datasources(self) -> List[str]:
endpoint = self.get_base_broker_url() + "/datasources"
auth = requests.auth.HTTPBasicAuth(self.broker_user, self.broker_pass)
return json.loads(requests.get(endpoint, auth=auth).text)
def get_druid_version(self) -> str:
endpoint = self.get_base_url(self.broker_host, self.broker_port) + "/status"
auth = requests.auth.HTTPBasicAuth(self.broker_user, self.broker_pass)
return json.loads(requests.get(endpoint, auth=auth).text)["version"]
@property # type: ignore
@utils.memoized
def druid_version(self) -> str:
return self.get_druid_version()
def refresh_datasources(
self,
datasource_name: Optional[str] = None,
merge_flag: bool = True,
refresh_all: bool = True,
) -> None:
"""Refresh metadata of all datasources in the cluster
If ``datasource_name`` is specified, only that datasource is updated
"""
ds_list = self.get_datasources()
denylist = conf.get("DRUID_DATA_SOURCE_DENYLIST", [])
ds_refresh: List[str] = []
if not datasource_name:
ds_refresh = list(filter(lambda ds: ds not in denylist, ds_list))
elif datasource_name not in denylist and datasource_name in ds_list:
ds_refresh.append(datasource_name)
else:
return
self.refresh(ds_refresh, merge_flag, refresh_all)
def refresh(
self, datasource_names: List[str], merge_flag: bool, refresh_all: bool
) -> None:
"""
Fetches metadata for the specified datasources and
merges to the Superset database
"""
session = db.session
ds_list = (
session.query(DruidDatasource)
.filter(DruidDatasource.cluster_id == self.id)
.filter(DruidDatasource.datasource_name.in_(datasource_names))
)
ds_map = {ds.name: ds for ds in ds_list}
for ds_name in datasource_names:
datasource = ds_map.get(ds_name, None)
if not datasource:
datasource = DruidDatasource(datasource_name=ds_name)
with session.no_autoflush:
session.add(datasource)
flasher(_("Adding new datasource [{}]").format(ds_name), "success")
ds_map[ds_name] = datasource
elif refresh_all:
flasher(_("Refreshing datasource [{}]").format(ds_name), "info")
else:
del ds_map[ds_name]
continue
datasource.cluster = self
datasource.merge_flag = merge_flag
session.flush()
# Prepare multithreaded executation
pool = ThreadPool()
ds_refresh = list(ds_map.values())
metadata = pool.map(_fetch_metadata_for, ds_refresh)
pool.close()
pool.join()
for i in range(0, len(ds_refresh)):
datasource = ds_refresh[i]
cols = metadata[i]
if cols:
col_objs_list = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(DruidColumn.column_name.in_(cols.keys()))
)
col_objs = {col.column_name: col for col in col_objs_list}
for col in cols:
if col == "__time": # skip the time column
continue
col_obj = col_objs.get(col)
if not col_obj:
col_obj = DruidColumn(
datasource_id=datasource.id, column_name=col
)
with session.no_autoflush:
session.add(col_obj)
col_obj.type = cols[col]["type"]
col_obj.datasource = datasource
if col_obj.type == "STRING":
col_obj.groupby = True
col_obj.filterable = True
datasource.refresh_metrics()
session.commit()
@hybrid_property
def perm(self) -> str:
return f"[{self.cluster_name}].(id:{self.id})"
@perm.expression # type: ignore
def perm(cls) -> str: # pylint: disable=no-self-argument
return "[" + cls.cluster_name + "].(id:" + expression.cast(cls.id, String) + ")"
def get_perm(self) -> str:
return self.perm # type: ignore
@property
def name(self) -> str:
return self.verbose_name or self.cluster_name
@property
def unique_name(self) -> str:
return self.verbose_name or self.cluster_name
sa.event.listen(DruidCluster, "after_insert", security_manager.set_perm)
sa.event.listen(DruidCluster, "after_update", security_manager.set_perm)
class DruidColumn(Model, BaseColumn):
"""ORM model for storing Druid datasource column metadata"""
__tablename__ = "columns"
__table_args__ = (UniqueConstraint("column_name", "datasource_id"),)
datasource_id = Column(Integer, ForeignKey("datasources.id"))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
"DruidDatasource",
backref=backref("columns", cascade="all, delete-orphan"),
enable_typechecks=False,
)
dimension_spec_json = Column(Text)
export_fields = [
"datasource_id",
"column_name",
"is_active",
"type",
"groupby",
"filterable",
"description",
"dimension_spec_json",
"verbose_name",
]
update_from_object_fields = export_fields
export_parent = "datasource"
def __repr__(self) -> str:
return self.column_name or str(self.id)
@property
def expression(self) -> str:
return self.dimension_spec_json
@property
def dimension_spec(self) -> Optional[Dict[str, Any]]:
if self.dimension_spec_json:
return json.loads(self.dimension_spec_json)
return None
def get_metrics(self) -> Dict[str, "DruidMetric"]:
metrics = {
"count": DruidMetric(
metric_name="count",
verbose_name="COUNT(*)",
metric_type="count",
json=json.dumps({"type": "count", "name": "count"}),
)
}
return metrics
def refresh_metrics(self) -> None:
"""Refresh metrics based on the column metadata"""
metrics = self.get_metrics()
dbmetrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == self.datasource_id)
.filter(DruidMetric.metric_name.in_(metrics.keys()))
)
dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
for metric in metrics.values():
dbmetric = dbmetrics.get(metric.metric_name)
if dbmetric:
for attr in ["json", "metric_type"]:
setattr(dbmetric, attr, getattr(metric, attr))
else:
with db.session.no_autoflush:
metric.datasource_id = self.datasource_id
db.session.add(metric)
class DruidMetric(Model, BaseMetric):
"""ORM object referencing Druid metrics for a datasource"""
__tablename__ = "metrics"
__table_args__ = (UniqueConstraint("metric_name", "datasource_id"),)
datasource_id = Column(Integer, ForeignKey("datasources.id"))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
"DruidDatasource",
backref=backref("metrics", cascade="all, delete-orphan"),
enable_typechecks=False,
)
json = Column(Text, nullable=False)
export_fields = [
"metric_name",
"verbose_name",
"metric_type",
"datasource_id",
"json",
"description",
"d3format",
"warning_text",
]
update_from_object_fields = export_fields
export_parent = "datasource"
@property
def expression(self) -> Column:
return self.json
@property
def json_obj(self) -> Dict[str, Any]:
try:
obj = json.loads(self.json)
except Exception:
obj = {}
return obj
@property
def perm(self) -> Optional[str]:
return (
("{parent_name}.[{obj.metric_name}](id:{obj.id})").format(
obj=self, parent_name=self.datasource.full_name
)
if self.datasource
else None
)
def get_perm(self) -> Optional[str]:
return self.perm
druiddatasource_user = Table(
"druiddatasource_user",
metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("ab_user.id")),
Column("datasource_id", Integer, ForeignKey("datasources.id")),
)
class DruidDatasource(Model, BaseDatasource):
"""ORM object referencing Druid datasources (tables)"""
__tablename__ = "datasources"
__table_args__ = (UniqueConstraint("datasource_name", "cluster_id"),)
type = "druid"
query_language = "json"
cluster_class = DruidCluster
columns: List[DruidColumn] = []
metrics: List[DruidMetric] = []
metric_class = DruidMetric
column_class = DruidColumn
owner_class = security_manager.user_model
baselink = "druiddatasourcemodelview"
# Columns
datasource_name = Column(String(255), nullable=False)
is_hidden = Column(Boolean, default=False)
filter_select_enabled = Column(Boolean, default=True) # override default
fetch_values_from = Column(String(100))
cluster_id = Column(Integer, ForeignKey("clusters.id"), nullable=False)
cluster = relationship(
"DruidCluster", backref="datasources", foreign_keys=[cluster_id]
)
owners = relationship(
owner_class, secondary=druiddatasource_user, backref="druiddatasources"
)
export_fields = [
"datasource_name",
"is_hidden",
"description",
"default_endpoint",
"cluster_id",
"offset",
"cache_timeout",
"params",
"filter_select_enabled",
]
update_from_object_fields = export_fields
export_parent = "cluster"
export_children = ["columns", "metrics"]
@property
def cluster_name(self) -> str:
cluster = (
self.cluster
or db.session.query(DruidCluster).filter_by(id=self.cluster_id).one()
)
return cluster.cluster_name
@property
def database(self) -> DruidCluster:
return self.cluster
@property
def connection(self) -> str:
return str(self.database)
@property
def num_cols(self) -> List[str]:
return [c.column_name for c in self.columns if c.is_numeric]
@property
def name(self) -> str:
return self.datasource_name
@property
def datasource_type(self) -> str:
return self.type
@property
def schema(self) -> Optional[str]:
ds_name = self.datasource_name or ""
name_pieces = ds_name.split(".")
if len(name_pieces) > 1:
return name_pieces[0]
else:
return None
def get_schema_perm(self) -> Optional[str]:
"""Returns schema permission if present, cluster one otherwise."""
return security_manager.get_schema_perm(self.cluster, self.schema)
def get_perm(self) -> str:
return ("[{obj.cluster_name}].[{obj.datasource_name}]" "(id:{obj.id})").format(
obj=self
)
def update_from_object(self, obj: Dict[str, Any]) -> None:
raise NotImplementedError()
@property
def link(self) -> Markup:
name = escape(self.datasource_name)
return Markup(f'<a href="{self.url}">{name}</a>')
@property
def full_name(self) -> str:
return utils.get_datasource_full_name(self.cluster_name, self.datasource_name)
@property
def time_column_grains(self) -> Dict[str, List[str]]:
return {
"time_columns": [
"all",
"5 seconds",
"30 seconds",
"1 minute",
"5 minutes",
"30 minutes",
"1 hour",
"6 hour",
"1 day",
"7 days",
"week",
"week_starting_sunday",
"week_ending_saturday",
"month",
"quarter",
"year",
],
"time_grains": ["now"],
}
def __repr__(self) -> str:
return self.datasource_name
@renders("datasource_name")
def datasource_link(self) -> str:
url = f"/superset/explore/{self.type}/{self.id}/"
name = escape(self.datasource_name)
return Markup(f'<a href="{url}">{name}</a>')
def get_metric_obj(self, metric_name: str) -> Dict[str, Any]:
return [m.json_obj for m in self.metrics if m.metric_name == metric_name][0]
def latest_metadata(self) -> Optional[Dict[str, Any]]:
"""Returns segment metadata from the latest segment"""
logger.info("Syncing datasource [{}]".format(self.datasource_name))
client = self.cluster.get_pydruid_client()
try:
results = client.time_boundary(datasource=self.datasource_name)
except IOError:
results = None
if results:
max_time = results[0]["result"]["maxTime"]
max_time = dparse(max_time)
else:
max_time = datetime.now()
# Query segmentMetadata for 7 days back. However, due to a bug,
# we need to set this interval to more than 1 day ago to exclude
# realtime segments, which triggered a bug (fixed in druid 0.8.2).
# https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
lbound = (max_time - timedelta(days=7)).isoformat()
if LooseVersion(self.cluster.druid_version) < LooseVersion("0.8.2"):
rbound = (max_time - timedelta(1)).isoformat()
else:
rbound = max_time.isoformat()
segment_metadata = None
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + "/" + rbound,
merge=self.merge_flag,
analysisTypes=[],
)
except Exception as ex:
logger.warning("Failed first attempt to get latest segment")
logger.exception(ex)
if not segment_metadata:
# if no segments in the past 7 days, look at all segments
lbound = datetime(1901, 1, 1).isoformat()[:10]
if LooseVersion(self.cluster.druid_version) < LooseVersion("0.8.2"):
rbound = datetime.now().isoformat()
else:
rbound = datetime(2050, 1, 1).isoformat()[:10]
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + "/" + rbound,
merge=self.merge_flag,
analysisTypes=[],
)
except Exception as ex:
logger.warning("Failed 2nd attempt to get latest segment")
logger.exception(ex)
if segment_metadata:
return segment_metadata[-1]["columns"]
return None
def refresh_metrics(self) -> None:
for col in self.columns:
col.refresh_metrics()
@classmethod
def sync_to_db_from_config(
cls,
druid_config: Dict[str, Any],
user: User,
cluster: DruidCluster,
refresh: bool = True,
) -> None:
"""Merges the ds config from druid_config into one stored in the db."""
session = db.session
datasource = (
session.query(cls).filter_by(datasource_name=druid_config["name"]).first()
)
# Create a new datasource.
if not datasource:
datasource = cls(
datasource_name=druid_config["name"],
cluster=cluster,
owners=[user],
changed_by_fk=user.id,
created_by_fk=user.id,
)
session.add(datasource)
elif not refresh:
return
dimensions = druid_config["dimensions"]
col_objs = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(DruidColumn.column_name.in_(dimensions))
)
col_objs = {col.column_name: col for col in col_objs}
for dim in dimensions:
col_obj = col_objs.get(dim, None)
if not col_obj:
col_obj = DruidColumn(
datasource_id=datasource.id,
column_name=dim,
groupby=True,
filterable=True,
# TODO: fetch type from Hive.
type="STRING",
datasource=datasource,
)
session.add(col_obj)
# Import Druid metrics
metric_objs = (
session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(
DruidMetric.metric_name.in_(
spec["name"] for spec in druid_config["metrics_spec"]
)
)
)
metric_objs = {metric.metric_name: metric for metric in metric_objs}
for metric_spec in druid_config["metrics_spec"]:
metric_name = metric_spec["name"]
metric_type = metric_spec["type"]
metric_json = json.dumps(metric_spec)
if metric_type == "count":
metric_type = "longSum"
metric_json = json.dumps(
{"type": "longSum", "name": metric_name, "fieldName": metric_name}
)
metric_obj = metric_objs.get(metric_name, None)
if not metric_obj:
metric_obj = DruidMetric(
metric_name=metric_name,
metric_type=metric_type,
verbose_name="%s(%s)" % (metric_type, metric_name),
datasource=datasource,
json=metric_json,
description=(
"Imported from the airolap config dir for %s"
% druid_config["name"]
),
)
session.add(metric_obj)
session.commit()
@staticmethod
def time_offset(granularity: Granularity) -> int:
if granularity == "week_ending_saturday":
return 6 * 24 * 3600 * 1000 # 6 days
return 0
@classmethod
def get_datasource_by_name(
cls, session: Session, datasource_name: str, schema: str, database_name: str
) -> Optional["DruidDatasource"]:
query = (
session.query(cls)
.join(DruidCluster)
.filter(cls.datasource_name == datasource_name)
.filter(DruidCluster.cluster_name == database_name)
)
return query.first()
# uses https://en.wikipedia.org/wiki/ISO_8601
# http://druid.io/docs/0.8.0/querying/granularities.html
# TODO: pass origin from the UI
@staticmethod
def granularity(
period_name: str, timezone: Optional[str] = None, origin: Optional[str] = None
) -> Union[Dict[str, str], str]:
if not period_name or period_name == "all":
return "all"
iso_8601_dict = {
"5 seconds": "PT5S",
"30 seconds": "PT30S",
"1 minute": "PT1M",
"5 minutes": "PT5M",
"30 minutes": "PT30M",
"1 hour": "PT1H",
"6 hour": "PT6H",
"one day": "P1D",
"1 day": "P1D",
"7 days": "P7D",
"week": "P1W",
"week_starting_sunday": "P1W",
"week_ending_saturday": "P1W",
"month": "P1M",
"quarter": "P3M",
"year": "P1Y",
}
granularity = {"type": "period"}
if timezone:
granularity["timeZone"] = timezone
if origin:
dttm = parse_human_datetime(origin)
assert dttm
granularity["origin"] = dttm.isoformat()
if period_name in iso_8601_dict:
granularity["period"] = iso_8601_dict[period_name]
if period_name in ("week_ending_saturday", "week_starting_sunday"):
# use Sunday as start of the week
granularity["origin"] = "2016-01-03T00:00:00"
elif not isinstance(period_name, str):
granularity["type"] = "duration"
granularity["duration"] = period_name
elif period_name.startswith("P"):
# identify if the string is the iso_8601 period
granularity["period"] = period_name
else:
granularity["type"] = "duration"
granularity["duration"] = (
parse_human_timedelta(period_name).total_seconds() # type: ignore
* 1000
)
return granularity
@staticmethod
def get_post_agg(mconf: Dict[str, Any]) -> "Postaggregator":
"""
For a metric specified as `postagg` returns the
kind of post aggregation for pydruid.
"""
if mconf.get("type") == "javascript":
return JavascriptPostAggregator(
name=mconf.get("name", ""),
field_names=mconf.get("fieldNames", []),
function=mconf.get("function", ""),
)
elif mconf.get("type") == "quantile":
return Quantile(mconf.get("name", ""), mconf.get("probability", ""))
elif mconf.get("type") == "quantiles":
return Quantiles(mconf.get("name", ""), mconf.get("probabilities", ""))
elif mconf.get("type") == "fieldAccess":
return Field(mconf.get("name"))
elif mconf.get("type") == "constant":
return Const(mconf.get("value"), output_name=mconf.get("name", ""))
elif mconf.get("type") == "hyperUniqueCardinality":
return HyperUniqueCardinality(mconf.get("name"))
elif mconf.get("type") == "arithmetic":
return Postaggregator(
mconf.get("fn", "/"), mconf.get("fields", []), mconf.get("name", "")
)
else:
return CustomPostAggregator(mconf.get("name", ""), mconf)
@staticmethod
def find_postaggs_for(
postagg_names: Set[str], metrics_dict: Dict[str, DruidMetric]
) -> List[DruidMetric]:
"""Return a list of metrics that are post aggregations"""
postagg_metrics = [
metrics_dict[name]
for name in postagg_names
if metrics_dict[name].metric_type == POST_AGG_TYPE
]
# Remove post aggregations that were found
for postagg in postagg_metrics:
postagg_names.remove(postagg.metric_name)
return postagg_metrics
@staticmethod
def recursive_get_fields(_conf: Dict[str, Any]) -> List[str]:
_type = _conf.get("type")
_field = _conf.get("field")
_fields = _conf.get("fields")
field_names = []
if _type in ["fieldAccess", "hyperUniqueCardinality", "quantile", "quantiles"]:
field_names.append(_conf.get("fieldName", ""))
if _field:
field_names += DruidDatasource.recursive_get_fields(_field)
if _fields:
for _f in _fields:
field_names += DruidDatasource.recursive_get_fields(_f)
return list(set(field_names))
@staticmethod
def resolve_postagg(
postagg: DruidMetric,
post_aggs: Dict[str, Any],
agg_names: Set[str],
visited_postaggs: Set[str],
metrics_dict: Dict[str, DruidMetric],
) -> None:
mconf = postagg.json_obj
required_fields = set(
DruidDatasource.recursive_get_fields(mconf) + mconf.get("fieldNames", [])
)
# Check if the fields are already in aggs
# or is a previous postagg
required_fields = set(
field
for field in required_fields
if field not in visited_postaggs and field not in agg_names
)
# First try to find postaggs that match
if len(required_fields) > 0:
missing_postaggs = DruidDatasource.find_postaggs_for(
required_fields, metrics_dict
)
for missing_metric in required_fields:
agg_names.add(missing_metric)
for missing_postagg in missing_postaggs:
# Add to visited first to avoid infinite recursion
# if post aggregations are cyclicly dependent
visited_postaggs.add(missing_postagg.metric_name)
for missing_postagg in missing_postaggs:
DruidDatasource.resolve_postagg(
missing_postagg,
post_aggs,
agg_names,
visited_postaggs,
metrics_dict,
)
post_aggs[postagg.metric_name] = DruidDatasource.get_post_agg(postagg.json_obj)
@staticmethod
def metrics_and_post_aggs(
metrics: List[Metric], metrics_dict: Dict[str, DruidMetric]
) -> Tuple["OrderedDict[str, Any]", "OrderedDict[str, Any]"]:
# Separate metrics into those that are aggregations
# and those that are post aggregations
saved_agg_names = set()
adhoc_agg_configs = []
postagg_names = []
for metric in metrics:
if isinstance(metric, dict) and utils.is_adhoc_metric(metric):
adhoc_agg_configs.append(metric)
elif isinstance(metric, str):
if metrics_dict[metric].metric_type != POST_AGG_TYPE:
saved_agg_names.add(metric)
else:
postagg_names.append(metric)
# Create the post aggregations, maintain order since postaggs
# may depend on previous ones
post_aggs: "OrderedDict[str, Postaggregator]" = OrderedDict()
visited_postaggs = set()
for postagg_name in postagg_names:
postagg = metrics_dict[postagg_name]
visited_postaggs.add(postagg_name)
DruidDatasource.resolve_postagg(
postagg, post_aggs, saved_agg_names, visited_postaggs, metrics_dict
)
aggs = DruidDatasource.get_aggregations(
metrics_dict, saved_agg_names, adhoc_agg_configs
)
return aggs, post_aggs
def values_for_column(self, column_name: str, limit: int = 10000) -> List[Any]:
"""Retrieve some values for the given column"""
logger.info(
"Getting values for columns [{}] limited to [{}]".format(column_name, limit)
)
# TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid
if self.fetch_values_from:
from_dttm = parse_human_datetime(self.fetch_values_from)
assert from_dttm
else:
from_dttm = datetime(1970, 1, 1)
qry = dict(
datasource=self.datasource_name,
granularity="all",
intervals=from_dttm.isoformat() + "/" + datetime.now().isoformat(),
aggregations=dict(count=count("count")),
dimension=column_name,
metric="count",
threshold=limit,
)
client = self.cluster.get_pydruid_client()
client.topn(**qry)
df = client.export_pandas()
return df[column_name].to_list()
def get_query_str(
self,
query_obj: QueryObjectDict,
phase: int = 1,
client: Optional["PyDruid"] = None,
) -> str:
return self.run_query(client=client, phase=phase, **query_obj)
def _add_filter_from_pre_query_data(
self, df: pd.DataFrame, dimensions: List[Any], dim_filter: "Filter"
) -> "Filter":
ret = dim_filter
if not df.empty:
new_filters = []
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = None
# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
if isinstance(dim, dict) and "extractionFn" in dim:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(
dim
)
dim_val = dim["outputName"]
f = Filter(
dimension=col,
value=row[dim_val],
extraction_function=extraction_fn,
)
elif isinstance(dim, dict):
dim_val = dim["outputName"]
if dim_val:
f = Dimension(dim_val) == row[dim_val]
else:
f = Dimension(dim) == row[dim]
if f:
fields.append(f)
if len(fields) > 1:
term = Filter(type="and", fields=fields)
new_filters.append(term)
elif fields:
new_filters.append(fields[0])
if new_filters:
ff = Filter(type="or", fields=new_filters)
if not dim_filter:
ret = ff
else:
ret = Filter(type="and", fields=[ff, dim_filter])
return ret
@staticmethod
def druid_type_from_adhoc_metric(adhoc_metric: Dict[str, Any]) -> str:
column_type = adhoc_metric["column"]["type"].lower()
aggregate = adhoc_metric["aggregate"].lower()
if aggregate == "count":
return "count"
if aggregate == "count_distinct":
return "hyperUnique" if column_type == "hyperunique" else "cardinality"
else:
return column_type + aggregate.capitalize()
@staticmethod
def get_aggregations(
metrics_dict: Dict[str, Any],
saved_metrics: Set[str],
adhoc_metrics: Optional[List[Dict[str, Any]]] = None,
) -> "OrderedDict[str, Any]":
"""
Returns a dictionary of aggregation metric names to aggregation json objects
:param metrics_dict: dictionary of all the metrics
:param saved_metrics: list of saved metric names
:param adhoc_metrics: list of adhoc metric names
:raise SupersetException: if one or more metric names are not aggregations
"""
if not adhoc_metrics:
adhoc_metrics = []
aggregations = OrderedDict()
invalid_metric_names = []
for metric_name in saved_metrics:
if metric_name in metrics_dict:
metric = metrics_dict[metric_name]
if metric.metric_type == POST_AGG_TYPE:
invalid_metric_names.append(metric_name)
else:
aggregations[metric_name] = metric.json_obj
else:
invalid_metric_names.append(metric_name)
if len(invalid_metric_names) > 0:
raise SupersetException(
_("Metric(s) {} must be aggregations.").format(invalid_metric_names)
)
for adhoc_metric in adhoc_metrics:
aggregations[adhoc_metric["label"]] = {
"fieldName": adhoc_metric["column"]["column_name"],
"fieldNames": [adhoc_metric["column"]["column_name"]],
"type": DruidDatasource.druid_type_from_adhoc_metric(adhoc_metric),
"name": adhoc_metric["label"],
}
return aggregations
def get_dimensions(
self, columns: List[str], columns_dict: Dict[str, DruidColumn]
) -> List[Union[str, Dict[str, Any]]]:
dimensions = []
columns = [col for col in columns if col in columns_dict]
for column_name in columns:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec if col else None
dimensions.append(dim_spec or column_name)
return dimensions
def intervals_from_dttms(self, from_dttm: datetime, to_dttm: datetime) -> str:
# Couldn't find a way to just not filter on time...
from_dttm = from_dttm or datetime(1901, 1, 1)
to_dttm = to_dttm or datetime(2101, 1, 1)
# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=DRUID_TZ)
to_dttm = to_dttm.replace(tzinfo=DRUID_TZ)
return "{}/{}".format(
from_dttm.isoformat() if from_dttm else "",
to_dttm.isoformat() if to_dttm else "",
)
@staticmethod
def _dimensions_to_values(
dimensions: List[Union[Dict[str, str], str]]
) -> List[Union[Dict[str, str], str]]:
"""
Replace dimensions specs with their `dimension`
values, and ignore those without
"""
values: List[Union[Dict[str, str], str]] = []
for dimension in dimensions:
if isinstance(dimension, dict):
if "extractionFn" in dimension:
values.append(dimension)
elif "dimension" in dimension:
values.append(dimension["dimension"])
else:
values.append(dimension)
return values
@staticmethod
def sanitize_metric_object(metric: Metric) -> None:
"""
Update a metric with the correct type if necessary.
:param dict metric: The metric to sanitize
"""
if (
utils.is_adhoc_metric(metric)
and metric["column"]["type"].upper() == "FLOAT" # type: ignore
):
metric["column"]["type"] = "DOUBLE" # type: ignore
def run_query( # druid
self,
metrics: List[Metric],
granularity: str,
from_dttm: datetime,
to_dttm: datetime,
columns: Optional[List[str]] = None,
groupby: Optional[List[str]] = None,
filter: Optional[List[Dict[str, Any]]] = None,
is_timeseries: Optional[bool] = True,
timeseries_limit: Optional[int] = None,
timeseries_limit_metric: Optional[Metric] = None,
row_limit: Optional[int] = None,
row_offset: Optional[int] = None,
inner_from_dttm: Optional[datetime] = None,
inner_to_dttm: Optional[datetime] = None,
orderby: Optional[Any] = None,
extras: Optional[Dict[str, Any]] = None,
phase: int = 2,
client: Optional["PyDruid"] = None,
order_desc: bool = True,
is_rowcount: bool = False,
apply_fetch_values_predicate: bool = False,
) -> str:
"""Runs a query against Druid and returns a dataframe."""
# is_rowcount and apply_fetch_values_predicate is only
# supported on SQL connector
if is_rowcount:
raise SupersetException("is_rowcount is not supported on Druid connector")
if apply_fetch_values_predicate:
raise SupersetException(
"apply_fetch_values_predicate is not supported on Druid connector"
)
# TODO refactor into using a TBD Query object
client = client or self.cluster.get_pydruid_client()
row_limit = row_limit or conf.get("ROW_LIMIT")
if row_offset:
raise SupersetException("Offset not implemented for Druid connector")
if not is_timeseries:
granularity = "all"
if granularity == "all":
phase = 1
inner_from_dttm = inner_from_dttm or from_dttm
inner_to_dttm = inner_to_dttm or to_dttm
timezone = from_dttm.replace(tzinfo=DRUID_TZ).tzname() if from_dttm else None
query_str = ""
metrics_dict = {m.metric_name: m for m in self.metrics}
columns_dict = {c.column_name: c for c in self.columns}
if self.cluster and LooseVersion(
self.cluster.get_druid_version()
) < LooseVersion("0.11.0"):
for metric in metrics:
self.sanitize_metric_object(metric)
if timeseries_limit_metric:
self.sanitize_metric_object(timeseries_limit_metric)
aggregations, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
# the dimensions list with dimensionSpecs expanded
dimensions = self.get_dimensions(groupby, columns_dict) if groupby else []
extras = extras or {}
qry = dict(
datasource=self.datasource_name,
dimensions=dimensions,
aggregations=aggregations,
granularity=DruidDatasource.granularity(
granularity, timezone=timezone, origin=extras.get("druid_time_origin")
),
post_aggregations=post_aggs,
intervals=self.intervals_from_dttms(from_dttm, to_dttm),
)
if is_timeseries:
qry["context"] = dict(skipEmptyBuckets=True)
filters = (
DruidDatasource.get_filters(filter, self.num_cols, columns_dict)
if filter
else None
)
if filters:
qry["filter"] = filters
if "having_druid" in extras:
having_filters = self.get_having_filters(extras["having_druid"])
if having_filters:
qry["having"] = having_filters
else:
having_filters = None
order_direction = "descending" if order_desc else "ascending"
if columns:
columns.append("__time")
del qry["post_aggregations"]
del qry["aggregations"]
del qry["dimensions"]
qry["columns"] = columns
qry["metrics"] = []
qry["granularity"] = "all"
qry["limit"] = row_limit
client.scan(**qry)
elif not groupby and not having_filters:
logger.info("Running timeseries query for no groupby values")
del qry["dimensions"]
client.timeseries(**qry)
elif not having_filters and order_desc and (groupby and len(groupby) == 1):
dim = list(qry["dimensions"])[0]
logger.info("Running two-phase topn query for dimension [{}]".format(dim))
pre_qry = deepcopy(qry)
order_by: Optional[str] = None
if timeseries_limit_metric:
order_by = utils.get_metric_name(timeseries_limit_metric)
aggs_dict, post_aggs_dict = DruidDatasource.metrics_and_post_aggs(
[timeseries_limit_metric], metrics_dict
)
if phase == 1:
pre_qry["aggregations"].update(aggs_dict)
pre_qry["post_aggregations"].update(post_aggs_dict)
else:
pre_qry["aggregations"] = aggs_dict
pre_qry["post_aggregations"] = post_aggs_dict
else:
agg_keys = qry["aggregations"].keys()
order_by = list(agg_keys)[0] if agg_keys else None
# Limit on the number of timeseries, doing a two-phases query
pre_qry["granularity"] = "all"
pre_qry["threshold"] = min(row_limit, timeseries_limit or row_limit)
pre_qry["metric"] = order_by
pre_qry["dimension"] = self._dimensions_to_values(qry["dimensions"])[0]
del pre_qry["dimensions"]
client.topn(**pre_qry)
logger.info("Phase 1 Complete")
if phase == 2:
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2
)
query_str += "\n"
if phase == 1:
return query_str
query_str += "// Phase 2 (built based on phase one's results)\n"
df = client.export_pandas()
if df is None:
df = pd.DataFrame()
qry["filter"] = self._add_filter_from_pre_query_data(
df, [pre_qry["dimension"]], filters
)
qry["threshold"] = timeseries_limit or 1000
if row_limit and granularity == "all":
qry["threshold"] = row_limit
qry["dimension"] = dim
del qry["dimensions"]
qry["metric"] = list(qry["aggregations"].keys())[0]
client.topn(**qry)
logger.info("Phase 2 Complete")
elif having_filters or groupby:
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
logger.info("Running groupby query for dimensions [{}]".format(dimensions))
if timeseries_limit and is_timeseries:
logger.info("Running two-phase query for timeseries")
pre_qry = deepcopy(qry)
pre_qry_dims = self._dimensions_to_values(qry["dimensions"])
# Can't use set on an array with dicts
# Use set with non-dict items only
non_dict_dims = list(
set([x for x in pre_qry_dims if not isinstance(x, dict)])
)
dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
pre_qry["dimensions"] = non_dict_dims + dict_dims # type: ignore
order_by = None
if metrics:
order_by = utils.get_metric_name(metrics[0])
else:
order_by = pre_qry_dims[0] # type: ignore
if timeseries_limit_metric:
order_by = utils.get_metric_name(timeseries_limit_metric)
aggs_dict, post_aggs_dict = DruidDatasource.metrics_and_post_aggs(
[timeseries_limit_metric], metrics_dict
)
if phase == 1:
pre_qry["aggregations"].update(aggs_dict)
pre_qry["post_aggregations"].update(post_aggs_dict)
else:
pre_qry["aggregations"] = aggs_dict
pre_qry["post_aggregations"] = post_aggs_dict
# Limit on the number of timeseries, doing a two-phases query
pre_qry["granularity"] = "all"
pre_qry["limit_spec"] = {
"type": "default",
"limit": min(timeseries_limit, row_limit),
"intervals": self.intervals_from_dttms(
inner_from_dttm, inner_to_dttm
),
"columns": [{"dimension": order_by, "direction": order_direction}],
}
client.groupby(**pre_qry)
logger.info("Phase 1 Complete")
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2
)
query_str += "\n"
if phase == 1:
return query_str
query_str += "// Phase 2 (built based on phase one's results)\n"
df = client.export_pandas()
if df is None:
df = pd.DataFrame()
qry["filter"] = self._add_filter_from_pre_query_data(
df, pre_qry["dimensions"], filters
)
qry["limit_spec"] = None
if row_limit:
dimension_values = self._dimensions_to_values(dimensions)
qry["limit_spec"] = {
"type": "default",
"limit": row_limit,
"columns": [
{
"dimension": (
utils.get_metric_name(metrics[0])
if metrics
else dimension_values[0]
),
"direction": order_direction,
}
],
}
client.groupby(**qry)
logger.info("Query Complete")
query_str += json.dumps(client.query_builder.last_query.query_dict, indent=2)
return query_str
@staticmethod
def homogenize_types(df: pd.DataFrame, columns: Iterable[str]) -> pd.DataFrame:
"""Converting all columns to strings
When grouping by a numeric (say FLOAT) column, pydruid returns
strings in the dataframe. This creates issues downstream related
to having mixed types in the dataframe
Here we replace None with <NULL> and make the whole series a
str instead of an object.
"""
df[columns] = df[columns].fillna(NULL_STRING).astype("unicode")
return df
def query(self, query_obj: QueryObjectDict) -> QueryResult:
qry_start_dttm = datetime.now()
client = self.cluster.get_pydruid_client()
query_str = self.get_query_str(client=client, query_obj=query_obj, phase=2)
df = client.export_pandas()
if df is None:
df = pd.DataFrame()
if df.empty:
return QueryResult(
df=df, query=query_str, duration=datetime.now() - qry_start_dttm
)
df = self.homogenize_types(df, query_obj.get("groupby", []))
df.columns = [
DTTM_ALIAS if c in ("timestamp", "__time") else c for c in df.columns
]
is_timeseries = (
query_obj["is_timeseries"] if "is_timeseries" in query_obj else True
)
if not is_timeseries and DTTM_ALIAS in df.columns:
del df[DTTM_ALIAS]
# Reordering columns
cols: List[str] = []
if DTTM_ALIAS in df.columns:
cols += [DTTM_ALIAS]
cols += query_obj.get("groupby") or []
cols += query_obj.get("columns") or []
cols += query_obj.get("metrics") or []
cols = utils.get_metric_names(cols)
cols = [col for col in cols if col in df.columns]
df = df[cols]
time_offset = DruidDatasource.time_offset(query_obj["granularity"])
def increment_timestamp(ts: str) -> datetime:
dt = parse_human_datetime(ts).replace(tzinfo=DRUID_TZ)
return dt + timedelta(milliseconds=time_offset)
if DTTM_ALIAS in df.columns and time_offset:
df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(increment_timestamp)
return QueryResult(
df=df, query=query_str, duration=datetime.now() - qry_start_dttm
)
@staticmethod
def _create_extraction_fn(
dim_spec: Dict[str, Any]
) -> Tuple[
str,
Union[
"MapLookupExtraction",
"RegexExtraction",
"RegisteredLookupExtraction",
"TimeFormatExtraction",
],
]:
extraction_fn = None
if dim_spec and "extractionFn" in dim_spec:
col = dim_spec["dimension"]
fn = dim_spec["extractionFn"]
ext_type = fn.get("type")
if ext_type == "lookup" and fn["lookup"].get("type") == "map":
replace_missing_values = fn.get("replaceMissingValueWith")
retain_missing_values = fn.get("retainMissingValue", False)
injective = fn.get("isOneToOne", False)
extraction_fn = MapLookupExtraction(
fn["lookup"]["map"],
replace_missing_values=replace_missing_values,
retain_missing_values=retain_missing_values,
injective=injective,
)
elif ext_type == "regex":
extraction_fn = RegexExtraction(fn["expr"])
elif ext_type == "registeredLookup":
extraction_fn = RegisteredLookupExtraction(fn.get("lookup"))
elif ext_type == "timeFormat":
extraction_fn = TimeFormatExtraction(
fn.get("format"), fn.get("locale"), fn.get("timeZone")
)
else:
raise Exception(_("Unsupported extraction function: " + ext_type))
return (col, extraction_fn)
@classmethod
def get_filters(
cls,
raw_filters: List[Dict[str, Any]],
num_cols: List[str],
columns_dict: Dict[str, DruidColumn],
) -> "Filter":
"""Given Superset filter data structure, returns pydruid Filter(s)"""
filters = None
for flt in raw_filters:
col: Optional[str] = flt.get("col")
op: Optional[str] = flt["op"].upper() if "op" in flt else None
eq: Optional[FilterValues] = flt.get("val")
if (
not col
or not op
or (
eq is None
and op
not in (
FilterOperator.IS_NULL.value,
FilterOperator.IS_NOT_NULL.value,
)
)
):
continue
# Check if this dimension uses an extraction function
# If so, create the appropriate pydruid extraction object
column_def = columns_dict.get(col)
dim_spec = column_def.dimension_spec if column_def else None
extraction_fn = None
if dim_spec and "extractionFn" in dim_spec:
(col, extraction_fn) = DruidDatasource._create_extraction_fn(dim_spec)
cond = None
is_numeric_col = col in num_cols
is_list_target = op in (
FilterOperator.IN.value,
FilterOperator.NOT_IN.value,
)
eq = cls.filter_values_handler(
eq,
is_list_target=is_list_target,
target_column_type=utils.GenericDataType.NUMERIC
if is_numeric_col
else utils.GenericDataType.STRING,
)
# For these two ops, could have used Dimension,
# but it doesn't support extraction functions
if op == FilterOperator.EQUALS.value:
cond = Filter(
dimension=col, value=eq, extraction_function=extraction_fn
)
elif op == FilterOperator.NOT_EQUALS.value:
cond = ~Filter(
dimension=col, value=eq, extraction_function=extraction_fn
)
elif is_list_target:
eq = cast(List[Any], eq)
fields = []
# ignore the filter if it has no value
if not len(eq):
continue
# if it uses an extraction fn, use the "in" operator
# as Dimension isn't supported
elif extraction_fn is not None:
cond = Filter(
dimension=col,
values=eq,
type="in",
extraction_function=extraction_fn,
)
elif len(eq) == 1:
cond = Dimension(col) == eq[0]
else:
for s in eq:
fields.append(Dimension(col) == s)
cond = Filter(type="or", fields=fields)
if op == FilterOperator.NOT_IN.value:
cond = ~cond
elif op == FilterOperator.REGEX.value:
cond = Filter(
extraction_function=extraction_fn,
type="regex",
pattern=eq,
dimension=col,
)
# For the ops below, could have used pydruid's Bound,
# but it doesn't support extraction functions
elif op == FilterOperator.GREATER_THAN_OR_EQUALS.value:
cond = Bound(
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=eq,
upper=None,
ordering=cls._get_ordering(is_numeric_col),
)
elif op == FilterOperator.LESS_THAN_OR_EQUALS.value:
cond = Bound(
extraction_function=extraction_fn,
dimension=col,
lowerStrict=False,
upperStrict=False,
lower=None,
upper=eq,
ordering=cls._get_ordering(is_numeric_col),
)
elif op == FilterOperator.GREATER_THAN.value:
cond = Bound(
extraction_function=extraction_fn,
lowerStrict=True,
upperStrict=False,
dimension=col,
lower=eq,
upper=None,
ordering=cls._get_ordering(is_numeric_col),
)
elif op == FilterOperator.LESS_THAN.value:
cond = Bound(
extraction_function=extraction_fn,
upperStrict=True,
lowerStrict=False,
dimension=col,
lower=None,
upper=eq,
ordering=cls._get_ordering(is_numeric_col),
)
elif op == FilterOperator.IS_NULL.value:
cond = Filter(dimension=col, value="")
elif op == FilterOperator.IS_NOT_NULL.value:
cond = ~Filter(dimension=col, value="")
if filters:
filters = Filter(type="and", fields=[cond, filters])
else:
filters = cond
return filters
@staticmethod
def _get_ordering(is_numeric_col: bool) -> str:
return "numeric" if is_numeric_col else "lexicographic"
def _get_having_obj(self, col: str, op: str, eq: str) -> "Having":
cond = None
if op == FilterOperator.EQUALS.value:
if col in self.column_names:
cond = DimSelector(dimension=col, value=eq)
else:
cond = Aggregation(col) == eq
elif op == FilterOperator.GREATER_THAN.value:
cond = Aggregation(col) > eq
elif op == FilterOperator.LESS_THAN.value:
cond = Aggregation(col) < eq
return cond
def get_having_filters(
self, raw_filters: List[Dict[str, Any]]
) -> Optional["Having"]:
filters = None
reversed_op_map = {
FilterOperator.NOT_EQUALS.value: FilterOperator.EQUALS.value,
FilterOperator.GREATER_THAN_OR_EQUALS.value: FilterOperator.LESS_THAN.value,
FilterOperator.LESS_THAN_OR_EQUALS.value: FilterOperator.GREATER_THAN.value,
}
for flt in raw_filters:
if not all(f in flt for f in ["col", "op", "val"]):
continue
col = flt["col"]
op = flt["op"]
eq = flt["val"]
cond = None
if op in [
FilterOperator.EQUALS.value,
FilterOperator.GREATER_THAN.value,
FilterOperator.LESS_THAN.value,
]:
cond = self._get_having_obj(col, op, eq)
elif op in reversed_op_map:
cond = ~self._get_having_obj(col, reversed_op_map[op], eq)
if filters:
filters = filters & cond
else:
filters = cond
return filters
@classmethod
def query_datasources_by_name(
cls,
session: Session,
database: Database,
datasource_name: str,
schema: Optional[str] = None,
) -> List["DruidDatasource"]:
return []
def external_metadata(self) -> List[Dict[str, Any]]:
self.merge_flag = True
latest_metadata = self.latest_metadata() or {}
return [{"name": k, "type": v.get("type")} for k, v in latest_metadata.items()]
sa.event.listen(DruidDatasource, "after_insert", security_manager.set_perm)
sa.event.listen(DruidDatasource, "after_update", security_manager.set_perm)