blob: a8dcf81e1ebe597fbaeea4d68d71b6d1a4946de0 [file] [log] [blame]
import time
from datetime import datetime
from distutils.version import LooseVersion
from threading import Event
from cassandra import ConsistencyLevel as CL
from cassandra import ReadFailure
from cassandra.query import SimpleStatement
from ccmlib.node import Node, TimeoutError
from nose.tools import timed
from dtest import Tester, debug, get_ip_from_node, create_ks
from tools.decorators import no_vnodes, since
class NotificationWaiter(object):
"""
A helper class for waiting for pushed notifications from
Cassandra over the native protocol.
"""
def __init__(self, tester, node, notification_types, keyspace=None):
"""
`address` should be a ccmlib.node.Node instance
`notification_types` should be a list of
"TOPOLOGY_CHANGE", "STATUS_CHANGE", and "SCHEMA_CHANGE".
"""
self.node = node
self.address = node.network_interfaces['binary'][0]
self.notification_types = notification_types
self.keyspace = keyspace
# get a single, new connection
session = tester.patient_exclusive_cql_connection(node)
connection = session.cluster.connection_factory(self.address, is_control_connection=True)
# coordinate with an Event
self.event = Event()
# the pushed notification
self.notifications = []
# register a callback for the notification type
for notification_type in notification_types:
connection.register_watcher(notification_type, self.handle_notification, register_timeout=5.0)
def handle_notification(self, notification):
"""
Called when a notification is pushed from Cassandra.
"""
debug("Got {} from {} at {}".format(notification, self.address, datetime.now()))
if self.keyspace and notification['keyspace'] and self.keyspace != notification['keyspace']:
return # we are not interested in this schema change
self.notifications.append(notification)
self.event.set()
def wait_for_notifications(self, timeout, num_notifications=1):
"""
Waits up to `timeout` seconds for notifications from Cassandra. If
passed `num_notifications`, stop waiting when that many notifications
are observed.
"""
deadline = time.time() + timeout
while time.time() < deadline:
self.event.wait(deadline - time.time())
self.event.clear()
if len(self.notifications) >= num_notifications:
break
return self.notifications
def clear_notifications(self):
debug("Clearing notifications...")
self.notifications = []
self.event.clear()
class TestPushedNotifications(Tester):
"""
Tests for pushed native protocol notification from Cassandra.
"""
@no_vnodes()
def move_single_node_test(self):
"""
@jira_ticket CASSANDRA-8516
Moving a token should result in MOVED_NODE notifications.
"""
self.cluster.populate(3).start(wait_for_binary_proto=True, wait_other_notice=True)
waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
for node in self.cluster.nodes.values()]
# The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
# late due to network delays let's block a bit longer
debug("Waiting for unwanted notifications....")
waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
waiters[0].clear_notifications()
debug("Issuing move command....")
node1 = self.cluster.nodes.values()[0]
node1.move("123")
for waiter in waiters:
debug("Waiting for notification from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(60.0)
self.assertEquals(1, len(notifications), notifications)
notification = notifications[0]
change_type = notification["change_type"]
address, port = notification["address"]
self.assertEquals("MOVED_NODE", change_type)
self.assertEquals(get_ip_from_node(node1), address)
@no_vnodes()
def move_single_node_localhost_test(self):
"""
@jira_ticket CASSANDRA-10052
Test that we don't get NODE_MOVED notifications from nodes other than the local one,
when rpc_address is set to localhost (127.0.0.1).
To set-up this test we override the rpc_address to "localhost (127.0.0.1)" for all nodes, and
therefore we must change the rpc port or else processes won't start.
"""
cluster = self.cluster
cluster.populate(3)
self.change_rpc_address_to_localhost()
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
waiters = [NotificationWaiter(self, node, ["TOPOLOGY_CHANGE"])
for node in self.cluster.nodes.values()]
# The first node sends NEW_NODE for the other 2 nodes during startup, in case they are
# late due to network delays let's block a bit longer
debug("Waiting for unwanted notifications...")
waiters[0].wait_for_notifications(timeout=30, num_notifications=2)
waiters[0].clear_notifications()
debug("Issuing move command....")
node1 = self.cluster.nodes.values()[0]
node1.move("123")
for waiter in waiters:
debug("Waiting for notification from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(30.0)
self.assertEquals(1 if waiter.node is node1 else 0, len(notifications), notifications)
def restart_node_test(self):
"""
@jira_ticket CASSANDRA-7816
Restarting a node should generate exactly one DOWN and one UP notification
"""
self.cluster.populate(2).start(wait_for_binary_proto=True, wait_other_notice=True)
node1, node2 = self.cluster.nodelist()
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
# don't confuse the state below.
debug("Waiting for unwanted notifications...")
waiter.wait_for_notifications(timeout=30, num_notifications=2)
waiter.clear_notifications()
# On versions prior to 2.2, an additional NEW_NODE notification is sent when a node
# is restarted. This bug was fixed in CASSANDRA-11038 (see also CASSANDRA-11360)
version = self.cluster.cassandra_version()
expected_notifications = 2 if version >= '2.2' else 3
for i in range(5):
debug("Restarting second node...")
node2.stop(wait_other_notice=True)
node2.start(wait_other_notice=True)
debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=expected_notifications)
self.assertEquals(expected_notifications, len(notifications), notifications)
for notification in notifications:
self.assertEquals(get_ip_from_node(node2), notification["address"][0])
self.assertEquals("DOWN", notifications[0]["change_type"])
if version >= '2.2':
self.assertEquals("UP", notifications[1]["change_type"])
else:
# pre 2.2, we'll receive both a NEW_NODE and an UP notification,
# but the order is not guaranteed
self.assertEquals({"NEW_NODE", "UP"}, set(map(lambda n: n["change_type"], notifications[1:])))
waiter.clear_notifications()
def restart_node_localhost_test(self):
"""
Test that we don't get client notifications when rpc_address is set to localhost.
@jira_ticket CASSANDRA-10052
To set-up this test we override the rpc_address to "localhost" for all nodes, and
therefore we must change the rpc port or else processes won't start.
"""
cluster = self.cluster
cluster.populate(2)
node1, node2 = cluster.nodelist()
self.change_rpc_address_to_localhost()
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
# register for notification with node1
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# restart node 2
debug("Restarting second node...")
node2.stop(wait_other_notice=True)
node2.start(wait_other_notice=True)
# check that node1 did not send UP or DOWN notification for node2
debug("Waiting for notifications from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(timeout=30.0, num_notifications=2)
self.assertEquals(0, len(notifications), notifications)
@since("2.2")
def add_and_remove_node_test(self):
"""
Test that NEW_NODE and REMOVED_NODE are sent correctly as nodes join and leave.
@jira_ticket CASSANDRA-11038
"""
self.cluster.populate(1).start(wait_for_binary_proto=True)
node1 = self.cluster.nodelist()[0]
waiter = NotificationWaiter(self, node1, ["STATUS_CHANGE", "TOPOLOGY_CHANGE"])
# need to block for up to 2 notifications (NEW_NODE and UP) so that these notifications
# don't confuse the state below
debug("Waiting for unwanted notifications...")
waiter.wait_for_notifications(timeout=30, num_notifications=2)
waiter.clear_notifications()
session = self.patient_cql_connection(node1)
# reduce system_distributed RF to 2 so we don't require forceful decommission
session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};")
debug("Adding second node...")
node2 = Node('node2', self.cluster, True, None, ('127.0.0.2', 7000), '7200', '0', None, binary_interface=('127.0.0.2', 9042))
self.cluster.add(node2, False)
node2.start(wait_other_notice=True)
debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
self.assertEquals(2, len(notifications), notifications)
for notification in notifications:
self.assertEquals(get_ip_from_node(node2), notification["address"][0])
self.assertEquals("NEW_NODE", notifications[0]["change_type"])
self.assertEquals("UP", notifications[1]["change_type"])
debug("Removing second node...")
waiter.clear_notifications()
node2.decommission()
node2.stop(gently=False)
debug("Waiting for notifications from {}".format(waiter.address))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=2)
self.assertEquals(2, len(notifications), notifications)
for notification in notifications:
self.assertEquals(get_ip_from_node(node2), notification["address"][0])
self.assertEquals("REMOVED_NODE", notifications[0]["change_type"])
self.assertEquals("DOWN", notifications[1]["change_type"])
def change_rpc_address_to_localhost(self):
"""
change node's 'rpc_address' from '127.0.0.x' to 'localhost (127.0.0.1)', increase port numbers
"""
cluster = self.cluster
i = 0
for node in cluster.nodelist():
debug('Set 127.0.0.1 to prevent IPv6 java prefs, set rpc_address: localhost in cassandra.yaml')
if cluster.version() < '4':
node.network_interfaces['thrift'] = ('127.0.0.1', node.network_interfaces['thrift'][1] + i)
node.network_interfaces['binary'] = ('127.0.0.1', node.network_interfaces['binary'][1] + i)
node.import_config_files() # this regenerates the yaml file and sets 'rpc_address' to the 'thrift' address
node.set_configuration_options(values={'rpc_address': 'localhost'})
debug(node.show())
i += 2
@since("3.0")
def schema_changes_test(self):
"""
@jira_ticket CASSANDRA-10328
Creating, updating and dropping a keyspace, a table and a materialized view
will generate the correct schema change notifications.
"""
self.cluster.populate(2).start(wait_for_binary_proto=True)
node1, node2 = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
waiter = NotificationWaiter(self, node2, ["SCHEMA_CHANGE"], keyspace='ks')
create_ks(session, 'ks', 3)
session.execute("create TABLE t (k int PRIMARY KEY , v int)")
session.execute("alter TABLE t add v1 int;")
session.execute("create MATERIALIZED VIEW mv as select * from t WHERE v IS NOT NULL AND v1 IS NOT NULL PRIMARY KEY (v, k)")
session.execute(" alter materialized view mv with min_index_interval = 100")
session.execute("drop MATERIALIZED VIEW mv")
session.execute("drop TABLE t")
session.execute("drop KEYSPACE ks")
debug("Waiting for notifications from {}".format(waiter.address,))
notifications = waiter.wait_for_notifications(timeout=60.0, num_notifications=8)
self.assertEquals(8, len(notifications), notifications)
self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'KEYSPACE'}, notifications[0])
self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u't'}, notifications[1])
self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u't'}, notifications[2])
self.assertDictContainsSubset({'change_type': u'CREATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[3])
self.assertDictContainsSubset({'change_type': u'UPDATED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[4])
self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u'mv'}, notifications[5])
self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'TABLE', u'table': u't'}, notifications[6])
self.assertDictContainsSubset({'change_type': u'DROPPED', 'target_type': u'KEYSPACE'}, notifications[7])
class TestVariousNotifications(Tester):
"""
Tests for various notifications/messages from Cassandra.
"""
@since('2.2')
def tombstone_failure_threshold_message_test(self):
"""
Ensure nodes return an error message in case of TombstoneOverwhelmingExceptions rather
than dropping the request. A drop makes the coordinator waits for the specified
read_request_timeout_in_ms.
@jira_ticket CASSANDRA-7886
"""
have_v5_protocol = self.cluster.version() >= LooseVersion('3.10')
self.allow_log_errors = True
self.cluster.set_configuration_options(
values={
'tombstone_failure_threshold': 500,
'read_request_timeout_in_ms': 30000, # 30 seconds
'range_request_timeout_in_ms': 40000
}
)
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
proto_version = 5 if have_v5_protocol else None
session = self.patient_cql_connection(node1, protocol_version=proto_version)
create_ks(session, 'test', 3)
session.execute(
"CREATE TABLE test ( "
"id int, mytext text, col1 int, col2 int, col3 int, "
"PRIMARY KEY (id, mytext) )"
)
# Add data with tombstones
values = map(lambda i: str(i), range(1000))
for value in values:
session.execute(SimpleStatement(
"insert into test (id, mytext, col1) values (1, '{}', null) ".format(
value
),
consistency_level=CL.ALL
))
failure_msg = ("Scanned over.* tombstones.* query aborted")
@timed(25)
def read_failure_query():
try:
session.execute(SimpleStatement("select * from test where id in (1,2,3,4,5)", consistency_level=CL.ALL))
except ReadFailure as exc:
if have_v5_protocol:
# at least one replica should have responded with a tombstone error
self.assertIsNotNone(exc.error_code_map)
self.assertEqual(0x0001, exc.error_code_map.values()[0])
except Exception:
raise
else:
self.fail('Expected ReadFailure')
read_failure_query()
# In almost all cases, we should find the failure message on node1 within a few seconds.
# If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
# If we still cannot find it then, we fail the test, as this is a problem.
try:
node1.watch_log_for(failure_msg, timeout=5)
except TimeoutError:
failure = (node1.grep_log(failure_msg) or
node2.grep_log(failure_msg) or
node3.grep_log(failure_msg))
self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
"after failed query"))
mark1 = node1.mark_log()
mark2 = node2.mark_log()
mark3 = node3.mark_log()
@timed(35)
def range_request_failure_query():
try:
session.execute(SimpleStatement("select * from test", consistency_level=CL.ALL))
except ReadFailure as exc:
if have_v5_protocol:
# at least one replica should have responded with a tombstone error
self.assertIsNotNone(exc.error_code_map)
self.assertEqual(0x0001, exc.error_code_map.values()[0])
except Exception:
raise
else:
self.fail('Expected ReadFailure')
range_request_failure_query()
# In almost all cases, we should find the failure message on node1 within a few seconds.
# If it is not on node1, we grep all logs, as it *absolutely* should be somewhere.
# If we still cannot find it then, we fail the test, as this is a problem.
try:
node1.watch_log_for(failure_msg, from_mark=mark1, timeout=5)
except TimeoutError:
failure = (node1.grep_log(failure_msg, from_mark=mark1) or
node2.grep_log(failure_msg, from_mark=mark2) or
node3.grep_log(failure_msg, from_mark=mark3))
self.assertTrue(failure, ("Cannot find tombstone failure threshold error in log "
"after range_request_timeout_query"))