import time
import pytest
import logging

from cassandra import Unauthorized
from ccmlib.common import is_win
from ccmlib.node import Node

from dtest_setup_overrides import DTestSetupOverrides

from dtest import Tester
from tools.assertions import assert_all, assert_invalid
from tools.misc import ImmutableMapping

since = pytest.mark.since
logger = logging.getLogger(__name__)


@pytest.mark.upgrade_test
@since('2.2')
class TestAuthUpgrade(Tester):

    @pytest.fixture(scope='function', autouse=True)
    def fixture_dtest_setup_overrides(self, dtest_config):
        dtest_setup_overrides = DTestSetupOverrides()
        dtest_setup_overrides.cluster_options = ImmutableMapping({'authenticator': 'PasswordAuthenticator',
                                                               'authorizer': 'CassandraAuthorizer'})
        return dtest_setup_overrides

    @pytest.fixture(autouse=True)
    def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
        fixture_dtest_setup.ignore_log_patterns = (
            # This one occurs if we do a non-rolling upgrade, the node
            # it's trying to send the migration to hasn't started yet,
            # and when it does, it gets replayed and everything is fine.
            r'Can\'t send migration request: node.*is down',
    )

    @since('2.2', max_version='2.2.X')
    def test_upgrade_to_22(self):
        self.do_upgrade_with_internal_auth("github:apache/cassandra-2.2")

    @since('3.0', max_version='3.0.X')
    @pytest.mark.no_offheap_memtables
    def test_upgrade_to_30(self):
        self.do_upgrade_with_internal_auth("github:apache/cassandra-3.0")

    @since('2.2', max_version='3.X')
    def test_upgrade_legacy_table(self):
        """
        Upgrade with bringing up the legacy tables after the newer nodes (without legacy tables)
        were started.

        @jira_ticket CASSANDRA-12813
        """

        cluster = self.cluster

        # Forcing cluster version on purpose
        cluster.set_install_dir(version="2.1.16")
        cluster.populate(3).start()

        node1, node2, node3 = cluster.nodelist()

        # Wait for default user to get created on one of the nodes
        time.sleep(15)

        # Upgrade to current version
        for node in [node1, node2, node3]:
            node.drain()
            node.watch_log_for("DRAINED")
            node.stop(gently=True)
            self.set_node_to_current_version(node)

        cluster.start()

        # Make sure the system_auth table will get replicated to the node that we're going to replace
        session = self.patient_cql_connection(node1, user='cassandra', password='cassandra')
        session.execute("ALTER KEYSPACE system_auth WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };")
        cluster.repair()

        cluster.stop()

        # Replace the node
        cluster.seeds.remove(node1)
        cluster.remove(node1)

        replacement_address = node1.address()
        replacement_node = Node('replacement', cluster=self.cluster, auto_bootstrap=True,
                                thrift_interface=(replacement_address, 9160),
                                storage_interface=(replacement_address, 7000),
                                jmx_port='7400', remote_debug_port='0', initial_token=None,
                                binary_interface=(replacement_address, 9042))
        self.set_node_to_current_version(replacement_node)

        cluster.add(replacement_node, True)
        replacement_node.start(wait_for_binary_proto=True)

        node2.start(wait_for_binary_proto=True)
        node3.start(wait_for_binary_proto=True)

        replacement_node.watch_log_for('Initializing system_auth.credentials')
        replacement_node.watch_log_for('Initializing system_auth.permissions')
        replacement_node.watch_log_for('Initializing system_auth.users')

        cluster.repair()
        replacement_node.watch_log_for('Repair command')

        # Should succeed. Will throw an NPE on pre-12813 code.
        self.patient_cql_connection(replacement_node, user='cassandra', password='cassandra')

    def do_upgrade_with_internal_auth(self, target_version):
        """
        Tests upgrade between 2.1->2.2 & 2.1->3.0 as the schema and apis around authn/authz changed

        @jira_ticket CASSANDRA-7653
        """
        cluster = self.cluster

        # Forcing cluster version on purpose
        cluster.set_install_dir(version="github:apache/cassandra-2.1")
        cluster.populate(3).start()

        node1, node2, node3 = cluster.nodelist()

        # wait for default superuser creation
        # The log message
        # node.watch_log_for('Created default superuser')
        # will only appear on one of the three nodes, and we don't know
        # which ahead of time. Grepping all three in parallel is unpleasant.
        # See auth_test and auth_roles test for instances of this as well.
        # Should be fixed by C*-6177
        time.sleep(15)

        session = self.patient_cql_connection(node1, user='cassandra', password='cassandra')
        session.execute("CREATE USER klaus WITH PASSWORD '12345' SUPERUSER")
        session.execute("CREATE USER michael WITH PASSWORD '54321' NOSUPERUSER")
        session.execute("CREATE KEYSPACE ks WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}")
        session.execute("CREATE TABLE ks.cf1 (id int primary key, val int)")
        session.execute("CREATE TABLE ks.cf2 (id int primary key, val int)")
        session.execute("GRANT MODIFY ON ks.cf1 TO michael")
        session.execute("GRANT SELECT ON ks.cf2 TO michael")

        self.check_permissions(node1, False)
        session.cluster.shutdown()
        # upgrade node1 to 2.2
        self.upgrade_to_version(target_version, node1)
        # run the permissions checking queries on the upgraded node
        # this will be using the legacy tables as the conversion didn't complete
        # but the output format should be updated on the upgraded node
        self.check_permissions(node1, True)
        # and check on those still on the old version
        self.check_permissions(node2, False)
        self.check_permissions(node3, False)

        # now upgrade the remaining nodes
        self.upgrade_to_version(target_version, node2)
        self.upgrade_to_version(target_version, node3)

        self.check_permissions(node2, True)
        self.check_permissions(node3, True)

        # we should now be able to drop the old auth tables
        session = self.patient_cql_connection(node1, user='cassandra', password='cassandra')
        session.execute('DROP TABLE system_auth.users', timeout=60)
        session.execute('DROP TABLE system_auth.credentials', timeout=60)
        session.execute('DROP TABLE system_auth.permissions', timeout=60)
        # and we should still be able to authenticate and check authorization
        self.check_permissions(node1, True)
        logger.debug('Test completed successfully')

    def check_permissions(self, node, upgraded):
        # use an exclusive connection to ensure we only talk to the specified node
        klaus = self.patient_exclusive_cql_connection(node, user='klaus', password='12345', timeout=20)
        # klaus is a superuser, so should be able to list all permissions
        # the output of LIST PERMISSIONS changes slightly with #7653 adding
        # a new role column to results, so we need to tailor our check
        # based on whether the node has been upgraded or not
        if not upgraded:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', '<table ks.cf2>', 'SELECT']],
                       timeout=60)
        else:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', 'michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', 'michael', '<table ks.cf2>', 'SELECT']],
                       timeout=60)

        klaus.cluster.shutdown()

        michael = self.patient_exclusive_cql_connection(node, user='michael', password='54321')
        michael.execute('INSERT INTO ks.cf1 (id, val) VALUES (0,0)')
        michael.execute('SELECT * FROM ks.cf2')
        assert_invalid(michael,
                       'SELECT * FROM ks.cf1',
                       'User michael has no SELECT permission on <table ks.cf1> or any of its parents',
                       Unauthorized)
        michael.cluster.shutdown()

    def upgrade_to_version(self, tag, node):
        format_args = {'node': node.name, 'tag': tag}
        logger.debug('Upgrading node {node} to {tag}'.format(**format_args))
        # drain and shutdown
        node.drain()
        node.watch_log_for("DRAINED")
        node.stop(wait_other_notice=False)
        logger.debug('{node} stopped'.format(**format_args))

        # Ignore errors before upgrade on Windows
        if is_win():
            node.mark_log_for_errors()

        # Update Cassandra Directory
        logger.debug('Updating version to tag {tag}'.format(**format_args))
        node.set_install_dir(version=tag, verbose=True)
        logger.debug('Set new cassandra dir for {node}: {tag}'.format(**format_args))

        # Restart node on new version
        logger.debug('Starting {node} on new version ({tag})'.format(**format_args))
        # Setup log4j / logback again (necessary moving from 2.0 -> 2.1):
        node.set_log_level("INFO")
        node.start()
        # wait for the conversion of legacy data to either complete or fail
        # (because not enough upgraded nodes are available yet)
        logger.debug('Waiting for conversion of legacy data to complete or fail')
        node.watch_log_for('conversion of legacy permissions')

        logger.debug('Running upgradesstables')
        node.nodetool('upgradesstables -a')
        logger.debug('Upgrade of {node} complete'.format(**format_args))
