blob: 4f785cb36a65cfdeb3525fb4cad2e1f33c0a8144 [file] [log] [blame]
from flask.ext.appbuilder import Model
from datetime import timedelta
from flask.ext.appbuilder.models.mixins import AuditMixin
from flask import request, redirect, flash, Response
from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime
from sqlalchemy import create_engine, MetaData, desc
from sqlalchemy import Table as sqlaTable
from sqlalchemy.orm import relationship
from dateutil.parser import parse
from pydruid import client
from pydruid.utils.filters import Dimension, Filter
from pandas import read_sql_query
from sqlalchemy.sql import table, literal_column
from sqlalchemy import select, and_, text, String
from copy import deepcopy, copy
from collections import namedtuple
from datetime import datetime
import logging
import json
import sqlparse
import requests
import textwrap
from panoramix import db, get_session
QueryResult = namedtuple('namedtuple', ['df', 'query', 'duration'])
class Queryable(object):
@property
def column_names(self):
return sorted([c.column_name for c in self.columns])
@property
def groupby_column_names(self):
return sorted([c.column_name for c in self.columns if c.groupby])
@property
def filterable_column_names(self):
return sorted([c.column_name for c in self.columns if c.filterable])
class Database(Model, AuditMixin):
__tablename__ = 'dbs'
id = Column(Integer, primary_key=True)
database_name = Column(String(255), unique=True)
sqlalchemy_uri = Column(String(1024))
def __repr__(self):
return self.database_name
def get_sqla_engine(self):
return create_engine(self.sqlalchemy_uri)
def get_table(self, table_name):
meta = MetaData()
return sqlaTable(
table_name, meta,
autoload=True,
autoload_with=self.get_sqla_engine())
class Table(Model, Queryable, AuditMixin):
__tablename__ = 'tables'
id = Column(Integer, primary_key=True)
table_name = Column(String(255), unique=True)
main_datetime_column_id = Column(Integer, ForeignKey('table_columns.id'))
main_datetime_column = relationship(
'TableColumn', foreign_keys=[main_datetime_column_id])
default_endpoint = Column(Text)
database_id = Column(Integer, ForeignKey('dbs.id'), nullable=False)
database = relationship(
'Database', backref='tables', foreign_keys=[database_id])
baselink = "tableview"
@property
def name(self):
return self.table_name
@property
def table_link(self):
url = "/panoramix/table/{}/".format(self.id)
return '<a href="{url}">{self.table_name}</a>'.format(**locals())
@property
def metrics_combo(self):
return sorted(
[
(m.metric_name, m.verbose_name)
for m in self.metrics],
key=lambda x: x[1])
def query_bkp(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):
"""
Unused, legacy way of querying by building a SQL string without
using the sqlalchemy expression API (new approach which supports
all dialects)
"""
from pandas import read_sql_query
qry_start_dttm = datetime.now()
metrics_exprs = [
"{} AS {}".format(m.expression, m.metric_name)
for m in self.metrics if m.metric_name in metrics]
from_dttm_iso = from_dttm.isoformat()
to_dttm_iso = to_dttm.isoformat()
if metrics:
main_metric_expr = [m.expression for m in self.metrics if m.metric_name == metrics[0]][0]
else:
main_metric_expr = "COUNT(*)"
select_exprs = []
groupby_exprs = []
if groupby:
select_exprs = copy(groupby)
groupby_exprs = [s for s in groupby]
inner_groupby_exprs = [s for s in groupby]
select_exprs += metrics_exprs
if granularity != "all":
select_exprs += ['ds as timestamp']
groupby_exprs += ['ds']
select_exprs = ",\n".join(select_exprs)
groupby_exprs = ",\n".join(groupby_exprs)
where_clause = [
"ds >= '{from_dttm_iso}'",
"ds < '{to_dttm_iso}'"
]
for col, op, eq in filter:
if op in ('in', 'not in'):
l = ["'{}'".format(s) for s in eq.split(",")]
l = ", ".join(l)
op = op.upper()
where_clause.append(
"{col} {op} ({l})".format(**locals())
)
where_clause = " AND\n".join(where_clause).format(**locals())
on_clause = " AND ".join(["{g} = __{g}".format(g=g) for g in groupby])
limiting_join = ""
if timeseries_limit and groupby:
inner_select = ", ".join(["{g} as __{g}".format(g=g) for g in inner_groupby_exprs])
inner_groupby_exprs = ", ".join(inner_groupby_exprs)
limiting_join = (
"JOIN ( \n"
" SELECT {inner_select} \n"
" FROM {self.table_name} \n"
" WHERE \n"
" {where_clause}\n"
" GROUP BY {inner_groupby_exprs}\n"
" ORDER BY {main_metric_expr} DESC\n"
" LIMIT {timeseries_limit}\n"
") z ON {on_clause}\n"
).format(**locals())
sql = (
"SELECT\n"
" {select_exprs}\n"
"FROM {self.table_name}\n"
"{limiting_join}"
"WHERE\n"
" {where_clause}\n"
"GROUP BY\n"
" {groupby_exprs}\n"
).format(**locals())
df = read_sql_query(
sql=sql,
con=self.database.get_sqla_engine()
)
textwrap.dedent(sql)
return QueryResult(
df=df, duration=datetime.now() - qry_start_dttm, query=sql)
def query(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=15, row_limit=None):
qry_start_dttm = datetime.now()
timestamp = literal_column(
self.main_datetime_column.column_name).label('timestamp')
metrics_exprs = [
literal_column(m.expression).label(m.metric_name)
for m in self.metrics if m.metric_name in metrics]
if metrics:
main_metric_expr = literal_column(
[m.expression for m in self.metrics if m.metric_name == metrics[0]][0])
else:
main_metric_expr = literal_column("COUNT(*)")
select_exprs = []
groupby_exprs = []
if groupby:
select_exprs = [literal_column(s) for s in groupby]
groupby_exprs = [literal_column(s) for s in groupby]
inner_groupby_exprs = [literal_column(s).label('__' + s) for s in groupby]
if granularity != "all":
select_exprs += [timestamp]
groupby_exprs += [timestamp]
select_exprs += metrics_exprs
qry = select(select_exprs)
from_clause = table(self.table_name)
qry = qry.group_by(*groupby_exprs)
where_clause_and = [
timestamp >= from_dttm.isoformat(),
timestamp < to_dttm.isoformat(),
]
for col, op, eq in filter:
if op in ('in', 'not in'):
values = eq.split(",")
cond = literal_column(col).in_(values)
if op == 'not in':
cond = ~cond
where_clause_and.append(cond)
qry = qry.where(and_(*where_clause_and))
qry = qry.order_by(desc(main_metric_expr))
qry = qry.limit(row_limit)
if timeseries_limit and groupby:
subq = select(inner_groupby_exprs)
subq = subq.select_from(table(self.table_name))
subq = subq.where(and_(*where_clause_and))
subq = subq.group_by(*inner_groupby_exprs)
subq = subq.order_by(desc(main_metric_expr))
subq = subq.limit(timeseries_limit)
on_clause = []
for gb in groupby:
on_clause.append(literal_column(gb)==literal_column("__" + gb))
from_clause = from_clause.join(subq.alias(), and_(*on_clause))
qry = qry.select_from(from_clause)
engine = self.database.get_sqla_engine()
sql = str(qry.compile(engine, compile_kwargs={"literal_binds": True}))
df = read_sql_query(
sql=sql,
con=engine
)
sql = sqlparse.format(sql, reindent=True)
return QueryResult(
df=df, duration=datetime.now() - qry_start_dttm, query=sql)
def fetch_metadata(self):
try:
table = self.database.get_table(self.table_name)
except Exception as e:
flash(str(e))
flash(
"Table doesn't seem to exist in the specified database, "
"couldn't fetch column information", "danger")
return
TC = TableColumn
M = SqlMetric
metrics = []
any_date_col = None
for col in table.columns:
try:
datatype = str(col.type)
except Exception as e:
datatype = "UNKNOWN"
dbcol = (
db.session
.query(TC)
.filter(TC.table==self)
.filter(TC.column_name==col.name)
.first()
)
db.session.flush()
if not dbcol:
dbcol = TableColumn(column_name=col.name)
if (
str(datatype).startswith('VARCHAR') or
str(datatype).startswith('STRING')):
dbcol.groupby = True
dbcol.filterable = True
db.session.merge(self)
self.columns.append(dbcol)
if not any_date_col and 'date' in datatype.lower():
any_date_col = dbcol
if dbcol.sum:
metrics.append(M(
metric_name='sum__' + dbcol.column_name,
verbose_name='sum__' + dbcol.column_name,
metric_type='sum',
expression="SUM({})".format(dbcol.column_name)
))
if dbcol.max:
metrics.append(M(
metric_name='max__' + dbcol.column_name,
verbose_name='max__' + dbcol.column_name,
metric_type='max',
expression="MAX({})".format(dbcol.column_name)
))
if dbcol.min:
metrics.append(M(
metric_name='min__' + dbcol.column_name,
verbose_name='min__' + dbcol.column_name,
metric_type='min',
expression="MIN({})".format(dbcol.column_name)
))
if dbcol.count_distinct:
metrics.append(M(
metric_name='count_distinct__' + dbcol.column_name,
verbose_name='count_distinct__' + dbcol.column_name,
metric_type='count_distinct',
expression="COUNT(DISTINCT {})".format(dbcol.column_name)
))
dbcol.type = datatype
db.session.merge(self)
db.session.commit()
metrics.append(M(
metric_name='count',
verbose_name='COUNT(*)',
metric_type='count',
expression="COUNT(*)"
))
for metric in metrics:
m = (
db.session.query(M)
.filter(M.metric_name==metric.metric_name)
.filter(M.table==self)
.first()
)
metric.table = self
if not m:
db.session.add(metric)
db.session.commit()
if not self.main_datetime_column:
self.main_datetime_column = any_date_col
class SqlMetric(Model, AuditMixin):
__tablename__ = 'sql_metrics'
id = Column(Integer, primary_key=True)
metric_name = Column(String(512))
verbose_name = Column(String(1024))
metric_type = Column(String(32))
table_id = Column(Integer,ForeignKey('tables.id'))
table = relationship(
'Table', backref='metrics', foreign_keys=[table_id])
expression = Column(Text)
description = Column(Text)
class TableColumn(Model, AuditMixin):
__tablename__ = 'table_columns'
id = Column(Integer, primary_key=True)
table_id = Column(Integer, ForeignKey('tables.id'))
table = relationship('Table', backref='columns', foreign_keys=[table_id])
column_name = Column(String(256))
is_dttm = Column(Boolean, default=True)
is_active = Column(Boolean, default=True)
type = Column(String(32), default='')
groupby = Column(Boolean, default=False)
count_distinct = Column(Boolean, default=False)
sum = Column(Boolean, default=False)
max = Column(Boolean, default=False)
min = Column(Boolean, default=False)
filterable = Column(Boolean, default=False)
description = Column(Text, default='')
def __repr__(self):
return self.column_name
class Cluster(Model, AuditMixin):
__tablename__ = 'clusters'
id = Column(Integer, primary_key=True)
cluster_name = Column(String(255), unique=True)
coordinator_host = Column(String(256))
coordinator_port = Column(Integer)
coordinator_endpoint = Column(String(256))
broker_host = Column(String(256))
broker_port = Column(Integer)
broker_endpoint = Column(String(256))
metadata_last_refreshed = Column(DateTime)
def __repr__(self):
return self.cluster_name
def get_pydruid_client(self):
cli = client.PyDruid(
"http://{0}:{1}/".format(self.broker_host, self.broker_port),
self.broker_endpoint)
return cli
def refresh_datasources(self):
endpoint = (
"http://{self.coordinator_host}:{self.coordinator_port}/"
"{self.coordinator_endpoint}/datasources"
).format(self=self)
datasources = json.loads(requests.get(endpoint).text)
for datasource in datasources:
Datasource.sync_to_db(datasource, self)
class Datasource(Model, AuditMixin, Queryable):
baselink = "datasourcemodelview"
__tablename__ = 'datasources'
id = Column(Integer, primary_key=True)
datasource_name = Column(String(255), unique=True)
is_featured = Column(Boolean, default=False)
is_hidden = Column(Boolean, default=False)
description = Column(Text)
default_endpoint = Column(Text)
user_id = Column(Integer, ForeignKey('ab_user.id'))
owner = relationship('User', backref='datasources', foreign_keys=[user_id])
cluster_name = Column(String(255),
ForeignKey('clusters.cluster_name'))
cluster = relationship('Cluster', backref='datasources', foreign_keys=[cluster_name])
@property
def metrics_combo(self):
return sorted(
[(m.metric_name, m.verbose_name) for m in self.metrics],
key=lambda x: x[1])
@property
def name(self):
return self.datasource_name
def __repr__(self):
return self.datasource_name
@property
def datasource_link(self):
url = "/panoramix/datasource/{}/".format(self.datasource_name)
return '<a href="{url}">{self.datasource_name}</a>'.format(**locals())
def get_metric_obj(self, metric_name):
return [
m.json_obj for m in self.metrics
if m.metric_name == metric_name
][0]
def latest_metadata(self):
client = self.cluster.get_pydruid_client()
results = client.time_boundary(datasource=self.datasource_name)
if not results:
return
max_time = results[0]['result']['minTime']
max_time = parse(max_time)
intervals = (max_time - timedelta(seconds=1)).isoformat() + '/'
intervals += (max_time + timedelta(seconds=1)).isoformat()
segment_metadata = client.segment_metadata(
datasource=self.datasource_name,
intervals=intervals)
if segment_metadata:
return segment_metadata[-1]['columns']
def generate_metrics(self):
for col in self.columns:
col.generate_metrics()
@classmethod
def sync_to_db(cls, name, cluster):
session = get_session()
datasource = session.query(cls).filter_by(datasource_name=name).first()
if not datasource:
datasource = cls(datasource_name=name)
session.add(datasource)
datasource.cluster = cluster
cols = datasource.latest_metadata()
if not cols:
return
for col in cols:
col_obj = (
session
.query(Column)
.filter_by(datasource_name=name, column_name=col)
.first()
)
datatype = cols[col]['type']
if not col_obj:
col_obj = Column(datasource_name=name, column_name=col)
session.add(col_obj)
if datatype == "STRING":
col_obj.groupby = True
col_obj.filterable = True
if col_obj:
col_obj.type = cols[col]['type']
col_obj.datasource = datasource
col_obj.generate_metrics()
#session.commit()
def query(
self, groupby, metrics,
granularity,
from_dttm, to_dttm,
limit_spec=None,
filter=None,
is_timeseries=True,
timeseries_limit=None,
row_limit=None):
qry_start_dttm = datetime.now()
query_str = ""
aggregations = {
m.metric_name: m.json_obj
for m in self.metrics if m.metric_name in metrics
}
if not isinstance(granularity, basestring):
granularity = {"type": "duration", "duration": granularity}
qry = dict(
datasource=self.datasource_name,
dimensions=groupby,
aggregations=aggregations,
granularity=granularity,
intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
filters = None
for col, op, eq in filter:
cond = None
if op == '==':
cond = Dimension(col)==eq
elif op == '!=':
cond = ~(Dimension(col)==eq)
elif op in ('in', 'not in'):
fields = []
splitted = eq.split(',')
if len(splitted) > 1:
for s in eq.split(','):
s = s.strip()
fields.append(Filter.build_filter(Dimension(col)==s))
cond = Filter(type="or", fields=fields)
else:
cond = Dimension(col)==eq
if op == 'not in':
cond = ~cond
if filters:
filters = Filter(type="and", fields=[
Filter.build_filter(cond),
Filter.build_filter(filters)
])
else:
filters = cond
if filters:
qry['filter'] = filters
client = self.cluster.get_pydruid_client()
orig_filters = filters
if timeseries_limit and is_timeseries:
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = "all"
pre_qry['limit_spec'] = {
"type": "default",
"limit": timeseries_limit,
"columns": [{
"dimension": metrics[0] if metrics else self.metrics[0],
"direction": "descending",
}],
}
client.groupby(**pre_qry)
query_str += "// Two phase query\n// Phase 1\n"
query_str += json.dumps(client.query_dict, indent=2) + "\n"
query_str += "//\nPhase 2 (built based on phase one's results)\n"
df = client.export_pandas()
if not df is None and not df.empty:
dims = qry['dimensions']
filters = []
for index, row in df.iterrows():
fields = []
for dim in dims:
f = Filter.build_filter(Dimension(dim) == row[dim])
fields.append(f)
if len(fields) > 1:
filt = Filter(type="and", fields=fields)
filters.append(Filter.build_filter(filt))
elif fields:
filters.append(fields[0])
if filters:
ff = Filter(type="or", fields=filters)
if not orig_filters:
qry['filter'] = ff
else:
qry['filter'] = Filter(type="and", fields=[
Filter.build_filter(ff),
Filter.build_filter(orig_filters)])
qry['limit_spec'] = None
if row_limit:
qry['limit_spec'] = {
"type": "default",
"limit": row_limit,
"columns": [{
"dimension": metrics[0] if metrics else self.metrics[0],
"direction": "descending",
}],
}
client.groupby(**qry)
query_str += json.dumps(client.query_dict, indent=2)
df = client.export_pandas()
return QueryResult(
df=df,
query=query_str,
duration=datetime.now() - qry_start_dttm)
#class Metric(Model, AuditMixin):
class Metric(Model):
__tablename__ = 'metrics'
id = Column(Integer, primary_key=True)
metric_name = Column(String(512))
verbose_name = Column(String(1024))
metric_type = Column(String(32))
datasource_name = Column(
String(256),
ForeignKey('datasources.datasource_name'))
datasource = relationship('Datasource', backref='metrics')
json = Column(Text)
description = Column(Text)
@property
def json_obj(self):
try:
obj = json.loads(self.json)
except Exception as e:
obj = {}
return obj
class Column(Model, AuditMixin):
__tablename__ = 'columns'
id = Column(Integer, primary_key=True)
datasource_name = Column(
String(256),
ForeignKey('datasources.datasource_name'))
datasource = relationship('Datasource', backref='columns')
column_name = Column(String(256))
is_active = Column(Boolean, default=True)
type = Column(String(32))
groupby = Column(Boolean, default=False)
count_distinct = Column(Boolean, default=False)
sum = Column(Boolean, default=False)
max = Column(Boolean, default=False)
min = Column(Boolean, default=False)
filterable = Column(Boolean, default=False)
description = Column(Text)
def __repr__(self):
return self.column_name
@property
def isnum(self):
return self.type in ('LONG', 'DOUBLE', 'FLOAT')
def generate_metrics(self):
M = Metric
metrics = []
metrics.append(Metric(
metric_name='count',
verbose_name='COUNT(*)',
metric_type='count',
json=json.dumps({'type': 'count', 'name': 'count'})
))
# Somehow we need to reassign this for UDAFs
corrected_type = 'DOUBLE' if self.type in ('DOUBLE', 'FLOAT') else self.type
if self.sum and self.isnum:
mt = corrected_type.lower() + 'Sum'
name='sum__' + self.column_name
metrics.append(Metric(
metric_name=name,
metric_type='sum',
verbose_name='SUM({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name})
))
if self.min and self.isnum:
mt = corrected_type.lower() + 'Min'
name='min__' + self.column_name
metrics.append(Metric(
metric_name=name,
metric_type='min',
verbose_name='MIN({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name})
))
if self.max and self.isnum:
mt = corrected_type.lower() + 'Max'
name='max__' + self.column_name
metrics.append(Metric(
metric_name=name,
metric_type='max',
verbose_name='MAX({})'.format(self.column_name),
json=json.dumps({
'type': mt, 'name': name, 'fieldName': self.column_name})
))
if self.count_distinct:
mt = 'count_distinct'
name='count_distinct__' + self.column_name
metrics.append(Metric(
metric_name=name,
verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
metric_type='count_distinct',
json=json.dumps({
'type': 'cardinality',
'name': name,
'fieldNames': [self.column_name]})
))
session = get_session()
for metric in metrics:
m = (
session.query(M)
.filter(M.metric_name==metric.metric_name)
.filter(M.datasource_name==self.datasource_name)
.filter(Cluster.cluster_name==self.datasource.cluster_name)
.first()
)
metric.datasource_name = self.datasource_name
if not m:
session.add(metric)
session.commit()