blob: 0cc00bafe4e3571dd128e004da0450a4eddc8e36 [file] [log] [blame]
"""Unit tests for Sql Lab"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from datetime import datetime, timedelta
import json
import unittest
from flask_appbuilder.security.sqla import models as ab_models
from superset import db, utils, appbuilder, sm
from superset.models.sql_lab import Query
from .base_tests import SupersetTestCase
class SqlLabTests(SupersetTestCase):
"""Testings for Sql Lab"""
def __init__(self, *args, **kwargs):
super(SqlLabTests, self).__init__(*args, **kwargs)
def run_some_queries(self):
db.session.query(Query).delete()
db.session.commit()
self.run_sql(
"SELECT * FROM ab_user",
client_id='client_id_1',
user_name='admin')
self.run_sql(
"SELECT * FROM NO_TABLE",
client_id='client_id_3',
user_name='admin')
self.run_sql(
"SELECT * FROM ab_permission",
client_id='client_id_2',
user_name='gamma_sqllab')
self.logout()
def tearDown(self):
db.session.query(Query).delete()
db.session.commit()
self.logout()
def test_sql_json(self):
self.login('admin')
data = self.run_sql('SELECT * FROM ab_user', "1")
self.assertLess(0, len(data['data']))
data = self.run_sql('SELECT * FROM unexistant_table', "2")
self.assertLess(0, len(data['error']))
def test_sql_json_has_access(self):
main_db = self.get_main_database(db.session)
sm.add_permission_view_menu('database_access', main_db.perm)
db.session.commit()
main_db_permission_view = (
db.session.query(ab_models.PermissionView)
.join(ab_models.ViewMenu)
.filter(ab_models.ViewMenu.name == '[main].(id:1)')
.first()
)
astronaut = sm.add_role("Astronaut")
sm.add_permission_role(astronaut, main_db_permission_view)
# Astronaut role is Gamma + sqllab + main db permissions
for perm in sm.find_role('Gamma').permissions:
sm.add_permission_role(astronaut, perm)
for perm in sm.find_role('sql_lab').permissions:
sm.add_permission_role(astronaut, perm)
gagarin = appbuilder.sm.find_user('gagarin')
if not gagarin:
appbuilder.sm.add_user(
'gagarin', 'Iurii', 'Gagarin', 'gagarin@cosmos.ussr',
astronaut,
password='general')
data = self.run_sql('SELECT * FROM ab_user', "3", user_name='gagarin')
db.session.query(Query).delete()
db.session.commit()
self.assertLess(0, len(data['data']))
def test_queries_endpoint(self):
self.run_some_queries()
# Not logged in, should error out
resp = self.client.get('/superset/queries/0')
# Redirects to the login page
self.assertEquals(403, resp.status_code)
# Admin sees queries
self.login('admin')
data = self.get_json_resp('/superset/queries/0')
self.assertEquals(2, len(data))
# Run 2 more queries
self.run_sql("SELECT * FROM ab_user LIMIT 1", client_id='client_id_4')
self.run_sql("SELECT * FROM ab_user LIMIT 2", client_id='client_id_5')
self.login('admin')
data = self.get_json_resp('/superset/queries/0')
self.assertEquals(4, len(data))
now = datetime.now() + timedelta(days=1)
query = db.session.query(Query).filter_by(
sql='SELECT * FROM ab_user LIMIT 1').first()
query.changed_on = now
db.session.commit()
data = self.get_json_resp(
'/superset/queries/{}'.format(
int(utils.datetime_to_epoch(now))-1000))
self.assertEquals(1, len(data))
self.logout()
resp = self.client.get('/superset/queries/0')
# Redirects to the login page
self.assertEquals(403, resp.status_code)
def test_search_query_on_db_id(self):
self.run_some_queries()
self.login('admin')
# Test search queries on database Id
data = self.get_json_resp('/superset/search_queries?database_id=1')
self.assertEquals(3, len(data))
db_ids = [k['dbId'] for k in data]
self.assertEquals([1, 1, 1], db_ids)
resp = self.get_resp('/superset/search_queries?database_id=-1')
data = json.loads(resp)
self.assertEquals(0, len(data))
def test_search_query_on_user(self):
self.run_some_queries()
self.login('admin')
# Test search queries on user Id
user_id = appbuilder.sm.find_user('admin').id
data = self.get_json_resp(
'/superset/search_queries?user_id={}'.format(user_id))
self.assertEquals(2, len(data))
user_ids = {k['userId'] for k in data}
self.assertEquals(set([user_id]), user_ids)
user_id = appbuilder.sm.find_user('gamma_sqllab').id
resp = self.get_resp(
'/superset/search_queries?user_id={}'.format(user_id))
data = json.loads(resp)
self.assertEquals(1, len(data))
self.assertEquals(data[0]['userId'] , user_id)
def test_search_query_on_status(self):
self.run_some_queries()
self.login('admin')
# Test search queries on status
resp = self.get_resp('/superset/search_queries?status=success')
data = json.loads(resp)
self.assertEquals(2, len(data))
states = [k['state'] for k in data]
self.assertEquals(['success', 'success'], states)
resp = self.get_resp('/superset/search_queries?status=failed')
data = json.loads(resp)
self.assertEquals(1, len(data))
self.assertEquals(data[0]['state'], 'failed')
def test_search_query_on_text(self):
self.run_some_queries()
self.login('admin')
url = '/superset/search_queries?search_text=permission'
data = self.get_json_resp(url)
self.assertEquals(1, len(data))
self.assertIn('permission', data[0]['sql'])
def test_search_query_on_time(self):
self.run_some_queries()
self.login('admin')
first_query_time = (
db.session.query(Query)
.filter_by(sql='SELECT * FROM ab_user').one()
).start_time
second_query_time = (
db.session.query(Query)
.filter_by(sql='SELECT * FROM ab_permission').one()
).start_time
# Test search queries on time filter
from_time = 'from={}'.format(int(first_query_time))
to_time = 'to={}'.format(int(second_query_time))
params = [from_time, to_time]
resp = self.get_resp('/superset/search_queries?'+'&'.join(params))
data = json.loads(resp)
self.assertEquals(2, len(data))
for k in data:
self.assertLess(int(first_query_time), k['startDttm'])
self.assertLess(k['startDttm'], int(second_query_time))
def test_alias_duplicate(self):
self.run_sql(
"SELECT username as col, id as col, username FROM ab_user",
client_id='2e2df3',
user_name='admin',
raise_on_error=True)
if __name__ == '__main__':
unittest.main()