blob: df45ccf533530f9e6e11557498228e9b45fd2f8c [file] [log] [blame]
"""A collection of ORM sqlalchemy models for Superset"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from copy import copy, deepcopy
from datetime import date, datetime
import functools
import json
import logging
import textwrap
from flask import escape, g, Markup, request
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from future.standard_library import install_aliases
import numpy
import pandas as pd
import sqlalchemy as sqla
from sqlalchemy import (
Boolean, Column, create_engine, Date, DateTime, ForeignKey, Integer,
MetaData, select, String, Table, Text,
)
from sqlalchemy.engine import url
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm import relationship, subqueryload
from sqlalchemy.orm.session import make_transient
from sqlalchemy.pool import NullPool
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextAsFrom
from sqlalchemy_utils import EncryptedType
from superset import app, db, db_engine_specs, sm, utils
from superset.connectors.connector_registry import ConnectorRegistry
from superset.models.helpers import AuditMixinNullable, ImportMixin, set_perm
from superset.viz import viz_types
install_aliases()
from urllib import parse # noqa
config = app.config
custom_password_store = config.get('SQLALCHEMY_CUSTOM_PASSWORD_STORE')
stats_logger = config.get('STATS_LOGGER')
metadata = Model.metadata # pylint: disable=no-member
PASSWORD_MASK = 'X' * 10
def set_related_perm(mapper, connection, target): # noqa
src_class = target.cls_model
id_ = target.datasource_id
if id_:
ds = db.session.query(src_class).filter_by(id=int(id_)).first()
if ds:
target.perm = ds.perm
class Url(Model, AuditMixinNullable):
"""Used for the short url feature"""
__tablename__ = 'url'
id = Column(Integer, primary_key=True)
url = Column(Text)
class KeyValue(Model):
"""Used for any type of key-value store"""
__tablename__ = 'keyvalue'
id = Column(Integer, primary_key=True)
value = Column(Text, nullable=False)
class CssTemplate(Model, AuditMixinNullable):
"""CSS templates for dashboards"""
__tablename__ = 'css_templates'
id = Column(Integer, primary_key=True)
template_name = Column(String(250))
css = Column(Text, default='')
slice_user = Table('slice_user', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('ab_user.id')),
Column('slice_id', Integer, ForeignKey('slices.id')))
class Slice(Model, AuditMixinNullable, ImportMixin):
"""A slice is essentially a report or a view on data"""
__tablename__ = 'slices'
id = Column(Integer, primary_key=True)
slice_name = Column(String(250))
datasource_id = Column(Integer)
datasource_type = Column(String(200))
datasource_name = Column(String(2000))
viz_type = Column(String(250))
params = Column(Text)
description = Column(Text)
cache_timeout = Column(Integer)
perm = Column(String(1000))
owners = relationship(sm.user_model, secondary=slice_user)
export_fields = ('slice_name', 'datasource_type', 'datasource_name',
'viz_type', 'params', 'cache_timeout')
def __repr__(self):
return self.slice_name
@property
def cls_model(self):
return ConnectorRegistry.sources[self.datasource_type]
@property
def datasource(self):
return self.get_datasource
def clone(self):
return Slice(
slice_name=self.slice_name,
datasource_id=self.datasource_id,
datasource_type=self.datasource_type,
datasource_name=self.datasource_name,
viz_type=self.viz_type,
params=self.params,
description=self.description,
cache_timeout=self.cache_timeout)
@datasource.getter
@utils.memoized
def get_datasource(self):
return (
db.session.query(self.cls_model)
.filter_by(id=self.datasource_id)
.first()
)
@renders('datasource_name')
def datasource_link(self):
# pylint: disable=no-member
datasource = self.datasource
return datasource.link if datasource else None
@property
def datasource_edit_url(self):
# pylint: disable=no-member
datasource = self.datasource
return datasource.url if datasource else None
@property
@utils.memoized
def viz(self):
d = json.loads(self.params)
viz_class = viz_types[self.viz_type]
# pylint: disable=no-member
return viz_class(self.datasource, form_data=d)
@property
def description_markeddown(self):
return utils.markdown(self.description)
@property
def data(self):
"""Data used to render slice in templates"""
d = {}
self.token = ''
try:
d = self.viz.data
self.token = d.get('token')
except Exception as e:
logging.exception(e)
d['error'] = str(e)
return {
'datasource': self.datasource_name,
'description': self.description,
'description_markeddown': self.description_markeddown,
'edit_url': self.edit_url,
'form_data': self.form_data,
'slice_id': self.id,
'slice_name': self.slice_name,
'slice_url': self.slice_url,
}
@property
def json_data(self):
return json.dumps(self.data)
@property
def form_data(self):
form_data = {}
try:
form_data = json.loads(self.params)
except Exception as e:
logging.error("Malformed json in slice's params")
logging.exception(e)
form_data.update({
'slice_id': self.id,
'viz_type': self.viz_type,
'datasource': str(self.datasource_id) + '__' + self.datasource_type,
})
if self.cache_timeout:
form_data['cache_timeout'] = self.cache_timeout
return form_data
@property
def slice_url(self):
"""Defines the url to access the slice"""
form_data = {'slice_id': self.id}
return (
'/superset/explore/{obj.datasource_type}/'
'{obj.datasource_id}/?form_data={params}'.format(
obj=self, params=parse.quote(json.dumps(form_data))))
@property
def slice_id_url(self):
return (
'/superset/{slc.datasource_type}/{slc.datasource_id}/{slc.id}/'
).format(slc=self)
@property
def edit_url(self):
return '/slicemodelview/edit/{}'.format(self.id)
@property
def slice_link(self):
url = self.slice_url
name = escape(self.slice_name)
return Markup('<a href="{url}">{name}</a>'.format(**locals()))
def get_viz(self, force=False):
"""Creates :py:class:viz.BaseViz object from the url_params_multidict.
:return: object of the 'viz_type' type that is taken from the
url_params_multidict or self.params.
:rtype: :py:class:viz.BaseViz
"""
slice_params = json.loads(self.params)
slice_params['slice_id'] = self.id
slice_params['json'] = 'false'
slice_params['slice_name'] = self.slice_name
slice_params['viz_type'] = self.viz_type if self.viz_type else 'table'
return viz_types[slice_params.get('viz_type')](
self.datasource,
form_data=slice_params,
force=force,
)
@classmethod
def import_obj(cls, slc_to_import, slc_to_override, import_time=None):
"""Inserts or overrides slc in the database.
remote_id and import_time fields in params_dict are set to track the
slice origin and ensure correct overrides for multiple imports.
Slice.perm is used to find the datasources and connect them.
:param Slice slc_to_import: Slice object to import
:param Slice slc_to_override: Slice to replace, id matches remote_id
:returns: The resulting id for the imported slice
:rtype: int
"""
session = db.session
make_transient(slc_to_import)
slc_to_import.dashboards = []
slc_to_import.alter_params(
remote_id=slc_to_import.id, import_time=import_time)
slc_to_import = slc_to_import.copy()
params = slc_to_import.params_dict
slc_to_import.datasource_id = ConnectorRegistry.get_datasource_by_name(
session, slc_to_import.datasource_type, params['datasource_name'],
params['schema'], params['database_name']).id
if slc_to_override:
slc_to_override.override(slc_to_import)
session.flush()
return slc_to_override.id
session.add(slc_to_import)
logging.info('Final slice: {}'.format(slc_to_import.to_json()))
session.flush()
return slc_to_import.id
sqla.event.listen(Slice, 'before_insert', set_related_perm)
sqla.event.listen(Slice, 'before_update', set_related_perm)
dashboard_slices = Table(
'dashboard_slices', metadata,
Column('id', Integer, primary_key=True),
Column('dashboard_id', Integer, ForeignKey('dashboards.id')),
Column('slice_id', Integer, ForeignKey('slices.id')),
)
dashboard_user = Table(
'dashboard_user', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('ab_user.id')),
Column('dashboard_id', Integer, ForeignKey('dashboards.id')),
)
class Dashboard(Model, AuditMixinNullable, ImportMixin):
"""The dashboard object!"""
__tablename__ = 'dashboards'
id = Column(Integer, primary_key=True)
dashboard_title = Column(String(500))
position_json = Column(Text)
description = Column(Text)
css = Column(Text)
json_metadata = Column(Text)
slug = Column(String(255), unique=True)
slices = relationship(
'Slice', secondary=dashboard_slices, backref='dashboards')
owners = relationship(sm.user_model, secondary=dashboard_user)
export_fields = ('dashboard_title', 'position_json', 'json_metadata',
'description', 'css', 'slug')
def __repr__(self):
return self.dashboard_title
@property
def table_names(self):
# pylint: disable=no-member
return ', '.join(
{'{}'.format(s.datasource.full_name) for s in self.slices})
@property
def url(self):
if self.json_metadata:
# add default_filters to the preselect_filters of dashboard
json_metadata = json.loads(self.json_metadata)
default_filters = json_metadata.get('default_filters')
if default_filters:
filters = parse.quote(default_filters.encode('utf8'))
return '/superset/dashboard/{}/?preselect_filters={}'.format(
self.slug or self.id, filters)
return '/superset/dashboard/{}/'.format(self.slug or self.id)
@property
def datasources(self):
return {slc.datasource for slc in self.slices}
@property
def sqla_metadata(self):
# pylint: disable=no-member
metadata = MetaData(bind=self.get_sqla_engine())
return metadata.reflect()
def dashboard_link(self):
title = escape(self.dashboard_title)
return Markup(
'<a href="{self.url}">{title}</a>'.format(**locals()))
@property
def data(self):
positions = self.position_json
if positions:
positions = json.loads(positions)
return {
'id': self.id,
'metadata': self.params_dict,
'css': self.css,
'dashboard_title': self.dashboard_title,
'slug': self.slug,
'slices': [slc.data for slc in self.slices],
'position_json': positions,
}
@property
def params(self):
return self.json_metadata
@params.setter
def params(self, value):
self.json_metadata = value
@property
def position_array(self):
if self.position_json:
return json.loads(self.position_json)
return []
@classmethod
def import_obj(cls, dashboard_to_import, import_time=None):
"""Imports the dashboard from the object to the database.
Once dashboard is imported, json_metadata field is extended and stores
remote_id and import_time. It helps to decide if the dashboard has to
be overridden or just copies over. Slices that belong to this
dashboard will be wired to existing tables. This function can be used
to import/export dashboards between multiple superset instances.
Audit metadata isn't copied over.
"""
def alter_positions(dashboard, old_to_new_slc_id_dict):
""" Updates slice_ids in the position json.
Sample position json:
[{
"col": 5,
"row": 10,
"size_x": 4,
"size_y": 2,
"slice_id": "3610"
}]
"""
position_array = dashboard.position_array
for position in position_array:
if 'slice_id' not in position:
continue
old_slice_id = int(position['slice_id'])
if old_slice_id in old_to_new_slc_id_dict:
position['slice_id'] = '{}'.format(
old_to_new_slc_id_dict[old_slice_id])
dashboard.position_json = json.dumps(position_array)
logging.info('Started import of the dashboard: {}'
.format(dashboard_to_import.to_json()))
session = db.session
logging.info('Dashboard has {} slices'
.format(len(dashboard_to_import.slices)))
# copy slices object as Slice.import_slice will mutate the slice
# and will remove the existing dashboard - slice association
slices = copy(dashboard_to_import.slices)
old_to_new_slc_id_dict = {}
new_filter_immune_slices = []
new_timed_refresh_immune_slices = []
new_expanded_slices = {}
i_params_dict = dashboard_to_import.params_dict
remote_id_slice_map = {
slc.params_dict['remote_id']: slc
for slc in session.query(Slice).all()
if 'remote_id' in slc.params_dict
}
for slc in slices:
logging.info('Importing slice {} from the dashboard: {}'.format(
slc.to_json(), dashboard_to_import.dashboard_title))
remote_slc = remote_id_slice_map.get(slc.id)
new_slc_id = Slice.import_obj(slc, remote_slc, import_time=import_time)
old_to_new_slc_id_dict[slc.id] = new_slc_id
# update json metadata that deals with slice ids
new_slc_id_str = '{}'.format(new_slc_id)
old_slc_id_str = '{}'.format(slc.id)
if ('filter_immune_slices' in i_params_dict and
old_slc_id_str in i_params_dict['filter_immune_slices']):
new_filter_immune_slices.append(new_slc_id_str)
if ('timed_refresh_immune_slices' in i_params_dict and
old_slc_id_str in
i_params_dict['timed_refresh_immune_slices']):
new_timed_refresh_immune_slices.append(new_slc_id_str)
if ('expanded_slices' in i_params_dict and
old_slc_id_str in i_params_dict['expanded_slices']):
new_expanded_slices[new_slc_id_str] = (
i_params_dict['expanded_slices'][old_slc_id_str])
# override the dashboard
existing_dashboard = None
for dash in session.query(Dashboard).all():
if ('remote_id' in dash.params_dict and
dash.params_dict['remote_id'] ==
dashboard_to_import.id):
existing_dashboard = dash
dashboard_to_import.id = None
alter_positions(dashboard_to_import, old_to_new_slc_id_dict)
dashboard_to_import.alter_params(import_time=import_time)
if new_expanded_slices:
dashboard_to_import.alter_params(
expanded_slices=new_expanded_slices)
if new_filter_immune_slices:
dashboard_to_import.alter_params(
filter_immune_slices=new_filter_immune_slices)
if new_timed_refresh_immune_slices:
dashboard_to_import.alter_params(
timed_refresh_immune_slices=new_timed_refresh_immune_slices)
new_slices = session.query(Slice).filter(
Slice.id.in_(old_to_new_slc_id_dict.values())).all()
if existing_dashboard:
existing_dashboard.override(dashboard_to_import)
existing_dashboard.slices = new_slices
session.flush()
return existing_dashboard.id
else:
# session.add(dashboard_to_import) causes sqlachemy failures
# related to the attached users / slices. Creating new object
# allows to avoid conflicts in the sql alchemy state.
copied_dash = dashboard_to_import.copy()
copied_dash.slices = new_slices
session.add(copied_dash)
session.flush()
return copied_dash.id
@classmethod
def export_dashboards(cls, dashboard_ids):
copied_dashboards = []
datasource_ids = set()
for dashboard_id in dashboard_ids:
# make sure that dashboard_id is an integer
dashboard_id = int(dashboard_id)
copied_dashboard = (
db.session.query(Dashboard)
.options(subqueryload(Dashboard.slices))
.filter_by(id=dashboard_id).first()
)
make_transient(copied_dashboard)
for slc in copied_dashboard.slices:
datasource_ids.add((slc.datasource_id, slc.datasource_type))
# add extra params for the import
slc.alter_params(
remote_id=slc.id,
datasource_name=slc.datasource.name,
schema=slc.datasource.name,
database_name=slc.datasource.database.name,
)
copied_dashboard.alter_params(remote_id=dashboard_id)
copied_dashboards.append(copied_dashboard)
eager_datasources = []
for dashboard_id, dashboard_type in datasource_ids:
eager_datasource = ConnectorRegistry.get_eager_datasource(
db.session, dashboard_type, dashboard_id)
eager_datasource.alter_params(
remote_id=eager_datasource.id,
database_name=eager_datasource.database.name,
)
make_transient(eager_datasource)
eager_datasources.append(eager_datasource)
return json.dumps({
'dashboards': copied_dashboards,
'datasources': eager_datasources,
}, cls=utils.DashboardEncoder, indent=4)
class Database(Model, AuditMixinNullable, ImportMixin):
"""An ORM object that stores Database related information"""
__tablename__ = 'dbs'
type = 'table'
__table_args__ = (UniqueConstraint('database_name'),)
id = Column(Integer, primary_key=True)
verbose_name = Column(String(250), unique=True)
# short unique name, used in permissions
database_name = Column(String(250), unique=True)
sqlalchemy_uri = Column(String(1024))
password = Column(EncryptedType(String(1024), config.get('SECRET_KEY')))
cache_timeout = Column(Integer)
select_as_create_table_as = Column(Boolean, default=False)
expose_in_sqllab = Column(Boolean, default=False)
allow_run_sync = Column(Boolean, default=True)
allow_run_async = Column(Boolean, default=False)
allow_ctas = Column(Boolean, default=False)
allow_dml = Column(Boolean, default=False)
force_ctas_schema = Column(String(250))
extra = Column(Text, default=textwrap.dedent("""\
{
"metadata_params": {},
"engine_params": {}
}
"""))
perm = Column(String(1000))
impersonate_user = Column(Boolean, default=False)
export_fields = ('database_name', 'sqlalchemy_uri', 'cache_timeout',
'expose_in_sqllab', 'allow_run_sync', 'allow_run_async',
'allow_ctas', 'extra')
export_children = ['tables']
def __repr__(self):
return self.verbose_name if self.verbose_name else self.database_name
@property
def name(self):
return self.verbose_name if self.verbose_name else self.database_name
@property
def data(self):
return {
'name': self.database_name,
'backend': self.backend,
}
@property
def unique_name(self):
return self.database_name
@property
def backend(self):
url = make_url(self.sqlalchemy_uri_decrypted)
return url.get_backend_name()
@classmethod
def get_password_masked_url_from_uri(cls, uri):
url = make_url(uri)
return cls.get_password_masked_url(url)
@classmethod
def get_password_masked_url(cls, url):
url_copy = deepcopy(url)
if url_copy.password is not None and url_copy.password != PASSWORD_MASK:
url_copy.password = PASSWORD_MASK
return url_copy
def set_sqlalchemy_uri(self, uri):
conn = sqla.engine.url.make_url(uri.strip())
if conn.password != PASSWORD_MASK and not custom_password_store:
# do not over-write the password with the password mask
self.password = conn.password
conn.password = PASSWORD_MASK if conn.password else None
self.sqlalchemy_uri = str(conn) # hides the password
def get_effective_user(self, url, user_name=None):
"""
Get the effective user, especially during impersonation.
:param url: SQL Alchemy URL object
:param user_name: Default username
:return: The effective username
"""
effective_username = None
if self.impersonate_user:
effective_username = url.username
if user_name:
effective_username = user_name
elif (
hasattr(g, 'user') and hasattr(g.user, 'username') and
g.user.username is not None
):
effective_username = g.user.username
return effective_username
@utils.memoized(
watch=('impersonate_user', 'sqlalchemy_uri_decrypted', 'extra'))
def get_sqla_engine(self, schema=None, nullpool=True, user_name=None):
extra = self.get_extra()
url = make_url(self.sqlalchemy_uri_decrypted)
url = self.db_engine_spec.adjust_database_uri(url, schema)
effective_username = self.get_effective_user(url, user_name)
# If using MySQL or Presto for example, will set url.username
# If using Hive, will not do anything yet since that relies on a
# configuration parameter instead.
self.db_engine_spec.modify_url_for_impersonation(
url,
self.impersonate_user,
effective_username)
masked_url = self.get_password_masked_url(url)
logging.info('Database.get_sqla_engine(). Masked URL: {0}'.format(masked_url))
params = extra.get('engine_params', {})
if nullpool:
params['poolclass'] = NullPool
# If using Hive, this will set hive.server2.proxy.user=$effective_username
configuration = {}
configuration.update(
self.db_engine_spec.get_configuration_for_impersonation(
str(url),
self.impersonate_user,
effective_username))
if configuration:
params['connect_args'] = {'configuration': configuration}
return create_engine(url, **params)
def get_reserved_words(self):
return self.get_dialect().preparer.reserved_words
def get_quoter(self):
return self.get_dialect().identifier_preparer.quote
def get_df(self, sql, schema):
sql = sql.strip().strip(';')
eng = self.get_sqla_engine(schema=schema)
df = pd.read_sql(sql, eng)
def needs_conversion(df_series):
if df_series.empty:
return False
if isinstance(df_series[0], (list, dict)):
return True
return False
for k, v in df.dtypes.iteritems():
if v.type == numpy.object_ and needs_conversion(df[k]):
df[k] = df[k].apply(utils.json_dumps_w_dates)
return df
def compile_sqla_query(self, qry, schema=None):
eng = self.get_sqla_engine(schema=schema)
compiled = qry.compile(eng, compile_kwargs={'literal_binds': True})
return '{}'.format(compiled)
def select_star(
self, table_name, schema=None, limit=100, show_cols=False,
indent=True, latest_partition=True):
"""Generates a ``select *`` statement in the proper dialect"""
return self.db_engine_spec.select_star(
self, table_name, schema=schema, limit=limit, show_cols=show_cols,
indent=indent, latest_partition=latest_partition)
def wrap_sql_limit(self, sql, limit=1000):
qry = (
select('*')
.select_from(
TextAsFrom(text(sql), ['*'])
.alias('inner_qry'),
).limit(limit)
)
return self.compile_sqla_query(qry)
def safe_sqlalchemy_uri(self):
return self.sqlalchemy_uri
@property
def inspector(self):
engine = self.get_sqla_engine()
return sqla.inspect(engine)
def all_table_names(self, schema=None, force=False):
if not schema:
tables_dict = self.db_engine_spec.fetch_result_sets(
self, 'table', force=force)
return tables_dict.get('', [])
return sorted(
self.db_engine_spec.get_table_names(schema, self.inspector))
def all_view_names(self, schema=None, force=False):
if not schema:
views_dict = self.db_engine_spec.fetch_result_sets(
self, 'view', force=force)
return views_dict.get('', [])
views = []
try:
views = self.inspector.get_view_names(schema)
except Exception:
pass
return views
def all_schema_names(self):
return sorted(self.db_engine_spec.get_schema_names(self.inspector))
@property
def db_engine_spec(self):
return db_engine_specs.engines.get(
self.backend, db_engine_specs.BaseEngineSpec)
@classmethod
def get_db_engine_spec_for_backend(cls, backend):
return db_engine_specs.engines.get(backend, db_engine_specs.BaseEngineSpec)
def grains(self):
"""Defines time granularity database-specific expressions.
The idea here is to make it easy for users to change the time grain
form a datetime (maybe the source grain is arbitrary timestamps, daily
or 5 minutes increments) to another, "truncated" datetime. Since
each database has slightly different but similar datetime functions,
this allows a mapping between database engines and actual functions.
"""
return self.db_engine_spec.time_grains
def grains_dict(self):
return {grain.name: grain for grain in self.grains()}
def get_extra(self):
extra = {}
if self.extra:
try:
extra = json.loads(self.extra)
except Exception as e:
logging.error(e)
return extra
def get_table(self, table_name, schema=None):
extra = self.get_extra()
meta = MetaData(**extra.get('metadata_params', {}))
return Table(
table_name, meta,
schema=schema or None,
autoload=True,
autoload_with=self.get_sqla_engine())
def get_columns(self, table_name, schema=None):
return self.inspector.get_columns(table_name, schema)
def get_indexes(self, table_name, schema=None):
return self.inspector.get_indexes(table_name, schema)
def get_pk_constraint(self, table_name, schema=None):
return self.inspector.get_pk_constraint(table_name, schema)
def get_foreign_keys(self, table_name, schema=None):
return self.inspector.get_foreign_keys(table_name, schema)
@property
def sqlalchemy_uri_decrypted(self):
conn = sqla.engine.url.make_url(self.sqlalchemy_uri)
if custom_password_store:
conn.password = custom_password_store(conn)
else:
conn.password = self.password
return str(conn)
@property
def sql_url(self):
return '/superset/sql/{}/'.format(self.id)
def get_perm(self):
return (
'[{obj.database_name}].(id:{obj.id})').format(obj=self)
def has_table(self, table):
engine = self.get_sqla_engine()
return engine.has_table(
table.table_name, table.schema or None)
@utils.memoized
def get_dialect(self):
sqla_url = url.make_url(self.sqlalchemy_uri_decrypted)
return sqla_url.get_dialect()()
sqla.event.listen(Database, 'after_insert', set_perm)
sqla.event.listen(Database, 'after_update', set_perm)
class Log(Model):
"""ORM object used to log Superset actions to the database"""
__tablename__ = 'logs'
id = Column(Integer, primary_key=True)
action = Column(String(512))
user_id = Column(Integer, ForeignKey('ab_user.id'))
dashboard_id = Column(Integer)
slice_id = Column(Integer)
json = Column(Text)
user = relationship(sm.user_model, backref='logs', foreign_keys=[user_id])
dttm = Column(DateTime, default=datetime.utcnow)
dt = Column(Date, default=date.today())
duration_ms = Column(Integer)
referrer = Column(String(1024))
@classmethod
def log_this(cls, f):
"""Decorator to log user actions"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
start_dttm = datetime.now()
user_id = None
if g.user:
user_id = g.user.get_id()
d = request.form.to_dict() or {}
# request parameters can overwrite post body
request_params = request.args.to_dict()
d.update(request_params)
d.update(kwargs)
slice_id = d.get('slice_id')
try:
slice_id = int(
slice_id or json.loads(d.get('form_data')).get('slice_id'))
except (ValueError, TypeError):
slice_id = 0
params = ''
try:
params = json.dumps(d)
except Exception:
pass
stats_logger.incr(f.__name__)
value = f(*args, **kwargs)
sesh = db.session()
log = cls(
action=f.__name__,
json=params,
dashboard_id=d.get('dashboard_id'),
slice_id=slice_id,
duration_ms=(
datetime.now() - start_dttm).total_seconds() * 1000,
referrer=request.referrer[:1000] if request.referrer else None,
user_id=user_id)
sesh.add(log)
sesh.commit()
return value
return wrapper
class FavStar(Model):
__tablename__ = 'favstar'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('ab_user.id'))
class_name = Column(String(50))
obj_id = Column(Integer)
dttm = Column(DateTime, default=datetime.utcnow)
class DatasourceAccessRequest(Model, AuditMixinNullable):
"""ORM model for the access requests for datasources and dbs."""
__tablename__ = 'access_request'
id = Column(Integer, primary_key=True)
datasource_id = Column(Integer)
datasource_type = Column(String(200))
ROLES_BLACKLIST = set(config.get('ROBOT_PERMISSION_ROLES', []))
@property
def cls_model(self):
return ConnectorRegistry.sources[self.datasource_type]
@property
def username(self):
return self.creator()
@property
def datasource(self):
return self.get_datasource
@datasource.getter
@utils.memoized
def get_datasource(self):
# pylint: disable=no-member
ds = db.session.query(self.cls_model).filter_by(
id=self.datasource_id).first()
return ds
@property
def datasource_link(self):
return self.datasource.link # pylint: disable=no-member
@property
def roles_with_datasource(self):
action_list = ''
perm = self.datasource.perm # pylint: disable=no-member
pv = sm.find_permission_view_menu('datasource_access', perm)
for r in pv.role:
if r.name in self.ROLES_BLACKLIST:
continue
url = (
'/superset/approve?datasource_type={self.datasource_type}&'
'datasource_id={self.datasource_id}&'
'created_by={self.created_by.username}&role_to_grant={r.name}'
.format(**locals())
)
href = '<a href="{}">Grant {} Role</a>'.format(url, r.name)
action_list = action_list + '<li>' + href + '</li>'
return '<ul>' + action_list + '</ul>'
@property
def user_roles(self):
action_list = ''
for r in self.created_by.roles: # pylint: disable=no-member
url = (
'/superset/approve?datasource_type={self.datasource_type}&'
'datasource_id={self.datasource_id}&'
'created_by={self.created_by.username}&role_to_extend={r.name}'
.format(**locals())
)
href = '<a href="{}">Extend {} Role</a>'.format(url, r.name)
if r.name in self.ROLES_BLACKLIST:
href = '{} Role'.format(r.name)
action_list = action_list + '<li>' + href + '</li>'
return '<ul>' + action_list + '</ul>'