blob: 17f51518f5e5e108b9115224c1b9ccfd933153b9 [file] [log] [blame]
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from collections import defaultdict
from datetime import datetime, timedelta
import json
import logging
import pickle
import re
import time
import traceback
from urllib import parse
from flask import (
flash, g, Markup, redirect, render_template, request, Response, url_for,
)
from flask_appbuilder import expose
from flask_appbuilder.actions import action
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access_api
from flask_appbuilder.security.sqla import models as ab_models
from flask_babel import gettext as __
from flask_babel import lazy_gettext as _
import pandas as pd
import sqlalchemy as sqla
from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
from werkzeug.routing import BaseConverter
from superset import (
app, appbuilder, cache, db, results_backend, security, sm, sql_lab, utils,
viz,
)
from superset.connectors.connector_registry import ConnectorRegistry
from superset.legacy import cast_form_data
import superset.models.core as models
from superset.models.sql_lab import Query
from superset.sql_parse import SupersetQuery
from superset.utils import has_access, merge_extra_filters, QueryStatus
from .base import (
api, BaseSupersetView, CsvResponse, DeleteMixin, get_error_msg,
get_user_roles, json_error_response, SupersetFilter, SupersetModelView,
)
config = app.config
stats_logger = config.get('STATS_LOGGER')
log_this = models.Log.log_this
can_access = utils.can_access
DAR = models.DatasourceAccessRequest
ALL_DATASOURCE_ACCESS_ERR = __(
'This endpoint requires the `all_datasource_access` permission')
DATASOURCE_MISSING_ERR = __('The datasource seems to have been deleted')
ACCESS_REQUEST_MISSING_ERR = __(
'The access requests seem to have been deleted')
USER_MISSING_ERR = __('The user seems to have been deleted')
DATASOURCE_ACCESS_ERR = __("You don't have access to this datasource")
def get_database_access_error_msg(database_name):
return __('This view requires the database %(name)s or '
'`all_datasource_access` permission', name=database_name)
def get_datasource_access_error_msg(datasource_name):
return __('This endpoint requires the datasource %(name)s, database or '
'`all_datasource_access` permission', name=datasource_name)
def json_success(json_msg, status=200):
return Response(json_msg, status=status, mimetype='application/json')
def is_owner(obj, user):
""" Check if user is owner of the slice """
return obj and user in obj.owners
def check_ownership(obj, raise_if_false=True):
"""Meant to be used in `pre_update` hooks on models to enforce ownership
Admin have all access, and other users need to be referenced on either
the created_by field that comes with the ``AuditMixin``, or in a field
named ``owners`` which is expected to be a one-to-many with the User
model. It is meant to be used in the ModelView's pre_update hook in
which raising will abort the update.
"""
if not obj:
return False
security_exception = utils.SupersetSecurityException(
"You don't have the rights to alter [{}]".format(obj))
if g.user.is_anonymous():
if raise_if_false:
raise security_exception
return False
roles = (r.name for r in get_user_roles())
if 'Admin' in roles:
return True
session = db.create_scoped_session()
orig_obj = session.query(obj.__class__).filter_by(id=obj.id).first()
owner_names = (user.username for user in orig_obj.owners)
if (
hasattr(orig_obj, 'created_by') and
orig_obj.created_by and
orig_obj.created_by.username == g.user.username):
return True
if (
hasattr(orig_obj, 'owners') and
g.user and
hasattr(g.user, 'username') and
g.user.username in owner_names):
return True
if raise_if_false:
raise security_exception
else:
return False
class SliceFilter(SupersetFilter):
def apply(self, query, func): # noqa
if self.has_all_datasource_access():
return query
perms = self.get_view_menus('datasource_access')
# TODO(bogdan): add `schema_access` support here
return query.filter(self.model.perm.in_(perms))
class DatabaseFilter(SupersetFilter):
def apply(self, query, func):
if self.has_all_datasource_access():
return query
perms = self.get_view_menus('database_access')
return query.filter(self.model.perm.in_(perms))
class DashboardFilter(SupersetFilter):
"""List dashboards for which users have access to at least one slice"""
def apply(self, query, func): # noqa
if self.has_all_datasource_access():
return query
Slice = models.Slice # noqa
Dash = models.Dashboard # noqa
# TODO(bogdan): add `schema_access` support here
datasource_perms = self.get_view_menus('datasource_access')
slice_ids_qry = (
db.session
.query(Slice.id)
.filter(Slice.perm.in_(datasource_perms))
)
query = query.filter(
Dash.id.in_(
db.session.query(Dash.id)
.distinct()
.join(Dash.slices)
.filter(Slice.id.in_(slice_ids_qry)),
),
)
return query
def generate_download_headers(extension):
filename = datetime.now().strftime('%Y%m%d_%H%M%S')
content_disp = 'attachment; filename={}.{}'.format(filename, extension)
headers = {
'Content-Disposition': content_disp,
}
return headers
class DatabaseView(SupersetModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Database)
list_title = _('List Databases')
show_title = _('Show Database')
add_title = _('Add Database')
edit_title = _('Edit Database')
list_columns = [
'database_name', 'backend', 'allow_run_sync', 'allow_run_async',
'allow_dml', 'creator', 'modified']
order_columns = [
'database_name', 'allow_run_sync', 'allow_run_async', 'allow_dml',
'modified',
]
add_columns = [
'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra',
'expose_in_sqllab', 'allow_run_sync', 'allow_run_async',
'allow_ctas', 'allow_dml', 'force_ctas_schema', 'impersonate_user']
search_exclude_columns = (
'password', 'tables', 'created_by', 'changed_by', 'queries',
'saved_queries', )
edit_columns = add_columns
show_columns = [
'tables',
'cache_timeout',
'extra',
'database_name',
'sqlalchemy_uri',
'perm',
'created_by',
'created_on',
'changed_by',
'changed_on',
]
add_template = 'superset/models/database/add.html'
edit_template = 'superset/models/database/edit.html'
base_filters = [['perm', DatabaseFilter, lambda:[]]]
base_order = ('changed_on', 'desc')
description_columns = {
'sqlalchemy_uri': utils.markdown(
'Refer to the '
'[SqlAlchemy docs]'
'(http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#'
'database-urls) '
'for more information on how to structure your URI.', True),
'expose_in_sqllab': _('Expose this DB in SQL Lab'),
'allow_run_sync': _(
'Allow users to run synchronous queries, this is the default '
'and should work well for queries that can be executed '
'within a web request scope (<~1 minute)'),
'allow_run_async': _(
'Allow users to run queries, against an async backend. '
'This assumes that you have a Celery worker setup as well '
'as a results backend.'),
'allow_ctas': _('Allow CREATE TABLE AS option in SQL Lab'),
'allow_dml': _(
'Allow users to run non-SELECT statements '
'(UPDATE, DELETE, CREATE, ...) '
'in SQL Lab'),
'force_ctas_schema': _(
'When allowing CREATE TABLE AS option in SQL Lab, '
'this option forces the table to be created in this schema'),
'extra': utils.markdown(
'JSON string containing extra configuration elements. '
'The ``engine_params`` object gets unpacked into the '
'[sqlalchemy.create_engine]'
'(http://docs.sqlalchemy.org/en/latest/core/engines.html#'
'sqlalchemy.create_engine) call, while the ``metadata_params`` '
'gets unpacked into the [sqlalchemy.MetaData]'
'(http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html'
'#sqlalchemy.schema.MetaData) call. ', True),
'impersonate_user': _(
'If Presto, all the queries in SQL Lab are going to be executed as the '
'currently logged on user who must have permission to run them.<br/>'
'If Hive and hive.server2.enable.doAs is enabled, will run the queries as '
'service account, but impersonate the currently logged on user '
'via hive.server2.proxy.user property.'),
}
label_columns = {
'expose_in_sqllab': _('Expose in SQL Lab'),
'allow_ctas': _('Allow CREATE TABLE AS'),
'allow_dml': _('Allow DML'),
'force_ctas_schema': _('CTAS Schema'),
'database_name': _('Database'),
'creator': _('Creator'),
'changed_on_': _('Last Changed'),
'sqlalchemy_uri': _('SQLAlchemy URI'),
'cache_timeout': _('Cache Timeout'),
'extra': _('Extra'),
'allow_run_sync': _('Allow Run Sync'),
'allow_run_async': _('Allow Run Async'),
'impersonate_user': _('Impersonate the logged on user'),
}
def pre_add(self, db):
db.set_sqlalchemy_uri(db.sqlalchemy_uri)
security.merge_perm(sm, 'database_access', db.perm)
for schema in db.all_schema_names():
security.merge_perm(
sm, 'schema_access', utils.get_schema_perm(db, schema))
def pre_update(self, db):
self.pre_add(db)
def _delete(self, pk):
DeleteMixin._delete(self, pk)
appbuilder.add_link(
'Import Dashboards',
label=__('Import Dashboards'),
href='/superset/import_dashboards',
icon='fa-cloud-upload',
category='Manage',
category_label=__('Manage'),
category_icon='fa-wrench',)
appbuilder.add_view(
DatabaseView,
'Databases',
label=__('Databases'),
icon='fa-database',
category='Sources',
category_label=__('Sources'),
category_icon='fa-database',)
class DatabaseAsync(DatabaseView):
list_columns = [
'id', 'database_name',
'expose_in_sqllab', 'allow_ctas', 'force_ctas_schema',
'allow_run_async', 'allow_run_sync', 'allow_dml',
]
appbuilder.add_view_no_menu(DatabaseAsync)
class DatabaseTablesAsync(DatabaseView):
list_columns = ['id', 'all_table_names', 'all_schema_names']
appbuilder.add_view_no_menu(DatabaseTablesAsync)
class AccessRequestsModelView(SupersetModelView, DeleteMixin):
datamodel = SQLAInterface(DAR)
list_columns = [
'username', 'user_roles', 'datasource_link',
'roles_with_datasource', 'created_on']
order_columns = ['created_on']
base_order = ('changed_on', 'desc')
label_columns = {
'username': _('User'),
'user_roles': _('User Roles'),
'database': _('Database URL'),
'datasource_link': _('Datasource'),
'roles_with_datasource': _('Roles to grant'),
'created_on': _('Created On'),
}
appbuilder.add_view(
AccessRequestsModelView,
'Access requests',
label=__('Access requests'),
category='Security',
category_label=__('Security'),
icon='fa-table',)
class SliceModelView(SupersetModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Slice)
list_title = _('List Slices')
show_title = _('Show Slice')
add_title = _('Add Slice')
edit_title = _('Edit Slice')
can_add = False
label_columns = {
'datasource_link': _('Datasource'),
}
search_columns = (
'slice_name', 'description', 'viz_type', 'owners',
)
list_columns = [
'slice_link', 'viz_type', 'datasource_link', 'creator', 'modified']
order_columns = ['viz_type', 'datasource_link', 'modified']
edit_columns = [
'slice_name', 'description', 'viz_type', 'owners', 'dashboards',
'params', 'cache_timeout']
base_order = ('changed_on', 'desc')
description_columns = {
'description': Markup(
'The content here can be displayed as widget headers in the '
'dashboard view. Supports '
'<a href="https://daringfireball.net/projects/markdown/"">'
'markdown</a>'),
'params': _(
'These parameters are generated dynamically when clicking '
'the save or overwrite button in the explore view. This JSON '
'object is exposed here for reference and for power users who may '
'want to alter specific parameters.',
),
'cache_timeout': _(
'Duration (in seconds) of the caching timeout for this slice.'),
}
base_filters = [['id', SliceFilter, lambda: []]]
label_columns = {
'cache_timeout': _('Cache Timeout'),
'creator': _('Creator'),
'dashboards': _('Dashboards'),
'datasource_link': _('Datasource'),
'description': _('Description'),
'modified': _('Last Modified'),
'owners': _('Owners'),
'params': _('Parameters'),
'slice_link': _('Slice'),
'slice_name': _('Name'),
'table': _('Table'),
'viz_type': _('Visualization Type'),
}
def pre_add(self, obj):
utils.validate_json(obj.params)
def pre_update(self, obj):
utils.validate_json(obj.params)
check_ownership(obj)
def pre_delete(self, obj):
check_ownership(obj)
@expose('/add', methods=['GET', 'POST'])
@has_access
def add(self):
datasources = ConnectorRegistry.get_all_datasources(db.session)
datasources = [
{'value': str(d.id) + '__' + d.type, 'label': repr(d)}
for d in datasources
]
return self.render_template(
'superset/add_slice.html',
bootstrap_data=json.dumps({
'datasources': sorted(datasources, key=lambda d: d['label']),
}),
)
appbuilder.add_view(
SliceModelView,
'Slices',
label=__('Slices'),
icon='fa-bar-chart',
category='',
category_icon='',)
class SliceAsync(SliceModelView): # noqa
list_columns = [
'slice_link', 'viz_type',
'creator', 'modified', 'icons']
label_columns = {
'icons': ' ',
'slice_link': _('Slice'),
}
appbuilder.add_view_no_menu(SliceAsync)
class SliceAddView(SliceModelView): # noqa
list_columns = [
'id', 'slice_name', 'slice_link', 'viz_type',
'owners', 'modified', 'changed_on']
appbuilder.add_view_no_menu(SliceAddView)
class DashboardModelView(SupersetModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Dashboard)
list_title = _('List Dashboards')
show_title = _('Show Dashboard')
add_title = _('Add Dashboard')
edit_title = _('Edit Dashboard')
list_columns = ['dashboard_link', 'creator', 'modified']
order_columns = ['modified']
edit_columns = [
'dashboard_title', 'slug', 'slices', 'owners', 'position_json', 'css',
'json_metadata']
show_columns = edit_columns + ['table_names']
search_columns = ('dashboard_title', 'slug', 'owners')
add_columns = edit_columns
base_order = ('changed_on', 'desc')
description_columns = {
'position_json': _(
'This json object describes the positioning of the widgets in '
'the dashboard. It is dynamically generated when adjusting '
'the widgets size and positions by using drag & drop in '
'the dashboard view'),
'css': _(
'The css for individual dashboards can be altered here, or '
'in the dashboard view where changes are immediately '
'visible'),
'slug': _('To get a readable URL for your dashboard'),
'json_metadata': _(
'This JSON object is generated dynamically when clicking '
'the save or overwrite button in the dashboard view. It '
'is exposed here for reference and for power users who may '
'want to alter specific parameters.'),
'owners': _('Owners is a list of users who can alter the dashboard.'),
}
base_filters = [['slice', DashboardFilter, lambda: []]]
add_form_query_rel_fields = {
'slices': [['slices', SliceFilter, None]],
}
edit_form_query_rel_fields = add_form_query_rel_fields
label_columns = {
'dashboard_link': _('Dashboard'),
'dashboard_title': _('Title'),
'slug': _('Slug'),
'slices': _('Slices'),
'owners': _('Owners'),
'creator': _('Creator'),
'modified': _('Modified'),
'position_json': _('Position JSON'),
'css': _('CSS'),
'json_metadata': _('JSON Metadata'),
'table_names': _('Underlying Tables'),
}
def pre_add(self, obj):
obj.slug = obj.slug.strip() or None
if obj.slug:
obj.slug = obj.slug.replace(' ', '-')
obj.slug = re.sub(r'[^a-zA-Z0-9\-]+', '', obj.slug)
if g.user not in obj.owners:
obj.owners.append(g.user)
utils.validate_json(obj.json_metadata)
utils.validate_json(obj.position_json)
owners = [o for o in obj.owners]
for slc in obj.slices:
slc.owners = list(set(owners) | set(slc.owners))
def pre_update(self, obj):
check_ownership(obj)
self.pre_add(obj)
def pre_delete(self, obj):
check_ownership(obj)
@action('mulexport', __('Export'), __('Export dashboards?'), 'fa-database')
def mulexport(self, items):
if not isinstance(items, list):
items = [items]
ids = ''.join('&id={}'.format(d.id) for d in items)
return redirect(
'/dashboardmodelview/export_dashboards_form?{}'.format(ids[1:]))
@expose('/export_dashboards_form')
def download_dashboards(self):
if request.args.get('action') == 'go':
ids = request.args.getlist('id')
return Response(
models.Dashboard.export_dashboards(ids),
headers=generate_download_headers('pickle'),
mimetype='application/text')
return self.render_template(
'superset/export_dashboards.html',
dashboards_url='/dashboardmodelview/list',
)
appbuilder.add_view(
DashboardModelView,
'Dashboards',
label=__('Dashboards'),
icon='fa-dashboard',
category='',
category_icon='',)
class DashboardModelViewAsync(DashboardModelView): # noqa
list_columns = ['dashboard_link', 'creator', 'modified', 'dashboard_title']
label_columns = {
'dashboard_link': _('Dashboard'),
'dashboard_title': _('Title'),
'creator': _('Creator'),
'modified': _('Modified'),
}
appbuilder.add_view_no_menu(DashboardModelViewAsync)
class LogModelView(SupersetModelView):
datamodel = SQLAInterface(models.Log)
list_columns = ('user', 'action', 'dttm')
edit_columns = ('user', 'action', 'dttm', 'json')
base_order = ('dttm', 'desc')
label_columns = {
'user': _('User'),
'action': _('Action'),
'dttm': _('dttm'),
'json': _('JSON'),
}
appbuilder.add_view(
LogModelView,
'Action Log',
label=__('Action Log'),
category='Security',
category_label=__('Security'),
icon='fa-list-ol')
@app.route('/health')
def health():
return 'OK'
@app.route('/healthcheck')
def healthcheck():
return 'OK'
@app.route('/ping')
def ping():
return 'OK'
class KV(BaseSupersetView):
"""Used for storing and retrieving key value pairs"""
@log_this
@expose('/store/', methods=['POST'])
def store(self):
try:
value = request.form.get('data')
obj = models.KeyValue(value=value)
db.session.add(obj)
db.session.commit()
except Exception as e:
return json_error_response(e)
return Response(
json.dumps({'id': obj.id}),
status=200)
@log_this
@expose('/<key_id>/', methods=['GET'])
def get_value(self, key_id):
kv = None
try:
kv = db.session.query(models.KeyValue).filter_by(id=key_id).one()
except Exception as e:
return json_error_response(e)
return Response(kv.value, status=200)
appbuilder.add_view_no_menu(KV)
class R(BaseSupersetView):
"""used for short urls"""
@log_this
@expose('/<url_id>')
def index(self, url_id):
url = db.session.query(models.Url).filter_by(id=url_id).first()
if url:
return redirect('/' + url.url)
else:
flash('URL to nowhere...', 'danger')
return redirect('/')
@log_this
@expose('/shortner/', methods=['POST', 'GET'])
def shortner(self):
url = request.form.get('data')
obj = models.Url(url=url)
db.session.add(obj)
db.session.commit()
return('http://{request.headers[Host]}/r/{obj.id}'.format(
request=request, obj=obj))
@expose('/msg/')
def msg(self):
"""Redirects to specified url while flash a message"""
flash(Markup(request.args.get('msg')), 'info')
return redirect(request.args.get('url'))
appbuilder.add_view_no_menu(R)
class Superset(BaseSupersetView):
"""The base views for Superset!"""
@api
@has_access_api
@expose('/update_role/', methods=['POST'])
def update_role(self):
"""Assigns a list of found users to the given role."""
data = request.get_json(force=True)
gamma_role = sm.find_role('Gamma')
username_set = set()
user_data_dict = {}
for user_data in data['users']:
username = user_data['username']
if not username:
continue
user_data_dict[username] = user_data
username_set.add(username)
existing_users = db.session.query(sm.user_model).filter(
sm.user_model.username.in_(username_set)).all()
missing_users = username_set.difference(
set([u.username for u in existing_users]))
logging.info('Missing users: {}'.format(missing_users))
created_users = []
for username in missing_users:
user_data = user_data_dict[username]
user = sm.find_user(email=user_data['email'])
if not user:
logging.info('Adding user: {}.'.format(user_data))
sm.add_user(
username=user_data['username'],
first_name=user_data['first_name'],
last_name=user_data['last_name'],
email=user_data['email'],
role=gamma_role,
)
sm.get_session.commit()
user = sm.find_user(username=user_data['username'])
existing_users.append(user)
created_users.append(user.username)
role_name = data['role_name']
role = sm.find_role(role_name)
role.user = existing_users
sm.get_session.commit()
return self.json_response({
'role': role_name,
'# missing users': len(missing_users),
'# granted': len(existing_users),
'created_users': created_users,
}, status=201)
def json_response(self, obj, status=200):
return Response(
json.dumps(obj, default=utils.json_int_dttm_ser),
status=status,
mimetype='application/json')
@has_access_api
@expose('/datasources/')
def datasources(self):
datasources = ConnectorRegistry.get_all_datasources(db.session)
datasources = [o.short_data for o in datasources]
datasources = sorted(datasources, key=lambda o: o['name'])
return self.json_response(datasources)
@has_access_api
@expose('/override_role_permissions/', methods=['POST'])
def override_role_permissions(self):
"""Updates the role with the give datasource permissions.
Permissions not in the request will be revoked. This endpoint should
be available to admins only. Expects JSON in the format:
{
'role_name': '{role_name}',
'database': [{
'datasource_type': '{table|druid}',
'name': '{database_name}',
'schema': [{
'name': '{schema_name}',
'datasources': ['{datasource name}, {datasource name}']
}]
}]
}
"""
data = request.get_json(force=True)
role_name = data['role_name']
databases = data['database']
db_ds_names = set()
for dbs in databases:
for schema in dbs['schema']:
for ds_name in schema['datasources']:
fullname = utils.get_datasource_full_name(
dbs['name'], ds_name, schema=schema['name'])
db_ds_names.add(fullname)
existing_datasources = ConnectorRegistry.get_all_datasources(db.session)
datasources = [
d for d in existing_datasources if d.full_name in db_ds_names]
role = sm.find_role(role_name)
# remove all permissions
role.permissions = []
# grant permissions to the list of datasources
granted_perms = []
for datasource in datasources:
view_menu_perm = sm.find_permission_view_menu(
view_menu_name=datasource.perm,
permission_name='datasource_access')
# prevent creating empty permissions
if view_menu_perm and view_menu_perm.view_menu:
role.permissions.append(view_menu_perm)
granted_perms.append(view_menu_perm.view_menu.name)
db.session.commit()
return self.json_response({
'granted': granted_perms,
'requested': list(db_ds_names),
}, status=201)
@log_this
@has_access
@expose('/request_access/')
def request_access(self):
datasources = set()
dashboard_id = request.args.get('dashboard_id')
if dashboard_id:
dash = (
db.session.query(models.Dashboard)
.filter_by(id=int(dashboard_id))
.one()
)
datasources |= dash.datasources
datasource_id = request.args.get('datasource_id')
datasource_type = request.args.get('datasource_type')
if datasource_id:
ds_class = ConnectorRegistry.sources.get(datasource_type)
datasource = (
db.session.query(ds_class)
.filter_by(id=int(datasource_id))
.one()
)
datasources.add(datasource)
if request.args.get('action') == 'go':
for datasource in datasources:
access_request = DAR(
datasource_id=datasource.id,
datasource_type=datasource.type)
db.session.add(access_request)
db.session.commit()
flash(__('Access was requested'), 'info')
return redirect('/')
return self.render_template(
'superset/request_access.html',
datasources=datasources,
datasource_names=', '.join([o.name for o in datasources]),
)
@log_this
@has_access
@expose('/approve')
def approve(self):
def clean_fulfilled_requests(session):
for r in session.query(DAR).all():
datasource = ConnectorRegistry.get_datasource(
r.datasource_type, r.datasource_id, session)
user = sm.get_user_by_id(r.created_by_fk)
if not datasource or \
self.datasource_access(datasource, user):
# datasource does not exist anymore
session.delete(r)
session.commit()
datasource_type = request.args.get('datasource_type')
datasource_id = request.args.get('datasource_id')
created_by_username = request.args.get('created_by')
role_to_grant = request.args.get('role_to_grant')
role_to_extend = request.args.get('role_to_extend')
session = db.session
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, session)
if not datasource:
flash(DATASOURCE_MISSING_ERR, 'alert')
return json_error_response(DATASOURCE_MISSING_ERR)
requested_by = sm.find_user(username=created_by_username)
if not requested_by:
flash(USER_MISSING_ERR, 'alert')
return json_error_response(USER_MISSING_ERR)
requests = (
session.query(DAR)
.filter(
DAR.datasource_id == datasource_id,
DAR.datasource_type == datasource_type,
DAR.created_by_fk == requested_by.id)
.all()
)
if not requests:
flash(ACCESS_REQUEST_MISSING_ERR, 'alert')
return json_error_response(ACCESS_REQUEST_MISSING_ERR)
# check if you can approve
if self.all_datasource_access() or g.user.id == datasource.owner_id:
# can by done by admin only
if role_to_grant:
role = sm.find_role(role_to_grant)
requested_by.roles.append(role)
msg = __(
'%(user)s was granted the role %(role)s that gives access '
'to the %(datasource)s',
user=requested_by.username,
role=role_to_grant,
datasource=datasource.full_name)
utils.notify_user_about_perm_udate(
g.user, requested_by, role, datasource,
'email/role_granted.txt', app.config)
flash(msg, 'info')
if role_to_extend:
perm_view = sm.find_permission_view_menu(
'email/datasource_access', datasource.perm)
role = sm.find_role(role_to_extend)
sm.add_permission_role(role, perm_view)
msg = __('Role %(r)s was extended to provide the access to '
'the datasource %(ds)s', r=role_to_extend,
ds=datasource.full_name)
utils.notify_user_about_perm_udate(
g.user, requested_by, role, datasource,
'email/role_extended.txt', app.config)
flash(msg, 'info')
clean_fulfilled_requests(session)
else:
flash(__('You have no permission to approve this request'),
'danger')
return redirect('/accessrequestsmodelview/list/')
for r in requests:
session.delete(r)
session.commit()
return redirect('/accessrequestsmodelview/list/')
def get_form_data(self):
# get form data from url
if request.args.get('form_data'):
form_data = request.args.get('form_data')
elif request.form.get('form_data'):
# Supporting POST as well as get
form_data = request.form.get('form_data')
else:
form_data = '{}'
d = json.loads(form_data)
if request.args.get('viz_type'):
# Converting old URLs
d = cast_form_data(request.args)
return d
def get_viz(
self,
slice_id=None,
args=None,
datasource_type=None,
datasource_id=None):
if slice_id:
slc = (
db.session.query(models.Slice)
.filter_by(id=slice_id)
.one()
)
return slc.get_viz()
else:
form_data = self.get_form_data()
viz_type = form_data.get('viz_type', 'table')
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session)
viz_obj = viz.viz_types[viz_type](
datasource,
form_data=form_data,
)
return viz_obj
@has_access
@expose('/slice/<slice_id>/')
def slice(self, slice_id):
viz_obj = self.get_viz(slice_id)
endpoint = (
'/superset/explore/{}/{}?form_data={}'
.format(
viz_obj.datasource.type,
viz_obj.datasource.id,
parse.quote(json.dumps(viz_obj.form_data)),
)
)
if request.args.get('standalone') == 'true':
endpoint += '&standalone=true'
return redirect(endpoint)
def get_query_string_response(self, viz_obj):
try:
query_obj = viz_obj.query_obj()
query = viz_obj.datasource.get_query_str(query_obj)
except Exception as e:
return json_error_response(e)
return Response(
json.dumps({
'query': query,
'language': viz_obj.datasource.query_language,
}),
status=200,
mimetype='application/json')
@log_this
@has_access_api
@expose('/explore_json/<datasource_type>/<datasource_id>/')
def explore_json(self, datasource_type, datasource_id):
try:
viz_obj = self.get_viz(
datasource_type=datasource_type,
datasource_id=datasource_id,
args=request.args)
except Exception as e:
logging.exception(e)
return json_error_response(
utils.error_msg_from_exception(e),
stacktrace=traceback.format_exc())
if not self.datasource_access(viz_obj.datasource):
return json_error_response(DATASOURCE_ACCESS_ERR, status=404)
if request.args.get('csv') == 'true':
return CsvResponse(
viz_obj.get_csv(),
status=200,
headers=generate_download_headers('csv'),
mimetype='application/csv')
if request.args.get('query') == 'true':
return self.get_query_string_response(viz_obj)
payload = {}
try:
payload = viz_obj.get_payload(
force=request.args.get('force') == 'true')
except Exception as e:
logging.exception(e)
return json_error_response(utils.error_msg_from_exception(e))
status = 200
if payload.get('status') == QueryStatus.FAILED:
status = 400
return json_success(viz_obj.json_dumps(payload), status=status)
@expose('/import_dashboards', methods=['GET', 'POST'])
@log_this
def import_dashboards(self):
"""Overrides the dashboards using pickled instances from the file."""
f = request.files.get('file')
if request.method == 'POST' and f:
current_tt = int(time.time())
data = pickle.load(f)
# TODO: import DRUID datasources
for table in data['datasources']:
ds_class = ConnectorRegistry.sources.get(table.type)
ds_class.import_obj(table, import_time=current_tt)
db.session.commit()
for dashboard in data['dashboards']:
models.Dashboard.import_obj(
dashboard, import_time=current_tt)
db.session.commit()
return redirect('/dashboardmodelview/list/')
return self.render_template('superset/import_dashboards.html')
@log_this
@has_access
@expose('/explorev2/<datasource_type>/<datasource_id>/')
def explorev2(self, datasource_type, datasource_id):
return redirect(url_for(
'Superset.explore',
datasource_type=datasource_type,
datasource_id=datasource_id,
**request.args))
@log_this
@has_access
@expose('/explore/<datasource_type>/<datasource_id>/')
def explore(self, datasource_type, datasource_id):
form_data = self.get_form_data()
datasource_id = int(datasource_id)
viz_type = form_data.get('viz_type')
slice_id = form_data.get('slice_id')
user_id = g.user.get_id() if g.user else None
slc = None
if slice_id:
slc = db.session.query(models.Slice).filter_by(id=slice_id).first()
error_redirect = '/slicemodelview/list/'
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session)
if not datasource:
flash(DATASOURCE_MISSING_ERR, 'danger')
return redirect(error_redirect)
if not self.datasource_access(datasource):
flash(
__(get_datasource_access_error_msg(datasource.name)),
'danger')
return redirect(
'superset/request_access/?'
'datasource_type={datasource_type}&'
'datasource_id={datasource_id}&'
''.format(**locals()))
if not viz_type and datasource.default_endpoint:
return redirect(datasource.default_endpoint)
# slc perms
slice_add_perm = self.can_access('can_add', 'SliceModelView')
slice_overwrite_perm = is_owner(slc, g.user)
slice_download_perm = self.can_access('can_download', 'SliceModelView')
form_data['datasource'] = str(datasource_id) + '__' + datasource_type
# handle save or overwrite
action = request.args.get('action')
if action == 'overwrite' and not slice_overwrite_perm:
return json_error_response(
"You don't have the rights to alter this slice",
status=400)
if action in ('saveas', 'overwrite'):
return self.save_or_overwrite_slice(
request.args,
slc, slice_add_perm,
slice_overwrite_perm,
slice_download_perm,
datasource_id,
datasource_type)
form_data['datasource'] = str(datasource_id) + '__' + datasource_type
# On explore, merge extra filters into the form data
merge_extra_filters(form_data)
standalone = request.args.get('standalone') == 'true'
bootstrap_data = {
'can_add': slice_add_perm,
'can_download': slice_download_perm,
'can_overwrite': slice_overwrite_perm,
'datasource': datasource.data,
'form_data': form_data,
'datasource_id': datasource_id,
'datasource_type': datasource_type,
'slice': slc.data if slc else None,
'standalone': standalone,
'user_id': user_id,
'forced_height': request.args.get('height'),
'common': self.common_bootsrap_payload(),
}
table_name = datasource.table_name \
if datasource_type == 'table' \
else datasource.datasource_name
if slc:
title = '[slice] ' + slc.slice_name
else:
title = '[explore] ' + table_name
return self.render_template(
'superset/basic.html',
bootstrap_data=json.dumps(bootstrap_data),
entry='explore',
title=title,
standalone_mode=standalone)
@api
@has_access_api
@expose('/filter/<datasource_type>/<datasource_id>/<column>/')
def filter(self, datasource_type, datasource_id, column):
"""
Endpoint to retrieve values for specified column.
:param datasource_type: Type of datasource e.g. table
:param datasource_id: Datasource id
:param column: Column name to retrieve values for
:return:
"""
# TODO: Cache endpoint by user, datasource and column
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session)
if not datasource:
return json_error_response(DATASOURCE_MISSING_ERR)
if not self.datasource_access(datasource):
return json_error_response(DATASOURCE_ACCESS_ERR)
payload = json.dumps(
datasource.values_for_column(column),
default=utils.json_int_dttm_ser)
return json_success(payload)
def save_or_overwrite_slice(
self, args, slc, slice_add_perm, slice_overwrite_perm, slice_download_perm,
datasource_id, datasource_type):
"""Save or overwrite a slice"""
slice_name = args.get('slice_name')
action = args.get('action')
form_data = self.get_form_data()
if action in ('saveas'):
if 'slice_id' in form_data:
form_data.pop('slice_id') # don't save old slice_id
slc = models.Slice(owners=[g.user] if g.user else [])
slc.params = json.dumps(form_data)
slc.datasource_name = args.get('datasource_name')
slc.viz_type = form_data['viz_type']
slc.datasource_type = datasource_type
slc.datasource_id = datasource_id
slc.slice_name = slice_name
if action in ('saveas') and slice_add_perm:
self.save_slice(slc)
elif action == 'overwrite' and slice_overwrite_perm:
self.overwrite_slice(slc)
# Adding slice to a dashboard if requested
dash = None
if request.args.get('add_to_dash') == 'existing':
dash = (
db.session.query(models.Dashboard)
.filter_by(id=int(request.args.get('save_to_dashboard_id')))
.one()
)
flash(
'Slice [{}] was added to dashboard [{}]'.format(
slc.slice_name,
dash.dashboard_title),
'info')
elif request.args.get('add_to_dash') == 'new':
dash = models.Dashboard(
dashboard_title=request.args.get('new_dashboard_name'),
owners=[g.user] if g.user else [])
flash(
'Dashboard [{}] just got created and slice [{}] was added '
'to it'.format(
dash.dashboard_title,
slc.slice_name),
'info')
if dash and slc not in dash.slices:
dash.slices.append(slc)
db.session.commit()
response = {
'can_add': slice_add_perm,
'can_download': slice_download_perm,
'can_overwrite': is_owner(slc, g.user),
'form_data': slc.form_data,
'slice': slc.data,
}
if request.args.get('goto_dash') == 'true':
response.update({'dashboard': dash.url})
return json_success(json.dumps(response))
def save_slice(self, slc):
session = db.session()
msg = 'Slice [{}] has been saved'.format(slc.slice_name)
session.add(slc)
session.commit()
flash(msg, 'info')
def overwrite_slice(self, slc):
session = db.session()
session.merge(slc)
session.commit()
msg = 'Slice [{}] has been overwritten'.format(slc.slice_name)
flash(msg, 'info')
@api
@has_access_api
@expose('/checkbox/<model_view>/<id_>/<attr>/<value>', methods=['GET'])
def checkbox(self, model_view, id_, attr, value):
"""endpoint for checking/unchecking any boolean in a sqla model"""
modelview_to_model = {
'TableColumnInlineView':
ConnectorRegistry.sources['table'].column_class,
}
model = modelview_to_model[model_view]
obj = db.session.query(model).filter_by(id=id_).first()
if obj:
setattr(obj, attr, value == 'true')
db.session.commit()
return json_success('OK')
@api
@has_access_api
@expose('/activity_per_day')
def activity_per_day(self):
"""endpoint to power the calendar heatmap on the welcome page"""
Log = models.Log # noqa
qry = (
db.session
.query(
Log.dt,
sqla.func.count())
.group_by(Log.dt)
.all()
)
payload = {str(time.mktime(dt.timetuple())):
ccount for dt, ccount in qry if dt}
return json_success(json.dumps(payload))
@api
@has_access_api
@expose('/schemas/<db_id>/')
def schemas(self, db_id):
db_id = int(db_id)
database = (
db.session
.query(models.Database)
.filter_by(id=db_id)
.one()
)
schemas = database.all_schema_names()
schemas = self.schemas_accessible_by_user(database, schemas)
return Response(
json.dumps({'schemas': schemas}),
mimetype='application/json')
@api
@has_access_api
@expose('/tables/<db_id>/<schema>/<substr>/')
def tables(self, db_id, schema, substr):
"""Endpoint to fetch the list of tables for given database"""
db_id = int(db_id)
schema = utils.js_string_to_python(schema)
substr = utils.js_string_to_python(substr)
database = db.session.query(models.Database).filter_by(id=db_id).one()
table_names = self.accessible_by_user(
database, database.all_table_names(schema), schema)
view_names = self.accessible_by_user(
database, database.all_view_names(schema), schema)
if substr:
table_names = [tn for tn in table_names if substr in tn]
view_names = [vn for vn in view_names if substr in vn]
max_items = config.get('MAX_TABLE_NAMES') or len(table_names)
total_items = len(table_names) + len(view_names)
max_tables = len(table_names)
max_views = len(view_names)
if total_items and substr:
max_tables = max_items * len(table_names) // total_items
max_views = max_items * len(view_names) // total_items
table_options = [{'value': tn, 'label': tn}
for tn in table_names[:max_tables]]
table_options.extend([{'value': vn, 'label': '[view] {}'.format(vn)}
for vn in view_names[:max_views]])
payload = {
'tableLength': len(table_names) + len(view_names),
'options': table_options,
}
return json_success(json.dumps(payload))
@api
@has_access_api
@expose('/copy_dash/<dashboard_id>/', methods=['GET', 'POST'])
def copy_dash(self, dashboard_id):
"""Copy dashboard"""
session = db.session()
data = json.loads(request.form.get('data'))
dash = models.Dashboard()
original_dash = (
session
.query(models.Dashboard)
.filter_by(id=dashboard_id).first())
dash.owners = [g.user] if g.user else []
dash.dashboard_title = data['dashboard_title']
if data['duplicate_slices']:
# Duplicating slices as well, mapping old ids to new ones
old_to_new_sliceids = {}
for slc in original_dash.slices:
new_slice = slc.clone()
new_slice.owners = [g.user] if g.user else []
session.add(new_slice)
session.flush()
new_slice.dashboards.append(dash)
old_to_new_sliceids['{}'.format(slc.id)] =\
'{}'.format(new_slice.id)
for d in data['positions']:
d['slice_id'] = old_to_new_sliceids[d['slice_id']]
else:
dash.slices = original_dash.slices
dash.params = original_dash.params
self._set_dash_metadata(dash, data)
session.add(dash)
session.commit()
dash_json = json.dumps(dash.data)
session.close()
return json_success(dash_json)
@api
@has_access_api
@expose('/save_dash/<dashboard_id>/', methods=['GET', 'POST'])
def save_dash(self, dashboard_id):
"""Save a dashboard's metadata"""
session = db.session()
dash = (session
.query(models.Dashboard)
.filter_by(id=dashboard_id).first())
check_ownership(dash, raise_if_false=True)
data = json.loads(request.form.get('data'))
self._set_dash_metadata(dash, data)
session.merge(dash)
session.commit()
session.close()
return 'SUCCESS'
@staticmethod
def _set_dash_metadata(dashboard, data):
positions = data['positions']
slice_ids = [int(d['slice_id']) for d in positions]
dashboard.slices = [o for o in dashboard.slices if o.id in slice_ids]
positions = sorted(data['positions'], key=lambda x: int(x['slice_id']))
dashboard.position_json = json.dumps(positions, indent=4, sort_keys=True)
md = dashboard.params_dict
dashboard.css = data['css']
dashboard.dashboard_title = data['dashboard_title']
if 'filter_immune_slices' not in md:
md['filter_immune_slices'] = []
if 'timed_refresh_immune_slices' not in md:
md['timed_refresh_immune_slices'] = []
if 'filter_immune_slice_fields' not in md:
md['filter_immune_slice_fields'] = {}
md['expanded_slices'] = data['expanded_slices']
md['default_filters'] = data.get('default_filters', '')
dashboard.json_metadata = json.dumps(md, indent=4)
@api
@has_access_api
@expose('/add_slices/<dashboard_id>/', methods=['POST'])
def add_slices(self, dashboard_id):
"""Add and save slices to a dashboard"""
data = json.loads(request.form.get('data'))
session = db.session()
Slice = models.Slice # noqa
dash = (
session.query(models.Dashboard).filter_by(id=dashboard_id).first())
check_ownership(dash, raise_if_false=True)
new_slices = session.query(Slice).filter(
Slice.id.in_(data['slice_ids']))
dash.slices += new_slices
session.merge(dash)
session.commit()
session.close()
return 'SLICES ADDED'
@api
@has_access_api
@expose('/testconn', methods=['POST', 'GET'])
def testconn(self):
"""Tests a sqla connection"""
try:
username = g.user.username if g.user is not None else None
uri = request.json.get('uri')
db_name = request.json.get('name')
impersonate_user = request.json.get('impersonate_user')
database = None
if db_name:
database = (
db.session
.query(models.Database)
.filter_by(database_name=db_name)
.first()
)
if database and uri == database.safe_sqlalchemy_uri():
# the password-masked uri was passed
# use the URI associated with this database
uri = database.sqlalchemy_uri_decrypted
configuration = {}
if database and uri:
url = make_url(uri)
db_engine = models.Database.get_db_engine_spec_for_backend(
url.get_backend_name())
db_engine.patch()
masked_url = database.get_password_masked_url_from_uri(uri)
logging.info('Superset.testconn(). Masked URL: {0}'.format(masked_url))
configuration.update(
db_engine.get_configuration_for_impersonation(uri,
impersonate_user,
username),
)
connect_args = (
request.json
.get('extras', {})
.get('engine_params', {})
.get('connect_args', {}))
if configuration:
connect_args['configuration'] = configuration
engine = create_engine(uri, connect_args=connect_args)
engine.connect()
return json_success(json.dumps(engine.table_names(), indent=4))
except Exception as e:
logging.exception(e)
return json_error_response((
'Connection failed!\n\n'
'The error message returned was:\n{}').format(e))
@api
@has_access_api
@expose('/recent_activity/<user_id>/', methods=['GET'])
def recent_activity(self, user_id):
"""Recent activity (actions) for a given user"""
M = models # noqa
qry = (
db.session.query(M.Log, M.Dashboard, M.Slice)
.outerjoin(
M.Dashboard,
M.Dashboard.id == M.Log.dashboard_id,
)
.outerjoin(
M.Slice,
M.Slice.id == M.Log.slice_id,
)
.filter(
sqla.and_(
~M.Log.action.in_(('queries', 'shortner', 'sql_json')),
M.Log.user_id == user_id,
),
)
.order_by(M.Log.dttm.desc())
.limit(1000)
)
payload = []
for log in qry.all():
item_url = None
item_title = None
if log.Dashboard:
item_url = log.Dashboard.url
item_title = log.Dashboard.dashboard_title
elif log.Slice:
item_url = log.Slice.slice_url
item_title = log.Slice.slice_name
payload.append({
'action': log.Log.action,
'item_url': item_url,
'item_title': item_title,
'time': log.Log.dttm,
})
return json_success(
json.dumps(payload, default=utils.json_int_dttm_ser))
@api
@has_access_api
@expose('/csrf_token/', methods=['GET'])
def csrf_token(self):
return Response(
self.render_template('superset/csrf_token.json'),
mimetype='text/json',
)
@api
@has_access_api
@expose('/fave_dashboards_by_username/<username>/', methods=['GET'])
def fave_dashboards_by_username(self, username):
"""This lets us use a user's username to pull favourite dashboards"""
user = sm.find_user(username=username)
return self.fave_dashboards(user.get_id())
@api
@has_access_api
@expose('/fave_dashboards/<user_id>/', methods=['GET'])
def fave_dashboards(self, user_id):
qry = (
db.session.query(
models.Dashboard,
models.FavStar.dttm,
)
.join(
models.FavStar,
sqla.and_(
models.FavStar.user_id == int(user_id),
models.FavStar.class_name == 'Dashboard',
models.Dashboard.id == models.FavStar.obj_id,
),
)
.order_by(
models.FavStar.dttm.desc(),
)
)
payload = []
for o in qry.all():
d = {
'id': o.Dashboard.id,
'dashboard': o.Dashboard.dashboard_link(),
'title': o.Dashboard.dashboard_title,
'url': o.Dashboard.url,
'dttm': o.dttm,
}
if o.Dashboard.created_by:
user = o.Dashboard.created_by
d['creator'] = str(user)
d['creator_url'] = '/superset/profile/{}/'.format(
user.username)
payload.append(d)
return json_success(
json.dumps(payload, default=utils.json_int_dttm_ser))
@api
@has_access_api
@expose('/created_dashboards/<user_id>/', methods=['GET'])
def created_dashboards(self, user_id):
Dash = models.Dashboard # noqa
qry = (
db.session.query(
Dash,
)
.filter(
sqla.or_(
Dash.created_by_fk == user_id,
Dash.changed_by_fk == user_id,
),
)
.order_by(
Dash.changed_on.desc(),
)
)
payload = [{
'id': o.id,
'dashboard': o.dashboard_link(),
'title': o.dashboard_title,
'url': o.url,
'dttm': o.changed_on,
} for o in qry.all()]
return json_success(
json.dumps(payload, default=utils.json_int_dttm_ser))
@api
@has_access_api
@expose('/created_slices/<user_id>/', methods=['GET'])
def created_slices(self, user_id):
"""List of slices created by this user"""
Slice = models.Slice # noqa
qry = (
db.session.query(Slice)
.filter(
sqla.or_(
Slice.created_by_fk == user_id,
Slice.changed_by_fk == user_id,
),
)
.order_by(Slice.changed_on.desc())
)
payload = [{
'id': o.id,
'title': o.slice_name,
'url': o.slice_url,
'dttm': o.changed_on,
} for o in qry.all()]
return json_success(
json.dumps(payload, default=utils.json_int_dttm_ser))
@api
@has_access_api
@expose('/fave_slices/<user_id>/', methods=['GET'])
def fave_slices(self, user_id):
"""Favorite slices for a user"""
qry = (
db.session.query(
models.Slice,
models.FavStar.dttm,
)
.join(
models.FavStar,
sqla.and_(
models.FavStar.user_id == int(user_id),
models.FavStar.class_name == 'slice',
models.Slice.id == models.FavStar.obj_id,
),
)
.order_by(
models.FavStar.dttm.desc(),
)
)
payload = []
for o in qry.all():
d = {
'id': o.Slice.id,
'title': o.Slice.slice_name,
'url': o.Slice.slice_url,
'dttm': o.dttm,
}
if o.Slice.created_by:
user = o.Slice.created_by
d['creator'] = str(user)
d['creator_url'] = '/superset/profile/{}/'.format(
user.username)
payload.append(d)
return json_success(
json.dumps(payload, default=utils.json_int_dttm_ser))
@api
@has_access_api
@expose('/warm_up_cache/', methods=['GET'])
def warm_up_cache(self):
"""Warms up the cache for the slice or table.
Note for slices a force refresh occurs.
"""
slices = None
session = db.session()
slice_id = request.args.get('slice_id')
table_name = request.args.get('table_name')
db_name = request.args.get('db_name')
if not slice_id and not (table_name and db_name):
return json_error_response(__(
'Malformed request. slice_id or table_name and db_name '
'arguments are expected'), status=400)
if slice_id:
slices = session.query(models.Slice).filter_by(id=slice_id).all()
if not slices:
return json_error_response(__(
'Slice %(id)s not found', id=slice_id), status=404)
elif table_name and db_name:
SqlaTable = ConnectorRegistry.sources['table']
table = (
session.query(SqlaTable)
.join(models.Database)
.filter(
models.Database.database_name == db_name or
SqlaTable.table_name == table_name)
).first()
if not table:
return json_error_response(__(
"Table %(t)s wasn't found in the database %(d)s",
t=table_name, s=db_name), status=404)
slices = session.query(models.Slice).filter_by(
datasource_id=table.id,
datasource_type=table.type).all()
for slc in slices:
try:
obj = slc.get_viz()
obj.get_json(force=True)
except Exception as e:
return json_error_response(utils.error_msg_from_exception(e))
return json_success(json.dumps(
[{'slice_id': slc.id, 'slice_name': slc.slice_name}
for slc in slices]))
@expose('/favstar/<class_name>/<obj_id>/<action>/')
def favstar(self, class_name, obj_id, action):
"""Toggle favorite stars on Slices and Dashboard"""
session = db.session()
FavStar = models.FavStar # noqa
count = 0
favs = session.query(FavStar).filter_by(
class_name=class_name, obj_id=obj_id,
user_id=g.user.get_id()).all()
if action == 'select':
if not favs:
session.add(
FavStar(
class_name=class_name,
obj_id=obj_id,
user_id=g.user.get_id(),
dttm=datetime.now(),
),
)
count = 1
elif action == 'unselect':
for fav in favs:
session.delete(fav)
else:
count = len(favs)
session.commit()
return json_success(json.dumps({'count': count}))
@has_access
@expose('/dashboard/<dashboard_id>/')
def dashboard(self, dashboard_id):
"""Server side rendering for a dashboard"""
session = db.session()
qry = session.query(models.Dashboard)
if dashboard_id.isdigit():
qry = qry.filter_by(id=int(dashboard_id))
else:
qry = qry.filter_by(slug=dashboard_id)
dash = qry.one()
datasources = set()
for slc in dash.slices:
datasource = slc.datasource
if datasource:
datasources.add(datasource)
for datasource in datasources:
if datasource and not self.datasource_access(datasource):
flash(
__(get_datasource_access_error_msg(datasource.name)),
'danger')
return redirect(
'superset/request_access/?'
'dashboard_id={dash.id}&'.format(**locals()))
# Hack to log the dashboard_id properly, even when getting a slug
@log_this
def dashboard(**kwargs): # noqa
pass
dashboard(dashboard_id=dash.id)
dash_edit_perm = check_ownership(dash, raise_if_false=False)
dash_save_perm = \
dash_edit_perm and self.can_access('can_save_dash', 'Superset')
standalone_mode = request.args.get('standalone') == 'true'
dashboard_data = dash.data
dashboard_data.update({
'standalone_mode': standalone_mode,
'dash_save_perm': dash_save_perm,
'dash_edit_perm': dash_edit_perm,
})
bootstrap_data = {
'user_id': g.user.get_id(),
'dashboard_data': dashboard_data,
'datasources': {ds.uid: ds.data for ds in datasources},
'common': self.common_bootsrap_payload(),
}
return self.render_template(
'superset/dashboard.html',
entry='dashboard',
standalone_mode=standalone_mode,
title='[dashboard] ' + dash.dashboard_title,
bootstrap_data=json.dumps(bootstrap_data),
)
@has_access
@expose('/sync_druid/', methods=['POST'])
@log_this
def sync_druid_source(self):
"""Syncs the druid datasource in main db with the provided config.
The endpoint takes 3 arguments:
user - user name to perform the operation as
cluster - name of the druid cluster
config - configuration stored in json that contains:
name: druid datasource name
dimensions: list of the dimensions, they become druid columns
with the type STRING
metrics_spec: list of metrics (dictionary). Metric consists of
2 attributes: type and name. Type can be count,
etc. `count` type is stored internally as longSum
other fields will be ignored.
Example: {
'name': 'test_click',
'metrics_spec': [{'type': 'count', 'name': 'count'}],
'dimensions': ['affiliate_id', 'campaign', 'first_seen']
}
"""
payload = request.get_json(force=True)
druid_config = payload['config']
user_name = payload['user']
cluster_name = payload['cluster']
user = sm.find_user(username=user_name)
DruidDatasource = ConnectorRegistry.sources['druid']
DruidCluster = DruidDatasource.cluster_class
if not user:
err_msg = __("Can't find User '%(name)s', please ask your admin "
'to create one.', name=user_name)
logging.error(err_msg)
return json_error_response(err_msg)
cluster = db.session.query(DruidCluster).filter_by(
cluster_name=cluster_name).first()
if not cluster:
err_msg = __("Can't find DruidCluster with cluster_name = "
"'%(name)s'", name=cluster_name)
logging.error(err_msg)
return json_error_response(err_msg)
try:
DruidDatasource.sync_to_db_from_config(
druid_config, user, cluster)
except Exception as e:
logging.exception(utils.error_msg_from_exception(e))
return json_error_response(utils.error_msg_from_exception(e))
return Response(status=201)
@has_access
@expose('/sqllab_viz/', methods=['POST'])
@log_this
def sqllab_viz(self):
SqlaTable = ConnectorRegistry.sources['table']
data = json.loads(request.form.get('data'))
table_name = data.get('datasourceName')
SqlaTable = ConnectorRegistry.sources['table']
table = (
db.session.query(SqlaTable)
.filter_by(table_name=table_name)
.first()
)
if not table:
table = SqlaTable(table_name=table_name)
table.database_id = data.get('dbId')
q = SupersetQuery(data.get('sql'))
table.sql = q.stripped()
db.session.add(table)
cols = []
dims = []
metrics = []
for column_name, config in data.get('columns').items():
is_dim = config.get('is_dim', False)
SqlaTable = ConnectorRegistry.sources['table']
TableColumn = SqlaTable.column_class
SqlMetric = SqlaTable.metric_class
col = TableColumn(
column_name=column_name,
filterable=is_dim,
groupby=is_dim,
is_dttm=config.get('is_date', False),
type=config.get('type', False),
)
cols.append(col)
if is_dim:
dims.append(col)
agg = config.get('agg')
if agg:
if agg == 'count_distinct':
metrics.append(SqlMetric(
metric_name='{agg}__{column_name}'.format(**locals()),
expression='COUNT(DISTINCT {column_name})'
.format(**locals()),
))
else:
metrics.append(SqlMetric(
metric_name='{agg}__{column_name}'.format(**locals()),
expression='{agg}({column_name})'.format(**locals()),
))
if not metrics:
metrics.append(SqlMetric(
metric_name='count'.format(**locals()),
expression='count(*)'.format(**locals()),
))
table.columns = cols
table.metrics = metrics
db.session.commit()
return self.json_response(json.dumps({
'table_id': table.id,
}))
@has_access
@expose('/table/<database_id>/<table_name>/<schema>/')
@log_this
def table(self, database_id, table_name, schema):
schema = utils.js_string_to_python(schema)
mydb = db.session.query(models.Database).filter_by(id=database_id).one()
cols = []
indexes = []
t = mydb.get_columns(table_name, schema)
try:
t = mydb.get_columns(table_name, schema)
indexes = mydb.get_indexes(table_name, schema)
primary_key = mydb.get_pk_constraint(table_name, schema)
foreign_keys = mydb.get_foreign_keys(table_name, schema)
except Exception as e:
return json_error_response(utils.error_msg_from_exception(e))
keys = []
if primary_key and primary_key.get('constrained_columns'):
primary_key['column_names'] = primary_key.pop('constrained_columns')
primary_key['type'] = 'pk'
keys += [primary_key]
for fk in foreign_keys:
fk['column_names'] = fk.pop('constrained_columns')
fk['type'] = 'fk'
keys += foreign_keys
for idx in indexes:
idx['type'] = 'index'
keys += indexes
for col in t:
dtype = ''
try:
dtype = '{}'.format(col['type'])
except Exception:
pass
cols.append({
'name': col['name'],
'type': dtype.split('(')[0] if '(' in dtype else dtype,
'longType': dtype,
'keys': [
k for k in keys
if col['name'] in k.get('column_names')
],
})
tbl = {
'name': table_name,
'columns': cols,
'selectStar': mydb.select_star(
table_name, schema=schema, show_cols=True, indent=True),
'primaryKey': primary_key,
'foreignKeys': foreign_keys,
'indexes': keys,
}
return json_success(json.dumps(tbl))
@has_access
@expose('/extra_table_metadata/<database_id>/<table_name>/<schema>/')
@log_this
def extra_table_metadata(self, database_id, table_name, schema):
schema = utils.js_string_to_python(schema)
mydb = db.session.query(models.Database).filter_by(id=database_id).one()
payload = mydb.db_engine_spec.extra_table_metadata(
mydb, table_name, schema)
return json_success(json.dumps(payload))
@has_access
@expose('/select_star/<database_id>/<table_name>/')
@log_this
def select_star(self, database_id, table_name):
mydb = db.session.query(
models.Database).filter_by(id=database_id).first()
return self.render_template(
'superset/ajah.html',
content=mydb.select_star(table_name, show_cols=True),
)
@expose('/theme/')
def theme(self):
return self.render_template('superset/theme.html')
@has_access_api
@expose('/cached_key/<key>/')
@log_this
def cached_key(self, key):
"""Returns a key from the cache"""
resp = cache.get(key)
if resp:
return resp
return 'nope'
@has_access_api
@expose('/results/<key>/')
@log_this
def results(self, key):
"""Serves a key off of the results backend"""
if not results_backend:
return json_error_response("Results backend isn't configured")
blob = results_backend.get(key)
if not blob:
return json_error_response(
'Data could not be retrieved. '
'You may want to re-run the query.',
status=410,
)
query = db.session.query(Query).filter_by(results_key=key).one()
rejected_tables = self.rejected_datasources(
query.sql, query.database, query.schema)
if rejected_tables:
return json_error_response(get_datasource_access_error_msg(
'{}'.format(rejected_tables)))
payload = utils.zlib_decompress_to_string(blob)
display_limit = app.config.get('DISPLAY_SQL_MAX_ROW', None)
if display_limit:
payload_json = json.loads(payload)
payload_json['data'] = payload_json['data'][:display_limit]
return json_success(
json.dumps(payload_json, default=utils.json_iso_dttm_ser))
@has_access_api
@expose('/stop_query/', methods=['POST'])
@log_this
def stop_query(self):
client_id = request.form.get('client_id')
try:
query = (
db.session.query(Query)
.filter_by(client_id=client_id).one()
)
query.status = utils.QueryStatus.STOPPED
db.session.commit()
except Exception:
pass
return self.json_response('OK')
@has_access_api
@expose('/sql_json/', methods=['POST', 'GET'])
@log_this
def sql_json(self):
"""Runs arbitrary sql and returns and json"""
async = request.form.get('runAsync') == 'true'
sql = request.form.get('sql')
database_id = request.form.get('database_id')
schema = request.form.get('schema') or None
session = db.session()
mydb = session.query(models.Database).filter_by(id=database_id).first()
if not mydb:
json_error_response(
'Database with id {} is missing.'.format(database_id))
rejected_tables = self.rejected_datasources(sql, mydb, schema)
if rejected_tables:
return json_error_response(get_datasource_access_error_msg(
'{}'.format(rejected_tables)))
session.commit()
select_as_cta = request.form.get('select_as_cta') == 'true'
tmp_table_name = request.form.get('tmp_table_name')
if select_as_cta and mydb.force_ctas_schema:
tmp_table_name = '{}.{}'.format(
mydb.force_ctas_schema,
tmp_table_name,
)
query = Query(
database_id=int(database_id),
limit=int(app.config.get('SQL_MAX_ROW', None)),
sql=sql,
schema=schema,
select_as_cta=request.form.get('select_as_cta') == 'true',
start_time=utils.now_as_float(),
tab_name=request.form.get('tab'),
status=QueryStatus.PENDING if async else QueryStatus.RUNNING,
sql_editor_id=request.form.get('sql_editor_id'),
tmp_table_name=tmp_table_name,
user_id=int(g.user.get_id()),
client_id=request.form.get('client_id'),
)
session.add(query)
session.flush()
query_id = query.id
session.commit() # shouldn't be necessary
if not query_id:
raise Exception(_('Query record was not created as expected.'))
logging.info('Triggering query_id: {}'.format(query_id))
# Async request.
if async:
logging.info('Running query on a Celery worker')
# Ignore the celery future object and the request may time out.
try:
sql_lab.get_sql_results.delay(
query_id=query_id, return_results=False,
store_results=not query.select_as_cta, user_name=g.user.username)
except Exception as e:
logging.exception(e)
msg = (
'Failed to start remote query on a worker. '
'Tell your administrator to verify the availability of '
'the message queue.'
)
query.status = QueryStatus.FAILED
query.error_message = msg
session.commit()
return json_error_response('{}'.format(msg))
resp = json_success(json.dumps(
{'query': query.to_dict()}, default=utils.json_int_dttm_ser,
allow_nan=False), status=202)
session.commit()
return resp
# Sync request.
try:
timeout = config.get('SQLLAB_TIMEOUT')
timeout_msg = (
'The query exceeded the {timeout} seconds '
'timeout.').format(**locals())
with utils.timeout(seconds=timeout,
error_message=timeout_msg):
# pylint: disable=no-value-for-parameter
data = sql_lab.get_sql_results(
query_id=query_id, return_results=True)
except Exception as e:
logging.exception(e)
return json_error_response('{}'.format(e))
if data.get('status') == QueryStatus.FAILED:
return json_error_response(payload=data)
return json_success(json.dumps(data, default=utils.json_iso_dttm_ser))
@has_access
@expose('/csv/<client_id>')
@log_this
def csv(self, client_id):
"""Download the query results as csv."""
logging.info('Exporting CSV file [{}]'.format(client_id))
query = (
db.session.query(Query)
.filter_by(client_id=client_id)
.one()
)
rejected_tables = self.rejected_datasources(
query.sql, query.database, query.schema)
if rejected_tables:
flash(get_datasource_access_error_msg('{}'.format(rejected_tables)))
return redirect('/')
blob = None
if results_backend and query.results_key:
logging.info(
'Fetching CSV from results backend '
'[{}]'.format(query.results_key))
blob = results_backend.get(query.results_key)
if blob:
logging.info('Decompressing')
json_payload = utils.zlib_decompress_to_string(blob)
obj = json.loads(json_payload)
columns = [c['name'] for c in obj['columns']]
df = pd.DataFrame.from_records(obj['data'], columns=columns)
logging.info('Using pandas to convert to CSV')
csv = df.to_csv(index=False, **config.get('CSV_EXPORT'))
else:
logging.info('Running a query to turn into CSV')
sql = query.select_sql or query.executed_sql
df = query.database.get_df(sql, query.schema)
# TODO(bkyryliuk): add compression=gzip for big files.
csv = df.to_csv(index=False, **config.get('CSV_EXPORT'))
response = Response(csv, mimetype='text/csv')
response.headers['Content-Disposition'] = (
'attachment; filename={}.csv'.format(query.name))
logging.info('Ready to return response')
return response
@has_access
@expose('/fetch_datasource_metadata')
@log_this
def fetch_datasource_metadata(self):
datasource_id, datasource_type = (
request.args.get('datasourceKey').split('__'))
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session)
# Check if datasource exists
if not datasource:
return json_error_response(DATASOURCE_MISSING_ERR)
# Check permission for datasource
if not self.datasource_access(datasource):
return json_error_response(DATASOURCE_ACCESS_ERR)
return json_success(json.dumps(datasource.data))
@expose('/queries/<last_updated_ms>')
def queries(self, last_updated_ms):
"""Get the updated queries."""
stats_logger.incr('queries')
if not g.user.get_id():
return json_error_response(
'Please login to access the queries.', status=403)
# Unix time, milliseconds.
last_updated_ms_int = int(float(last_updated_ms)) if last_updated_ms else 0
# UTC date time, same that is stored in the DB.
last_updated_dt = utils.EPOCH + timedelta(seconds=last_updated_ms_int / 1000)
sql_queries = (
db.session.query(Query)
.filter(
Query.user_id == g.user.get_id(),
Query.changed_on >= last_updated_dt,
)
.all()
)
dict_queries = {q.client_id: q.to_dict() for q in sql_queries}
return json_success(
json.dumps(dict_queries, default=utils.json_int_dttm_ser))
@has_access
@expose('/search_queries')
@log_this
def search_queries(self):
"""Search for queries."""
query = db.session.query(Query)
search_user_id = request.args.get('user_id')
database_id = request.args.get('database_id')
search_text = request.args.get('search_text')
status = request.args.get('status')
# From and To time stamp should be Epoch timestamp in seconds
from_time = request.args.get('from')
to_time = request.args.get('to')
if search_user_id:
# Filter on db Id
query = query.filter(Query.user_id == search_user_id)
if database_id:
# Filter on db Id
query = query.filter(Query.database_id == database_id)
if status:
# Filter on status
query = query.filter(Query.status == status)
if search_text:
# Filter on search text
query = query \
.filter(Query.sql.like('%{}%'.format(search_text)))
if from_time:
query = query.filter(Query.start_time > int(from_time))
if to_time:
query = query.filter(Query.start_time < int(to_time))
query_limit = config.get('QUERY_SEARCH_LIMIT', 1000)
sql_queries = (
query.order_by(Query.start_time.asc())
.limit(query_limit)
.all()
)
dict_queries = [q.to_dict() for q in sql_queries]
return Response(
json.dumps(dict_queries, default=utils.json_int_dttm_ser),
status=200,
mimetype='application/json')
@app.errorhandler(500)
def show_traceback(self):
return render_template(
'superset/traceback.html',
error_msg=get_error_msg(),
), 500
@expose('/welcome')
def welcome(self):
"""Personalized welcome page"""
if not g.user or not g.user.get_id():
return redirect(appbuilder.get_url_for_login)
return self.render_template(
'superset/welcome.html', entry='welcome', utils=utils)
@has_access
@expose('/profile/<username>/')
def profile(self, username):
"""User profile page"""
if not username and g.user:
username = g.user.username
user = (
db.session.query(ab_models.User)
.filter_by(username=username)
.one()
)
roles = {}
permissions = defaultdict(set)
for role in user.roles:
perms = set()
for perm in role.permissions:
if perm.permission and perm.view_menu:
perms.add(
(perm.permission.name, perm.view_menu.name),
)
if perm.permission.name in ('datasource_access', 'database_access'):
permissions[perm.permission.name].add(perm.view_menu.name)
roles[role.name] = [
[perm.permission.name, perm.view_menu.name]
for perm in role.permissions
if perm.permission and perm.view_menu
]
payload = {
'user': {
'username': user.username,
'firstName': user.first_name,
'lastName': user.last_name,
'userId': user.id,
'isActive': user.is_active(),
'createdOn': user.created_on.isoformat(),
'email': user.email,
'roles': roles,
'permissions': permissions,
},
'common': self.common_bootsrap_payload(),
}
return self.render_template(
'superset/basic.html',
title=user.username + "'s profile",
navbar_container=True,
entry='profile',
bootstrap_data=json.dumps(payload, default=utils.json_iso_dttm_ser),
)
@has_access
@expose('/sqllab')
def sqllab(self):
"""SQL Editor"""
d = {
'defaultDbId': config.get('SQLLAB_DEFAULT_DBID'),
'common': self.common_bootsrap_payload(),
}
return self.render_template(
'superset/basic.html',
entry='sqllab',
bootstrap_data=json.dumps(d, default=utils.json_iso_dttm_ser),
)
@api
@has_access_api
@expose('/slice_query/<slice_id>/')
def sliceQuery(self, slice_id):
"""
This method exposes an API endpoint to
get the database query string for this slice
"""
viz_obj = self.get_viz(slice_id)
if not self.datasource_access(viz_obj.datasource):
return json_error_response(DATASOURCE_ACCESS_ERR, status=401)
return self.get_query_string_response(viz_obj)
appbuilder.add_view_no_menu(Superset)
class CssTemplateModelView(SupersetModelView, DeleteMixin):
datamodel = SQLAInterface(models.CssTemplate)
list_columns = ['template_name']
edit_columns = ['template_name', 'css']
add_columns = edit_columns
label_columns = {
'template_name': _('Template Name'),
}
class CssTemplateAsyncModelView(CssTemplateModelView):
list_columns = ['template_name', 'css']
appbuilder.add_separator('Sources')
appbuilder.add_view(
CssTemplateModelView,
'CSS Templates',
label=__('CSS Templates'),
icon='fa-css3',
category='Manage',
category_label=__('Manage'),
category_icon='')
appbuilder.add_view_no_menu(CssTemplateAsyncModelView)
appbuilder.add_link(
'SQL Editor',
label=_('SQL Editor'),
href='/superset/sqllab',
category_icon='fa-flask',
icon='fa-flask',
category='SQL Lab',
category_label=__('SQL Lab'),
)
appbuilder.add_link(
'Query Search',
label=_('Query Search'),
href='/superset/sqllab#search',
icon='fa-search',
category_icon='fa-flask',
category='SQL Lab',
category_label=__('SQL Lab'),
)
@app.after_request
def apply_caching(response):
"""Applies the configuration's http headers to all responses"""
for k, v in config.get('HTTP_HEADERS').items():
response.headers[k] = v
return response
# ---------------------------------------------------------------------
# Redirecting URL from previous names
class RegexConverter(BaseConverter):
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
app.url_map.converters['regex'] = RegexConverter
@app.route('/<regex("panoramix\/.*"):url>')
def panoramix(url): # noqa
return redirect(request.full_path.replace('panoramix', 'superset'))
@app.route('/<regex("caravel\/.*"):url>')
def caravel(url): # noqa
return redirect(request.full_path.replace('caravel', 'superset'))
# ---------------------------------------------------------------------