blob: acb1951c98ba23b245a0a807f2ec90094b2c9b9d [file] [log] [blame]
# pylint: disable=invalid-unary-operand-type
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timedelta
import json
import logging
from multiprocessing.pool import ThreadPool
from dateutil.parser import parse as dparse
from flask import escape, Markup
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from flask_babel import lazy_gettext as _
from pydruid.client import PyDruid
from pydruid.utils.aggregators import count
from pydruid.utils.filters import Bound, Dimension, Filter
from pydruid.utils.having import Aggregation
from pydruid.utils.postaggregator import (
Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
)
import requests
from six import string_types
import sqlalchemy as sa
from sqlalchemy import (
Boolean, Column, DateTime, ForeignKey, Integer, or_, String, Text, UniqueConstraint,
)
from sqlalchemy.orm import backref, relationship
from superset import conf, db, import_util, sm, utils
from superset.connectors.base.models import BaseColumn, BaseDatasource, BaseMetric
from superset.models.helpers import (
AuditMixinNullable, ImportMixin, QueryResult, set_perm,
)
from superset.utils import (
DimSelector, DTTM_ALIAS, flasher, MetricPermException,
)
DRUID_TZ = conf.get('DRUID_TZ')
# Function wrapper because bound methods cannot
# be passed to processes
def _fetch_metadata_for(datasource):
return datasource.latest_metadata()
class JavascriptPostAggregator(Postaggregator):
def __init__(self, name, field_names, function):
self.post_aggregator = {
'type': 'javascript',
'fieldNames': field_names,
'name': name,
'function': function,
}
self.name = name
class CustomPostAggregator(Postaggregator):
"""A way to allow users to specify completely custom PostAggregators"""
def __init__(self, name, post_aggregator):
self.name = name
self.post_aggregator = post_aggregator
class DruidCluster(Model, AuditMixinNullable, ImportMixin):
"""ORM object referencing the Druid clusters"""
__tablename__ = 'clusters'
type = 'druid'
id = Column(Integer, primary_key=True)
verbose_name = Column(String(250), unique=True)
# short unique name, used in permissions
cluster_name = Column(String(250), unique=True)
coordinator_host = Column(String(255))
coordinator_port = Column(Integer, default=8081)
coordinator_endpoint = Column(
String(255), default='druid/coordinator/v1/metadata')
broker_host = Column(String(255))
broker_port = Column(Integer, default=8082)
broker_endpoint = Column(String(255), default='druid/v2')
metadata_last_refreshed = Column(DateTime)
cache_timeout = Column(Integer)
export_fields = ('cluster_name', 'coordinator_host', 'coordinator_port',
'coordinator_endpoint', 'broker_host', 'broker_port',
'broker_endpoint', 'cache_timeout')
export_children = ['datasources']
def __repr__(self):
return self.verbose_name if self.verbose_name else self.cluster_name
def get_pydruid_client(self):
cli = PyDruid(
'http://{0}:{1}/'.format(self.broker_host, self.broker_port),
self.broker_endpoint)
return cli
def get_datasources(self):
endpoint = (
'http://{obj.coordinator_host}:{obj.coordinator_port}/'
'{obj.coordinator_endpoint}/datasources'
).format(obj=self)
return json.loads(requests.get(endpoint).text)
def get_druid_version(self):
endpoint = (
'http://{obj.coordinator_host}:{obj.coordinator_port}/status'
).format(obj=self)
return json.loads(requests.get(endpoint).text)['version']
def refresh_datasources(
self,
datasource_name=None,
merge_flag=True,
refreshAll=True):
"""Refresh metadata of all datasources in the cluster
If ``datasource_name`` is specified, only that datasource is updated
"""
self.druid_version = self.get_druid_version()
ds_list = self.get_datasources()
blacklist = conf.get('DRUID_DATA_SOURCE_BLACKLIST', [])
ds_refresh = []
if not datasource_name:
ds_refresh = list(filter(lambda ds: ds not in blacklist, ds_list))
elif datasource_name not in blacklist and datasource_name in ds_list:
ds_refresh.append(datasource_name)
else:
return
self.refresh(ds_refresh, merge_flag, refreshAll)
def refresh(self, datasource_names, merge_flag, refreshAll):
"""
Fetches metadata for the specified datasources andm
merges to the Superset database
"""
session = db.session
ds_list = (
session.query(DruidDatasource)
.filter(or_(DruidDatasource.datasource_name == name
for name in datasource_names))
)
ds_map = {ds.name: ds for ds in ds_list}
for ds_name in datasource_names:
datasource = ds_map.get(ds_name, None)
if not datasource:
datasource = DruidDatasource(datasource_name=ds_name)
with session.no_autoflush:
session.add(datasource)
flasher(
'Adding new datasource [{}]'.format(ds_name), 'success')
ds_map[ds_name] = datasource
elif refreshAll:
flasher(
'Refreshing datasource [{}]'.format(ds_name), 'info')
else:
del ds_map[ds_name]
continue
datasource.cluster = self
datasource.merge_flag = merge_flag
session.flush()
# Prepare multithreaded executation
pool = ThreadPool()
ds_refresh = list(ds_map.values())
metadata = pool.map(_fetch_metadata_for, ds_refresh)
pool.close()
pool.join()
for i in range(0, len(ds_refresh)):
datasource = ds_refresh[i]
cols = metadata[i]
if cols:
col_objs_list = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(or_(DruidColumn.column_name == col for col in cols))
)
col_objs = {col.column_name: col for col in col_objs_list}
for col in cols:
if col == '__time': # skip the time column
continue
col_obj = col_objs.get(col, None)
if not col_obj:
col_obj = DruidColumn(
datasource_id=datasource.id,
column_name=col)
with session.no_autoflush:
session.add(col_obj)
datatype = cols[col]['type']
if datatype == 'STRING':
col_obj.groupby = True
col_obj.filterable = True
if datatype == 'hyperUnique' or datatype == 'thetaSketch':
col_obj.count_distinct = True
# Allow sum/min/max for long or double
if datatype == 'LONG' or datatype == 'DOUBLE':
col_obj.sum = True
col_obj.min = True
col_obj.max = True
col_obj.type = datatype
col_obj.datasource = datasource
datasource.generate_metrics_for(col_objs_list)
session.commit()
@property
def perm(self):
return '[{obj.cluster_name}].(id:{obj.id})'.format(obj=self)
def get_perm(self):
return self.perm
@property
def name(self):
return self.verbose_name if self.verbose_name else self.cluster_name
@property
def unique_name(self):
return self.verbose_name if self.verbose_name else self.cluster_name
class DruidColumn(Model, BaseColumn):
"""ORM model for storing Druid datasource column metadata"""
__tablename__ = 'columns'
__table_args__ = (UniqueConstraint('column_name', 'datasource_id'),)
datasource_id = Column(
Integer,
ForeignKey('datasources.id'))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
'DruidDatasource',
backref=backref('columns', cascade='all, delete-orphan'),
enable_typechecks=False)
dimension_spec_json = Column(Text)
export_fields = (
'datasource_id', 'column_name', 'is_active', 'type', 'groupby',
'count_distinct', 'sum', 'avg', 'max', 'min', 'filterable',
'description', 'dimension_spec_json', 'verbose_name',
)
export_parent = 'datasource'
def __repr__(self):
return self.column_name
@property
def expression(self):
return self.dimension_spec_json
@property
def dimension_spec(self):
if self.dimension_spec_json:
return json.loads(self.dimension_spec_json)
def get_metrics(self):
metrics = {}
metrics['count'] = DruidMetric(
metric_name='count',
verbose_name='COUNT(*)',
metric_type='count',
json=json.dumps({'type': 'count', 'name': 'count'}),
)
# Somehow we need to reassign this for UDAFs
if self.type in ('DOUBLE', 'FLOAT'):
corrected_type = 'DOUBLE'
else:
corrected_type = self.type
if self.sum and self.is_num:
mt = corrected_type.lower() + 'Sum'
name = 'sum__' + self.column_name
metrics[name] = DruidMetric(
metric_name=name,
metric_type='sum',
verbose_name='SUM({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name}),
)
if self.avg and self.is_num:
mt = corrected_type.lower() + 'Avg'
name = 'avg__' + self.column_name
metrics[name] = DruidMetric(
metric_name=name,
metric_type='avg',
verbose_name='AVG({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name}),
)
if self.min and self.is_num:
mt = corrected_type.lower() + 'Min'
name = 'min__' + self.column_name
metrics[name] = DruidMetric(
metric_name=name,
metric_type='min',
verbose_name='MIN({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name}),
)
if self.max and self.is_num:
mt = corrected_type.lower() + 'Max'
name = 'max__' + self.column_name
metrics[name] = DruidMetric(
metric_name=name,
metric_type='max',
verbose_name='MAX({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name}),
)
if self.count_distinct:
name = 'count_distinct__' + self.column_name
if self.type == 'hyperUnique' or self.type == 'thetaSketch':
metrics[name] = DruidMetric(
metric_name=name,
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
metric_type=self.type,
json=json.dumps({
'type': self.type,
'name': name,
'fieldName': self.column_name,
}),
)
else:
metrics[name] = DruidMetric(
metric_name=name,
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
metric_type='count_distinct',
json=json.dumps({
'type': 'cardinality',
'name': name,
'fieldNames': [self.column_name]}),
)
return metrics
def generate_metrics(self):
"""Generate metrics based on the column metadata"""
metrics = self.get_metrics()
dbmetrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == self.datasource_id)
.filter(or_(
DruidMetric.metric_name == m for m in metrics
))
)
dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
for metric in metrics.values():
metric.datasource_id = self.datasource_id
if not dbmetrics.get(metric.metric_name, None):
db.session.add(metric)
@classmethod
def import_obj(cls, i_column):
def lookup_obj(lookup_column):
return db.session.query(DruidColumn).filter(
DruidColumn.datasource_id == lookup_column.datasource_id,
DruidColumn.column_name == lookup_column.column_name).first()
return import_util.import_simple_obj(db.session, i_column, lookup_obj)
class DruidMetric(Model, BaseMetric):
"""ORM object referencing Druid metrics for a datasource"""
__tablename__ = 'metrics'
__table_args__ = (UniqueConstraint('metric_name', 'datasource_id'),)
datasource_id = Column(
Integer,
ForeignKey('datasources.id'))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
'DruidDatasource',
backref=backref('metrics', cascade='all, delete-orphan'),
enable_typechecks=False)
json = Column(Text)
export_fields = (
'metric_name', 'verbose_name', 'metric_type', 'datasource_id',
'json', 'description', 'is_restricted', 'd3format',
)
export_parent = 'datasource'
@property
def expression(self):
return self.json
@property
def json_obj(self):
try:
obj = json.loads(self.json)
except Exception:
obj = {}
return obj
@property
def perm(self):
return (
'{parent_name}.[{obj.metric_name}](id:{obj.id})'
).format(obj=self,
parent_name=self.datasource.full_name,
) if self.datasource else None
@classmethod
def import_obj(cls, i_metric):
def lookup_obj(lookup_metric):
return db.session.query(DruidMetric).filter(
DruidMetric.datasource_id == lookup_metric.datasource_id,
DruidMetric.metric_name == lookup_metric.metric_name).first()
return import_util.import_simple_obj(db.session, i_metric, lookup_obj)
class DruidDatasource(Model, BaseDatasource):
"""ORM object referencing Druid datasources (tables)"""
__tablename__ = 'datasources'
__table_args__ = (UniqueConstraint('datasource_name', 'cluster_name'),)
type = 'druid'
query_langtage = 'json'
cluster_class = DruidCluster
metric_class = DruidMetric
column_class = DruidColumn
baselink = 'druiddatasourcemodelview'
# Columns
datasource_name = Column(String(255))
is_hidden = Column(Boolean, default=False)
fetch_values_from = Column(String(100))
cluster_name = Column(
String(250), ForeignKey('clusters.cluster_name'))
cluster = relationship(
'DruidCluster', backref='datasources', foreign_keys=[cluster_name])
user_id = Column(Integer, ForeignKey('ab_user.id'))
owner = relationship(
sm.user_model,
backref=backref('datasources', cascade='all, delete-orphan'),
foreign_keys=[user_id])
UniqueConstraint('cluster_name', 'datasource_name')
export_fields = (
'datasource_name', 'is_hidden', 'description', 'default_endpoint',
'cluster_name', 'offset', 'cache_timeout', 'params',
)
export_parent = 'cluster'
export_children = ['columns', 'metrics']
@property
def database(self):
return self.cluster
@property
def connection(self):
return str(self.database)
@property
def num_cols(self):
return [c.column_name for c in self.columns if c.is_num]
@property
def name(self):
return self.datasource_name
@property
def schema(self):
ds_name = self.datasource_name or ''
name_pieces = ds_name.split('.')
if len(name_pieces) > 1:
return name_pieces[0]
else:
return None
@property
def schema_perm(self):
"""Returns schema permission if present, cluster one otherwise."""
return utils.get_schema_perm(self.cluster, self.schema)
def get_perm(self):
return (
'[{obj.cluster_name}].[{obj.datasource_name}]'
'(id:{obj.id})').format(obj=self)
@property
def link(self):
name = escape(self.datasource_name)
return Markup('<a href="{self.url}">{name}</a>').format(**locals())
@property
def full_name(self):
return utils.get_datasource_full_name(
self.cluster_name, self.datasource_name)
@property
def time_column_grains(self):
return {
'time_columns': [
'all', '5 seconds', '30 seconds', '1 minute',
'5 minutes', '1 hour', '6 hour', '1 day', '7 days',
'week', 'week_starting_sunday', 'week_ending_saturday',
'month',
],
'time_grains': ['now'],
}
def __repr__(self):
return self.datasource_name
@renders('datasource_name')
def datasource_link(self):
url = '/superset/explore/{obj.type}/{obj.id}/'.format(obj=self)
name = escape(self.datasource_name)
return Markup('<a href="{url}">{name}</a>'.format(**locals()))
def get_metric_obj(self, metric_name):
return [
m.json_obj for m in self.metrics
if m.metric_name == metric_name
][0]
@classmethod
def import_obj(cls, i_datasource, import_time=None):
"""Imports the datasource from the object to the database.
Metrics and columns and datasource will be overridden if exists.
This function can be used to import/export dashboards between multiple
superset instances. Audit metadata isn't copies over.
"""
def lookup_datasource(d):
return db.session.query(DruidDatasource).filter(
DruidDatasource.datasource_name == d.datasource_name,
DruidCluster.cluster_name == d.cluster_name,
).first()
def lookup_cluster(d):
return db.session.query(DruidCluster).filter_by(
cluster_name=d.cluster_name).one()
return import_util.import_datasource(
db.session, i_datasource, lookup_cluster, lookup_datasource,
import_time)
@staticmethod
def version_higher(v1, v2):
"""is v1 higher than v2
>>> DruidDatasource.version_higher('0.8.2', '0.9.1')
False
>>> DruidDatasource.version_higher('0.8.2', '0.6.1')
True
>>> DruidDatasource.version_higher('0.8.2', '0.8.2')
False
>>> DruidDatasource.version_higher('0.8.2', '0.9.BETA')
False
>>> DruidDatasource.version_higher('0.8.2', '0.9')
False
"""
def int_or_0(v):
try:
v = int(v)
except (TypeError, ValueError):
v = 0
return v
v1nums = [int_or_0(n) for n in v1.split('.')]
v2nums = [int_or_0(n) for n in v2.split('.')]
v1nums = (v1nums + [0, 0, 0])[:3]
v2nums = (v2nums + [0, 0, 0])[:3]
return (
v1nums[0] > v2nums[0] or
(v1nums[0] == v2nums[0] and v1nums[1] > v2nums[1]) or
(v1nums[0] == v2nums[0] and v1nums[1] == v2nums[1] and
v1nums[2] > v2nums[2])
)
def latest_metadata(self):
"""Returns segment metadata from the latest segment"""
logging.info('Syncing datasource [{}]'.format(self.datasource_name))
client = self.cluster.get_pydruid_client()
try:
results = client.time_boundary(datasource=self.datasource_name)
except IOError:
results = None
if results:
max_time = results[0]['result']['maxTime']
max_time = dparse(max_time)
else:
max_time = datetime.now()
# Query segmentMetadata for 7 days back. However, due to a bug,
# we need to set this interval to more than 1 day ago to exclude
# realtime segments, which triggered a bug (fixed in druid 0.8.2).
# https://groups.google.com/forum/#!topic/druid-user/gVCqqspHqOQ
lbound = (max_time - timedelta(days=7)).isoformat()
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = (max_time - timedelta(1)).isoformat()
else:
rbound = max_time.isoformat()
segment_metadata = None
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound,
merge=self.merge_flag,
analysisTypes=[])
except Exception as e:
logging.warning('Failed first attempt to get latest segment')
logging.exception(e)
if not segment_metadata:
# if no segments in the past 7 days, look at all segments
lbound = datetime(1901, 1, 1).isoformat()[:10]
if not self.version_higher(self.cluster.druid_version, '0.8.2'):
rbound = datetime.now().isoformat()
else:
rbound = datetime(2050, 1, 1).isoformat()[:10]
try:
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=lbound + '/' + rbound,
merge=self.merge_flag,
analysisTypes=[])
except Exception as e:
logging.warning('Failed 2nd attempt to get latest segment')
logging.exception(e)
if segment_metadata:
return segment_metadata[-1]['columns']
def generate_metrics(self):
self.generate_metrics_for(self.columns)
def generate_metrics_for(self, columns):
metrics = {}
for col in columns:
metrics.update(col.get_metrics())
dbmetrics = (
db.session.query(DruidMetric)
.filter(DruidMetric.datasource_id == self.id)
.filter(or_(DruidMetric.metric_name == m for m in metrics))
)
dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
for metric in metrics.values():
metric.datasource_id = self.id
if not dbmetrics.get(metric.metric_name, None):
with db.session.no_autoflush:
db.session.add(metric)
@classmethod
def sync_to_db_from_config(
cls,
druid_config,
user,
cluster,
refresh=True):
"""Merges the ds config from druid_config into one stored in the db."""
session = db.session
datasource = (
session.query(cls)
.filter_by(datasource_name=druid_config['name'])
.first()
)
# Create a new datasource.
if not datasource:
datasource = cls(
datasource_name=druid_config['name'],
cluster=cluster,
owner=user,
changed_by_fk=user.id,
created_by_fk=user.id,
)
session.add(datasource)
elif not refresh:
return
dimensions = druid_config['dimensions']
col_objs = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(or_(DruidColumn.column_name == dim for dim in dimensions))
)
col_objs = {col.column_name: col for col in col_objs}
for dim in dimensions:
col_obj = col_objs.get(dim, None)
if not col_obj:
col_obj = DruidColumn(
datasource_id=datasource.id,
column_name=dim,
groupby=True,
filterable=True,
# TODO: fetch type from Hive.
type='STRING',
datasource=datasource,
)
session.add(col_obj)
# Import Druid metrics
metric_objs = (
session.query(DruidMetric)
.filter(DruidMetric.datasource_id == datasource.id)
.filter(or_(DruidMetric.metric_name == spec['name']
for spec in druid_config['metrics_spec']))
)
metric_objs = {metric.metric_name: metric for metric in metric_objs}
for metric_spec in druid_config['metrics_spec']:
metric_name = metric_spec['name']
metric_type = metric_spec['type']
metric_json = json.dumps(metric_spec)
if metric_type == 'count':
metric_type = 'longSum'
metric_json = json.dumps({
'type': 'longSum',
'name': metric_name,
'fieldName': metric_name,
})
metric_obj = metric_objs.get(metric_name, None)
if not metric_obj:
metric_obj = DruidMetric(
metric_name=metric_name,
metric_type=metric_type,
verbose_name='%s(%s)' % (metric_type, metric_name),
datasource=datasource,
json=metric_json,
description=(
'Imported from the airolap config dir for %s' %
druid_config['name']),
)
session.add(metric_obj)
session.commit()
@staticmethod
def time_offset(granularity):
if granularity == 'week_ending_saturday':
return 6 * 24 * 3600 * 1000 # 6 days
return 0
# uses https://en.wikipedia.org/wiki/ISO_8601
# http://druid.io/docs/0.8.0/querying/granularities.html
# TODO: pass origin from the UI
@staticmethod
def granularity(period_name, timezone=None, origin=None):
if not period_name or period_name == 'all':
return 'all'
iso_8601_dict = {
'5 seconds': 'PT5S',
'30 seconds': 'PT30S',
'1 minute': 'PT1M',
'5 minutes': 'PT5M',
'1 hour': 'PT1H',
'6 hour': 'PT6H',
'one day': 'P1D',
'1 day': 'P1D',
'7 days': 'P7D',
'week': 'P1W',
'week_starting_sunday': 'P1W',
'week_ending_saturday': 'P1W',
'month': 'P1M',
}
granularity = {'type': 'period'}
if timezone:
granularity['timeZone'] = timezone
if origin:
dttm = utils.parse_human_datetime(origin)
granularity['origin'] = dttm.isoformat()
if period_name in iso_8601_dict:
granularity['period'] = iso_8601_dict[period_name]
if period_name in ('week_ending_saturday', 'week_starting_sunday'):
# use Sunday as start of the week
granularity['origin'] = '2016-01-03T00:00:00'
elif not isinstance(period_name, string_types):
granularity['type'] = 'duration'
granularity['duration'] = period_name
elif period_name.startswith('P'):
# identify if the string is the iso_8601 period
granularity['period'] = period_name
else:
granularity['type'] = 'duration'
granularity['duration'] = utils.parse_human_timedelta(
period_name).total_seconds() * 1000
return granularity
@staticmethod
def get_post_agg(mconf):
"""
For a metric specified as `postagg` returns the
kind of post aggregation for pydruid.
"""
if mconf.get('type') == 'javascript':
return JavascriptPostAggregator(
name=mconf.get('name', ''),
field_names=mconf.get('fieldNames', []),
function=mconf.get('function', ''))
elif mconf.get('type') == 'quantile':
return Quantile(
mconf.get('name', ''),
mconf.get('probability', ''),
)
elif mconf.get('type') == 'quantiles':
return Quantiles(
mconf.get('name', ''),
mconf.get('probabilities', ''),
)
elif mconf.get('type') == 'fieldAccess':
return Field(mconf.get('name'))
elif mconf.get('type') == 'constant':
return Const(
mconf.get('value'),
output_name=mconf.get('name', ''),
)
elif mconf.get('type') == 'hyperUniqueCardinality':
return HyperUniqueCardinality(
mconf.get('name'),
)
elif mconf.get('type') == 'arithmetic':
return Postaggregator(
mconf.get('fn', '/'),
mconf.get('fields', []),
mconf.get('name', ''))
else:
return CustomPostAggregator(
mconf.get('name', ''),
mconf)
@staticmethod
def find_postaggs_for(postagg_names, metrics_dict):
"""Return a list of metrics that are post aggregations"""
postagg_metrics = [
metrics_dict[name] for name in postagg_names
if metrics_dict[name].metric_type == 'postagg'
]
# Remove post aggregations that were found
for postagg in postagg_metrics:
postagg_names.remove(postagg.metric_name)
return postagg_metrics
@staticmethod
def recursive_get_fields(_conf):
_type = _conf.get('type')
_field = _conf.get('field')
_fields = _conf.get('fields')
field_names = []
if _type in ['fieldAccess', 'hyperUniqueCardinality',
'quantile', 'quantiles']:
field_names.append(_conf.get('fieldName', ''))
if _field:
field_names += DruidDatasource.recursive_get_fields(_field)
if _fields:
for _f in _fields:
field_names += DruidDatasource.recursive_get_fields(_f)
return list(set(field_names))
@staticmethod
def resolve_postagg(postagg, post_aggs, agg_names, visited_postaggs, metrics_dict):
mconf = postagg.json_obj
required_fields = set(
DruidDatasource.recursive_get_fields(mconf)
+ mconf.get('fieldNames', []))
# Check if the fields are already in aggs
# or is a previous postagg
required_fields = set([
field for field in required_fields
if field not in visited_postaggs and field not in agg_names
])
# First try to find postaggs that match
if len(required_fields) > 0:
missing_postaggs = DruidDatasource.find_postaggs_for(
required_fields, metrics_dict)
for missing_metric in required_fields:
agg_names.add(missing_metric)
for missing_postagg in missing_postaggs:
# Add to visited first to avoid infinite recursion
# if post aggregations are cyclicly dependent
visited_postaggs.add(missing_postagg.metric_name)
for missing_postagg in missing_postaggs:
DruidDatasource.resolve_postagg(
missing_postagg, post_aggs, agg_names, visited_postaggs, metrics_dict)
post_aggs[postagg.metric_name] = DruidDatasource.get_post_agg(postagg.json_obj)
@staticmethod
def metrics_and_post_aggs(metrics, metrics_dict):
# Separate metrics into those that are aggregations
# and those that are post aggregations
agg_names = set()
postagg_names = []
for metric_name in metrics:
if metrics_dict[metric_name].metric_type != 'postagg':
agg_names.add(metric_name)
else:
postagg_names.append(metric_name)
# Create the post aggregations, maintain order since postaggs
# may depend on previous ones
post_aggs = OrderedDict()
visited_postaggs = set()
for postagg_name in postagg_names:
postagg = metrics_dict[postagg_name]
visited_postaggs.add(postagg_name)
DruidDatasource.resolve_postagg(
postagg, post_aggs, agg_names, visited_postaggs, metrics_dict)
return list(agg_names), post_aggs
def values_for_column(self,
column_name,
limit=10000):
"""Retrieve some values for the given column"""
# TODO: Use Lexicographic TopNMetricSpec once supported by PyDruid
if self.fetch_values_from:
from_dttm = utils.parse_human_datetime(self.fetch_values_from)
else:
from_dttm = datetime(1970, 1, 1)
qry = dict(
datasource=self.datasource_name,
granularity='all',
intervals=from_dttm.isoformat() + '/' + datetime.now().isoformat(),
aggregations=dict(count=count('count')),
dimension=column_name,
metric='count',
threshold=limit,
)
client = self.cluster.get_pydruid_client()
client.topn(**qry)
df = client.export_pandas()
return [row[column_name] for row in df.to_records(index=False)]
def get_query_str(self, query_obj, phase=1, client=None):
return self.run_query(client=client, phase=phase, **query_obj)
def _add_filter_from_pre_query_data(self, df, dimensions, dim_filter):
ret = dim_filter
if df is not None and not df.empty:
new_filters = []
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
f = Dimension(dim) == row[dim]
fields.append(f)
if len(fields) > 1:
term = Filter(type='and', fields=fields)
new_filters.append(term)
elif fields:
new_filters.append(fields[0])
if new_filters:
ff = Filter(type='or', fields=new_filters)
if not dim_filter:
ret = ff
else:
ret = Filter(type='and', fields=[ff, dim_filter])
return ret
def run_query( # noqa / druid
self,
groupby, metrics,
granularity,
from_dttm, to_dttm,
filter=None, # noqa
is_timeseries=True,
timeseries_limit=None,
timeseries_limit_metric=None,
row_limit=None,
inner_from_dttm=None, inner_to_dttm=None,
orderby=None,
extras=None, # noqa
select=None, # noqa
columns=None, phase=2, client=None, form_data=None,
order_desc=True):
"""Runs a query against Druid and returns a dataframe.
"""
# TODO refactor into using a TBD Query object
client = client or self.cluster.get_pydruid_client()
if not is_timeseries:
granularity = 'all'
inner_from_dttm = inner_from_dttm or from_dttm
inner_to_dttm = inner_to_dttm or to_dttm
# add tzinfo to native datetime with config
from_dttm = from_dttm.replace(tzinfo=DRUID_TZ)
to_dttm = to_dttm.replace(tzinfo=DRUID_TZ)
timezone = from_dttm.tzname()
query_str = ''
metrics_dict = {m.metric_name: m for m in self.metrics}
columns_dict = {c.column_name: c for c in self.columns}
all_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics,
metrics_dict)
aggregations = OrderedDict()
for m in self.metrics:
if m.metric_name in all_metrics:
aggregations[m.metric_name] = m.json_obj
rejected_metrics = [
m.metric_name for m in self.metrics
if m.is_restricted and
m.metric_name in aggregations.keys() and
not sm.has_access('metric_access', m.perm)
]
if rejected_metrics:
raise MetricPermException(
'Access to the metrics denied: ' + ', '.join(rejected_metrics),
)
# the dimensions list with dimensionSpecs expanded
dimensions = []
groupby = [gb for gb in groupby if gb in columns_dict]
for column_name in groupby:
col = columns_dict.get(column_name)
dim_spec = col.dimension_spec
if dim_spec:
dimensions.append(dim_spec)
else:
dimensions.append(column_name)
extras = extras or {}
qry = dict(
datasource=self.datasource_name,
dimensions=dimensions,
aggregations=aggregations,
granularity=DruidDatasource.granularity(
granularity,
timezone=timezone,
origin=extras.get('druid_time_origin'),
),
post_aggregations=post_aggs,
intervals=from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
filters = DruidDatasource.get_filters(filter, self.num_cols)
if filters:
qry['filter'] = filters
having_filters = self.get_having_filters(extras.get('having_druid'))
if having_filters:
qry['having'] = having_filters
order_direction = 'descending' if order_desc else 'ascending'
if len(groupby) == 0 and not having_filters:
del qry['dimensions']
client.timeseries(**qry)
elif (
not having_filters and
len(groupby) == 1 and
order_desc and
not isinstance(list(qry.get('dimensions'))[0], dict)
):
dim = list(qry.get('dimensions'))[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
else:
order_by = list(qry['aggregations'].keys())[0]
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = 'all'
pre_qry['threshold'] = min(row_limit,
timeseries_limit or row_limit)
pre_qry['metric'] = order_by
pre_qry['dimension'] = dim
del pre_qry['dimensions']
client.topn(**pre_qry)
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
query_str += '\n'
if phase == 1:
return query_str
query_str += (
"// Phase 2 (built based on phase one's results)\n")
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'], filters)
qry['threshold'] = timeseries_limit or 1000
if row_limit and granularity == 'all':
qry['threshold'] = row_limit
qry['dimension'] = list(qry.get('dimensions'))[0]
qry['dimension'] = dim
del qry['dimensions']
qry['metric'] = list(qry['aggregations'].keys())[0]
client.topn(**qry)
elif len(groupby) > 0:
# If grouping on multiple fields or using a having filter
# we have to force a groupby query
if timeseries_limit and is_timeseries:
order_by = metrics[0] if metrics else self.metrics[0]
if timeseries_limit_metric:
order_by = timeseries_limit_metric
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = 'all'
pre_qry['limit_spec'] = {
'type': 'default',
'limit': min(timeseries_limit, row_limit),
'intervals': (
inner_from_dttm.isoformat() + '/' +
inner_to_dttm.isoformat()),
'columns': [{
'dimension': order_by,
'direction': order_direction,
}],
}
client.groupby(**pre_qry)
query_str += '// Two phase query\n// Phase 1\n'
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
query_str += '\n'
if phase == 1:
return query_str
query_str += (
"// Phase 2 (built based on phase one's results)\n")
df = client.export_pandas()
qry['filter'] = self._add_filter_from_pre_query_data(
df,
qry['dimensions'],
filters,
)
qry['limit_spec'] = None
if row_limit:
qry['limit_spec'] = {
'type': 'default',
'limit': row_limit,
'columns': [{
'dimension': (
metrics[0] if metrics else self.metrics[0]),
'direction': order_direction,
}],
}
client.groupby(**qry)
query_str += json.dumps(
client.query_builder.last_query.query_dict, indent=2)
return query_str
def query(self, query_obj):
qry_start_dttm = datetime.now()
client = self.cluster.get_pydruid_client()
query_str = self.get_query_str(
client=client, query_obj=query_obj, phase=2)
df = client.export_pandas()
if df is None or df.size == 0:
raise Exception(_('No data was returned.'))
df.columns = [
DTTM_ALIAS if c == 'timestamp' else c for c in df.columns]
is_timeseries = query_obj['is_timeseries'] \
if 'is_timeseries' in query_obj else True
if (
not is_timeseries and
DTTM_ALIAS in df.columns):
del df[DTTM_ALIAS]
# Reordering columns
cols = []
if DTTM_ALIAS in df.columns:
cols += [DTTM_ALIAS]
cols += [col for col in query_obj['groupby'] if col in df.columns]
cols += [col for col in query_obj['metrics'] if col in df.columns]
df = df[cols]
time_offset = DruidDatasource.time_offset(query_obj['granularity'])
def increment_timestamp(ts):
dt = utils.parse_human_datetime(ts).replace(
tzinfo=DRUID_TZ)
return dt + timedelta(milliseconds=time_offset)
if DTTM_ALIAS in df.columns and time_offset:
df[DTTM_ALIAS] = df[DTTM_ALIAS].apply(increment_timestamp)
return QueryResult(
df=df,
query=query_str,
duration=datetime.now() - qry_start_dttm)
@staticmethod
def get_filters(raw_filters, num_cols): # noqa
filters = None
for flt in raw_filters:
if not all(f in flt for f in ['col', 'op', 'val']):
continue
col = flt['col']
op = flt['op']
eq = flt['val']
cond = None
if op in ('in', 'not in'):
eq = [
types.replace('"', '').strip()
if isinstance(types, string_types)
else types
for types in eq]
elif not isinstance(flt['val'], string_types):
eq = eq[0] if eq and len(eq) > 0 else ''
is_numeric_col = col in num_cols
if is_numeric_col:
if op in ('in', 'not in'):
eq = [utils.string_to_num(v) for v in eq]
else:
eq = utils.string_to_num(eq)
if op == '==':
cond = Dimension(col) == eq
elif op == '!=':
cond = Dimension(col) != eq
elif op in ('in', 'not in'):
fields = []
# ignore the filter if it has no value
if not len(eq):
continue
elif len(eq) == 1:
cond = Dimension(col) == eq[0]
else:
for s in eq:
fields.append(Dimension(col) == s)
cond = Filter(type='or', fields=fields)
if op == 'not in':
cond = ~cond
elif op == 'regex':
cond = Filter(type='regex', pattern=eq, dimension=col)
elif op == '>=':
cond = Bound(col, eq, None, alphaNumeric=is_numeric_col)
elif op == '<=':
cond = Bound(col, None, eq, alphaNumeric=is_numeric_col)
elif op == '>':
cond = Bound(
col, eq, None,
lowerStrict=True, alphaNumeric=is_numeric_col,
)
elif op == '<':
cond = Bound(
col, None, eq,
upperStrict=True, alphaNumeric=is_numeric_col,
)
if filters:
filters = Filter(type='and', fields=[
cond,
filters,
])
else:
filters = cond
return filters
def _get_having_obj(self, col, op, eq):
cond = None
if op == '==':
if col in self.column_names:
cond = DimSelector(dimension=col, value=eq)
else:
cond = Aggregation(col) == eq
elif op == '>':
cond = Aggregation(col) > eq
elif op == '<':
cond = Aggregation(col) < eq
return cond
def get_having_filters(self, raw_filters):
filters = None
reversed_op_map = {
'!=': '==',
'>=': '<',
'<=': '>',
}
for flt in raw_filters:
if not all(f in flt for f in ['col', 'op', 'val']):
continue
col = flt['col']
op = flt['op']
eq = flt['val']
cond = None
if op in ['==', '>', '<']:
cond = self._get_having_obj(col, op, eq)
elif op in reversed_op_map:
cond = ~self._get_having_obj(col, reversed_op_map[op], eq)
if filters:
filters = filters & cond
else:
filters = cond
return filters
@classmethod
def query_datasources_by_name(
cls, session, database, datasource_name, schema=None):
return (
session.query(cls)
.filter_by(cluster_name=database.id)
.filter_by(datasource_name=datasource_name)
.all()
)
sa.event.listen(DruidDatasource, 'after_insert', set_perm)
sa.event.listen(DruidDatasource, 'after_update', set_perm)