| import time |
| import pytest |
| import logging |
| |
| from datetime import datetime |
| from distutils.version import LooseVersion |
| from threading import Event |
| |
| from cassandra import ConsistencyLevel as CL |
| from cassandra import ReadFailure |
| from cassandra.query import SimpleStatement |
| from ccmlib.node import Node, TimeoutError |
| |
| from dtest import Tester, get_ip_from_node, create_ks |
| |
| since = pytest.mark.since |
| logger = logging.getLogger(__name__) |
| |
| |
| class NotificationWaiter(object): |
| """ |
| A helper class for waiting for pushed notifications from |
| Cassandra over the native protocol. |
| """ |
| |
| def __init__(self, tester, node, notification_types, keyspace=None): |
| """ |
| `address` should be a ccmlib.node.Node instance |
| `notification_types` should be a list of |
| "TOPOLOGY_CHANGE", "STATUS_CHANGE", and "SCHEMA_CHANGE". |
| """ |
| self.node = node |
| self.address = node.network_interfaces['binary'][0] |
| self.notification_types = notification_types |
| self.keyspace = keyspace |
| |
| # get a single, new connection |
| session = tester.patient_exclusive_cql_connection(node) |
| connection = session.cluster.connection_factory(self.address, is_control_connection=True) |
| |
| # coordinate with an Event |
| self.event = Event() |
| |
| # the pushed notification |
| self.notifications = [] |
| |
| # register a callback for the notification type |
| for notification_type in notification_types: |
| connection.register_watcher(notification_type, self.handle_notification, register_timeout=5.0) |
| |
| def handle_notification(self, notification): |
| """ |
| Called when a notification is pushed from Cassandra. |
| """ |
| logger.debug("Got {} from {} at {}".format(notification, self.address, datetime.now())) |
| |
| if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']: |
| return # we are not interested in this schema change |
| |
| self.notifications.append(notification) |
| self.event.set() |
| |
| def wait_for_notifications(self, timeout, num_notifications=1): |
| """ |
| Waits up to `timeout` seconds for notifications from Cassandra. If |
| passed `num_notifications`, stop waiting when that many notifications |
| are observed. |
| """ |
| |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| self.event.wait(deadline - time.time()) |
| self.event.clear() |
| if len(self.notifications) >= num_notifications: |
| break |
| |
| return self.notifications |
| |
| def clear_notifications(self): |
| logger.debug("Clearing notifications...") |
| self.notifications = [] |
| self.event.clear() |
| |
| |
| class TestPushedNotifications(Tester): |
| """ |
| Tests for pushed native protocol notification from Cassandra. |
| """ |
| |
| @pytest.mark.no_vnodes |
| def test_move_single_node(self): |
| """ |
| @jira_ticket CASSANDRA-8516 |
| Moving a token should result in MOVED_NODE notifications. |
| """ |
| self.cluster.populate(3).start(wait_for_binary_proto=True, wait_other_notice=True) |
| |
| waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"]) |
| for node in list(self.cluster.nodes.values())] |
| |
| # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are |
| # late due to network delays let's block a bit longer |
| logger.debug("Waiting for unwanted notifications....") |
| waiters[0].wait_for_notifications(timeout=30, num_notifications=2) |
| waiters[0].clear_notifications() |
| |
| logger.debug("Issuing move command....") |
| node1 = list(self.cluster.nodes.values())[0] |
| node1.move("123") |
| |
| for waiter in waiters: |
| logger.debug("Waiting for notification from {}".format(waiter.address,)) |
| notifications = waiter.wait_for_notifications(60.0) |
| assert 1 == len(notifications), notifications |
| notification = notifications[0] |
| change_type = notification["change_type"] |
| address, port = notification["address"] |
| assert "MOVED_NODE" == change_type |
| assert get_ip_from_node(node1) == address |
| |
| @pytest.mark.no_vnodes |
| def test_move_single_node_localhost(self): |
| """ |
| @jira_ticket CASSANDRA-10052 |
| Test that we don't get NODE_MOVED notifications from nodes other than the local one, |
| when rpc_address is set to localhost (127.0.0.1). |
| |
| To set-up this test we override the rpc_address to "localhost (127.0.0.1)" for all nodes, and |
| therefore we must change the rpc port or else processes won't start. |
| """ |
| cluster = self.cluster |
| cluster.populate(3) |
| |
| self.change_rpc_address_to_localhost() |
| |
| cluster.start(wait_for_binary_proto=True, wait_other_notice=True) |
| |
| waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"]) |
| for node in list(self.cluster.nodes.values())] |
| |
| # The first node sends NEW_NODE for the other 2 nodes during startup, in case they are |
| # late due to network delays let's block a bit longer |
| logger.debug("Waiting for unwanted notifications...") |
| waiters[0].wait_for_notifications(timeout=30, num_notifications=2) |
| waiters[0].clear_notifications() |
| |
| logger.debug("Issuing move command....") |
| node1 = list(self.cluster.nodes.values())[0] |
| node1.move("123") |
| |
| for waiter in waiters: |
| logger.debug("Waiting for notification from {}".format(waiter.address,)) |
| notifications = waiter.wait_for_notifications(30.0) |
| assert 1 if waiter.node is node1 else 0 == len(notifications), notifications |
| |
| def test_restart_node(self): |
| """ |
| @jira_ticket CASSANDRA-7816 |
| Restarting a node should generate exactly one DOWN and one UP notification |
| """ |
| self.cluster.populate(2).start(wait_for_binary_proto=True, wait_other_notice=True) |
| node1, node2 = self.cluster.nodelist() |
| |
| waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"]) |
| |
| # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications |
| # don't confuse the state below. |
| logger.debug("Waiting for unwanted notifications...") |
| waiter.wait_for_notifications(timeout=30, num_notifications=2) |
| waiter.clear_notifications() |
| |
| # On versions prior to 2.2, an additional NEW_NODE notification is sent when a node |
| # is restarted. This bug was fixed in CASSANDRA-11038 (see also CASSANDRA-11360) |
| version = self.cluster.cassandra_version() |
| expected_notifications = 2 if version >= '2.2' else 3 |
| for i in range(5): |
| logger.debug("Restarting second node...") |
| node2.stop(wait_other_notice=True) |
| node2.start(wait_other_notice=True) |
| logger.debug("Waiting for notifications from {}".format(waiter.address)) |
| notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications) |
| assert expected_notifications, len(notifications) == notifications |
| for notification in notifications: |
| assert get_ip_from_node(node2) == notification["address"][0] |
| assert "DOWN" == notifications[0]["change_type"] |
| if version >= '2.2': |
| assert "UP" == notifications[1]["change_type"] |
| else: |
| # pre 2.2, we'll receive both a NEW_NODE and an UP notification, |
| # but the order is not guaranteed |
| assert {"NEW_NODE", "UP"} == set([n["change_type"] for n in notifications[1:]]) |
| |
| waiter.clear_notifications() |
| |
| def test_restart_node_localhost(self): |
| """ |
| Test that we don't get client notifications when rpc_address is set to localhost. |
| @jira_ticket CASSANDRA-10052 |
| |
| To set-up this test we override the rpc_address to "localhost" for all nodes, and |
| therefore we must change the rpc port or else processes won't start. |
| """ |
| cluster = self.cluster |
| cluster.populate(2) |
| node1, node2 = cluster.nodelist() |
| |
| self.change_rpc_address_to_localhost() |
| |
| cluster.start(wait_for_binary_proto=True, wait_other_notice=True) |
| |
| # register for notification with node1 |
| waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"]) |
| |
| # restart node 2 |
| logger.debug("Restarting second node...") |
| node2.stop(wait_other_notice=True) |
| node2.start(wait_other_notice=True) |
| |
| # check that node1 did not send UP or DOWN notification for node2 |
| logger.debug("Waiting for notifications from {}".format(waiter.address,)) |
| notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2) |
| assert 0 == len(notifications), notifications |
| |
| @since("2.2") |
| def test_add_and_remove_node(self): |
| """ |
| Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave. |
| @jira_ticket CASSANDRA-11038 |
| """ |
| self.cluster.populate(1).start(wait_for_binary_proto=True) |
| node1 = self.cluster.nodelist()[0] |
| |
| waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"]) |
| |
| # need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications |
| # don't confuse the state below |
| logger.debug("Waiting for unwanted notifications...") |
| waiter.wait_for_notifications(timeout=30, num_notifications=2) |
| waiter.clear_notifications() |
| |
| session = self.patient_cql_connection(node1) |
| # reduce system_distributed RF to 2 so we don't require forceful decommission |
| session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") |
| session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") |
| |
| logger.debug("Adding second node...") |
| node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042)) |
| self.cluster.add(node2, False) |
| node2.start(wait_other_notice=True) |
| logger.debug("Waiting for notifications from {}".format(waiter.address)) |
| notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2) |
| assert 2 == len(notifications), notifications |
| for notification in notifications: |
| assert get_ip_from_node(node2) == notification["address"][0] |
| assert "NEW_NODE" == notifications[0]["change_type"] |
| assert "UP" == notifications[1]["change_type"] |
| |
| logger.debug("Removing second node...") |
| waiter.clear_notifications() |
| node2.decommission() |
| node2.stop(gently=False) |
| logger.debug("Waiting for notifications from {}".format(waiter.address)) |
| notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2) |
| assert 2 == len(notifications), notifications |
| for notification in notifications: |
| assert get_ip_from_node(node2) == notification["address"][0] |
| assert "REMOVED_NODE" == notifications[0]["change_type"] |
| assert "DOWN" == notifications[1]["change_type"] |
| |
| def change_rpc_address_to_localhost(self): |
| """ |
| change node's 'rpc_address' from '127.0.0.x' to 'localhost (127.0.0.1)', increase port numbers |
| """ |
| cluster = self.cluster |
| |
| i = 0 |
| for node in cluster.nodelist(): |
| logger.debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml') |
| if cluster.version() < '4': |
| node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i) |
| node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i) |
| node.import_config_files() # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address |
| node.set_configuration_options(values={'rpc_address': 'localhost'}) |
| logger.debug(node.show()) |
| i += 2 |
| |
| @since("3.0") |
| def test_schema_changes(self): |
| """ |
| @jira_ticket CASSANDRA-10328 |
| Creating, updating and dropping a keyspace, a table and a materialized view |
| will generate the correct schema change notifications. |
| """ |
| self.cluster.populate(2).start(wait_for_binary_proto=True) |
| node1, node2 = self.cluster.nodelist() |
| |
| session = self.patient_cql_connection(node1) |
| waiter = NotificationWaiter(self, node2, ["SCHEMA_CHANGE"], keyspace='ks') |
| |
| create_ks(session, 'ks', 3) |
| session.execute("create TABLE t (k int PRIMARY KEY , v int)") |
| session.execute("alter TABLE t add v1 int;") |
| |
| session.execute("create MATERIALIZED VIEW mv as select * from t WHERE v IS NOT NULL AND k IS NOT NULL PRIMARY KEY (v, k)") |
| session.execute(" alter materialized view mv with min_index_interval = 100") |
| |
| session.execute("drop MATERIALIZED VIEW mv") |
| session.execute("drop TABLE t") |
| session.execute("drop KEYSPACE ks") |
| |
| logger.debug("Waiting for notifications from {}".format(waiter.address,)) |
| notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8) |
| assert 8 == len(notifications), notifications |
| # assert dict contains subset |
| expected = {'change_type': 'CREATED', 'target_type': 'KEYSPACE'} |
| assert set(notifications[0].keys()) >= expected.keys() and {k: notifications[0][k] for k in expected if |
| k in notifications[0]} == expected |
| expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 't'} |
| assert set(notifications[1].keys()) >= expected.keys() and {k: notifications[1][k] for k in expected if |
| k in notifications[1]} == expected |
| expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 't'} |
| assert set(notifications[2].keys()) >= expected.keys() and {k: notifications[2][k] for k in expected if |
| k in notifications[2]} == expected |
| expected = {'change_type': 'CREATED', 'target_type': 'TABLE', 'table': 'mv'} |
| assert set(notifications[3].keys()) >= expected.keys() and {k: notifications[3][k] for k in expected if |
| k in notifications[3]} == expected |
| expected = {'change_type': 'UPDATED', 'target_type': 'TABLE', 'table': 'mv'} |
| assert set(notifications[4].keys()) >= expected.keys() and {k: notifications[4][k] for k in expected if |
| k in notifications[4]} == expected |
| expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 'mv'} |
| assert set(notifications[5].keys()) >= expected.keys() and {k: notifications[5][k] for k in expected if |
| k in notifications[5]} == expected |
| expected = {'change_type': 'DROPPED', 'target_type': 'TABLE', 'table': 't'} |
| assert set(notifications[6].keys()) >= expected.keys() and {k: notifications[6][k] for k in expected if |
| k in notifications[6]} == expected |
| expected = {'change_type': 'DROPPED', 'target_type': 'KEYSPACE'} |
| assert set(notifications[7].keys()) >= expected.keys() and {k: notifications[7][k] for k in expected if |
| k in notifications[7]} == expected |
| |
| |
| class TestVariousNotifications(Tester): |
| """ |
| Tests for various notifications/messages from Cassandra. |
| """ |
| |
| @since('2.2') |
| def test_tombstone_failure_threshold_message(self): |
| """ |
| Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather |
| than dropping the request. A drop makes the coordinator waits for the specified |
| read_request_timeout_in_ms. |
| @jira_ticket CASSANDRA-7886 |
| """ |
| have_v5_protocol = self.cluster.version() >= LooseVersion('3.10') |
| |
| self.fixture_dtest_setup.allow_log_errors = True |
| self.cluster.set_configuration_options( |
| values={ |
| 'tombstone_failure_threshold': 500, |
| 'read_request_timeout_in_ms': 30000, # 30 seconds |
| 'range_request_timeout_in_ms': 40000 |
| } |
| ) |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| proto_version = 5 if have_v5_protocol else None |
| session = self.patient_cql_connection(node1, protocol_version=proto_version) |
| |
| create_ks(session, 'test', 3) |
| session.execute( |
| "CREATE TABLE test ( " |
| "id int, mytext text, col1 int, col2 int, col3 int, " |
| "PRIMARY KEY (id, mytext) )" |
| ) |
| |
| # Add data with tombstones |
| values = [str(i) for i in range(1000)] |
| for value in values: |
| session.execute(SimpleStatement( |
| "insert into test (id, mytext, col1) values (1, '{}', null) ".format( |
| value |
| ), |
| consistency_level=CL.ALL |
| )) |
| |
| failure_msg = ("Scanned over.* tombstones.* query aborted") |
| |
| @pytest.mark.timeout(25) |
| def read_failure_query(): |
| try: |
| session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL)) |
| except ReadFailure as exc: |
| if have_v5_protocol: |
| # at least one replica should have responded with a tombstone error |
| assert exc.error_code_map is not None |
| assert 0x0001 == list(exc.error_code_map.values())[0] |
| except Exception: |
| raise |
| else: |
| self.fail('Expected ReadFailure') |
| |
| read_failure_query() |
| |
| # In almost all cases, we should find the failure message on node1 within a few seconds. |
| # If it is not on node1, we grep all logs, as it *absolutely* should be somewhere. |
| # If we still cannot find it then, we fail the test, as this is a problem. |
| try: |
| node1.watch_log_for(failure_msg, timeout=5) |
| except TimeoutError: |
| failure = (node1.grep_log(failure_msg) or |
| node2.grep_log(failure_msg) or |
| node3.grep_log(failure_msg)) |
| |
| assert failure == "Cannot find tombstone failure threshold error in log after failed query" |
| |
| mark1 = node1.mark_log() |
| mark2 = node2.mark_log() |
| mark3 = node3.mark_log() |
| |
| @pytest.mark.timeout(35) |
| def range_request_failure_query(): |
| try: |
| session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL)) |
| except ReadFailure as exc: |
| if have_v5_protocol: |
| # at least one replica should have responded with a tombstone error |
| assert exc.error_code_map is not None |
| assert 0x0001 == list(exc.error_code_map.values())[0] |
| except Exception: |
| raise |
| else: |
| self.fail('Expected ReadFailure') |
| |
| range_request_failure_query() |
| |
| # In almost all cases, we should find the failure message on node1 within a few seconds. |
| # If it is not on node1, we grep all logs, as it *absolutely* should be somewhere. |
| # If we still cannot find it then, we fail the test, as this is a problem. |
| try: |
| node1.watch_log_for(failure_msg, from_mark=mark1, timeout=5) |
| except TimeoutError: |
| failure = (node1.grep_log(failure_msg, from_mark=mark1) or |
| node2.grep_log(failure_msg, from_mark=mark2) or |
| node3.grep_log(failure_msg, from_mark=mark3)) |
| |
| assert failure == "Cannot find tombstone failure threshold error in log after range_request_timeout_query" |