| """Unit tests for Superset""" |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| |
| import csv |
| import datetime |
| import doctest |
| import io |
| import json |
| import logging |
| import os |
| import random |
| import string |
| import unittest |
| |
| from flask import escape |
| import pandas as pd |
| import psycopg2 |
| import sqlalchemy as sqla |
| |
| from superset import appbuilder, dataframe, db, jinja_context, sm, sql_lab, utils |
| from superset.connectors.sqla.models import SqlaTable |
| from superset.models import core as models |
| from superset.models.sql_lab import Query |
| from superset.views.core import DatabaseView |
| from .base_tests import SupersetTestCase |
| |
| |
| class CoreTests(SupersetTestCase): |
| |
| requires_examples = True |
| |
| def __init__(self, *args, **kwargs): |
| super(CoreTests, self).__init__(*args, **kwargs) |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.table_ids = {tbl.table_name: tbl.id for tbl in ( |
| db.session |
| .query(SqlaTable) |
| .all() |
| )} |
| |
| def setUp(self): |
| db.session.query(Query).delete() |
| db.session.query(models.DatasourceAccessRequest).delete() |
| db.session.query(models.Log).delete() |
| |
| def tearDown(self): |
| db.session.query(Query).delete() |
| |
| def test_login(self): |
| resp = self.get_resp( |
| '/login/', |
| data=dict(username='admin', password='general')) |
| self.assertIn('Welcome', resp) |
| |
| resp = self.get_resp('/logout/', follow_redirects=True) |
| self.assertIn('User confirmation needed', resp) |
| |
| resp = self.get_resp( |
| '/login/', |
| data=dict(username='admin', password='wrongPassword')) |
| self.assertNotIn('Welcome', resp) |
| self.assertIn('User confirmation needed', resp) |
| |
| def test_welcome(self): |
| self.login() |
| resp = self.client.get('/superset/welcome') |
| assert 'Welcome' in resp.data.decode('utf-8') |
| |
| def test_slice_endpoint(self): |
| self.login(username='admin') |
| slc = self.get_slice('Girls', db.session) |
| resp = self.get_resp('/superset/slice/{}/'.format(slc.id)) |
| assert 'Time Column' in resp |
| assert 'List Roles' in resp |
| |
| # Testing overrides |
| resp = self.get_resp( |
| '/superset/slice/{}/?standalone=true'.format(slc.id)) |
| assert 'List Roles' not in resp |
| |
| def test_cache_key(self): |
| self.login(username='admin') |
| slc = self.get_slice('Girls', db.session) |
| |
| viz = slc.viz |
| qobj = viz.query_obj() |
| cache_key = viz.cache_key(qobj) |
| self.assertEqual(cache_key, viz.cache_key(qobj)) |
| |
| qobj['groupby'] = [] |
| self.assertNotEqual(cache_key, viz.cache_key(qobj)) |
| |
| def test_slice_json_endpoint(self): |
| self.login(username='admin') |
| slc = self.get_slice('Girls', db.session) |
| |
| json_endpoint = ( |
| '/superset/explore_json/{}/{}/' |
| .format(slc.datasource_type, slc.datasource_id) |
| ) |
| resp = self.get_resp(json_endpoint, {'form_data': json.dumps(slc.viz.form_data)}) |
| assert '"Jennifer"' in resp |
| |
| def test_slice_csv_endpoint(self): |
| self.login(username='admin') |
| slc = self.get_slice('Girls', db.session) |
| |
| csv_endpoint = ( |
| '/superset/explore_json/{}/{}/?csv=true' |
| .format(slc.datasource_type, slc.datasource_id) |
| ) |
| resp = self.get_resp(csv_endpoint, {'form_data': json.dumps(slc.viz.form_data)}) |
| assert 'Jennifer,' in resp |
| |
| def test_admin_only_permissions(self): |
| def assert_admin_permission_in(role_name, assert_func): |
| role = sm.find_role(role_name) |
| permissions = [p.permission.name for p in role.permissions] |
| assert_func('can_sync_druid_source', permissions) |
| assert_func('can_approve', permissions) |
| |
| assert_admin_permission_in('Admin', self.assertIn) |
| assert_admin_permission_in('Alpha', self.assertNotIn) |
| assert_admin_permission_in('Gamma', self.assertNotIn) |
| |
| def test_admin_only_menu_views(self): |
| def assert_admin_view_menus_in(role_name, assert_func): |
| role = sm.find_role(role_name) |
| view_menus = [p.view_menu.name for p in role.permissions] |
| assert_func('ResetPasswordView', view_menus) |
| assert_func('RoleModelView', view_menus) |
| assert_func('Security', view_menus) |
| assert_func('UserDBModelView', view_menus) |
| assert_func('SQL Lab', |
| view_menus) |
| |
| assert_admin_view_menus_in('Admin', self.assertIn) |
| assert_admin_view_menus_in('Alpha', self.assertNotIn) |
| assert_admin_view_menus_in('Gamma', self.assertNotIn) |
| |
| def test_save_slice(self): |
| self.login(username='admin') |
| slice_name = 'Energy Sankey' |
| slice_id = self.get_slice(slice_name, db.session).id |
| db.session.commit() |
| copy_name = 'Test Sankey Save' |
| tbl_id = self.table_ids.get('energy_usage') |
| new_slice_name = 'Test Sankey Overwirte' |
| |
| url = ( |
| '/superset/explore/table/{}/?slice_name={}&' |
| 'action={}&datasource_name=energy_usage') |
| |
| form_data = { |
| 'viz_type': 'sankey', |
| 'groupby': 'target', |
| 'metric': 'sum__value', |
| 'row_limit': 5000, |
| 'slice_id': slice_id, |
| } |
| # Changing name and save as a new slice |
| self.get_resp( |
| url.format( |
| tbl_id, |
| copy_name, |
| 'saveas', |
| ), |
| {'form_data': json.dumps(form_data)}, |
| ) |
| slices = db.session.query(models.Slice) \ |
| .filter_by(slice_name=copy_name).all() |
| assert len(slices) == 1 |
| new_slice_id = slices[0].id |
| |
| form_data = { |
| 'viz_type': 'sankey', |
| 'groupby': 'target', |
| 'metric': 'sum__value', |
| 'row_limit': 5000, |
| 'slice_id': new_slice_id, |
| } |
| # Setting the name back to its original name by overwriting new slice |
| self.get_resp( |
| url.format( |
| tbl_id, |
| new_slice_name, |
| 'overwrite', |
| ), |
| {'form_data': json.dumps(form_data)}, |
| ) |
| slc = db.session.query(models.Slice).filter_by(id=new_slice_id).first() |
| assert slc.slice_name == new_slice_name |
| db.session.delete(slc) |
| |
| def test_filter_endpoint(self): |
| self.login(username='admin') |
| slice_name = 'Energy Sankey' |
| slice_id = self.get_slice(slice_name, db.session).id |
| db.session.commit() |
| tbl_id = self.table_ids.get('energy_usage') |
| table = db.session.query(SqlaTable).filter(SqlaTable.id == tbl_id) |
| table.filter_select_enabled = True |
| url = ( |
| '/superset/filter/table/{}/target/?viz_type=sankey&groupby=source' |
| '&metric=sum__value&flt_col_0=source&flt_op_0=in&flt_eq_0=&' |
| 'slice_id={}&datasource_name=energy_usage&' |
| 'datasource_id=1&datasource_type=table') |
| |
| # Changing name |
| resp = self.get_resp(url.format(tbl_id, slice_id)) |
| assert len(resp) > 0 |
| assert 'Carbon Dioxide' in resp |
| |
| def test_slices(self): |
| # Testing by hitting the two supported end points for all slices |
| self.login(username='admin') |
| Slc = models.Slice |
| urls = [] |
| for slc in db.session.query(Slc).all(): |
| urls += [ |
| (slc.slice_name, 'slice_url', slc.slice_url), |
| (slc.slice_name, 'slice_id_url', slc.slice_id_url), |
| ] |
| for name, method, url in urls: |
| logging.info('[{name}]/[{method}]: {url}'.format(**locals())) |
| self.client.get(url) |
| |
| def test_tablemodelview_list(self): |
| self.login(username='admin') |
| |
| url = '/tablemodelview/list/' |
| resp = self.get_resp(url) |
| |
| # assert that a table is listed |
| table = db.session.query(SqlaTable).first() |
| assert table.name in resp |
| assert '/superset/explore/table/{}'.format(table.id) in resp |
| |
| def test_add_slice(self): |
| self.login(username='admin') |
| # assert that /slicemodelview/add responds with 200 |
| url = '/slicemodelview/add' |
| resp = self.client.get(url) |
| self.assertEqual(resp.status_code, 200) |
| |
| def test_get_user_slices(self): |
| self.login(username='admin') |
| userid = appbuilder.sm.find_user('admin').id |
| url = '/sliceaddview/api/read?_flt_0_created_by={}'.format(userid) |
| resp = self.client.get(url) |
| self.assertEqual(resp.status_code, 200) |
| |
| def test_slices_V2(self): |
| # Add explore-v2-beta role to admin user |
| # Test all slice urls as user with with explore-v2-beta role |
| sm.add_role('explore-v2-beta') |
| |
| appbuilder.sm.add_user( |
| 'explore_beta', 'explore_beta', ' user', 'explore_beta@airbnb.com', |
| appbuilder.sm.find_role('explore-v2-beta'), |
| password='general') |
| self.login(username='explore_beta', password='general') |
| |
| Slc = models.Slice |
| urls = [] |
| for slc in db.session.query(Slc).all(): |
| urls += [ |
| (slc.slice_name, 'slice_url', slc.slice_url), |
| ] |
| for name, method, url in urls: |
| print('[{name}]/[{method}]: {url}'.format(**locals())) |
| response = self.client.get(url) |
| |
| def test_dashboard(self): |
| self.login(username='admin') |
| urls = {} |
| for dash in db.session.query(models.Dashboard).all(): |
| urls[dash.dashboard_title] = dash.url |
| for title, url in urls.items(): |
| assert escape(title) in self.client.get(url).data.decode('utf-8') |
| |
| def test_doctests(self): |
| modules = [utils, models, sql_lab] |
| for mod in modules: |
| failed, tests = doctest.testmod(mod) |
| if failed: |
| raise Exception('Failed a doctest') |
| |
| def test_misc(self): |
| assert self.get_resp('/health') == 'OK' |
| assert self.get_resp('/healthcheck') == 'OK' |
| assert self.get_resp('/ping') == 'OK' |
| |
| def test_testconn(self, username='admin'): |
| self.login(username=username) |
| database = self.get_main_database(db.session) |
| |
| # validate that the endpoint works with the password-masked sqlalchemy uri |
| data = json.dumps({ |
| 'uri': database.safe_sqlalchemy_uri(), |
| 'name': 'main', |
| 'impersonate_user': False, |
| }) |
| response = self.client.post( |
| '/superset/testconn', |
| data=data, |
| content_type='application/json') |
| assert response.status_code == 200 |
| assert response.headers['Content-Type'] == 'application/json' |
| |
| # validate that the endpoint works with the decrypted sqlalchemy uri |
| data = json.dumps({ |
| 'uri': database.sqlalchemy_uri_decrypted, |
| 'name': 'main', |
| 'impersonate_user': False, |
| }) |
| response = self.client.post( |
| '/superset/testconn', |
| data=data, |
| content_type='application/json') |
| assert response.status_code == 200 |
| assert response.headers['Content-Type'] == 'application/json' |
| |
| def test_custom_password_store(self): |
| database = self.get_main_database(db.session) |
| conn_pre = sqla.engine.url.make_url(database.sqlalchemy_uri_decrypted) |
| |
| def custom_password_store(uri): |
| return 'password_store_test' |
| |
| models.custom_password_store = custom_password_store |
| conn = sqla.engine.url.make_url(database.sqlalchemy_uri_decrypted) |
| if conn_pre.password: |
| assert conn.password == 'password_store_test' |
| assert conn.password != conn_pre.password |
| # Disable for password store for later tests |
| models.custom_password_store = None |
| |
| def test_databaseview_edit(self, username='admin'): |
| # validate that sending a password-masked uri does not over-write the decrypted |
| # uri |
| self.login(username=username) |
| database = self.get_main_database(db.session) |
| sqlalchemy_uri_decrypted = database.sqlalchemy_uri_decrypted |
| url = 'databaseview/edit/{}'.format(database.id) |
| data = {k: database.__getattribute__(k) for k in DatabaseView.add_columns} |
| data['sqlalchemy_uri'] = database.safe_sqlalchemy_uri() |
| self.client.post(url, data=data) |
| database = self.get_main_database(db.session) |
| self.assertEqual(sqlalchemy_uri_decrypted, database.sqlalchemy_uri_decrypted) |
| |
| def test_warm_up_cache(self): |
| slc = self.get_slice('Girls', db.session) |
| data = self.get_json_resp( |
| '/superset/warm_up_cache?slice_id={}'.format(slc.id)) |
| assert data == [{'slice_id': slc.id, 'slice_name': slc.slice_name}] |
| |
| data = self.get_json_resp( |
| '/superset/warm_up_cache?table_name=energy_usage&db_name=main') |
| assert len(data) == 3 |
| |
| def test_shortner(self): |
| self.login(username='admin') |
| data = ( |
| '//superset/explore/table/1/?viz_type=sankey&groupby=source&' |
| 'groupby=target&metric=sum__value&row_limit=5000&where=&having=&' |
| 'flt_col_0=source&flt_op_0=in&flt_eq_0=&slice_id=78&slice_name=' |
| 'Energy+Sankey&collapsed_fieldsets=&action=&datasource_name=' |
| 'energy_usage&datasource_id=1&datasource_type=table&' |
| 'previous_viz_type=sankey' |
| ) |
| resp = self.client.post('/r/shortner/', data=dict(data=data)) |
| assert '?r=' in resp.data.decode('utf-8') |
| |
| def test_kv(self): |
| self.logout() |
| self.login(username='admin') |
| |
| try: |
| resp = self.client.post('/kv/store/', data=dict()) |
| except Exception: |
| self.assertRaises(TypeError) |
| |
| value = json.dumps({'data': 'this is a test'}) |
| resp = self.client.post('/kv/store/', data=dict(data=value)) |
| self.assertEqual(resp.status_code, 200) |
| kv = db.session.query(models.KeyValue).first() |
| kv_value = kv.value |
| self.assertEqual(json.loads(value), json.loads(kv_value)) |
| |
| resp = self.client.get('/kv/{}/'.format(kv.id)) |
| self.assertEqual(resp.status_code, 200) |
| self.assertEqual( |
| json.loads(value), |
| json.loads(resp.data.decode('utf-8'))) |
| |
| try: |
| resp = self.client.get('/kv/10001/') |
| except Exception: |
| self.assertRaises(TypeError) |
| |
| def test_save_dash(self, username='admin'): |
| self.login(username=username) |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| positions = [] |
| for i, slc in enumerate(dash.slices): |
| d = { |
| 'col': 0, |
| 'row': i * 4, |
| 'size_x': 4, |
| 'size_y': 4, |
| 'slice_id': '{}'.format(slc.id)} |
| positions.append(d) |
| data = { |
| 'css': '', |
| 'expanded_slices': {}, |
| 'positions': positions, |
| 'dashboard_title': dash.dashboard_title, |
| } |
| url = '/superset/save_dash/{}/'.format(dash.id) |
| resp = self.get_resp(url, data=dict(data=json.dumps(data))) |
| self.assertIn('SUCCESS', resp) |
| |
| def test_save_dash_with_filter(self, username='admin'): |
| self.login(username=username) |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='world_health').first() |
| positions = [] |
| for i, slc in enumerate(dash.slices): |
| d = { |
| 'col': 0, |
| 'row': i * 4, |
| 'size_x': 4, |
| 'size_y': 4, |
| 'slice_id': '{}'.format(slc.id)} |
| positions.append(d) |
| |
| filters = {str(dash.slices[0].id): {'region': ['North America']}} |
| default_filters = json.dumps(filters) |
| data = { |
| 'css': '', |
| 'expanded_slices': {}, |
| 'positions': positions, |
| 'dashboard_title': dash.dashboard_title, |
| 'default_filters': default_filters, |
| } |
| |
| url = '/superset/save_dash/{}/'.format(dash.id) |
| resp = self.get_resp(url, data=dict(data=json.dumps(data))) |
| self.assertIn('SUCCESS', resp) |
| |
| updatedDash = db.session.query(models.Dashboard).filter_by( |
| slug='world_health').first() |
| new_url = updatedDash.url |
| self.assertIn('region', new_url) |
| |
| resp = self.get_resp(new_url) |
| self.assertIn('North America', resp) |
| |
| def test_save_dash_with_dashboard_title(self, username='admin'): |
| self.login(username=username) |
| dash = ( |
| db.session.query(models.Dashboard) |
| .filter_by(slug='births') |
| .first() |
| ) |
| origin_title = dash.dashboard_title |
| positions = [] |
| for i, slc in enumerate(dash.slices): |
| d = { |
| 'col': 0, |
| 'row': i * 4, |
| 'size_x': 4, |
| 'size_y': 4, |
| 'slice_id': '{}'.format(slc.id)} |
| positions.append(d) |
| data = { |
| 'css': '', |
| 'expanded_slices': {}, |
| 'positions': positions, |
| 'dashboard_title': 'new title', |
| } |
| url = '/superset/save_dash/{}/'.format(dash.id) |
| self.get_resp(url, data=dict(data=json.dumps(data))) |
| updatedDash = ( |
| db.session.query(models.Dashboard) |
| .filter_by(slug='births') |
| .first() |
| ) |
| self.assertEqual(updatedDash.dashboard_title, 'new title') |
| # # bring back dashboard original title |
| data['dashboard_title'] = origin_title |
| self.get_resp(url, data=dict(data=json.dumps(data))) |
| |
| def test_copy_dash(self, username='admin'): |
| self.login(username=username) |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| positions = [] |
| for i, slc in enumerate(dash.slices): |
| d = { |
| 'col': 0, |
| 'row': i * 4, |
| 'size_x': 4, |
| 'size_y': 4, |
| 'slice_id': '{}'.format(slc.id)} |
| positions.append(d) |
| data = { |
| 'css': '', |
| 'duplicate_slices': False, |
| 'expanded_slices': {}, |
| 'positions': positions, |
| 'dashboard_title': 'Copy Of Births', |
| } |
| |
| # Save changes to Births dashboard and retrieve updated dash |
| dash_id = dash.id |
| url = '/superset/save_dash/{}/'.format(dash_id) |
| self.client.post(url, data=dict(data=json.dumps(data))) |
| dash = db.session.query(models.Dashboard).filter_by( |
| id=dash_id).first() |
| orig_json_data = dash.data |
| |
| # Verify that copy matches original |
| url = '/superset/copy_dash/{}/'.format(dash_id) |
| resp = self.get_json_resp(url, data=dict(data=json.dumps(data))) |
| self.assertEqual(resp['dashboard_title'], 'Copy Of Births') |
| self.assertEqual(resp['position_json'], orig_json_data['position_json']) |
| self.assertEqual(resp['metadata'], orig_json_data['metadata']) |
| self.assertEqual(resp['slices'], orig_json_data['slices']) |
| |
| def test_add_slices(self, username='admin'): |
| self.login(username=username) |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| new_slice = db.session.query(models.Slice).filter_by( |
| slice_name='Mapbox Long/Lat').first() |
| existing_slice = db.session.query(models.Slice).filter_by( |
| slice_name='Name Cloud').first() |
| data = { |
| 'slice_ids': [new_slice.data['slice_id'], |
| existing_slice.data['slice_id']], |
| } |
| url = '/superset/add_slices/{}/'.format(dash.id) |
| resp = self.client.post(url, data=dict(data=json.dumps(data))) |
| assert 'SLICES ADDED' in resp.data.decode('utf-8') |
| |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| new_slice = db.session.query(models.Slice).filter_by( |
| slice_name='Mapbox Long/Lat').first() |
| assert new_slice in dash.slices |
| assert len(set(dash.slices)) == len(dash.slices) |
| |
| # cleaning up |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| dash.slices = [ |
| o for o in dash.slices if o.slice_name != 'Mapbox Long/Lat'] |
| db.session.commit() |
| |
| def test_gamma(self): |
| self.login(username='gamma') |
| assert 'List Charts' in self.get_resp('/slicemodelview/list/') |
| assert 'List Dashboard' in self.get_resp('/dashboardmodelview/list/') |
| |
| def test_csv_endpoint(self): |
| self.login('admin') |
| sql = """ |
| SELECT first_name, last_name |
| FROM ab_user |
| WHERE first_name='admin' |
| """ |
| client_id = '{}'.format(random.getrandbits(64))[:10] |
| self.run_sql(sql, client_id, raise_on_error=True) |
| |
| resp = self.get_resp('/superset/csv/{}'.format(client_id)) |
| data = csv.reader(io.StringIO(resp)) |
| expected_data = csv.reader( |
| io.StringIO('first_name,last_name\nadmin, user\n')) |
| |
| self.assertEqual(list(expected_data), list(data)) |
| self.logout() |
| |
| def test_public_user_dashboard_access(self): |
| table = ( |
| db.session |
| .query(SqlaTable) |
| .filter_by(table_name='birth_names') |
| .one() |
| ) |
| # Try access before adding appropriate permissions. |
| self.revoke_public_access_to_table(table) |
| self.logout() |
| |
| resp = self.get_resp('/slicemodelview/list/') |
| self.assertNotIn('birth_names</a>', resp) |
| |
| resp = self.get_resp('/dashboardmodelview/list/') |
| self.assertNotIn('/superset/dashboard/births/', resp) |
| |
| self.grant_public_access_to_table(table) |
| |
| # Try access after adding appropriate permissions. |
| self.assertIn('birth_names', self.get_resp('/slicemodelview/list/')) |
| |
| resp = self.get_resp('/dashboardmodelview/list/') |
| self.assertIn('/superset/dashboard/births/', resp) |
| |
| self.assertIn('Births', self.get_resp('/superset/dashboard/births/')) |
| |
| # Confirm that public doesn't have access to other datasets. |
| resp = self.get_resp('/slicemodelview/list/') |
| self.assertNotIn('wb_health_population</a>', resp) |
| |
| resp = self.get_resp('/dashboardmodelview/list/') |
| self.assertNotIn('/superset/dashboard/world_health/', resp) |
| |
| def test_dashboard_with_created_by_can_be_accessed_by_public_users(self): |
| self.logout() |
| table = ( |
| db.session |
| .query(SqlaTable) |
| .filter_by(table_name='birth_names') |
| .one() |
| ) |
| self.grant_public_access_to_table(table) |
| |
| dash = db.session.query(models.Dashboard).filter_by( |
| slug='births').first() |
| dash.owners = [appbuilder.sm.find_user('admin')] |
| dash.created_by = appbuilder.sm.find_user('admin') |
| db.session.merge(dash) |
| db.session.commit() |
| |
| assert 'Births' in self.get_resp('/superset/dashboard/births/') |
| |
| def test_only_owners_can_save(self): |
| dash = ( |
| db.session |
| .query(models.Dashboard) |
| .filter_by(slug='births') |
| .first() |
| ) |
| dash.owners = [] |
| db.session.merge(dash) |
| db.session.commit() |
| self.test_save_dash('admin') |
| |
| self.logout() |
| self.assertRaises( |
| Exception, self.test_save_dash, 'alpha') |
| |
| alpha = appbuilder.sm.find_user('alpha') |
| |
| dash = ( |
| db.session |
| .query(models.Dashboard) |
| .filter_by(slug='births') |
| .first() |
| ) |
| dash.owners = [alpha] |
| db.session.merge(dash) |
| db.session.commit() |
| self.test_save_dash('alpha') |
| |
| def test_extra_table_metadata(self): |
| self.login('admin') |
| dbid = self.get_main_database(db.session).id |
| self.get_json_resp( |
| '/superset/extra_table_metadata/{dbid}/' |
| 'ab_permission_view/panoramix/'.format(**locals())) |
| |
| def test_process_template(self): |
| maindb = self.get_main_database(db.session) |
| sql = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}'" |
| tp = jinja_context.get_template_processor(database=maindb) |
| rendered = tp.process_template(sql) |
| self.assertEqual("SELECT '2017-01-01T00:00:00'", rendered) |
| |
| def test_get_template_kwarg(self): |
| maindb = self.get_main_database(db.session) |
| s = '{{ foo }}' |
| tp = jinja_context.get_template_processor(database=maindb, foo='bar') |
| rendered = tp.process_template(s) |
| self.assertEqual('bar', rendered) |
| |
| def test_template_kwarg(self): |
| maindb = self.get_main_database(db.session) |
| s = '{{ foo }}' |
| tp = jinja_context.get_template_processor(database=maindb) |
| rendered = tp.process_template(s, foo='bar') |
| self.assertEqual('bar', rendered) |
| |
| def test_templated_sql_json(self): |
| self.login('admin') |
| sql = "SELECT '{{ datetime(2017, 1, 1).isoformat() }}' as test" |
| data = self.run_sql(sql, 'fdaklj3ws') |
| self.assertEqual(data['data'][0]['test'], '2017-01-01T00:00:00') |
| |
| def test_table_metadata(self): |
| maindb = self.get_main_database(db.session) |
| backend = maindb.backend |
| data = self.get_json_resp( |
| '/superset/table/{}/ab_user/null/'.format(maindb.id)) |
| self.assertEqual(data['name'], 'ab_user') |
| assert len(data['columns']) > 5 |
| assert data.get('selectStar').startswith('SELECT') |
| |
| # Engine specific tests |
| if backend in ('mysql', 'postgresql'): |
| self.assertEqual(data.get('primaryKey').get('type'), 'pk') |
| self.assertEqual( |
| data.get('primaryKey').get('column_names')[0], 'id') |
| self.assertEqual(len(data.get('foreignKeys')), 2) |
| if backend == 'mysql': |
| self.assertEqual(len(data.get('indexes')), 7) |
| elif backend == 'postgresql': |
| self.assertEqual(len(data.get('indexes')), 5) |
| |
| def test_fetch_datasource_metadata(self): |
| self.login(username='admin') |
| url = ( |
| '/superset/fetch_datasource_metadata?' + |
| 'datasourceKey=1__table' |
| ) |
| resp = self.get_json_resp(url) |
| keys = [ |
| 'name', 'filterable_cols', 'gb_cols', 'type', 'all_cols', |
| 'order_by_choices', 'metrics_combo', 'granularity_sqla', |
| 'time_grain_sqla', 'id', |
| ] |
| for k in keys: |
| self.assertIn(k, resp.keys()) |
| |
| def test_user_profile(self, username='admin'): |
| self.login(username=username) |
| slc = self.get_slice('Girls', db.session) |
| |
| # Setting some faves |
| url = '/superset/favstar/Slice/{}/select/'.format(slc.id) |
| resp = self.get_json_resp(url) |
| self.assertEqual(resp['count'], 1) |
| |
| dash = ( |
| db.session |
| .query(models.Dashboard) |
| .filter_by(slug='births') |
| .first() |
| ) |
| url = '/superset/favstar/Dashboard/{}/select/'.format(dash.id) |
| resp = self.get_json_resp(url) |
| self.assertEqual(resp['count'], 1) |
| |
| userid = appbuilder.sm.find_user('admin').id |
| resp = self.get_resp('/superset/profile/admin/') |
| self.assertIn('"app"', resp) |
| data = self.get_json_resp('/superset/recent_activity/{}/'.format(userid)) |
| self.assertNotIn('message', data) |
| data = self.get_json_resp('/superset/created_slices/{}/'.format(userid)) |
| self.assertNotIn('message', data) |
| data = self.get_json_resp('/superset/created_dashboards/{}/'.format(userid)) |
| self.assertNotIn('message', data) |
| data = self.get_json_resp('/superset/fave_slices/{}/'.format(userid)) |
| self.assertNotIn('message', data) |
| data = self.get_json_resp('/superset/fave_dashboards/{}/'.format(userid)) |
| self.assertNotIn('message', data) |
| data = self.get_json_resp( |
| '/superset/fave_dashboards_by_username/{}/'.format(username)) |
| self.assertNotIn('message', data) |
| |
| def test_slice_id_is_always_logged_correctly_on_web_request(self): |
| # superset/explore case |
| slc = db.session.query(models.Slice).filter_by(slice_name='Girls').one() |
| qry = db.session.query(models.Log).filter_by(slice_id=slc.id) |
| self.get_resp(slc.slice_url, {'form_data': json.dumps(slc.viz.form_data)}) |
| self.assertEqual(1, qry.count()) |
| |
| def test_slice_id_is_always_logged_correctly_on_ajax_request(self): |
| # superset/explore_json case |
| self.login(username='admin') |
| slc = db.session.query(models.Slice).filter_by(slice_name='Girls').one() |
| qry = db.session.query(models.Log).filter_by(slice_id=slc.id) |
| slc_url = slc.slice_url.replace('explore', 'explore_json') |
| self.get_json_resp(slc_url, {'form_data': json.dumps(slc.viz.form_data)}) |
| self.assertEqual(1, qry.count()) |
| |
| def test_slice_query_endpoint(self): |
| # API endpoint for query string |
| self.login(username='admin') |
| slc = self.get_slice('Girls', db.session) |
| resp = self.get_resp('/superset/slice_query/{}/'.format(slc.id)) |
| assert 'query' in resp |
| assert 'language' in resp |
| self.logout() |
| |
| def test_viz_get_fillna_for_columns(self): |
| slc = self.get_slice('Girls', db.session) |
| q = slc.viz.query_obj() |
| results = slc.viz.datasource.query(q) |
| fillna_columns = slc.viz.get_fillna_for_columns(results.df.columns) |
| self.assertDictEqual( |
| fillna_columns, |
| {'name': ' NULL', 'sum__num': 0}, |
| ) |
| |
| def test_import_csv(self): |
| self.login(username='admin') |
| filename = 'testCSV.csv' |
| table_name = ''.join( |
| random.choice(string.ascii_uppercase) for _ in range(5)) |
| |
| test_file = open(filename, 'w+') |
| test_file.write('a,b\n') |
| test_file.write('john,1\n') |
| test_file.write('paul,2\n') |
| test_file.close() |
| main_db_uri = ( |
| db.session.query(models.Database) |
| .filter_by(database_name='main') |
| .all() |
| ) |
| |
| test_file = open(filename, 'rb') |
| form_data = { |
| 'csv_file': test_file, |
| 'sep': ',', |
| 'name': table_name, |
| 'con': main_db_uri[0].id, |
| 'if_exists': 'append', |
| 'index_label': 'test_label', |
| 'mangle_dupe_cols': False, |
| } |
| url = '/databaseview/list/' |
| add_datasource_page = self.get_resp(url) |
| assert 'Upload a CSV' in add_datasource_page |
| |
| url = '/csvtodatabaseview/form' |
| form_get = self.get_resp(url) |
| assert 'CSV to Database configuration' in form_get |
| |
| try: |
| # ensure uploaded successfully |
| form_post = self.get_resp(url, data=form_data) |
| assert 'CSV file \"testCSV.csv\" uploaded to table' in form_post |
| finally: |
| os.remove(filename) |
| |
| def test_dataframe_timezone(self): |
| tz = psycopg2.tz.FixedOffsetTimezone(offset=60, name=None) |
| data = [ |
| (datetime.datetime(2017, 11, 18, 21, 53, 0, 219225, tzinfo=tz),), |
| (datetime.datetime(2017, 11, 18, 22, 6, 30, 61810, tzinfo=tz),), |
| ] |
| df = dataframe.SupersetDataFrame(pd.DataFrame(data=list(data), |
| columns=['data'])) |
| data = df.data |
| self.assertDictEqual( |
| data[0], |
| {'data': pd.Timestamp('2017-11-18 21:53:00.219225+0100', tz=tz)}, |
| ) |
| self.assertDictEqual( |
| data[1], |
| {'data': pd.Timestamp('2017-11-18 22:06:30.061810+0100', tz=tz)}, |
| ) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |