Add intial tests for CASSANDRA-14145
Patch by Marcus Eriksson & Sam Tunnicliffe; reviewed by Jordan West for CASSANDRA-14145
closes #37
diff --git a/dtest_setup.py b/dtest_setup.py
index b8e1b23..756f542 100644
--- a/dtest_setup.py
+++ b/dtest_setup.py
@@ -391,11 +391,16 @@
# the failure detector can be quite slow in such tests with quick start/stop
phi_values = {'phi_convict_threshold': 5}
+ # enable read time tracking of repaired data between replicas by default
+ repaired_data_tracking_values = {'repaired_data_tracking_for_partition_reads_enabled': 'true',
+ 'repaired_data_tracking_for_range_reads_enabled': 'true',
+ 'report_unconfirmed_repaired_data_mismatches': 'true'}
+
timeout = 15000
if self.cluster_options is not None and len(self.cluster_options) > 0:
- values = merge_dicts(self.cluster_options, phi_values)
+ values = merge_dicts(self.cluster_options, phi_values, repaired_data_tracking_values)
else:
- values = merge_dicts(phi_values, {
+ values = merge_dicts(phi_values, repaired_data_tracking_values, {
'read_request_timeout_in_ms': timeout,
'range_request_timeout_in_ms': timeout,
'write_request_timeout_in_ms': timeout,
diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py
index a4fa5a9..42c7705 100644
--- a/repair_tests/incremental_repair_test.py
+++ b/repair_tests/incremental_repair_test.py
@@ -18,6 +18,7 @@
from tools.assertions import assert_almost_equal, assert_one
from tools.data import insert_c1c2
from tools.misc import new_node, ImmutableMapping
+from tools.jmxutils import make_mbean, JolokiaAgent, remove_perf_disable_shared_mem
since = pytest.mark.since
logger = logging.getLogger(__name__)
@@ -207,6 +208,7 @@
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -246,6 +248,7 @@
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -289,6 +292,7 @@
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -332,6 +336,7 @@
"""
# hinted handoff can create SSTable that we don't need after node3 restarted
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -459,6 +464,7 @@
* Verify repairs occurred and repairedAt was updated
"""
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'})
+ self.init_default_config()
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50'])
@@ -691,6 +697,7 @@
""" Test repaired data remains in sync after a move """
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start()
node1, node2, node3, node4 = self.cluster.nodelist()
@@ -727,6 +734,7 @@
""" Test repaired data remains in sync after a decommission """
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(4).start()
node1, node2, node3, node4 = self.cluster.nodelist()
@@ -763,6 +771,7 @@
""" Test repaired data remains in sync after a bootstrap """
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -804,6 +813,7 @@
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -837,6 +847,7 @@
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -865,6 +876,7 @@
self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -893,6 +905,7 @@
'num_tokens': 1,
'commitlog_sync_period_in_ms': 500,
'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'})
+ self.init_default_config()
self.cluster.populate(3).start()
node1, node2, node3 = self.cluster.nodelist()
@@ -918,3 +931,202 @@
self.assertRepairedAndUnrepaired(node1, 'ks')
self.assertRepairedAndUnrepaired(node2, 'ks')
self.assertRepairedAndUnrepaired(node3, 'ks')
+
+ @since('4.0')
+ def test_repaired_tracking_with_partition_deletes(self):
+ """
+ check that when an tracking repaired data status following a digest mismatch,
+ repaired data mismatches are marked as unconfirmed as we may skip sstables
+ after the partition delete are encountered.
+ @jira_ticket CASSANDRA-14145
+ """
+ session, node1, node2 = self.setup_for_repaired_data_tracking()
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i, i))
+
+ for node in self.cluster.nodelist():
+ node.flush()
+ self.assertNoRepairedSSTables(node, 'ks')
+
+ node1.repair(options=['ks'])
+ node2.stop(wait_other_notice=True)
+
+ session.execute("delete from ks.tbl where k = 5")
+
+ node1.flush()
+ node2.start(wait_other_notice=True)
+
+ # expect unconfirmed inconsistencies as the partition deletes cause some sstables to be skipped
+ with JolokiaAgent(node1) as jmx:
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5",
+ expect_unconfirmed_inconsistencies=True)
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5",
+ expect_unconfirmed_inconsistencies=True)
+ # no digest reads for range queries so blocking read repair metric isn't incremented
+ # *all* sstables are read for partition ranges too, and as the repaired set is still in sync there should
+ # be no inconsistencies
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+ @since('4.0')
+ def test_repaired_tracking_with_varying_sstable_sets(self):
+ """
+ verify that repaired data digests are computed over the merged data for each replica
+ and that the particular number of sstables on each doesn't affect the comparisons
+ both replicas start with the same repaired set, comprising 2 sstables. node1's is
+ then compacted and additional unrepaired data added (which overwrites some in the
+ repaired set). We expect the repaired digests to still match as the tracking will
+ force all sstables containing the partitions to be read
+ there are two variants of this, for single partition slice & names reads and range reads
+ @jira_ticket CASSANDRA-14145
+ """
+ session, node1, node2 = self.setup_for_repaired_data_tracking()
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i, i))
+
+ for node in self.cluster.nodelist():
+ node.flush()
+
+ for i in range(10,20):
+ session.execute(stmt, (i, i, i))
+
+ for node in self.cluster.nodelist():
+ node.flush()
+ self.assertNoRepairedSSTables(node, 'ks')
+
+ node1.repair(options=['ks'])
+ node2.stop(wait_other_notice=True)
+
+ session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)")
+ session.execute("insert into ks.tbl (k, c, v) values (15, 15, 155)")
+ node1.flush()
+ node1.compact()
+ node1.compact()
+ node2.start(wait_other_notice=True)
+
+ # we don't expect any inconsistencies as all repaired data is read on both replicas
+ with JolokiaAgent(node1) as jmx:
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5")
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5")
+ # no digest reads for range queries so read repair metric isn't incremented
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+ @since('4.0')
+ def test_repaired_tracking_with_mismatching_replicas(self):
+ """
+ verify that when replicas have different repaired sets, this can be detected via the digests
+ computed at read time. All nodes have start with the same data, but only 1 replica's sstables
+ are marked repaired. Then a divergence is introduced by overwriting on 1 replica only, which
+ is required to trigger a digest mismatch & full data read (for single partition reads).
+ As the repaired sets are different between the replicas, but no other shortcutting occurs
+ (no partition tombstones or sstable skipping) and no sstables are involved in pending repair
+ session, we expect confirmed inconsistencies to be reported.
+ there are two variants of this, for single partition slice & names reads and range reads
+ @jira_ticket CASSANDRA-14145
+ """
+ session, node1, node2 = self.setup_for_repaired_data_tracking()
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i, i))
+
+ for node in self.cluster.nodelist():
+ node.flush()
+
+ for i in range(10,20):
+ session.execute(stmt, (i, i, i))
+
+ for node in self.cluster.nodelist():
+ node.flush()
+ self.assertNoRepairedSSTables(node, 'ks')
+
+ # stop node 2 and mark its sstables repaired
+ node2.stop(wait_other_notice=True)
+ node2.run_sstablerepairedset(keyspace='ks')
+ # before restarting node2 overwrite some data on node1 to trigger digest mismatches
+ session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)")
+ node2.start(wait_for_binary_proto=True)
+
+ out1 = node1.run_sstablemetadata(keyspace='ks').stdout
+ out2 = node2.run_sstablemetadata(keyspace='ks').stdout
+
+ # verify the repaired at times for the sstables on node1/node2
+ assert all(t == 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out1)]])
+ assert all(t > 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out2)]])
+
+ # we expect inconsistencies due to sstables being marked repaired on one replica only
+ # these are marked confirmed because no sessions are pending & all sstables are
+ # skipped due to partition deletes
+ with JolokiaAgent(node1) as jmx:
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5",
+ expect_confirmed_inconsistencies=True)
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5",
+ expect_confirmed_inconsistencies=True)
+ # no digest reads for range queries so read repair metric isn't incremented
+ self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False)
+
+ def setup_for_repaired_data_tracking(self):
+ self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false',
+ 'num_tokens': 1,
+ 'commitlog_sync_period_in_ms': 500})
+ self.init_default_config()
+ self.cluster.populate(2)
+ node1, node2 = self.cluster.nodelist()
+ remove_perf_disable_shared_mem(node1) # necessary for jmx
+ self.cluster.start()
+
+ session = self.patient_exclusive_cql_connection(node1)
+ session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
+ session.execute("CREATE TABLE ks.tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'")
+ return session, node1, node2
+
+ def query_and_check_repaired_mismatches(self, jmx, session, query,
+ expect_read_repair=True,
+ expect_unconfirmed_inconsistencies=False,
+ expect_confirmed_inconsistencies=False):
+
+ rr_count = make_mbean('metrics', type='ReadRepair', name='ReconcileRead')
+ unconfirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesUnconfirmed,scope=tbl')
+ confirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesConfirmed,scope=tbl')
+
+ rr_before = self.get_attribute_count(jmx, rr_count)
+ uc_before = self.get_attribute_count(jmx, unconfirmed_count)
+ cc_before = self.get_attribute_count(jmx, confirmed_count)
+
+ stmt = SimpleStatement(query)
+ stmt.consistency_level = ConsistencyLevel.ALL
+ session.execute(stmt)
+
+ rr_after = self.get_attribute_count(jmx, rr_count)
+ uc_after = self.get_attribute_count(jmx, unconfirmed_count)
+ cc_after = self.get_attribute_count(jmx, confirmed_count)
+
+ logger.debug("Read Repair Count: {before}, {after}".format(before=rr_before, after=rr_after))
+ logger.debug("Unconfirmed Inconsistency Count: {before}, {after}".format(before=uc_before, after=uc_after))
+ logger.debug("Confirmed Inconsistency Count: {before}, {after}".format(before=cc_before, after=cc_after))
+
+ if expect_read_repair:
+ assert rr_after > rr_before
+ else:
+ assert rr_after == rr_before
+
+ if expect_unconfirmed_inconsistencies:
+ assert uc_after > uc_before
+ else:
+ assert uc_after == uc_before
+
+ if expect_confirmed_inconsistencies:
+ assert cc_after > cc_before
+ else:
+ assert cc_after == cc_before
+
+ def get_attribute_count(self, jmx, bean):
+ # the MBean may not have been initialized, in which case Jolokia agent will return
+ # a HTTP 404 response. If we receive such, we know that the count can only be 0
+ if jmx.has_mbean(bean):
+ return jmx.read_attribute(bean, 'Count')
+ else:
+ return 0