blob: 1b213faed7a5c940c5a33387a439558e22a37415 [file] [log] [blame]
"""Unit tests for Superset"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import logging
import os
import unittest
from flask_appbuilder.security.sqla import models as ab_models
from superset import app, appbuilder, cli, db, security, sm
from superset.connectors.druid.models import DruidCluster, DruidDatasource
from superset.connectors.sqla.models import SqlaTable
from superset.models import core as models
from superset.security import sync_role_definitions
os.environ['SUPERSET_CONFIG'] = 'tests.superset_test_config'
BASE_DIR = app.config.get('BASE_DIR')
class SupersetTestCase(unittest.TestCase):
requires_examples = False
examples_loaded = False
def __init__(self, *args, **kwargs):
if (
self.requires_examples and
not os.environ.get('SOLO_TEST') and
not os.environ.get('examples_loaded')
):
logging.info('Loading examples')
cli.load_examples(load_test_data=True)
logging.info('Done loading examples')
sync_role_definitions()
os.environ['examples_loaded'] = '1'
else:
sync_role_definitions()
super(SupersetTestCase, self).__init__(*args, **kwargs)
self.client = app.test_client()
self.maxDiff = None
gamma_sqllab_role = sm.add_role('gamma_sqllab')
for perm in sm.find_role('Gamma').permissions:
sm.add_permission_role(gamma_sqllab_role, perm)
db_perm = self.get_main_database(sm.get_session).perm
security.merge_perm(sm, 'database_access', db_perm)
db_pvm = sm.find_permission_view_menu(
view_menu_name=db_perm, permission_name='database_access')
gamma_sqllab_role.permissions.append(db_pvm)
for perm in sm.find_role('sql_lab').permissions:
sm.add_permission_role(gamma_sqllab_role, perm)
admin = appbuilder.sm.find_user('admin')
if not admin:
appbuilder.sm.add_user(
'admin', 'admin', ' user', 'admin@fab.org',
appbuilder.sm.find_role('Admin'),
password='general')
gamma = appbuilder.sm.find_user('gamma')
if not gamma:
appbuilder.sm.add_user(
'gamma', 'gamma', 'user', 'gamma@fab.org',
appbuilder.sm.find_role('Gamma'),
password='general')
gamma2 = appbuilder.sm.find_user('gamma2')
if not gamma2:
appbuilder.sm.add_user(
'gamma2', 'gamma2', 'user', 'gamma2@fab.org',
appbuilder.sm.find_role('Gamma'),
password='general')
gamma_sqllab_user = appbuilder.sm.find_user('gamma_sqllab')
if not gamma_sqllab_user:
appbuilder.sm.add_user(
'gamma_sqllab', 'gamma_sqllab', 'user', 'gamma_sqllab@fab.org',
gamma_sqllab_role, password='general')
alpha = appbuilder.sm.find_user('alpha')
if not alpha:
appbuilder.sm.add_user(
'alpha', 'alpha', 'user', 'alpha@fab.org',
appbuilder.sm.find_role('Alpha'),
password='general')
sm.get_session.commit()
# create druid cluster and druid datasources
session = db.session
cluster = (
session.query(DruidCluster)
.filter_by(cluster_name='druid_test')
.first()
)
if not cluster:
cluster = DruidCluster(cluster_name='druid_test')
session.add(cluster)
session.commit()
druid_datasource1 = DruidDatasource(
datasource_name='druid_ds_1',
cluster_name='druid_test',
)
session.add(druid_datasource1)
druid_datasource2 = DruidDatasource(
datasource_name='druid_ds_2',
cluster_name='druid_test',
)
session.add(druid_datasource2)
session.commit()
def get_table(self, table_id):
return db.session.query(SqlaTable).filter_by(
id=table_id).first()
def get_or_create(self, cls, criteria, session):
obj = session.query(cls).filter_by(**criteria).first()
if not obj:
obj = cls(**criteria)
return obj
def login(self, username='admin', password='general'):
resp = self.get_resp(
'/login/',
data=dict(username=username, password=password))
self.assertIn('Welcome', resp)
def get_slice(self, slice_name, session):
slc = (
session.query(models.Slice)
.filter_by(slice_name=slice_name)
.one()
)
session.expunge_all()
return slc
def get_table_by_name(self, name):
return db.session.query(SqlaTable).filter_by(
table_name=name).first()
def get_druid_ds_by_name(self, name):
return db.session.query(DruidDatasource).filter_by(
datasource_name=name).first()
def get_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True):
"""Shortcut to get the parsed results while following redirects"""
if data:
resp = self.client.post(
url, data=data, follow_redirects=follow_redirects)
else:
resp = self.client.get(url, follow_redirects=follow_redirects)
if raise_on_error and resp.status_code > 400:
raise Exception(
'http request failed with code {}'.format(resp.status_code))
return resp.data.decode('utf-8')
def get_json_resp(
self, url, data=None, follow_redirects=True, raise_on_error=True):
"""Shortcut to get the parsed results while following redirects"""
resp = self.get_resp(url, data, follow_redirects, raise_on_error)
return json.loads(resp)
def get_main_database(self, session):
return (
db.session.query(models.Database)
.filter_by(database_name='main')
.first()
)
def get_access_requests(self, username, ds_type, ds_id):
DAR = models.DatasourceAccessRequest
return (
db.session.query(DAR)
.filter(
DAR.created_by == sm.find_user(username=username),
DAR.datasource_type == ds_type,
DAR.datasource_id == ds_id,
)
.first()
)
def logout(self):
self.client.get('/logout/', follow_redirects=True)
def grant_public_access_to_table(self, table):
public_role = appbuilder.sm.find_role('Public')
perms = db.session.query(ab_models.PermissionView).all()
for perm in perms:
if (perm.permission.name == 'datasource_access' and
perm.view_menu and table.perm in perm.view_menu.name):
appbuilder.sm.add_permission_role(public_role, perm)
def revoke_public_access_to_table(self, table):
public_role = appbuilder.sm.find_role('Public')
perms = db.session.query(ab_models.PermissionView).all()
for perm in perms:
if (perm.permission.name == 'datasource_access' and
perm.view_menu and table.perm in perm.view_menu.name):
appbuilder.sm.del_permission_role(public_role, perm)
def run_sql(self, sql, client_id, user_name=None, raise_on_error=False):
if user_name:
self.logout()
self.login(username=(user_name if user_name else 'admin'))
dbid = self.get_main_database(db.session).id
resp = self.get_json_resp(
'/superset/sql_json/',
raise_on_error=False,
data=dict(database_id=dbid, sql=sql, select_as_create_as=False,
client_id=client_id),
)
if raise_on_error and 'error' in resp:
raise Exception('run_sql failed')
return resp
def test_gamma_permissions(self):
def assert_can_read(view_menu):
self.assertIn(('can_show', view_menu), gamma_perm_set)
self.assertIn(('can_list', view_menu), gamma_perm_set)
def assert_can_write(view_menu):
self.assertIn(('can_add', view_menu), gamma_perm_set)
self.assertIn(('can_download', view_menu), gamma_perm_set)
self.assertIn(('can_delete', view_menu), gamma_perm_set)
self.assertIn(('can_edit', view_menu), gamma_perm_set)
def assert_cannot_write(view_menu):
self.assertNotIn(('can_add', view_menu), gamma_perm_set)
self.assertNotIn(('can_download', view_menu), gamma_perm_set)
self.assertNotIn(('can_delete', view_menu), gamma_perm_set)
self.assertNotIn(('can_edit', view_menu), gamma_perm_set)
self.assertNotIn(('can_save', view_menu), gamma_perm_set)
def assert_can_all(view_menu):
assert_can_read(view_menu)
assert_can_write(view_menu)
gamma_perm_set = set()
for perm in sm.find_role('Gamma').permissions:
gamma_perm_set.add((perm.permission.name, perm.view_menu.name))
# check read only perms
assert_can_read('TableModelView')
assert_cannot_write('DruidColumnInlineView')
# make sure that user can create slices and dashboards
assert_can_all('SliceModelView')
assert_can_all('DashboardModelView')
self.assertIn(('can_add_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_copy_dash', 'Superset'), gamma_perm_set)
self.assertIn(('can_activity_per_day', 'Superset'), gamma_perm_set)
self.assertIn(('can_created_dashboards', 'Superset'), gamma_perm_set)
self.assertIn(('can_created_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_csv', 'Superset'), gamma_perm_set)
self.assertIn(('can_dashboard', 'Superset'), gamma_perm_set)
self.assertIn(('can_explore', 'Superset'), gamma_perm_set)
self.assertIn(('can_explore_json', 'Superset'), gamma_perm_set)
self.assertIn(('can_fave_dashboards', 'Superset'), gamma_perm_set)
self.assertIn(('can_fave_slices', 'Superset'), gamma_perm_set)
self.assertIn(('can_save_dash', 'Superset'), gamma_perm_set)
self.assertIn(('can_slice', 'Superset'), gamma_perm_set)