Add incremental repair support for --hosts, --force, and subrange repair
Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13818
diff --git a/repair_tests/incremental_repair_test.py b/repair_tests/incremental_repair_test.py
index b081d44..b99cf9e 100644
--- a/repair_tests/incremental_repair_test.py
+++ b/repair_tests/incremental_repair_test.py
@@ -7,6 +7,7 @@
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
+from cassandra.metadata import Murmur3Token
from ccmlib.common import is_win
from ccmlib.node import Node, ToolError
from nose.plugins.attrib import attr
@@ -74,6 +75,13 @@
self.assertTrue(all([t.repaired > 0 for t in data]), '{}'.format(data))
self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data))
+ def assertRepairedAndUnrepaired(self, node, keyspace):
+ """ Checks that a node has both repaired and unrepaired sstables for a given keyspace """
+ data = self._get_repaired_data(node, keyspace)
+ self.assertTrue(any([t.repaired > 0 for t in data]), '{}'.format(data))
+ self.assertTrue(any([t.repaired == 0 for t in data]), '{}'.format(data))
+ self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data))
+
@since('4.0')
def consistent_repair_test(self):
cluster = self.cluster
@@ -758,3 +766,98 @@
for node in self.cluster.nodelist():
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
+
+ @since('4.0')
+ def force_test(self):
+ """
+ forcing an incremental repair should incrementally repair any nodes
+ that are up, but should not promote the sstables to repaired
+ """
+ cluster = self.cluster
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
+ cluster.populate(3).start()
+ node1, node2, node3 = cluster.nodelist()
+
+ session = self.patient_exclusive_cql_connection(node3)
+ session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+ session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i))
+
+ node2.stop()
+
+ # repair should fail because node2 is down
+ with self.assertRaises(ToolError):
+ node1.repair(options=['ks'])
+
+ # run with force flag
+ node1.repair(options=['ks', '--force'])
+
+ # ... and verify nothing was promoted to repaired
+ self.assertNoRepairedSSTables(node1, 'ks')
+ self.assertNoRepairedSSTables(node2, 'ks')
+
+ @since('4.0')
+ def hosts_test(self):
+ """
+ running an incremental repair with hosts specified should incrementally repair
+ the given nodes, but should not promote the sstables to repaired
+ """
+ cluster = self.cluster
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
+ cluster.populate(3).start()
+ node1, node2, node3 = cluster.nodelist()
+
+ session = self.patient_exclusive_cql_connection(node3)
+ session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+ session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i))
+
+ # run with force flag
+ node1.repair(options=['ks', '-hosts', ','.join([node1.address(), node2.address()])])
+
+ # ... and verify nothing was promoted to repaired
+ self.assertNoRepairedSSTables(node1, 'ks')
+ self.assertNoRepairedSSTables(node2, 'ks')
+
+ @since('4.0')
+ def subrange_test(self):
+ """
+ running an incremental repair with hosts specified should incrementally repair
+ the given nodes, but should not promote the sstables to repaired
+ """
+ cluster = self.cluster
+ cluster.set_configuration_options(values={'hinted_handoff_enabled': False,
+ 'num_tokens': 1,
+ 'commitlog_sync_period_in_ms': 500,
+ 'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'})
+ cluster.populate(3).start()
+ node1, node2, node3 = cluster.nodelist()
+
+ session = self.patient_exclusive_cql_connection(node3)
+ session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
+ session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
+ stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
+ stmt.consistency_level = ConsistencyLevel.ALL
+ for i in range(10):
+ session.execute(stmt, (i, i))
+
+ for node in cluster.nodelist():
+ node.flush()
+ self.assertNoRepairedSSTables(node, 'ks')
+
+ # only repair the partition k=0
+ token = Murmur3Token.from_key(str(bytearray([0,0,0,0])))
+ # import ipdb; ipdb.set_trace()
+ # run with force flag
+ node1.repair(options=['ks', '-st', str(token.value - 1), '-et', str(token.value)])
+
+ # verify we have a mix of repaired and unrepaired sstables
+ self.assertRepairedAndUnrepaired(node1, 'ks')
+ self.assertRepairedAndUnrepaired(node2, 'ks')
+ self.assertRepairedAndUnrepaired(node3, 'ks')