"""Unit tests for Superset"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json
import logging
import os
import unittest

from flask_appbuilder.security.sqla import models as ab_models

from superset import app, appbuilder, cli, db, security, sm
from superset.connectors.druid.models import DruidCluster, DruidDatasource
from superset.connectors.sqla.models import SqlaTable
from superset.models import core as models
from superset.security import sync_role_definitions

os.environ['SUPERSET_CONFIG'] = 'tests.superset_test_config'

BASE_DIR = app.config.get('BASE_DIR')


class SupersetTestCase(unittest.TestCase):
    requires_examples = False
    examples_loaded = False

    def __init__(self, *args, **kwargs):
        if (
            self.requires_examples and
            not os.environ.get('SOLO_TEST') and
            not os.environ.get('examples_loaded')
        ):
            logging.info('Loading examples')
            cli.load_examples(load_test_data=True)
            logging.info('Done loading examples')
            sync_role_definitions()
            os.environ['examples_loaded'] = '1'
        else:
            sync_role_definitions()
        super(SupersetTestCase, self).__init__(*args, **kwargs)
        self.client = app.test_client()
        self.maxDiff = None

        gamma_sqllab_role = sm.add_role('gamma_sqllab')
        for perm in sm.find_role('Gamma').permissions:
            sm.add_permission_role(gamma_sqllab_role, perm)
        db_perm = self.get_main_database(sm.get_session).perm
        security.merge_perm(sm, 'database_access', db_perm)
        db_pvm = sm.find_permission_view_menu(
            view_menu_name=db_perm, permission_name='database_access')
        gamma_sqllab_role.permissions.append(db_pvm)
        for perm in sm.find_role('sql_lab').permissions:
            sm.add_permission_role(gamma_sqllab_role, perm)

        admin = appbuilder.sm.find_user('admin')
        if not admin:
            appbuilder.sm.add_user(
                'admin', 'admin', ' user', 'admin@fab.org',
                appbuilder.sm.find_role('Admin'),
                password='general')

        gamma = appbuilder.sm.find_user('gamma')
        if not gamma:
            appbuilder.sm.add_user(
                'gamma', 'gamma', 'user', 'gamma@fab.org',
                appbuilder.sm.find_role('Gamma'),
                password='general')

        gamma2 = appbuilder.sm.find_user('gamma2')
        if not gamma2:
            appbuilder.sm.add_user(
                'gamma2', 'gamma2', 'user', 'gamma2@fab.org',
                appbuilder.sm.find_role('Gamma'),
                password='general')

        gamma_sqllab_user = appbuilder.sm.find_user('gamma_sqllab')
        if not gamma_sqllab_user:
            appbuilder.sm.add_user(
                'gamma_sqllab', 'gamma_sqllab', 'user', 'gamma_sqllab@fab.org',
                gamma_sqllab_role, password='general')

        alpha = appbuilder.sm.find_user('alpha')
        if not alpha:
            appbuilder.sm.add_user(
                'alpha', 'alpha', 'user', 'alpha@fab.org',
                appbuilder.sm.find_role('Alpha'),
                password='general')
        sm.get_session.commit()
        # create druid cluster and druid datasources
        session = db.session
        cluster = (
            session.query(DruidCluster)
            .filter_by(cluster_name='druid_test')
            .first()
        )
        if not cluster:
            cluster = DruidCluster(cluster_name='druid_test')
            session.add(cluster)
            session.commit()

            druid_datasource1 = DruidDatasource(
                datasource_name='druid_ds_1',
                cluster_name='druid_test',
            )
            session.add(druid_datasource1)
            druid_datasource2 = DruidDatasource(
                datasource_name='druid_ds_2',
                cluster_name='druid_test',
            )
            session.add(druid_datasource2)
            session.commit()

    def get_table(self, table_id):
        return db.session.query(SqlaTable).filter_by(
            id=table_id).first()

    def get_or_create(self, cls, criteria, session):
        obj = session.query(cls).filter_by(**criteria).first()
        if not obj:
            obj = cls(**criteria)
        return obj

    def login(self, username='admin', password='general'):
        resp = self.get_resp(
            '/login/',
            data=dict(username=username, password=password))
        self.assertIn('Welcome', resp)

    def get_slice(self, slice_name, session):
        slc = (
            session.query(models.Slice)
            .filter_by(slice_name=slice_name)
            .one()
        )
        session.expunge_all()
        return slc

    def get_table_by_name(self, name):
        return db.session.query(SqlaTable).filter_by(
            table_name=name).first()

    def get_druid_ds_by_name(self, name):
        return db.session.query(DruidDatasource).filter_by(
            datasource_name=name).first()

    def get_resp(
            self, url, data=None, follow_redirects=True, raise_on_error=True):
        """Shortcut to get the parsed results while following redirects"""
        if data:
            resp = self.client.post(
                url, data=data, follow_redirects=follow_redirects)
        else:
            resp = self.client.get(url, follow_redirects=follow_redirects)
        if raise_on_error and resp.status_code > 400:
            raise Exception(
                'http request failed with code {}'.format(resp.status_code))
        return resp.data.decode('utf-8')

    def get_json_resp(
            self, url, data=None, follow_redirects=True, raise_on_error=True):
        """Shortcut to get the parsed results while following redirects"""
        resp = self.get_resp(url, data, follow_redirects, raise_on_error)
        return json.loads(resp)

    def get_main_database(self, session):
        return (
            db.session.query(models.Database)
            .filter_by(database_name='main')
            .first()
        )

    def get_access_requests(self, username, ds_type, ds_id):
        DAR = models.DatasourceAccessRequest
        return (
            db.session.query(DAR)
            .filter(
                DAR.created_by == sm.find_user(username=username),
                DAR.datasource_type == ds_type,
                DAR.datasource_id == ds_id,
            )
            .first()
        )

    def logout(self):
        self.client.get('/logout/', follow_redirects=True)

    def grant_public_access_to_table(self, table):
        public_role = appbuilder.sm.find_role('Public')
        perms = db.session.query(ab_models.PermissionView).all()
        for perm in perms:
            if (perm.permission.name == 'datasource_access' and
                    perm.view_menu and table.perm in perm.view_menu.name):
                appbuilder.sm.add_permission_role(public_role, perm)

    def revoke_public_access_to_table(self, table):
        public_role = appbuilder.sm.find_role('Public')
        perms = db.session.query(ab_models.PermissionView).all()
        for perm in perms:
            if (perm.permission.name == 'datasource_access' and
                    perm.view_menu and table.perm in perm.view_menu.name):
                appbuilder.sm.del_permission_role(public_role, perm)

    def run_sql(self, sql, client_id, user_name=None, raise_on_error=False):
        if user_name:
            self.logout()
            self.login(username=(user_name if user_name else 'admin'))
        dbid = self.get_main_database(db.session).id
        resp = self.get_json_resp(
            '/superset/sql_json/',
            raise_on_error=False,
            data=dict(database_id=dbid, sql=sql, select_as_create_as=False,
                      client_id=client_id),
        )
        if raise_on_error and 'error' in resp:
            raise Exception('run_sql failed')
        return resp

    def test_gamma_permissions(self):
        def assert_can_read(view_menu):
            self.assertIn(('can_show', view_menu), gamma_perm_set)
            self.assertIn(('can_list', view_menu), gamma_perm_set)

        def assert_can_write(view_menu):
            self.assertIn(('can_add', view_menu), gamma_perm_set)
            self.assertIn(('can_download', view_menu), gamma_perm_set)
            self.assertIn(('can_delete', view_menu), gamma_perm_set)
            self.assertIn(('can_edit', view_menu), gamma_perm_set)

        def assert_cannot_write(view_menu):
            self.assertNotIn(('can_add', view_menu), gamma_perm_set)
            self.assertNotIn(('can_download', view_menu), gamma_perm_set)
            self.assertNotIn(('can_delete', view_menu), gamma_perm_set)
            self.assertNotIn(('can_edit', view_menu), gamma_perm_set)
            self.assertNotIn(('can_save', view_menu), gamma_perm_set)

        def assert_can_all(view_menu):
            assert_can_read(view_menu)
            assert_can_write(view_menu)

        gamma_perm_set = set()
        for perm in sm.find_role('Gamma').permissions:
            gamma_perm_set.add((perm.permission.name, perm.view_menu.name))

        # check read only perms
        assert_can_read('TableModelView')
        assert_cannot_write('DruidColumnInlineView')

        # make sure that user can create slices and dashboards
        assert_can_all('SliceModelView')
        assert_can_all('DashboardModelView')

        self.assertIn(('can_add_slices', 'Superset'), gamma_perm_set)
        self.assertIn(('can_copy_dash', 'Superset'), gamma_perm_set)
        self.assertIn(('can_activity_per_day', 'Superset'), gamma_perm_set)
        self.assertIn(('can_created_dashboards', 'Superset'), gamma_perm_set)
        self.assertIn(('can_created_slices', 'Superset'), gamma_perm_set)
        self.assertIn(('can_csv', 'Superset'), gamma_perm_set)
        self.assertIn(('can_dashboard', 'Superset'), gamma_perm_set)
        self.assertIn(('can_explore', 'Superset'), gamma_perm_set)
        self.assertIn(('can_explore_json', 'Superset'), gamma_perm_set)
        self.assertIn(('can_fave_dashboards', 'Superset'), gamma_perm_set)
        self.assertIn(('can_fave_slices', 'Superset'), gamma_perm_set)
        self.assertIn(('can_save_dash', 'Superset'), gamma_perm_set)
        self.assertIn(('can_slice', 'Superset'), gamma_perm_set)
