blob: b081d44616e8b058c427890aaac97d89ea839b0a [file] [log] [blame]
import time
from datetime import datetime
from collections import Counter, namedtuple
from re import findall, compile
from unittest import skip
from uuid import UUID, uuid1
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from ccmlib.common import is_win
from ccmlib.node import Node, ToolError
from nose.plugins.attrib import attr
from dtest import Tester, debug, create_ks, create_cf
from tools.assertions import assert_almost_equal, assert_one
from tools.data import insert_c1c2
from tools.decorators import since, no_vnodes
from tools.misc import new_node
class ConsistentState(object):
PREPARING = 0
PREPARED = 1
REPAIRING = 2
FINALIZE_PROMISED = 3
FINALIZED = 4
FAILED = 5
class TestIncRepair(Tester):
ignore_log_patterns = (r'Can\'t send migration request: node.*is down',)
@classmethod
def _get_repaired_data(cls, node, keyspace):
_sstable_name = compile('SSTable: (.+)')
_repaired_at = compile('Repaired at: (\d+)')
_pending_repair = compile('Pending repair: (\-\-|null|[a-f0-9\-]+)')
_sstable_data = namedtuple('_sstabledata', ('name', 'repaired', 'pending_id'))
out = node.run_sstablemetadata(keyspace=keyspace).stdout
def matches(pattern):
return filter(None, [pattern.match(l) for l in out.split('\n')])
names = [m.group(1) for m in matches(_sstable_name)]
repaired_times = [int(m.group(1)) for m in matches(_repaired_at)]
def uuid_or_none(s):
return None if s == 'null' or s == '--' else UUID(s)
pending_repairs = [uuid_or_none(m.group(1)) for m in matches(_pending_repair)]
assert names
assert repaired_times
assert pending_repairs
assert len(names) == len(repaired_times) == len(pending_repairs)
return [_sstable_data(*a) for a in zip(names, repaired_times, pending_repairs)]
def assertNoRepairedSSTables(self, node, keyspace):
""" Checks that no sstables are marked repaired, and none are marked pending repair """
data = self._get_repaired_data(node, keyspace)
self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data))
self.assertTrue(all([t.pending_id is None for t in data]))
def assertAllPendingRepairSSTables(self, node, keyspace, pending_id=None):
""" Checks that no sstables are marked repaired, and all are marked pending repair """
data = self._get_repaired_data(node, keyspace)
self.assertTrue(all([t.repaired == 0 for t in data]), '{}'.format(data))
if pending_id:
self.assertTrue(all([t.pending_id == pending_id for t in data]))
else:
self.assertTrue(all([t.pending_id is not None for t in data]))
def assertAllRepairedSSTables(self, node, keyspace):
""" Checks that all sstables are marked repaired, and none are marked pending repair """
data = self._get_repaired_data(node, keyspace)
self.assertTrue(all([t.repaired > 0 for t in data]), '{}'.format(data))
self.assertTrue(all([t.pending_id is None for t in data]), '{}'.format(data))
@since('4.0')
def consistent_repair_test(self):
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# make data inconsistent between nodes
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
stmt.consistency_level = ConsistencyLevel.ALL
for i in range(10):
session.execute(stmt, (i, i))
node3.flush()
time.sleep(1)
node3.stop(gently=False)
stmt.consistency_level = ConsistencyLevel.QUORUM
session = self.exclusive_cql_connection(node1)
for i in range(10):
session.execute(stmt, (i + 10, i + 10))
node1.flush()
time.sleep(1)
node1.stop(gently=False)
node3.start(wait_other_notice=True, wait_for_binary_proto=True)
session = self.exclusive_cql_connection(node2)
for i in range(10):
session.execute(stmt, (i + 20, i + 20))
node1.start(wait_other_notice=True, wait_for_binary_proto=True)
# flush and check that no sstables are marked repaired
for node in cluster.nodelist():
node.flush()
self.assertNoRepairedSSTables(node, 'ks')
session = self.patient_exclusive_cql_connection(node)
results = list(session.execute("SELECT * FROM system.repairs"))
self.assertEqual(len(results), 0, str(results))
# disable compaction so we can verify sstables are marked pending repair
for node in cluster.nodelist():
node.nodetool('disableautocompaction ks tbl')
node1.repair(options=['ks'])
# check that all participating nodes have the repair recorded in their system
# table, that all nodes are listed as participants, and that all sstables are
# (still) marked pending repair
expected_participants = {n.address() for n in cluster.nodelist()}
recorded_pending_ids = set()
for node in cluster.nodelist():
session = self.patient_exclusive_cql_connection(node)
results = list(session.execute("SELECT * FROM system.repairs"))
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(set(result.participants), expected_participants)
self.assertEqual(result.state, ConsistentState.FINALIZED, "4=FINALIZED")
pending_id = result.parent_id
self.assertAllPendingRepairSSTables(node, 'ks', pending_id)
recorded_pending_ids.add(pending_id)
self.assertEqual(len(recorded_pending_ids), 1)
# sstables are compacted out of pending repair by a compaction
# task, we disabled compaction earlier in the test, so here we
# force the compaction and check that all sstables are promoted
for node in cluster.nodelist():
node.nodetool('compact ks tbl')
self.assertAllRepairedSSTables(node, 'ks')
def _make_fake_session(self, keyspace, table):
node1 = self.cluster.nodelist()[0]
session = self.patient_exclusive_cql_connection(node1)
session_id = uuid1()
cfid = list(session.execute("SELECT * FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'".format(keyspace, table)))[0].id
now = datetime.now()
# pulled from a repairs table
ranges = {'\x00\x00\x00\x08K\xc2\xed\\<\xd3{X\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6',
'\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4',
'\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4\x00\x00\x00\x08K\xc2\xed\\<\xd3{X'}
ranges = {buffer(b) for b in ranges}
for node in self.cluster.nodelist():
session = self.patient_exclusive_cql_connection(node)
session.execute("INSERT INTO system.repairs "
"(parent_id, cfids, coordinator, last_update, participants, ranges, repaired_at, started_at, state) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
[session_id, {cfid}, node1.address(), now, {n.address() for n in self.cluster.nodelist()},
ranges, now, now, ConsistentState.REPAIRING]) # 2=REPAIRING
time.sleep(1)
for node in self.cluster.nodelist():
node.stop(gently=False)
for node in self.cluster.nodelist():
node.start()
return session_id
@since('4.0')
def manual_session_fail_test(self):
""" check manual failing of repair sessions via nodetool works properly """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# make data inconsistent between nodes
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
self.assertIn("no sessions", out.stdout)
session_id = self._make_fake_session('ks', 'tbl')
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("REPAIRING", line)
node1.nodetool("repair_admin --cancel {}".format(session_id))
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin --all')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("FAILED", line)
@since('4.0')
def manual_session_cancel_non_coordinator_failure_test(self):
""" check manual failing of repair sessions via a node other than the coordinator fails """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# make data inconsistent between nodes
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
self.assertIn("no sessions", out.stdout)
session_id = self._make_fake_session('ks', 'tbl')
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("REPAIRING", line)
try:
node2.nodetool("repair_admin --cancel {}".format(session_id))
self.fail("cancel from a non coordinator should fail")
except ToolError:
pass # expected
# nothing should have changed
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("REPAIRING", line)
@since('4.0')
def manual_session_force_cancel_test(self):
""" check manual failing of repair sessions via a non-coordinator works if the --force flag is set """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'num_tokens': 1, 'commitlog_sync_period_in_ms': 500})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# make data inconsistent between nodes
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
self.assertIn("no sessions", out.stdout)
session_id = self._make_fake_session('ks', 'tbl')
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("REPAIRING", line)
node2.nodetool("repair_admin --cancel {} --force".format(session_id))
for node in self.cluster.nodelist():
out = node.nodetool('repair_admin --all')
lines = out.stdout.split('\n')
self.assertGreater(len(lines), 1)
line = lines[1]
self.assertIn(str(session_id), line)
self.assertIn("FAILED", line)
def sstable_marking_test(self):
"""
* Launch a three node cluster
* Stop node3
* Write 10K rows with stress
* Start node3
* Issue an incremental repair, and wait for it to finish
* Run sstablemetadata on every node, assert that all sstables are marked as repaired
"""
cluster = self.cluster
# hinted handoff can create SSTable that we don't need after node3 restarted
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node3.stop(gently=True)
node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=3)'])
node1.flush()
node2.flush()
node3.start(wait_other_notice=True)
if node3.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
node3.watch_log_for("Initializing keyspace1.standard1", filename=log_file)
# wait for things to settle before starting repair
time.sleep(1)
if cluster.version() >= "2.2":
node3.repair()
else:
node3.nodetool("repair -par -inc")
if cluster.version() >= '4.0':
# sstables are compacted out of pending repair by a compaction
for node in cluster.nodelist():
node.nodetool('compact keyspace1 standard1')
for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist()):
self.assertNotIn('Repaired at: 0', out)
def multiple_repair_test(self):
"""
* Launch a three node cluster
* Create a keyspace with RF 3 and a table
* Insert 49 rows
* Stop node3
* Insert 50 more rows
* Restart node3
* Issue an incremental repair on node3
* Stop node2
* Insert a final50 rows
* Restart node2
* Issue an incremental repair on node2
* Replace node3 with a new node
* Verify data integrity
# TODO: Several more verifications of data need to be interspersed throughout the test. The final assertion is insufficient.
@jira_ticket CASSANDRA-10644
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 3)
create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
debug("insert data")
insert_c1c2(session, keys=range(1, 50), consistency=ConsistencyLevel.ALL)
node1.flush()
debug("bringing down node 3")
node3.flush()
node3.stop(gently=False)
debug("inserting additional data into node 1 and 2")
insert_c1c2(session, keys=range(50, 100), consistency=ConsistencyLevel.TWO)
node1.flush()
node2.flush()
debug("restarting and repairing node 3")
node3.start(wait_for_binary_proto=True)
if cluster.version() >= "2.2":
node3.repair()
else:
node3.nodetool("repair -par -inc")
# wait stream handlers to be closed on windows
# after session is finished (See CASSANDRA-10644)
if is_win:
time.sleep(2)
debug("stopping node 2")
node2.stop(gently=False)
debug("inserting data in nodes 1 and 3")
insert_c1c2(session, keys=range(100, 150), consistency=ConsistencyLevel.TWO)
node1.flush()
node3.flush()
debug("start and repair node 2")
node2.start(wait_for_binary_proto=True)
if cluster.version() >= "2.2":
node2.repair()
else:
node2.nodetool("repair -par -inc")
debug("replace node and check data integrity")
node3.stop(gently=False)
node5 = Node('node5', cluster, True, ('127.0.0.5', 9160), ('127.0.0.5', 7000), '7500', '0', None, ('127.0.0.5', 9042))
cluster.add(node5, False)
node5.start(replace_address='127.0.0.3', wait_other_notice=True)
assert_one(session, "SELECT COUNT(*) FROM ks.cf LIMIT 200", [149])
def sstable_repairedset_test(self):
"""
* Launch a two node cluster
* Insert data with stress
* Stop node2
* Run sstablerepairedset against node2
* Start node2
* Run sstablemetadata on both nodes, pipe to a file
* Verify the output of sstablemetadata shows no repairs have occurred
* Stop node1
* Insert more data with stress
* Start node1
* Issue an incremental repair
* Run sstablemetadata on both nodes again, pipe to a new file
* Verify repairs occurred and repairedAt was updated
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50'])
node1.flush()
node2.flush()
node2.stop(gently=False)
node2.run_sstablerepairedset(keyspace='keyspace1')
node2.start(wait_for_binary_proto=True)
initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout
initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout
matches = findall('(?<=Repaired at:).*', '\n'.join([initialOut1, initialOut2]))
debug("Repair timestamps are: {}".format(matches))
uniquematches = set(matches)
matchcount = Counter(matches)
self.assertGreaterEqual(len(uniquematches), 2, uniquematches)
self.assertGreaterEqual(max(matchcount), 1, matchcount)
self.assertIn('Repaired at: 0', '\n'.join([initialOut1, initialOut2]))
node1.stop()
node2.stress(['write', 'n=15K', 'no-warmup', '-schema', 'replication(factor=2)'])
node2.flush()
node1.start(wait_for_binary_proto=True)
if cluster.version() >= "2.2":
node1.repair()
else:
node1.nodetool("repair -par -inc")
if cluster.version() >= '4.0':
# sstables are compacted out of pending repair by a compaction
for node in cluster.nodelist():
node.nodetool('compact keyspace1 standard1')
finalOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout
finalOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout
matches = findall('(?<=Repaired at:).*', '\n'.join([finalOut1, finalOut2]))
debug(matches)
uniquematches = set(matches)
matchcount = Counter(matches)
self.assertGreaterEqual(len(uniquematches), 2)
self.assertGreaterEqual(max(matchcount), 2)
self.assertNotIn('Repaired at: 0', '\n'.join([finalOut1, finalOut2]))
def compaction_test(self):
"""
Test we can major compact after an incremental repair
* Launch a three node cluster
* Create a keyspace with RF 3 and a table
* Stop node3
* Insert 100 rows
* Restart node3
* Issue an incremental repair
* Insert 50 more rows
* Perform a major compaction on node3
* Verify all data is present
# TODO: I have no idea what this is testing. The assertions do not verify anything meaningful.
# TODO: Fix all the string formatting
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 3)
session.execute("create table tab(key int PRIMARY KEY, val int);")
node3.stop()
for x in range(0, 100):
session.execute("insert into tab(key,val) values(" + str(x) + ",0)")
node1.flush()
node3.start(wait_for_binary_proto=True)
if cluster.version() >= "2.2":
node3.repair()
else:
node3.nodetool("repair -par -inc")
for x in range(0, 150):
session.execute("insert into tab(key,val) values(" + str(x) + ",1)")
cluster.flush()
node3.nodetool('compact')
for x in range(0, 150):
assert_one(session, "select val from tab where key =" + str(x), [1])
@since("2.2")
def multiple_full_repairs_lcs_test(self):
"""
@jira_ticket CASSANDRA-11172 - repeated full repairs should not cause infinite loop in getNextBackgroundTask
"""
cluster = self.cluster
cluster.populate(2).start(wait_for_binary_proto=True)
node1, node2 = cluster.nodelist()
for x in xrange(0, 10):
node1.stress(['write', 'n=100k', 'no-warmup', '-rate', 'threads=10', '-schema', 'compaction(strategy=LeveledCompactionStrategy,sstable_size_in_mb=10)', 'replication(factor=2)'])
cluster.flush()
cluster.wait_for_compactions()
node1.nodetool("repair -full keyspace1 standard1")
@attr('long')
@skip('hangs CI')
def multiple_subsequent_repair_test(self):
"""
@jira_ticket CASSANDRA-8366
There is an issue with subsequent inc repairs increasing load size.
So we perform several repairs and check that the expected amount of data exists.
* Launch a three node cluster
* Write 5M rows with stress
* Wait for minor compactions to finish
* Issue an incremental repair on each node, sequentially
* Issue major compactions on each node
* Sleep for a while so load size can be propagated between nodes
* Verify the correct amount of data is on each node
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
debug("Inserting data with stress")
node1.stress(['write', 'n=5M', 'no-warmup', '-rate', 'threads=10', '-schema', 'replication(factor=3)'])
debug("Flushing nodes")
cluster.flush()
debug("Waiting compactions to finish")
cluster.wait_for_compactions()
if self.cluster.version() >= '2.2':
debug("Repairing node1")
node1.nodetool("repair")
debug("Repairing node2")
node2.nodetool("repair")
debug("Repairing node3")
node3.nodetool("repair")
else:
debug("Repairing node1")
node1.nodetool("repair -par -inc")
debug("Repairing node2")
node2.nodetool("repair -par -inc")
debug("Repairing node3")
node3.nodetool("repair -par -inc")
# Using "print" instead of debug() here is on purpose. The compactions
# take a long time and don't print anything by default, which can result
# in the test being timed out after 20 minutes. These print statements
# prevent it from being timed out.
print "compacting node1"
node1.compact()
print "compacting node2"
node2.compact()
print "compacting node3"
node3.compact()
# wait some time to be sure the load size is propagated between nodes
debug("Waiting for load size info to be propagated between nodes")
time.sleep(45)
load_size_in_kb = float(sum(map(lambda n: n.data_size(), [node1, node2, node3])))
load_size = load_size_in_kb / 1024 / 1024
debug("Total Load size: {}GB".format(load_size))
# There is still some overhead, but it's lot better. We tolerate 25%.
expected_load_size = 4.5 # In GB
assert_almost_equal(load_size, expected_load_size, error=0.25)
@attr('resource-intensive')
def sstable_marking_test_not_intersecting_all_ranges(self):
"""
@jira_ticket CASSANDRA-10299
* Launch a four node cluster
* Insert data with stress
* Issue an incremental repair on each node sequentially
* Assert no extra, unrepaired sstables are generated
"""
cluster = self.cluster
cluster.populate(4).start(wait_for_binary_proto=True)
node1, node2, node3, node4 = cluster.nodelist()
debug("Inserting data with stress")
node1.stress(['write', 'n=3', 'no-warmup', '-rate', 'threads=1', '-schema', 'replication(factor=3)'])
debug("Flushing nodes")
cluster.flush()
repair_options = '' if self.cluster.version() >= '2.2' else '-inc -par'
debug("Repairing node 1")
node1.nodetool("repair {}".format(repair_options))
debug("Repairing node 2")
node2.nodetool("repair {}".format(repair_options))
debug("Repairing node 3")
node3.nodetool("repair {}".format(repair_options))
debug("Repairing node 4")
node4.nodetool("repair {}".format(repair_options))
if cluster.version() >= '4.0':
# sstables are compacted out of pending repair by a compaction
for node in cluster.nodelist():
node.nodetool('compact keyspace1 standard1')
for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist() if len(node.get_sstables('keyspace1', 'standard1')) > 0):
self.assertNotIn('Repaired at: 0', out)
@no_vnodes()
@since('4.0')
def move_test(self):
""" Test repaired data remains in sync after a move """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start()
node1, node2, node3, node4 = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
# insert some data
stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
for i in range(1000):
session.execute(stmt, (i, i))
node1.repair(options=['ks'])
for i in range(1000):
v = i + 1000
session.execute(stmt, (v, v))
# everything should be in sync
for node in cluster.nodelist():
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
node2.nodetool('move {}'.format(2**16))
# everything should still be in sync
for node in cluster.nodelist():
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
@no_vnodes()
@since('4.0')
def decommission_test(self):
""" Test repaired data remains in sync after a decommission """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
cluster.populate(4).start()
node1, node2, node3, node4 = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
# insert some data
stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
for i in range(1000):
session.execute(stmt, (i, i))
node1.repair(options=['ks'])
for i in range(1000):
v = i + 1000
session.execute(stmt, (v, v))
# everything should be in sync
for node in cluster.nodelist():
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
node2.nodetool('decommission')
# everything should still be in sync
for node in [node1, node3, node4]:
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
@no_vnodes()
@since('4.0')
def bootstrap_test(self):
""" Test repaired data remains in sync after a bootstrap """
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False, 'commitlog_sync_period_in_ms': 500})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node3)
session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}")
session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
# insert some data
stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)")
for i in range(1000):
session.execute(stmt, (i, i))
node1.repair(options=['ks'])
for i in range(1000):
v = i + 1000
session.execute(stmt, (v, v))
# everything should be in sync
for node in [node1, node2, node3]:
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)
node4 = new_node(self.cluster)
node4.start(wait_for_binary_proto=True)
self.assertEqual(len(self.cluster.nodelist()), 4)
# everything should still be in sync
for node in self.cluster.nodelist():
result = node.repair(options=['ks', '--validate'])
self.assertIn("Repaired data is in sync", result.stdout)