blob: 966f08471675ed6a84a9cbe183694a48850b0eb1 [file] [log] [blame]
import os
import re
import time
import pytest
import logging
from collections import defaultdict
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from dtest import DtestTimeoutError, Tester, create_ks
since = pytest.mark.since
logger = logging.getLogger(__name__)
TRACE_DETERMINE_REPLICAS = re.compile('Determining replicas for mutation')
TRACE_SEND_MESSAGE = re.compile(r'Sending (?:MUTATION|MUTATION_REQ|REQUEST_RESPONSE) message to /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)')
TRACE_RESPOND_MESSAGE = re.compile(r'(?:MUTATION||MUTATION_REQ|REQUEST_RESPONSE) message received from /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)')
TRACE_COMMIT_LOG = re.compile('Appending to commitlog')
TRACE_FORWARD_WRITE = re.compile(r'Enqueuing forwarded write to /([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)')
# Some pre-computed murmur 3 hashes; there are no good python murmur3
# hashing libraries :(
murmur3_hashes = {
5: -7509452495886106294,
10: -6715243485458697746,
16: -5477287129830487822,
13: -5034495173465742853,
11: -4156302194539278891,
1: -4069959284402364209,
19: -3974532302236993209,
8: -3799847372828181882,
2: -3248873570005575792,
4: -2729420104000364805,
18: -2695747960476065067,
15: -1191135763843456182,
20: 1388667306199997068,
7: 1634052884888577606,
6: 2705480034054113608,
9: 3728482343045213994,
14: 4279681877540623768,
17: 5467144456125416399,
12: 8582886034424406875,
3: 9010454139840013625
}
def query_system_traces_length(session):
return len(list(session.execute("SELECT * FROM system_traces.events")))
def last_n_values_same(n, iterable):
last_n_values = iterable[-n:]
if len(last_n_values) != n:
return False
num_unique_values_in_last_n = len(set(last_n_values))
return num_unique_values_in_last_n == 1
def block_on_trace(session):
results_from_query = []
num_same_results_required = 5
# We should never run into a timeout, because
# eventually trace events should stop being generated.
# Just in case though, we add a large timeout, to prevent
# deadlock.
start = time.time()
timeout = start + 180
while not last_n_values_same(num_same_results_required, results_from_query):
results_from_query.append(query_system_traces_length(session))
time.sleep(1)
if time.time() > timeout:
raise DtestTimeoutError()
@pytest.mark.no_vnodes
class TestReplication(Tester):
"""
This test suite looks at how data is replicated across a cluster
and who the coordinator, replicas and forwarders involved are.
"""
def get_replicas_from_trace(self, trace):
"""
Look at trace and return a list of the replicas contacted
"""
coordinator = None
nodes_sent_write = set() # Nodes sent a write request
nodes_responded_write = set() # Nodes that acknowledges a write
replicas_written = set() # Nodes that wrote to their commitlog
forwarders = set() # Nodes that forwarded a write to another node
nodes_contacted = defaultdict(set) # node -> list of nodes that were contacted
for trace_event in trace.events:
# Step 1, find coordinator node:
activity = trace_event.description
source = trace_event.source
if activity.startswith('Determining replicas for mutation'):
if not coordinator:
coordinator = source
break
if not coordinator:
continue
for trace_event in trace.events:
activity = trace_event.description
source = trace_event.source
# Step 2, find all the nodes that each node talked to:
send_match = TRACE_SEND_MESSAGE.search(activity)
recv_match = TRACE_RESPOND_MESSAGE.search(activity)
if send_match:
node_contacted = send_match.groups()[0]
if source == coordinator:
nodes_sent_write.add(node_contacted)
nodes_contacted[source].add(node_contacted)
elif recv_match:
node_contacted = recv_match.groups()[0]
if source == coordinator:
nodes_responded_write.add(recv_match.groups()[0])
# Step 3, find nodes that forwarded to other nodes:
# (Happens in multi-datacenter clusters)
if source != coordinator:
forward_match = TRACE_FORWARD_WRITE.search(activity)
if forward_match:
forwarding_node = forward_match.groups()[0]
nodes_sent_write.add(forwarding_node)
forwarders.add(forwarding_node)
# Step 4, find nodes who actually wrote data:
if TRACE_COMMIT_LOG.search(activity):
replicas_written.add(source)
return {"coordinator": coordinator,
"forwarders": forwarders,
"replicas": replicas_written,
"nodes_sent_write": nodes_sent_write,
"nodes_responded_write": nodes_responded_write,
"nodes_contacted": nodes_contacted
}
def get_replicas_for_token(self, token, replication_factor,
strategy='SimpleStrategy', nodes=None):
"""
Figure out which node(s) should receive data for a given token and
replication factor
"""
if not nodes:
nodes = self.cluster.nodelist()
token_ranges = sorted(zip([n.initial_token for n in nodes], nodes))
replicas = []
# Find first replica:
for i, (r, node) in enumerate(token_ranges):
if token <= r:
replicas.append(node.address())
first_ring_position = i
break
else:
replicas.append(token_ranges[0][1].address())
first_ring_position = 0
# Find other replicas:
if strategy == 'SimpleStrategy':
for node in nodes[first_ring_position + 1:]:
replicas.append(node.address())
if len(replicas) == replication_factor:
break
if len(replicas) != replication_factor:
# Replication token range looped:
for node in nodes:
replicas.append(node.address())
if len(replicas) == replication_factor:
break
elif strategy == 'NetworkTopologyStrategy':
# NetworkTopologyStrategy can be broken down into multiple
# SimpleStrategies, just once per datacenter:
for dc, rf in list(replication_factor.items()):
dc_nodes = [n for n in nodes if n.data_center == dc]
replicas.extend(self.get_replicas_for_token(
token, rf, nodes=dc_nodes))
else:
raise NotImplemented('replication strategy not implemented: %s'
% strategy)
return replicas
def pprint_trace(self, trace):
"""
Pretty print a trace
"""
if logging.root.level == logging.DEBUG:
print(("-" * 40))
for t in trace.events:
print(("%s\t%s\t%s\t%s" % (t.source, t.source_elapsed, t.description, t.thread_name)))
print(("-" * 40))
def test_simple(self):
"""
Test the SimpleStrategy on a 3 node cluster
"""
self.cluster.populate(3).start()
node1 = self.cluster.nodelist()[0]
session = self.patient_exclusive_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
session.max_trace_wait = 120
replication_factor = 3
create_ks(session, 'test', replication_factor)
session.execute('CREATE TABLE test.test (id int PRIMARY KEY, value text)', trace=False)
for key, token in list(murmur3_hashes.items()):
logger.debug('murmur3 hash key={key},token={token}'.format(key=key, token=token))
query = SimpleStatement("INSERT INTO test (id, value) VALUES ({}, 'asdf')".format(key), consistency_level=ConsistencyLevel.ALL)
future = session.execute_async(query, trace=True)
future.result()
block_on_trace(session)
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
stats = self.get_replicas_from_trace(trace)
replicas_should_be = set(self.get_replicas_for_token(
token, replication_factor))
logger.debug('\nreplicas should be: %s' % replicas_should_be)
logger.debug('replicas were: %s' % stats['replicas'])
# Make sure the correct nodes are replicas:
assert stats['replicas'] == replicas_should_be
# Make sure that each replica node was contacted and
# acknowledged the write:
assert stats['nodes_sent_write'] == stats['nodes_responded_write']
@pytest.mark.resource_intensive
def test_network_topology(self):
"""
Test the NetworkTopologyStrategy on a 2DC 3:3 node cluster
"""
self.cluster.populate([3, 3]).start()
node1 = self.cluster.nodelist()[0]
ip_nodes = dict((node.address(), node) for node in self.cluster.nodelist())
session = self.patient_exclusive_cql_connection(node1, consistency_level=ConsistencyLevel.ALL)
replication_factor = {'dc1': 2, 'dc2': 2}
create_ks(session, 'test', replication_factor)
session.execute('CREATE TABLE test.test (id int PRIMARY KEY, value text)', trace=False)
forwarders_used = set()
for key, token in list(murmur3_hashes.items()):
query = SimpleStatement("INSERT INTO test (id, value) VALUES ({}, 'asdf')".format(key), consistency_level=ConsistencyLevel.ALL)
future = session.execute_async(query, trace=True)
future.result()
block_on_trace(session)
trace = future.get_query_trace(max_wait=120)
self.pprint_trace(trace)
stats = self.get_replicas_from_trace(trace)
replicas_should_be = set(self.get_replicas_for_token(
token, replication_factor, strategy='NetworkTopologyStrategy'))
logger.debug('Current token is %s' % token)
logger.debug('\nreplicas should be: %s' % replicas_should_be)
logger.debug('replicas were: %s' % stats['replicas'])
# Make sure the coordinator only talked to a single node in
# the second datacenter - CASSANDRA-5632:
num_in_other_dcs_contacted = 0
for node_contacted in stats['nodes_contacted'][node1.address()]:
if ip_nodes[node_contacted].data_center != node1.data_center:
num_in_other_dcs_contacted += 1
assert num_in_other_dcs_contacted == 1
# Record the forwarder used for each INSERT:
forwarders_used = forwarders_used.union(stats['forwarders'])
try:
# Make sure the correct nodes are replicas:
assert stats['replicas'] == replicas_should_be
# Make sure that each replica node was contacted and
# acknowledged the write:
assert stats['nodes_sent_write'] == stats['nodes_responded_write']
except AssertionError as e:
logger.debug("Failed on key %s and token %s." % (key, token))
raise e
# Given a diverse enough keyset, each node in the second
# datacenter should get a chance to be a forwarder:
assert len(forwarders_used) == 3
class TestSnitchConfigurationUpdate(Tester):
"""
Test to reproduce CASSANDRA-10238, wherein changing snitch properties to change racks without a restart
could violate RF contract.
Since CASSANDRA-10243 it is no longer possible to change rack or dc for live nodes so we must specify
which nodes should be shutdown in order to have the rack changed.
"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
"Fatal exception during initialization",
"Cannot start node if snitch's rack(.*) differs from previous rack(.*)",
"Cannot update data center or rack"
)
def check_endpoint_count(self, ks, table, nodes, rf):
"""
Check a dummy key expecting it to have replication factor as the sum of rf on all dcs.
"""
expected_count = sum([int(r) for d, r in rf.items() if d != 'class'])
for node in nodes:
cmd = "getendpoints {} {} dummy".format(ks, table)
out, err, _ = node.nodetool(cmd)
if len(err.strip()) > 0:
logger.debug("Error running 'nodetool {}': {}".format(cmd, err))
logger.debug("Endpoints for node {}, expected count is {}".format(node.address(), expected_count))
logger.debug(out)
ips_found = re.findall('(\d+\.\d+\.\d+\.\d+)', out)
assert len(ips_found) == expected_count, "wrong number of endpoints found ({}), should be: {}".format(len(ips_found), expected_count)
def wait_for_nodes_on_racks(self, nodes, expected_racks):
"""
Waits for nodes to match the expected racks.
"""
regex = re.compile(r"^UN(?:\s*)127\.0\.0(?:.*)\s(.*)$", re.IGNORECASE)
for i, node in enumerate(nodes):
wait_expire = time.time() + 120
while time.time() < wait_expire:
out, err, _ = node.nodetool("status")
logger.debug(out)
if len(err.strip()) > 0:
logger.debug("Error trying to run nodetool status: {}".format(err))
racks = []
for line in out.split(os.linesep):
m = regex.match(line)
if m:
racks.append(m.group(1))
racks.sort() #order is not deterministic
if racks == sorted(expected_racks.copy()):
# great, the topology change is propagated
logger.debug("Topology change detected on node {}".format(i))
break
else:
logger.debug("Waiting for topology change on node {}".format(i))
time.sleep(5)
else:
raise RuntimeError("Ran out of time waiting for topology to change on node {}".format(i))
def test_rf_collapse_gossiping_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are collapsed using a gossiping property file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='GossipingPropertyFileSnitch',
snitch_config_file='cassandra-rackdc.properties',
snitch_lines_before=lambda i, node: ["dc=dc1", "rack=rack{}".format(i)],
snitch_lines_after=lambda i, node: ["dc=dc1", "rack=rack1"],
final_racks=["rack1", "rack1", "rack1"],
nodes_to_shutdown=[0, 2])
def test_rf_expand_gossiping_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are expanded using a gossiping property file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='GossipingPropertyFileSnitch',
snitch_config_file='cassandra-rackdc.properties',
snitch_lines_before=lambda i, node: ["dc=dc1", "rack=rack1"],
snitch_lines_after=lambda i, node: ["dc=dc1", "rack=rack{}".format(i)],
final_racks=["rack0", "rack1", "rack2"],
nodes_to_shutdown=[0, 2])
@pytest.mark.resource_intensive
def test_rf_collapse_gossiping_property_file_snitch_multi_dc(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are collapsed using a gossiping property file snitch the RF is not impacted, in a multi-dc environment.
"""
self._test_rf_on_snitch_update(nodes=[3, 3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3, 'dc2': 3},
snitch_class_name='GossipingPropertyFileSnitch',
snitch_config_file='cassandra-rackdc.properties',
snitch_lines_before=lambda i, node: ["dc={}".format(node.data_center), "rack=rack{}".format(i % 3)],
snitch_lines_after=lambda i, node: ["dc={}".format(node.data_center), "rack=rack1"],
final_racks=["rack1", "rack1", "rack1", "rack1", "rack1", "rack1"],
nodes_to_shutdown=[0, 2, 3, 5])
@pytest.mark.resource_intensive
def test_rf_expand_gossiping_property_file_snitch_multi_dc(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are expanded using a gossiping property file snitch the RF is not impacted, in a multi-dc environment.
"""
self._test_rf_on_snitch_update(nodes=[3, 3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3, 'dc2': 3},
snitch_class_name='GossipingPropertyFileSnitch',
snitch_config_file='cassandra-rackdc.properties',
snitch_lines_before=lambda i, node: ["dc={}".format(node.data_center), "rack=rack1"],
snitch_lines_after=lambda i, node: ["dc={}".format(node.data_center), "rack=rack{}".format(i % 3)],
final_racks=["rack0", "rack1", "rack2", "rack0", "rack1", "rack2"],
nodes_to_shutdown=[0, 2, 3, 5])
def test_rf_collapse_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are collapsed using a property file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='PropertyFileSnitch',
snitch_config_file='cassandra-topology.properties',
snitch_lines_before=lambda i, node: ["127.0.0.1=dc1:rack0", "127.0.0.2=dc1:rack1", "127.0.0.3=dc1:rack2"],
snitch_lines_after=lambda i, node: ["default=dc1:rack0"],
final_racks=["rack0", "rack0", "rack0"],
nodes_to_shutdown=[1, 2])
def test_rf_expand_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are expanded using a property file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='PropertyFileSnitch',
snitch_config_file='cassandra-topology.properties',
snitch_lines_before=lambda i, node: ["default=dc1:rack0"],
snitch_lines_after=lambda i, node: ["127.0.0.1=dc1:rack0", "127.0.0.2=dc1:rack1", "127.0.0.3=dc1:rack2"],
final_racks=["rack0", "rack1", "rack2"],
nodes_to_shutdown=[1, 2])
@since('2.0', max_version='2.1.x')
def test_rf_collapse_yaml_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are collapsed using a yaml file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='YamlFileNetworkTopologySnitch',
snitch_config_file='cassandra-topology.yaml',
snitch_lines_before=lambda i, node: ["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack0",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - rack_name: rack1",
" nodes:",
" - broadcast_address: 127.0.0.2",
" - rack_name: rack2",
" nodes:",
" - broadcast_address: 127.0.0.3"],
snitch_lines_after=lambda i, node: ["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack0",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - broadcast_address: 127.0.0.2",
" - broadcast_address: 127.0.0.3"],
final_racks=["rack0", "rack0", "rack0"],
nodes_to_shutdown=[1, 2])
@since('2.0', max_version='2.1.x')
def test_rf_expand_yaml_file_snitch(self):
"""
@jira_ticket CASSANDRA-10238
@jira_ticket CASSANDRA-10242
@jira_ticket CASSANDRA-10243
Confirm that when racks are expanded using a yaml file snitch the RF is not impacted.
"""
self._test_rf_on_snitch_update(nodes=[3], rf={'class': '\'NetworkTopologyStrategy\'', 'dc1': 3},
snitch_class_name='YamlFileNetworkTopologySnitch',
snitch_config_file='cassandra-topology.yaml',
snitch_lines_before=lambda i, node: ["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack0",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - broadcast_address: 127.0.0.2",
" - broadcast_address: 127.0.0.3"],
snitch_lines_after=lambda i, node: ["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack0",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - rack_name: rack1",
" nodes:",
" - broadcast_address: 127.0.0.2",
" - rack_name: rack2",
" nodes:",
" - broadcast_address: 127.0.0.3"],
final_racks=["rack0", "rack1", "rack2"],
nodes_to_shutdown=[1, 2])
def _test_rf_on_snitch_update(self, nodes, rf, snitch_class_name, snitch_config_file,
snitch_lines_before, snitch_lines_after, final_racks, nodes_to_shutdown):
cluster = self.cluster
cluster.populate(nodes)
cluster.set_configuration_options(
values={'endpoint_snitch': 'org.apache.cassandra.locator.{}'.format(snitch_class_name)}
)
# start with separate racks
for i, node in enumerate(cluster.nodelist()):
with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
for line in snitch_lines_before(i, node):
topo_file.write(line + os.linesep)
cluster.start()
session = self.patient_cql_connection(cluster.nodelist()[0])
options = (', ').join(['\'{}\': {}'.format(d, r) for d, r in rf.items()])
session.execute("CREATE KEYSPACE testing WITH replication = {{{}}}".format(options))
session.execute("CREATE TABLE testing.rf_test (key text PRIMARY KEY, value text)")
# avoid errors in nodetool calls below checking for the endpoint count
session.cluster.control_connection.wait_for_schema_agreement()
# make sure endpoint count is correct before continuing with the rest of the test
self.check_endpoint_count('testing', 'rf_test', cluster.nodelist(), rf)
for i in nodes_to_shutdown:
node = cluster.nodelist()[i]
logger.debug("Shutting down node {}".format(node.address()))
node.stop(wait_other_notice=True)
logger.debug("Updating snitch file")
for i, node in enumerate(cluster.nodelist()):
with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
for line in snitch_lines_after(i, node):
topo_file.write(line + os.linesep)
# wait until the config is reloaded before we restart the nodes, the default check period is
# 5 seconds so we wait for 10 seconds to be sure
logger.debug("Waiting 10 seconds to make sure snitch file is reloaded...")
time.sleep(10)
for i in nodes_to_shutdown:
node = cluster.nodelist()[i]
logger.debug("Restarting node {}".format(node.address()))
# Since CASSANDRA-10242 it is no longer
# possible to start a node with a different rack unless we specify -Dcassandra.ignore_rack and since
# CASSANDRA-9474 it is no longer possible to start a node with a different dc unless we specify
# -Dcassandra.ignore_dc.
node.start(jvm_args=['-Dcassandra.ignore_rack=true', '-Dcassandra.ignore_dc=true'],
wait_for_binary_proto=True)
self.wait_for_nodes_on_racks(cluster.nodelist(), final_racks)
# nodes have joined racks, check endpoint counts again
self.check_endpoint_count('testing', 'rf_test', cluster.nodelist(), rf)
def test_cannot_restart_with_different_rack(self):
"""
@jira_ticket CASSANDRA-10242
Test that we cannot restart with a different rack if '-Dcassandra.ignore_rack=true' is not specified.
"""
cluster = self.cluster
cluster.populate(1)
cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.{}'
.format('GossipingPropertyFileSnitch')})
node1 = cluster.nodelist()[0]
with open(os.path.join(node1.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as topo_file:
for line in ["dc={}".format(node1.data_center), "rack=rack1"]:
topo_file.write(line + os.linesep)
logger.debug("Starting node {} with rack1".format(node1.address()))
node1.start(wait_for_binary_proto=True)
logger.debug("Shutting down node {}".format(node1.address()))
node1.stop(wait_other_notice=True)
logger.debug("Updating snitch file with rack2")
for node in cluster.nodelist():
with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as topo_file:
for line in ["dc={}".format(node.data_center), "rack=rack2"]:
topo_file.write(line + os.linesep)
logger.debug("Restarting node {} with rack2".format(node1.address()))
mark = node1.mark_log()
node1.start()
# check node not running
logger.debug("Waiting for error message in log file")
if cluster.version() >= '2.2':
node1.watch_log_for("Cannot start node if snitch's rack(.*) differs from previous rack(.*)",
from_mark=mark)
else:
node1.watch_log_for("Fatal exception during initialization", from_mark=mark)
def test_failed_snitch_update_gossiping_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10243
Test that we cannot change the rack of a live node with GossipingPropertyFileSnitch.
"""
self._test_failed_snitch_update(nodes=[3],
snitch_class_name='GossipingPropertyFileSnitch',
snitch_config_file='cassandra-rackdc.properties',
snitch_lines_before=["dc=dc1", "rack=rack1"],
snitch_lines_after=["dc=dc1", "rack=rack2"],
racks=["rack1", "rack1", "rack1"],
error='')
def test_failed_snitch_update_property_file_snitch(self):
"""
@jira_ticket CASSANDRA-10243
Test that we cannot change the rack of a live node with PropertyFileSnitch.
"""
self._test_failed_snitch_update(nodes=[3],
snitch_class_name='PropertyFileSnitch',
snitch_config_file='cassandra-topology.properties',
snitch_lines_before=["default=dc1:rack1"],
snitch_lines_after=["default=dc1:rack2"],
racks=["rack1", "rack1", "rack1"],
error='Cannot update data center or rack')
@since('2.0', max_version='2.1.x')
def test_failed_snitch_update_yaml_file_snitch(self):
"""
@jira_ticket CASSANDRA-10243
Test that we cannot change the rack of a live node with YamlFileNetworkTopologySnitch.
"""
self._test_failed_snitch_update(nodes=[3],
snitch_class_name='YamlFileNetworkTopologySnitch',
snitch_config_file='cassandra-topology.yaml',
snitch_lines_before=["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack1",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - broadcast_address: 127.0.0.2",
" - broadcast_address: 127.0.0.3"],
snitch_lines_after=["topology:",
" - dc_name: dc1",
" racks:",
" - rack_name: rack2",
" nodes:",
" - broadcast_address: 127.0.0.1",
" - broadcast_address: 127.0.0.2",
" - broadcast_address: 127.0.0.3"],
racks=["rack1", "rack1", "rack1"],
error='Cannot update data center or rack')
def _test_failed_snitch_update(self, nodes, snitch_class_name, snitch_config_file,
snitch_lines_before, snitch_lines_after, racks, error):
cluster = self.cluster
cluster.populate(nodes)
cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.{}'
.format(snitch_class_name)})
# start with initial snitch lines
for node in cluster.nodelist():
with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
for line in snitch_lines_before:
topo_file.write(line + os.linesep)
cluster.start()
# check racks are as specified
self.wait_for_nodes_on_racks(cluster.nodelist(), racks)
marks = [node.mark_log() for node in cluster.nodelist()]
logger.debug("Updating snitch file")
for node in cluster.nodelist():
with open(os.path.join(node.get_conf_dir(), snitch_config_file), 'w') as topo_file:
for line in snitch_lines_after:
topo_file.write(line + os.linesep)
# wait until the config is reloaded, the default check period is
# 5 seconds so we wait for 10 seconds to be sure
logger.debug("Waiting 10 seconds to make sure snitch file is reloaded...")
time.sleep(10)
# check racks have not changed
self.wait_for_nodes_on_racks(cluster.nodelist(), racks)
# check error in log files if applicable
if error:
for node, mark in zip(cluster.nodelist(), marks):
node.watch_log_for(error, from_mark=mark)
def test_switch_data_center_startup_fails(self):
"""
@jira_ticket CASSANDRA-9474
Confirm that switching data centers fails to bring up the node.
"""
expected_error = (r"Cannot start node if snitch's data center (.*) differs from previous data center (.*)\. "
"Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_dc=true.")
self.fixture_dtest_setup.ignore_log_patterns = [expected_error]
cluster = self.cluster
cluster.populate(1)
cluster.set_configuration_options(values={'endpoint_snitch': 'org.apache.cassandra.locator.GossipingPropertyFileSnitch'})
node = cluster.nodelist()[0]
with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as topo_file:
topo_file.write("dc=dc9" + os.linesep)
topo_file.write("rack=rack1" + os.linesep)
cluster.start()
node.stop()
with open(os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties'), 'w') as topo_file:
topo_file.write("dc=dc0" + os.linesep)
topo_file.write("rack=rack1" + os.linesep)
mark = node.mark_log()
node.start()
node.watch_log_for(expected_error, from_mark=mark, timeout=120)