import time
from datetime import datetime
from distutils.version import LooseVersion
from threading import Event

from cassandra import ConsistencyLevel as CL
from cassandra import ReadFailure
from cassandra.query import SimpleStatement
from ccmlib.node import Node, TimeoutError
from nose.tools import timed

from dtest import Tester, debug, get_ip_from_node, create_ks
from tools.decorators import no_vnodes, since


class NotificationWaiter(object):
    """
    A helper class for waiting for pushed notifications from
    Cassandra over the native protocol.
    """

    def __init__(self, tester, node, notification_types, keyspace=None):
        """
        `address` should be a ccmlib.node.Node instance
        `notification_types` should be a list of
        "TOPOLOGY_CHANGE", "STATUS_CHANGE", and "SCHEMA_CHANGE".
        """
        self.node = node
        self.address = node.network_interfaces['binary'][0]
        self.notification_types = notification_types
        self.keyspace = keyspace

        # get a single, new connection
        session = tester.patient_exclusive_cql_connection(node)
        connection = session.cluster.connection_factory(self.address, is_control_connection=True)

        # coordinate with an Event
        self.event = Event()

        # the pushed notification
        self.notifications = []

        # register a callback for the notification type
        for notification_type in notification_types:
            connection.register_watcher(notification_type, self.handle_notification, register_timeout=5.0)

    def handle_notification(self, notification):
        """
        Called when a notification is pushed from Cassandra.
        """
        debug("Got {} from {} at {}".format(notification, self.address, datetime.now()))

        if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']:
            return  # we are not interested in this schema change

        self.notifications.append(notification)
        self.event.set()

    def wait_for_notifications(self, timeout, num_notifications=1):
        """
        Waits up to `timeout` seconds for notifications from Cassandra. If
        passed `num_notifications`, stop waiting when that many notifications
        are observed.
        """

        deadline = time.time() + timeout
        while time.time() < deadline:
            self.event.wait(deadline - time.time())
            self.event.clear()
            if len(self.notifications) >= num_notifications:
                break

        return self.notifications

    def clear_notifications(self):
        debug("Clearing notifications...")
        self.notifications = []
        self.event.clear()


class TestPushedNotifications(Tester):
    """
    Tests for pushed native protocol notification from Cassandra.
    """

    @no_vnodes()
    def move_single_node_test(self):
        """
        @jira_ticket CASSANDRA-8516
        Moving a token should result in MOVED_NODE notifications.
        """
        self.cluster.populate(3).start(wait_for_binary_proto=True, wait_other_notice=True)

        waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
                   for node in self.cluster.nodes.values()]

        # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
        # late due to network delays let's block a bit longer
        debug("Waiting for unwanted notifications....")
        waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
        waiters[0].clear_notifications()

        debug("Issuing move command....")
        node1 = self.cluster.nodes.values()[0]
        node1.move("123")

        for waiter in waiters:
            debug("Waiting for notification from {}".format(waiter.address,))
            notifications = waiter.wait_for_notifications(60.0)
            self.assertEquals(1, len(notifications), notifications)
            notification = notifications[0]
            change_type = notification["change_type"]
            address, port = notification["address"]
            self.assertEquals("MOVED_NODE", change_type)
            self.assertEquals(get_ip_from_node(node1), address)

    @no_vnodes()
    def move_single_node_localhost_test(self):
        """
        @jira_ticket  CASSANDRA-10052
        Test that we don't get NODE_MOVED notifications from nodes other than the local one,
        when rpc_address is set to localhost (127.0.0.1).

        To set-up this test we override the rpc_address to "localhost (127.0.0.1)" for all nodes, and
        therefore we must change the rpc port or else processes won't start.
        """
        cluster = self.cluster
        cluster.populate(3)

        self.change_rpc_address_to_localhost()

        cluster.start(wait_for_binary_proto=True, wait_other_notice=True)

        waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
                   for node in self.cluster.nodes.values()]

        # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
        # late due to network delays let's block a bit longer
        debug("Waiting for unwanted notifications...")
        waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
        waiters[0].clear_notifications()

        debug("Issuing move command....")
        node1 = self.cluster.nodes.values()[0]
        node1.move("123")

        for waiter in waiters:
            debug("Waiting for notification from {}".format(waiter.address,))
            notifications = waiter.wait_for_notifications(30.0)
            self.assertEquals(1 if waiter.node is node1 else 0, len(notifications), notifications)

    def restart_node_test(self):
        """
        @jira_ticket CASSANDRA-7816
        Restarting a node should generate exactly one DOWN and one UP notification
        """

        self.cluster.populate(2).start(wait_for_binary_proto=True, wait_other_notice=True)
        node1, node2 = self.cluster.nodelist()

        waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])

        # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
        # don't confuse the state below.
        debug("Waiting for unwanted notifications...")
        waiter.wait_for_notifications(timeout=30, num_notifications=2)
        waiter.clear_notifications()

        # On versions prior to 2.2, an additional NEW_NODE notification is sent when a node
        # is restarted. This bug was fixed in CASSANDRA-11038 (see also CASSANDRA-11360)
        version = self.cluster.cassandra_version()
        expected_notifications = 2 if version >= '2.2' else 3
        for i in range(5):
            debug("Restarting second node...")
            node2.stop(wait_other_notice=True)
            node2.start(wait_other_notice=True)
            debug("Waiting for notifications from {}".format(waiter.address))
            notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications)
            self.assertEquals(expected_notifications, len(notifications), notifications)
            for notification in notifications:
                self.assertEquals(get_ip_from_node(node2), notification["address"][0])
            self.assertEquals("DOWN", notifications[0]["change_type"])
            if version >= '2.2':
                self.assertEquals("UP", notifications[1]["change_type"])
            else:
                # pre 2.2, we'll receive both a NEW_NODE and an UP notification,
                # but the order is not guaranteed
                self.assertEquals({"NEW_NODE", "UP"}, set(map(lambda n: n["change_type"], notifications[1:])))

            waiter.clear_notifications()

    def restart_node_localhost_test(self):
        """
        Test that we don't get client notifications when rpc_address is set to localhost.
        @jira_ticket  CASSANDRA-10052

        To set-up this test we override the rpc_address to "localhost" for all nodes, and
        therefore we must change the rpc port or else processes won't start.
        """
        cluster = self.cluster
        cluster.populate(2)
        node1, node2 = cluster.nodelist()

        self.change_rpc_address_to_localhost()

        cluster.start(wait_for_binary_proto=True, wait_other_notice=True)

        # register for notification with node1
        waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])

        # restart node 2
        debug("Restarting second node...")
        node2.stop(wait_other_notice=True)
        node2.start(wait_other_notice=True)

        # check that node1 did not send UP or DOWN notification for node2
        debug("Waiting for notifications from {}".format(waiter.address,))
        notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2)
        self.assertEquals(0, len(notifications), notifications)

    @since("2.2")
    def add_and_remove_node_test(self):
        """
        Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave.
        @jira_ticket CASSANDRA-11038
        """
        self.cluster.populate(1).start(wait_for_binary_proto=True)
        node1 = self.cluster.nodelist()[0]

        waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])

        # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
        # don't confuse the state below
        debug("Waiting for unwanted notifications...")
        waiter.wait_for_notifications(timeout=30, num_notifications=2)
        waiter.clear_notifications()

        session = self.patient_cql_connection(node1)
        # reduce system_distributed RF to 2 so we don't require forceful decommission
        session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
        session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")

        debug("Adding second node...")
        node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042))
        self.cluster.add(node2, False)
        node2.start(wait_other_notice=True)
        debug("Waiting for notifications from {}".format(waiter.address))
        notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
        self.assertEquals(2, len(notifications), notifications)
        for notification in notifications:
            self.assertEquals(get_ip_from_node(node2), notification["address"][0])
            self.assertEquals("NEW_NODE", notifications[0]["change_type"])
            self.assertEquals("UP", notifications[1]["change_type"])

        debug("Removing second node...")
        waiter.clear_notifications()
        node2.decommission()
        node2.stop(gently=False)
        debug("Waiting for notifications from {}".format(waiter.address))
        notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
        self.assertEquals(2, len(notifications), notifications)
        for notification in notifications:
            self.assertEquals(get_ip_from_node(node2), notification["address"][0])
            self.assertEquals("REMOVED_NODE", notifications[0]["change_type"])
            self.assertEquals("DOWN", notifications[1]["change_type"])

    def change_rpc_address_to_localhost(self):
        """
        change node's 'rpc_address' from '127.0.0.x' to 'localhost (127.0.0.1)', increase port numbers
        """
        cluster = self.cluster

        i = 0
        for node in cluster.nodelist():
            debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml')
            if cluster.version() < '4':
                node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i)
            node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i)
            node.import_config_files()  # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address
            node.set_configuration_options(values={'rpc_address': 'localhost'})
            debug(node.show())
            i += 2

    @since("3.0")
    def schema_changes_test(self):
        """
        @jira_ticket CASSANDRA-10328
        Creating, updating and dropping a keyspace, a table and a materialized view
        will generate the correct schema change notifications.
        """

        self.cluster.populate(2).start(wait_for_binary_proto=True)
        node1, node2 = self.cluster.nodelist()

        session = self.patient_cql_connection(node1)
        waiter = NotificationWaiter(self, node2, ["SCHEMA_CHANGE"], keyspace='ks')

        create_ks(session, 'ks', 3)
        session.execute("create TABLE t (k int PRIMARY KEY , v int)")
        session.execute("alter TABLE t add v1 int;")

        session.execute("create MATERIALIZED VIEW mv as select * from t WHERE v IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY (v, k)")
        session.execute(" alter materialized view mv with min_index_interval = 100")

        session.execute("drop MATERIALIZED VIEW mv")
        session.execute("drop TABLE t")
        session.execute("drop KEYSPACE ks")

        debug("Waiting for notifications from {}".format(waiter.address,))
        notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8)
        self.assertEquals(8, len(notifications), notifications)
        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'KEYSPACE'}, notifications[0])
        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u't'}, notifications[1])
        self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u't'}, notifications[2])
        self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[3])
        self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[4])
        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[5])
        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u't'}, notifications[6])
        self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'KEYSPACE'}, notifications[7])


class TestVariousNotifications(Tester):
    """
    Tests for various notifications/messages from Cassandra.
    """

    @since('2.2')
    def tombstone_failure_threshold_message_test(self):
        """
        Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather
        than dropping the request. A drop makes the coordinator waits for the specified
        read_request_timeout_in_ms.
        @jira_ticket CASSANDRA-7886
        """

        have_v5_protocol = self.cluster.version() >= LooseVersion('3.10')

        self.allow_log_errors = True
        self.cluster.set_configuration_options(
            values={
                'tombstone_failure_threshold': 500,
                'read_request_timeout_in_ms': 30000,  # 30 seconds
                'range_request_timeout_in_ms': 40000
            }
        )
        self.cluster.populate(3).start()
        node1, node2, node3 = self.cluster.nodelist()
        proto_version = 5 if have_v5_protocol else None
        session = self.patient_cql_connection(node1, protocol_version=proto_version)

        create_ks(session, 'test', 3)
        session.execute(
            "CREATE TABLE test ( "
            "id int, mytext text, col1 int, col2 int, col3 int, "
            "PRIMARY KEY (id, mytext) )"
        )

        # Add data with tombstones
        values = map(lambda i: str(i), range(1000))
        for value in values:
            session.execute(SimpleStatement(
                "insert into test (id, mytext, col1) values (1, '{}', null) ".format(
                    value
                ),
                consistency_level=CL.ALL
            ))

        failure_msg = ("Scanned over.* tombstones.* query aborted")

        @timed(25)
        def read_failure_query():
            try:
                session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL))
            except ReadFailure as exc:
                if have_v5_protocol:
                    # at least one replica should have responded with a tombstone error
                    self.assertIsNotNone(exc.error_code_map)
                    self.assertEqual(0x0001, exc.error_code_map.values()[0])
            except Exception:
                raise
            else:
                self.fail('Expected ReadFailure')

        read_failure_query()

        # In almost all cases, we should find the failure message on node1 within a few seconds.
        # If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
        # If we still cannot find it then, we fail the test, as this is a problem.
        try:
            node1.watch_log_for(failure_msg, timeout=5)
        except TimeoutError:
            failure = (node1.grep_log(failure_msg) or
                       node2.grep_log(failure_msg) or
                       node3.grep_log(failure_msg))

            self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
                                      "after failed query"))

        mark1 = node1.mark_log()
        mark2 = node2.mark_log()
        mark3 = node3.mark_log()

        @timed(35)
        def range_request_failure_query():
            try:
                session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL))
            except ReadFailure as exc:
                if have_v5_protocol:
                    # at least one replica should have responded with a tombstone error
                    self.assertIsNotNone(exc.error_code_map)
                    self.assertEqual(0x0001, exc.error_code_map.values()[0])
            except Exception:
                raise
            else:
                self.fail('Expected ReadFailure')

        range_request_failure_query()

        # In almost all cases, we should find the failure message on node1 within a few seconds.
        # If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
        # If we still cannot find it then, we fail the test, as this is a problem.
        try:
            node1.watch_log_for(failure_msg, from_mark=mark1, timeout=5)
        except TimeoutError:
            failure = (node1.grep_log(failure_msg, from_mark=mark1) or
                       node2.grep_log(failure_msg, from_mark=mark2) or
                       node3.grep_log(failure_msg, from_mark=mark3))

            self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
                                      "after range_request_timeout_query"))
