blob: ddae777343418f8c058ae62b4d9d88fe90319dfb [file] [log] [blame]
import os
import os.path
import threading
import time
import re
import pytest
import logging
from collections import namedtuple
from threading import Thread
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from ccmlib.node import ToolError
from dtest import FlakyRetryPolicy, Tester, create_ks, create_cf
from tools.data import insert_c1c2, query_c1c2
from tools.jmxutils import JolokiaAgent, make_mbean
from repair_tests.incremental_repair_test import assert_parent_repair_session_count
since = pytest.mark.since
logger = logging.getLogger(__name__)
def _repair_options(version, ks='', cf=None, sequential=True):
"""
Function for assembling appropriate repair CLI options,
based on C* version, as defaults have changed.
@param ks The keyspace to repair
@param cf The table to repair
@param sequential If the repair should be a sequential repair [vs parallel]
"""
opts = []
# since version 2.2, default is parallel, otherwise it's sequential
if sequential:
if version >= '2.2':
opts += ['-seq']
else:
if version < '2.2':
opts += ['-par']
# test with full repair
if version >= '2.2':
opts += ['-full']
if ks:
opts += [ks]
if cf:
opts += [cf]
return opts
class BaseRepairTest(Tester):
def check_rows_on_node(self, node_to_check, rows, found=None, missings=None, restart=True):
"""
Function to verify the rows on a given node, without interference
from the other nodes in the cluster
@param node_to_check The given node to check. Should be the node, not the index
@param rows The number of rows we expect
@param found A list of partition keys that we expect to be on the node
@param missings A list of partition keys we expect NOT to be on the node
@param restart Whether or not we should restart the nodes we shut down to perform the assertions. Should only be False if the call to check_rows_on_node is the last line in the test.
"""
if found is None:
found = []
if missings is None:
missings = []
stopped_nodes = []
for node in list(self.cluster.nodes.values()):
if node.is_running() and node is not node_to_check:
stopped_nodes.append(node)
node.stop(wait_other_notice=True)
session = self.patient_exclusive_cql_connection(node_to_check, 'ks')
result = list(session.execute("SELECT * FROM cf LIMIT {}".format(rows * 2), timeout=10))
assert len(result) == rows
for k in found:
query_c1c2(session, k, ConsistencyLevel.ONE)
for k in missings:
query = SimpleStatement("SELECT c1, c2 FROM cf WHERE key='k{}'".format(k), consistency_level=ConsistencyLevel.ONE)
res = list(session.execute(query))
assert len([x for x in res if len(x) != 0]) == 0, res
if restart:
for node in stopped_nodes:
node.start(wait_for_binary_proto=True)
def _populate_cluster(self, start=True):
cluster = self.cluster
# Disable hinted handoff and set batch commit log so this doesn't
# interfere with the test (this must be after the populate)
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1, retry_policy=FlakyRetryPolicy(max_retries=15))
create_ks(session, 'ks', 3)
if cluster.version() < '4.0':
create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
else:
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
# Insert 1000 keys, kill node 3, insert 1 key, restart node 3, insert 1000 more keys
logger.debug("Inserting data...")
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
node3.flush()
node3.stop(wait_other_notice=True)
insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.TWO)
node3.start(wait_for_binary_proto=True)
insert_c1c2(session, keys=list(range(1001, 2001)), consistency=ConsistencyLevel.ALL)
cluster.flush()
def _repair_and_verify(self, sequential=True):
cluster = self.cluster
node1, node2, node3 = cluster.nodelist()
# Verify that node3 has only 2000 keys
logger.debug("Checking data on node3...")
self.check_rows_on_node(node3, 2000, missings=[1000])
# Verify that node1 has 2001 keys
logger.debug("Checking data on node1...")
self.check_rows_on_node(node1, 2001, found=[1000])
# Verify that node2 has 2001 keys
logger.debug("Checking data on node2...")
self.check_rows_on_node(node2, 2001, found=[1000])
time.sleep(10) # see CASSANDRA-4373
# Run repair
start = time.time()
logger.debug("starting repair...")
node1.repair(_repair_options(self.cluster.version(), ks='ks', sequential=sequential))
logger.debug("Repair time: {end}".format(end=time.time() - start))
# Validate that only one range was transfered
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])
valid_out_of_sync_pairs = [{node1.address(), node3.address()},
{node2.address(), node3.address()}]
for line, m in out_of_sync_logs:
num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)}
assert int(num_out_of_sync_ranges) == 1, \
"Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes)
# Check node3 now has the key
self.check_rows_on_node(node3, 2001, found=[1000], restart=False)
class TestRepair(BaseRepairTest):
@since('4.0')
def test_parent_repair_session_cleanup(self):
"""
Calls range_tombstone_digest with a sequential repair and verifies if
all ParentRepairSession objects are cleaned
@jira_ticket CASSANDRA-16446
"""
self._range_tombstone_digest(sequential=True)
assert_parent_repair_session_count(self.cluster.nodes.values(), 0)
@since('2.2.1', max_version='4')
def test_no_anticompaction_after_dclocal_repair(self):
"""
* Launch a four node, two DC cluster
* Start a -local repair on node1 in dc1
* Assert that the dc1 nodes see repair messages
* Assert that the dc2 nodes do not see repair messages
* Assert no nodes anticompact
# TODO: Verify the anticompaction with sstablemetadata, not just logs
@jira_ticket CASSANDRA-10422
"""
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate([2, 2]).start()
node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50'])
node1_1.nodetool("repair -local keyspace1 standard1")
assert node1_1.grep_log("Not a global repair")
assert node2_1.grep_log("Not a global repair")
# dc2 should not see these messages:
assert not node1_2.grep_log("Not a global repair")
assert not node2_2.grep_log("Not a global repair")
# and no nodes should do anticompaction:
for node in cluster.nodelist():
assert not node.grep_log("Starting anticompaction")
# @pytest.mark.skipif(CASSANDRA_VERSION_FROM_BUILD == '3.9', reason="Test doesn't run on 3.9")
@pytest.mark.skip_version('3.9')
def test_nonexistent_table_repair(self):
"""
* Check that repairing a non-existent table fails
@jira_ticket CASSANDRA-12279
"""
self.fixture_dtest_setup.ignore_log_patterns = [r'Unknown keyspace/cf pair']
cluster = self.cluster
logger.debug('Starting nodes')
cluster.populate(2).start()
node1, _ = cluster.nodelist()
logger.debug('Creating keyspace and tables')
node1.stress(stress_options=['write', 'n=1', 'no-warmup',
'cl=ONE', '-schema', 'replication(factor=2)',
'-rate', 'threads=1'])
logger.debug('Repairing non-existent table')
def repair_non_existent_table():
global nodetool_error
try:
node1.nodetool('repair keyspace1 standard2')
except Exception as e:
nodetool_error = e
# Launch in a external thread so it does not hang process
t = Thread(target=repair_non_existent_table)
t.start()
t.join(timeout=60)
assert not t.is_alive(), 'Repair thread on inexistent table is still running'
if self.cluster.version() >= '2.2':
node1.watch_log_for("Unknown keyspace/cf pair", timeout=60)
# Repair only finishes with error status after CASSANDRA-12508 on 3.0+
if self.cluster.version() >= '3.0':
assert 'nodetool_error' in globals() and isinstance(nodetool_error, ToolError), \
'Repair thread on inexistent table did not throw exception'
logger.debug(repr(nodetool_error))
assert 'Unknown keyspace/cf pair' in repr(nodetool_error),\
'Repair thread on inexistent table did not detect inexistent table.'
@since('2.2.1', max_version='4')
def test_no_anticompaction_after_hostspecific_repair(self):
"""
* Launch a four node, two DC cluster
* Start a repair on all nodes, by enumerating with -hosts
* Assert all nodes see a repair messages
* Assert no nodes anticompact
# TODO: Verify the anticompaction with sstablemetadata, not just logs
@jira_ticket CASSANDRA-10422
"""
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate([2, 2]).start()
node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
node1_1.stress(stress_options=['write', 'n=100K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)', '-rate', 'threads=50'])
node1_1.nodetool("repair -hosts 127.0.0.1,127.0.0.2,127.0.0.3,127.0.0.4 keyspace1 standard1")
for node in cluster.nodelist():
assert node.grep_log("Not a global repair")
for node in cluster.nodelist():
assert not node.grep_log("Starting anticompaction")
@since('2.2.4', max_version='4')
def test_no_anticompaction_after_subrange_repair(self):
"""
* Launch a three node, two DC cluster
* Start a repair on a token range
* Assert all nodes see repair messages
* Assert no nodes anticompact
# TODO: Verify the anticompaction with sstablemetadata, not just logs
@jira_ticket CASSANDRA-10422
"""
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)', '-rate', 'threads=50'])
node1.nodetool("repair -st 0 -et 1000 keyspace1 standard1")
for node in cluster.nodelist():
assert node.grep_log("Not a global repair")
for node in cluster.nodelist():
assert not node.grep_log("Starting anticompaction")
def _get_repaired_data(self, node, keyspace):
"""
Based on incremental_repair_test.py:TestIncRepair implementation.
"""
_sstable_name = re.compile('SSTable: (.+)')
_repaired_at = re.compile(r'Repaired at: (\d+)')
_sstable_data = namedtuple('_sstabledata', ('name', 'repaired'))
out = node.run_sstablemetadata(keyspace=keyspace).stdout
def matches(pattern):
return [_f for _f in [pattern.match(l) for l in out.split('\n')] if _f]
names = [m.group(1) for m in matches(_sstable_name)]
repaired_times = [int(m.group(1)) for m in matches(_repaired_at)]
assert names
assert repaired_times
return [_sstable_data(*a) for a in zip(names, repaired_times)]
@since('2.2.10', max_version='4')
def test_no_anticompaction_of_already_repaired(self):
"""
* Launch three node cluster and stress with RF2
* Do incremental repair to have all sstables flagged as repaired
* Stop node2, stress, start again and run full -pr repair
* Verify that none of the already repaired sstables have been anti-compacted again
@jira_ticket CASSANDRA-13153
"""
cluster = self.cluster
logger.debug("Starting cluster..")
# disable JBOD conf since the test expects sstables to be on the same disk
cluster.set_datadir_count(1)
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# we use RF to make sure to cover only a set of sub-ranges when doing -full -pr
node1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=50'])
# disable compaction to make sure that we won't create any new sstables with repairedAt 0
node1.nodetool('disableautocompaction keyspace1 standard1')
# Do incremental repair of all ranges. All sstables are expected for have repairedAt set afterwards.
node1.nodetool("repair keyspace1 standard1")
meta = self._get_repaired_data(node1, 'keyspace1')
repaired = set([m for m in meta if m.repaired > 0])
assert len(repaired) == len(meta)
# stop node2, stress and start full repair to find out how synced ranges affect repairedAt values
node2.stop(wait_other_notice=True)
node1.stress(stress_options=['write', 'n=40K', 'no-warmup', 'cl=ONE', '-rate', 'threads=50'])
node2.start(wait_for_binary_proto=True)
node1.nodetool("repair -full -pr keyspace1 standard1")
meta = self._get_repaired_data(node1, 'keyspace1')
repairedAfterFull = set([m for m in meta if m.repaired > 0])
# already repaired sstables must remain untouched
assert repaired.intersection(repairedAfterFull) == repaired
@since('2.2.1', '4')
def test_anticompaction_after_normal_repair(self):
"""
* Launch a four node, two DC cluster
* Start a normal repair
* Assert every node anticompacts
@jira_ticket CASSANDRA-10422
"""
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate([2, 2]).start()
node1_1, node2_1, node1_2, node2_2 = cluster.nodelist()
node1_1.stress(stress_options=['write', 'n=50K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=4)'])
node1_1.nodetool("repair keyspace1 standard1")
for node in cluster.nodelist():
assert "Starting anticompaction"
def test_simple_sequential_repair(self):
"""
Calls simple repair test with a sequential repair
"""
self._simple_repair(sequential=True)
def test_simple_parallel_repair(self):
"""
Calls simple repair test with a parallel repair
"""
self._simple_repair(sequential=False)
def test_empty_vs_gcable_sequential_repair(self):
"""
Calls empty_vs_gcable repair test with a sequential repair
"""
self._empty_vs_gcable_no_repair(sequential=True)
def test_empty_vs_gcable_parallel_repair(self):
"""
Calls empty_vs_gcable repair test with a parallel repair
"""
self._empty_vs_gcable_no_repair(sequential=False)
def test_range_tombstone_digest_sequential_repair(self):
"""
Calls range_tombstone_digest with a sequential repair
"""
self._range_tombstone_digest(sequential=True)
def test_range_tombstone_digest_parallel_repair(self):
"""
Calls range_tombstone_digest with a parallel repair
"""
self._range_tombstone_digest(sequential=False)
@since('2.1')
def test_shadowed_cell_digest_sequential_repair(self):
"""
Calls _cell_shadowed_by_range_tombstone with sequential repair
"""
self._cell_shadowed_by_range_tombstone(sequential=True)
@since('2.1')
def test_shadowed_cell_digest_parallel_repair(self):
"""
Calls _cell_shadowed_by_range_tombstone with parallel repair
"""
self._cell_shadowed_by_range_tombstone(sequential=False)
@since('3.0')
def test_shadowed_range_tombstone_digest_sequential_repair(self):
"""
Calls _range_tombstone_shadowed_by_range_tombstone with sequential repair
"""
self._range_tombstone_shadowed_by_range_tombstone(sequential=True)
@since('3.0')
def test_shadowed_range_tombstone_digest_parallel_repair(self):
"""
Calls _range_tombstone_shadowed_by_range_tombstone with parallel repair
"""
self._range_tombstone_shadowed_by_range_tombstone(sequential=False)
@pytest.mark.no_vnodes
def test_simple_repair_order_preserving(self):
"""
Calls simple repair test with OPP and sequential repair
@jira_ticket CASSANDRA-5220
"""
self._simple_repair(order_preserving_partitioner=True)
def _simple_repair(self, order_preserving_partitioner=False, sequential=True):
"""
* Configure a three node cluster to not use hinted handoff, and to use batch commitlog
* Launch the cluster
* Create a keyspace at RF 3 and table
* Insert one thousand rows at CL ALL
* Flush on node3 and shut it down
* Insert one row at CL TWO
* Restart node3
* Insert one thousand more rows at CL ALL
* Flush all nodes
* Check node3 only has 2000 keys
* Check node1 and node2 have 2001 keys
* Perform the repair type specified by the parent test
* Assert the appropriate messages are logged
* Assert node3 now has all data
@jira_ticket CASSANDRA-4373
"""
if order_preserving_partitioner:
self.cluster.set_partitioner('org.apache.cassandra.dht.ByteOrderedPartitioner')
self._populate_cluster()
self._repair_and_verify(sequential)
def _empty_vs_gcable_no_repair(self, sequential):
"""
Repairing empty partition and tombstoned partition older than gc grace
should be treated as the same and no repair is necessary.
@jira_ticket CASSANDRA-8979.
"""
cluster = self.cluster
cluster.populate(2)
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
cluster.start()
node1, node2 = cluster.nodelist()
session = self.patient_cql_connection(node1)
# create keyspace with RF=2 to be able to be repaired
create_ks(session, 'ks', 2)
# we create two tables, one has low gc grace seconds so that the data
# can be dropped during test (but we don't actually drop them).
# the other has default gc.
# compaction is disabled not to purge data
query = """
CREATE TABLE cf1 (
key text,
c1 text,
c2 text,
PRIMARY KEY (key, c1)
)
WITH gc_grace_seconds=1
AND compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'};
"""
session.execute(query)
time.sleep(.5)
query = """
CREATE TABLE cf2 (
key text,
c1 text,
c2 text,
PRIMARY KEY (key, c1)
)
WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'};
"""
session.execute(query)
time.sleep(.5)
# take down node2, so that only node1 has gc-able data
node2.stop(wait_other_notice=True)
for cf in ['cf1', 'cf2']:
# insert some data
for i in range(0, 10):
for j in range(0, 1000):
query = SimpleStatement("INSERT INTO {} (key, c1, c2) VALUES ('k{}', 'v{}', 'value')".format(cf, i, j), consistency_level=ConsistencyLevel.ONE)
session.execute(query)
node1.flush()
# delete those data, half with row tombstone, and the rest with cell range tombstones
for i in range(0, 5):
query = SimpleStatement("DELETE FROM {} WHERE key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ONE)
session.execute(query)
node1.flush()
for i in range(5, 10):
for j in range(0, 1000):
query = SimpleStatement("DELETE FROM {} WHERE key='k{}' AND c1='v{}'".format(cf, i, j), consistency_level=ConsistencyLevel.ONE)
session.execute(query)
node1.flush()
# sleep until gc grace seconds pass so that cf1 can be dropped
time.sleep(2)
# bring up node2 and repair
node2.start(wait_for_binary_proto=True)
node2.repair(_repair_options(self.cluster.version(), ks='ks', sequential=sequential))
# check no rows will be returned
for cf in ['cf1', 'cf2']:
for i in range(0, 10):
query = SimpleStatement("SELECT c1, c2 FROM {} WHERE key='k{}'".format(cf, i), consistency_level=ConsistencyLevel.ALL)
res = list(session.execute(query))
assert len([x for x in res if len(x) != 0]) == 0, res
# check log for no repair happened for gcable data
out_of_sync_logs = node2.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync for cf1".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 0, "GC-able data does not need to be repaired with empty data: " + str([elt[0] for elt in out_of_sync_logs])
# check log for actual repair for non gcable data
out_of_sync_logs = node2.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync for cf2".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) > 0, "Non GC-able data should be repaired"
def _range_tombstone_digest(self, sequential):
"""
multiple range tombstones for same partition and interval must not create a digest mismatch as long
as the most recent tombstone is present.
@jira_ticket cassandra-11349.
"""
def withsession(session, node1):
session.execute("delete from table1 where c1 = 'a' and c2 = 'b'")
node1.flush()
# recreate same tombstone (will be flushed by repair, so we end up with 2x on node1 and 1x on node2)
session.execute("delete from table1 where c1 = 'a' and c2 = 'b'")
self._repair_digest(sequential, withsession)
def _cell_shadowed_by_range_tombstone(self, sequential):
"""
Cells shadowed by range tombstones must not effect repairs (given tombstones are present on all nodes)
@jira_ticket CASSANDRA-11349.
"""
def withSession(session, node1):
session.execute("INSERT INTO table1 (c1, c2, c3, c4) VALUES ('a', 'b', 'c', 1)")
node1.flush()
session.execute("DELETE FROM table1 WHERE c1 = 'a' AND c2 = 'b'")
self._repair_digest(sequential, withSession)
def _range_tombstone_shadowed_by_range_tombstone(self, sequential):
"""
Range tombstones shadowed by other range tombstones must not effect repairs
@jira_ticket CASSANDRA-11349.
"""
def withSession(session, node1):
session.execute("DELETE FROM table1 WHERE c1 = 'a' AND c2 = 'b' AND c3 = 'c'")
node1.flush()
session.execute("DELETE FROM table1 WHERE c1 = 'a' AND c2 = 'b'")
node1.flush()
session.execute("DELETE FROM table1 WHERE c1 = 'a' AND c2 = 'b' AND c3 = 'd'")
node1.flush()
session.execute("DELETE FROM table1 WHERE c1 = 'a' AND c2 = 'b' AND c3 = 'a'")
self._repair_digest(sequential, withSession)
def _repair_digest(self, sequential, populate):
cluster = self.cluster
cluster.populate(2)
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
cluster.start()
node1, node2 = cluster.nodelist()
session = self.patient_cql_connection(node1)
# create keyspace with RF=2 to be able to be repaired
create_ks(session, 'ks', 2)
query = """
CREATE TABLE IF NOT EXISTS table1 (
c1 text,
c2 text,
c3 text,
c4 float,
PRIMARY KEY (c1, c2, c3)
)
WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'};
"""
session.execute(query)
populate(session, node1)
node2.repair(_repair_options(self.cluster.version(), ks='ks', sequential=sequential))
# check log for no repair happened for gcable data
out_of_sync_logs = node2.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync for table1".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 0, "Digest mismatch for range tombstone: {}".format(str([elt[0] for elt in out_of_sync_logs]))
def test_local_dc_repair(self):
"""
* Set up a multi DC cluster
* Perform a -local repair on one DC
* Assert only nodes in that DC are repaired
"""
cluster = self._setup_multi_dc()
node1 = cluster.nodes["node1"]
node2 = cluster.nodes["node2"]
logger.debug("starting repair...")
opts = ["-local"]
opts += _repair_options(self.cluster.version(), ks="ks")
node1.repair(opts)
# Verify that only nodes in dc1 are involved in repair
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 1, "Lines matching: {}".format(len(out_of_sync_logs))
line, m = out_of_sync_logs[0]
num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)}
assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
valid_out_of_sync_pairs = {node1.address(), node2.address()}
assert out_of_sync_nodes == valid_out_of_sync_pairs, "Unrelated node found in local repair: {}, expected {}".format(out_of_sync_nodes, valid_out_of_sync_pairs)
# Check node2 now has the key
self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
def test_dc_repair(self):
"""
* Set up a multi DC cluster
* Perform a -dc repair on two dc's
* Assert only nodes on those dcs were repaired
"""
cluster = self._setup_multi_dc()
node1 = cluster.nodes["node1"]
node2 = cluster.nodes["node2"]
node3 = cluster.nodes["node3"]
logger.debug("starting repair...")
opts = ["-dc", "dc1", "-dc", "dc2"]
opts += _repair_options(self.cluster.version(), ks="ks")
node1.repair(opts)
# Verify that only nodes in dc1 and dc2 are involved in repair
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])
valid_out_of_sync_pairs = [{node1.address(), node2.address()},
{node2.address(), node3.address()}]
for line, m in out_of_sync_logs:
num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)}
assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes)
# Check node2 now has the key
self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
def test_dc_parallel_repair(self):
"""
* Set up a multi DC cluster
* Perform a -dc repair on two dc's, with -dcpar
* Assert only nodes on those dcs were repaired
"""
cluster = self._setup_multi_dc()
node1 = cluster.nodes["node1"]
node2 = cluster.nodes["node2"]
node3 = cluster.nodes["node3"]
logger.debug("starting repair...")
opts = ["-dc", "dc1", "-dc", "dc2", "-dcpar"]
opts += _repair_options(self.cluster.version(), ks="ks", sequential=False)
node1.repair(opts)
# Verify that only nodes in dc1 and dc2 are involved in repair
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 2, "Lines matching: " + str([elt[0] for elt in out_of_sync_logs])
valid_out_of_sync_pairs = [{node1.address(), node2.address()},
{node2.address(), node3.address()}]
for line, m in out_of_sync_logs:
num_out_of_sync_ranges, out_of_sync_nodes = m.group(3), {m.group(1), m.group(2)}
assert int(num_out_of_sync_ranges) == 1, "Expecting 1 range out of sync for {}, but saw {}".format(out_of_sync_nodes, num_out_of_sync_ranges)
assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes)
# Check node2 now has the key
self.check_rows_on_node(node2, 2001, found=[1000], restart=False)
# Check the repair was a dc parallel repair
if self.cluster.version() >= '2.2':
assert len(node1.grep_log('parallelism: dc_parallel')) == 1, str(node1.grep_log('parallelism'))
else:
assert len(node1.grep_log('parallelism=PARALLEL')) == 1, str(node1.grep_log('parallelism'))
def _setup_multi_dc(self):
"""
Sets up 3 DCs (2 nodes in 'dc1', and one each in 'dc2' and 'dc3').
After set up, node2 in dc1 lacks some data and needs to be repaired.
"""
cluster = self.cluster
# Disable hinted handoff and set batch commit log so this doesn't
# interfer with the test (this must be after the populate)
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
# populate 2 nodes in dc1, and one node each in dc2 and dc3
cluster.populate([2, 1, 1]).start()
node1, node2, node3, node4 = cluster.nodelist()
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1, 'dc3':1}")
session.execute("USE ks")
if cluster.version() < '4.0':
create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
else:
create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'})
# Insert 1000 keys, kill node 2, insert 1 key, restart node 2, insert 1000 more keys
logger.debug("Inserting data...")
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
node2.flush()
node2.stop(wait_other_notice=True)
insert_c1c2(session, keys=(1000, ), consistency=ConsistencyLevel.THREE)
node2.start(wait_for_binary_proto=True)
node1.watch_log_for_alive(node2)
insert_c1c2(session, keys=list(range(1001, 2001)), consistency=ConsistencyLevel.ALL)
cluster.flush()
# Verify that only node2 has only 2000 keys and others have 2001 keys
logger.debug("Checking data...")
self.check_rows_on_node(node2, 2000, missings=[1000])
for node in [node1, node3, node4]:
self.check_rows_on_node(node, 2001, found=[1000])
return cluster
@since('2.2')
def parallel_table_repair_noleak(self):
"""
@jira_ticket CASSANDRA-11215
Tests that multiple parallel repairs on the same table isn't
causing reference leaks.
"""
self.fixture_dtest_setup.ignore_log_patterns = [
"Cannot start multiple repair sessions over the same sstables", # The message we are expecting
"Validation failed in", # Expecting validation to fail
"RMI Runtime", # JMX Repair failures
"Session completed with the following error", # The nodetool repair error
"ValidationExecutor", # Errors by the validation executor
"RepairJobTask" # Errors by the repair job task
]
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate([3]).start()
node1, node2, node3 = cluster.nodelist()
node1.stress(stress_options=['write', 'n=10k', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)', '-rate', 'threads=50'])
# Start multiple repairs in parallel
threads = []
for i in range(3):
t = threading.Thread(target=node1.nodetool, args=("repair keyspace1 standard1",))
threads.append(t)
t.start()
# Wait for the repairs to finish
for t in threads:
t.join()
found_message = False
# All nodes should reject multiple repairs and have no reference leaks
for node in cluster.nodelist():
if len(node.grep_log("Cannot start multiple repair sessions over the same sstables")) > 0:
found_message = True
break
assert found_message
@pytest.mark.no_vnodes
def test_token_range_repair(self):
"""
Test repair using the -st and -et options
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on a range that only belongs to node1
* Verify that nodes 1 and 2, and only nodes 1+2, are repaired
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
self._parameterized_range_repair(repair_opts=['-st', str(node3.initial_token), '-et', str(node1.initial_token)])
@pytest.mark.no_vnodes
def test_token_range_repair_with_cf(self):
"""
@jira_ticket CASSANDRA-11866
Test repair using the -st and -et, and -cf options
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on a range that only belongs to node1 on the wrong cf
* Verify that the data did not get repaired
* Issue a repair on a range that belongs to the right cf
* Verify that the data was repaired
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# Insert data, kill node 2, insert more data, restart node 2, insert another set of data
logger.debug("Inserting data...")
node1.stress(['write', 'n=1k', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30'])
node2.flush()
node2.stop(wait_other_notice=True)
node1.stress(['write', 'n=1K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq=20..40K'])
node2.start(wait_for_binary_proto=True)
node1.stress(['write', 'n=1K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq=40..60K'])
cluster.flush()
# Repair only the range node 1 owns on the wrong CF, assert everything is still broke
opts = ['-st', str(node3.initial_token), '-et', str(node1.initial_token), ]
opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='counter1', sequential=False)
node1.repair(opts)
assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent."
assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair."
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
assert len(out_of_sync_logs) == 0, "We repaired the wrong CF == so things should still be broke"
# Repair only the range node 1 owns on the right CF, assert everything is fixed
opts = ['-st', str(node3.initial_token), '-et', str(node1.initial_token), ]
opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False)
node1.repair(opts)
assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent."
assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair."
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
_, matches = out_of_sync_logs[0]
out_of_sync_nodes = {matches.group(1), matches.group(2)}
valid_out_of_sync_pairs = [{node1.address(), node2.address()}]
assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes)
@pytest.mark.no_vnodes
def test_partitioner_range_repair(self):
"""
Test repair using the -pr option
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on a range that only belongs to node1
* Verify that nodes 1 and 2, and only nodes 1+2, are repaired
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
self._parameterized_range_repair(repair_opts=['-pr'])
@since('3.10')
@pytest.mark.no_vnodes
def test_pull_repair(self):
"""
Test repair using the --pull option
@jira_ticket CASSANDRA-9876
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a pull repair on a range that only belongs to node1
* Verify that nodes 1 and 2, and only nodes 1+2, are repaired
* Verify that node1 only received data
* Verify that node2 only sent data
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node1_address = node1.network_interfaces['binary'][0]
node2_address = node2.network_interfaces['binary'][0]
self._parameterized_range_repair(repair_opts=['--pull', '--in-hosts', node1_address + ',' + node2_address, '-st', str(node3.initial_token), '-et', str(node1.initial_token)])
# Node 1 should only receive files (as we ran a pull repair on node1)
assert len(node1.grep_log("Receiving [1-9][0-9]* files")) > 0
assert len(node1.grep_log("sending [1-9][0-9]* files")) == 0
assert len(node1.grep_log("sending 0 files")) > 0
# Node 2 should only send files (as we ran a pull repair on node1)
assert len(node2.grep_log("Receiving [1-9][0-9]* files")) == 0
assert len(node2.grep_log("Receiving 0 files")) > 0
assert len(node2.grep_log("sending [1-9][0-9]* files")) > 0
@since('4.0')
def test_non_replicated_ks_repair(self):
cluster = self.cluster
cluster.populate([2, 2]).start(wait_for_binary_proto=True)
self.fixture_dtest_setup.ignore_log_patterns.extend(["no neighbors to repair with",
"keyspace is skipped since repair was called with --skip-empty"])
_, _, node, _ = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, "repair1", {'dc1': 2, 'dc2': 0})
create_ks(session, "repair2", {'dc1': 2, 'dc2': 2})
session.execute("create table repair1.t1 (id int primary key, i int)")
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
session.execute("create table repair2.t2 (id int primary key, i int)")
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
session.execute("insert into repair1.t1 (id, i) values (1, 1)")
session.execute("insert into repair2.t2 (id, i) values (2, 2)")
node.nodetool("repair --ignore-unreplicated-keyspaces -st 0 -et 1")
assert len(node.grep_log("t2 is fully synced")) > 0
assert len(node.grep_log("in repair1 - unreplicated keyspace is ignored since repair was called with --ignore-unreplicated-keyspaces")) > 0
try:
self.fixture_dtest_setup.ignore_log_patterns.append("Nothing to repair for .+ in repair1")
node.nodetool("repair -st 0 -et 1")
assert False, "repair should fail"
except ToolError:
logger.debug("got expected exception during repair")
@since('4.0')
@pytest.mark.no_vnodes
def test_multiple_ranges_repair(self):
cluster = self.cluster
cluster.populate([3])
node1, node2, node3 = cluster.nodelist()
cluster.start(wait_for_binary_proto=True)
self.fixture_dtest_setup.ignore_log_patterns.extend(["Nothing to repair for"])
session = self.patient_cql_connection(node1)
create_ks(session, "repair1", {'dc1': 2})
session.execute("create table repair1.t1 (id int primary key, i int)")
session.cluster.control_connection.wait_for_schema_agreement(wait_time=120)
session.execute("insert into repair1.t1 (id, i) values (1, 1)")
with JolokiaAgent(node1) as jmx:
repair_mbean = make_mbean('db', 'StorageService')
# 0,1 is replicated, -3074457345618258606:-3074457345618258605 is not:
jmx.execute_method(repair_mbean, 'repairAsync(java.lang.String,java.util.Map)',
["repair1", {"ranges": "0:1,-3074457345618258606:-3074457345618258605"}])
node1.watch_log_for("Nothing to repair for \(-3074457345618258606,-3074457345618258605\] in repair1 - aborting")
assert len(node1.grep_log("fully synced")) == 0
jmx.execute_method(repair_mbean, 'repairAsync(java.lang.String,java.util.Map)',
["repair1", {"ranges": "0:1,-3074457345618258606:-3074457345618258605",
"ignoreUnreplicatedKeyspaces": "true"}])
node1.watch_log_for("Found no neighbors for range \(-3074457345618258606,-3074457345618258605\] for repair1 - ignoring since repairing with --ignore-unreplicated-keyspaces")
node1.watch_log_for("t1 is fully synced")
def _parameterized_range_repair(self, repair_opts):
"""
@param repair_opts A list of strings which represent cli args to nodetool repair
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on a range that only belongs to node1, using repair_opts
* Verify that nodes 1 and 2, and only nodes 1+2, are repaired
"""
cluster = self.cluster
node1, node2, node3 = cluster.nodelist()
# Insert data, kill node 2, insert more data, restart node 2, insert another set of data
logger.debug("Inserting data...")
node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30'])
node2.flush()
node2.stop(wait_other_notice=True)
node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq=20..40K'])
node2.start(wait_for_binary_proto=True)
node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq=40..60K'])
cluster.flush()
# Repair only the range node 1 owns
opts = repair_opts
opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False)
node1.repair(opts)
assert len(node1.grep_log('are consistent for standard1')) == 0, "Nodes 1 and 2 should not be consistent."
assert len(node3.grep_log('Repair command')) == 0, "Node 3 should not have been involved in the repair."
out_of_sync_logs = node1.grep_log(r"{} and {} have ([0-9]+) range\(s\) out of sync".format(cluster.address_regex(), cluster.address_regex()))
_, matches = out_of_sync_logs[0]
out_of_sync_nodes = {matches.group(1), matches.group(2)}
valid_out_of_sync_pairs = [{node1.address(), node2.address()}]
assert out_of_sync_nodes, valid_out_of_sync_pairs in str(out_of_sync_nodes)
@since('2.2')
def test_trace_repair(self):
"""
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on to node1, setting job threads to 2 and with tracing enabled
* Check the trace data was written, and that the right job thread count was used
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
logger.debug("Inserting data...")
node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate', 'threads=30'])
node2.flush()
node2.stop(wait_other_notice=True)
node1.stress(['write', 'n=20K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=30', '-pop', 'seq=20..40K'])
node2.start(wait_for_binary_proto=True)
cluster.flush()
job_thread_count = '2'
opts = ['-tr', '-j', job_thread_count]
opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False)
node1.repair(opts)
time.sleep(5) # Give the trace table some time to populate
session = self.patient_cql_connection(node1)
rows = list(session.execute("SELECT activity FROM system_traces.events"))
# This check assumes that the only (or at least first) thing to write to `system_traces.events.activity` is
# the repair task triggered in the test.
assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \
'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0])
@since('2.2')
def test_thread_count_repair(self):
"""
* Launch a three node cluster
* Insert some data at RF 2
* Shut down node2, insert more data, restore node2
* Issue a repair on to node1, setting job threads
* Check the right job thread count was used
* Repeat steps 2 through 5 with all job count options
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Starting cluster..")
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
# Valid job thread counts: 1, 2, 3, and 4
for job_thread_count in range(1, 5):
logger.debug("Inserting data...")
node1.stress(['write', 'n=2K', 'no-warmup', 'cl=ALL', '-schema', 'replication(factor=2)', '-rate',
'threads=30', '-pop', 'seq={}..{}K'.format(2 * (job_thread_count - 1), 2 * job_thread_count)])
node2.flush()
node2.stop(wait_other_notice=True)
node1.stress(['write', 'n=2K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate',
'threads=30', '-pop', 'seq={}..{}K'.format(2 * (job_thread_count), 2 * (job_thread_count + 1))])
node2.start(wait_for_binary_proto=True)
cluster.flush()
session = self.patient_cql_connection(node1)
session.execute("TRUNCATE system_traces.events")
opts = ['-tr', '-j', str(job_thread_count)]
opts += _repair_options(self.cluster.version(), ks='keyspace1', cf='standard1', sequential=False)
node1.repair(opts)
time.sleep(5) # Give the trace table some time to populate
rows = list(session.execute("SELECT activity FROM system_traces.events"))
# This check assumes that the only (or at least first) thing to write to `system_traces.events.activity` is
# the repair task triggered in the test.
assert 'job threads: {}'.format(job_thread_count) in rows[0][0], \
'Expected {} job threads in repair options. Instead we saw {}'.format(job_thread_count, rows[0][0])
@pytest.mark.no_vnodes
def test_multiple_concurrent_repairs(self):
"""
@jira_ticket CASSANDRA-11451
Make sure we can run sub range repairs in parallel - and verify that we actually do repair
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node2.stop(wait_other_notice=True)
node1.stress(['write', 'n=1M', 'no-warmup', '-schema', 'replication(factor=3)', '-rate', 'threads=30'])
node2.start(wait_for_binary_proto=True)
t1 = threading.Thread(target=node1.nodetool, args=('repair keyspace1 standard1 -full -st {} -et {}'.format(str(node3.initial_token), str(node1.initial_token)),))
t2 = threading.Thread(target=node2.nodetool, args=('repair keyspace1 standard1 -full -st {} -et {}'.format(str(node1.initial_token), str(node2.initial_token)),))
t3 = threading.Thread(target=node3.nodetool, args=('repair keyspace1 standard1 -full -st {} -et {}'.format(str(node2.initial_token), str(node3.initial_token)),))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
node1.stop(wait_other_notice=True)
node3.stop(wait_other_notice=True)
_, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True)
assert rc == 0
@since('4.0')
def test_wide_row_repair(self):
"""
@jira_ticket CASSANDRA-13899
Make sure compressed vs uncompressed blocks are handled correctly when stream decompressing
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
node2.stop(wait_other_notice=True)
profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml')
logger.info(("yaml = " + profile_path))
node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
node2.start(wait_for_binary_proto=True)
node2.repair()
@since('2.1', max_version='4')
def test_dead_coordinator(self):
"""
@jira_ticket CASSANDRA-11824
Make sure parent repair session is cleared out if the repair coordinator dies
"""
cluster = self.cluster
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
node1.stress(['write', 'n=100k', '-schema', 'replication(factor=3)', '-rate', 'threads=30'])
def run_repair():
try:
if cluster.version() >= "2.2":
node1.repair()
else:
node1.nodetool('repair keyspace1 standard1 -inc -par')
except ToolError:
logger.debug("got expected exception during repair, ignoring")
t1 = threading.Thread(target=run_repair)
t1.start()
if cluster.version() > "2.2":
node2.watch_log_for('Validating ValidationRequest', filename='debug.log')
else:
node1.watch_log_for('requesting merkle trees', filename='system.log')
time.sleep(2)
logger.debug("stopping node1")
node1.stop(gently=False, wait_other_notice=True)
t1.join()
logger.debug("starting node1 - first repair should have failed")
node1.start(wait_for_binary_proto=True)
logger.debug("running second repair")
if cluster.version() >= "2.2":
node1.repair()
else:
node1.nodetool('repair keyspace1 standard1 -inc -par')
@since('2.2')
def test_dead_sync_initiator(self):
"""
@jira_ticket CASSANDRA-12901
"""
self._test_failure_during_repair(phase='sync', initiator=True)
@since('2.2')
def test_dead_sync_participant(self):
"""
@jira_ticket CASSANDRA-12901
"""
self._test_failure_during_repair(phase='sync', initiator=False,)
@since('2.2', max_version='4')
def test_failure_during_anticompaction(self):
"""
@jira_ticket CASSANDRA-12901
"""
self._test_failure_during_repair(phase='anticompaction',)
@since('2.2')
def test_failure_during_validation(self):
"""
@jira_ticket CASSANDRA-12901
"""
self._test_failure_during_repair(phase='validation')
def _test_failure_during_repair(self, phase, initiator=False):
cluster = self.cluster
# We are not interested in specific errors, but
# that the repair session finishes on node failure without hanging
self.fixture_dtest_setup.ignore_log_patterns = [
"Endpoint .* died",
"Streaming error occurred",
"StreamReceiveTask",
"Stream failed",
"Session completed with the following error",
"Repair session .* for range .* failed with error",
"Sync failed between .* and .*",
"failed to send a stream message/file to peer",
"failed to send a stream message/data to peer"
]
# stream session will be closed upon EOF, see CASSANDRA-15666
if cluster.version() >= '4.0':
self.ignore_log_patterns.append("Socket closed before session completion")
self.ignore_log_patterns.append("is finished with state FAILED")
self.ignore_log_patterns.append("stream has been closed")
self.ignore_log_patterns.append("stream operation from .* failed")
# Disable hinted handoff and set batch commit log so this doesn't
# interfere with the test (this must be after the populate)
cluster.set_configuration_options(values={'hinted_handoff_enabled': False})
cluster.set_batch_commitlog(enabled=True)
logger.debug("Setting up cluster..")
cluster.populate(3)
node1, node2, node3 = cluster.nodelist()
node_to_kill = node2 if (phase == 'sync' and initiator) else node3
logger.debug("Setting up byteman on {}".format(node_to_kill.name))
# set up byteman
node_to_kill.byteman_port = '8100'
node_to_kill.import_config_files()
logger.debug("Starting cluster..")
cluster.start(jvm_args=['-Djdk.attach.allowAttachSelf=true'])
# cluster.start()
logger.debug("stopping node3")
node3.stop(gently=False, wait_other_notice=True)
self.patient_exclusive_cql_connection(node1)
logger.debug("inserting data while node3 is down")
node1.stress(stress_options=['write', 'n=1k',
'no-warmup', 'cl=ONE',
'-schema', 'replication(factor=3)',
'-rate', 'threads=10'])
logger.debug("bring back node3")
node3.start(wait_for_binary_proto=True)
if phase == 'sync':
script = 'stream_sleep.btm'
else:
script = 'repair_{}_sleep.btm'.format(phase)
if phase == 'validation':
prefix = '4.0' if cluster.version() >= '4.0' else 'pre4.0'
script = prefix + '/' + script
logger.debug("Submitting byteman script to {}".format(node_to_kill.name))
# Sleep on anticompaction/stream so there will be time for node to be killed
node_to_kill.byteman_submit(['./byteman/{}'.format(script)])
def node1_repair():
global nodetool_error
try:
node1.nodetool('repair keyspace1 standard1')
except Exception as e:
nodetool_error = e
logger.debug("repair node1")
# Launch in a external thread so it does not hang process
t = Thread(target=node1_repair)
t.start()
logger.debug("Will kill {} in middle of {}".format(node_to_kill.name, phase))
msg_to_wait = 'streaming plan for Repair'
if phase == 'anticompaction':
msg_to_wait = 'Got anticompaction request'
elif phase == 'validation':
msg_to_wait = 'Validating'
node_to_kill.watch_log_for(msg_to_wait, filename='debug.log')
node_to_kill.stop(gently=False, wait_other_notice=True)
logger.debug("Killed {}, now waiting repair to finish".format(node_to_kill.name))
t.join(timeout=60)
assert not t.is_alive(), 'Repair still running after sync {} was killed'\
.format("initiator" if initiator else "participant")
if cluster.version() < '4.0' or phase != 'sync':
# the log entry we're watching for in the sync task came from the
# anti compaction at the end of the repair, which has been removed in 4.0
node1.watch_log_for('Endpoint .* died', timeout=60)
node1.watch_log_for('Repair command .* finished', timeout=60)
RepairTableContents = namedtuple('RepairTableContents',
['parent_repair_history', 'repair_history'])
@since('2.2')
@pytest.mark.resource_intensive
class TestRepairDataSystemTable(Tester):
"""
@jira_ticket CASSANDRA-5839
Tests the `system_distributed.parent_repair_history` and
`system_distributed.repair_history` tables by writing thousands of records
to a cluster, then ensuring these tables are in valid states before and
after running repair.
"""
@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
"""
Prepares a cluster for tests of the repair history tables by starting
a 5-node cluster, then inserting 5000 values with RF=3.
"""
self.cluster = fixture_dtest_setup.cluster
self.cluster.populate(5).start()
self.node1 = self.cluster.nodelist()[0]
self.session = fixture_dtest_setup.patient_cql_connection(self.node1)
self.node1.stress(stress_options=['write', 'n=5K', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=3)'])
fixture_dtest_setup.cluster.flush()
def repair_table_contents(self, node, include_system_keyspaces=True):
"""
@param node the node to connect to and query
@param include_system_keyspaces if truthy, return repair information about all keyspaces. If falsey, filter out keyspaces whose name contains 'system'
Return a `RepairTableContents` `namedtuple` containing the rows in
`node`'s `system_distributed.parent_repair_history` and
`system_distributed.repair_history` tables. If `include_system_keyspaces`,
include all results. If not `include_system_keyspaces`, filter out
repair information about system keyspaces, or at least keyspaces with
'system' in their names.
"""
session = self.patient_cql_connection(node)
def execute_with_all(stmt):
return session.execute(SimpleStatement(stmt, consistency_level=ConsistencyLevel.ALL))
parent_repair_history = execute_with_all('SELECT * FROM system_distributed.parent_repair_history;')
repair_history = execute_with_all('SELECT * FROM system_distributed.repair_history;')
if not include_system_keyspaces:
parent_repair_history = [row for row in parent_repair_history
if 'system' not in row.keyspace_name]
repair_history = [row for row in repair_history if
'system' not in row.keyspace_name]
return RepairTableContents(parent_repair_history=parent_repair_history,
repair_history=repair_history)
@pytest.mark.skip(reason='hangs CI')
def test_initial_empty_repair_tables(self):
logger.debug('repair tables:')
logger.debug(self.repair_table_contents(node=self.node1, include_system_keyspaces=False))
repair_tables_dict = self.repair_table_contents(node=self.node1, include_system_keyspaces=False)._asdict()
for table_name, table_contents in list(repair_tables_dict.items()):
assert not table_contents, '{} is non-empty'.format(table_name)
def test_repair_parent_table(self):
"""
Test that `system_distributed.parent_repair_history` is properly populated
after repair by:
- running repair on `node` and
- checking that there are a non-zero number of entries in `parent_repair_history`.
"""
self.node1.repair()
parent_repair_history, _ = self.repair_table_contents(node=self.node1, include_system_keyspaces=False)
assert len(parent_repair_history)
def test_repair_table(self):
"""
Test that `system_distributed.repair_history` is properly populated
after repair by:
- running repair on `node` and
- checking that there are a non-zero number of entries in `repair_history`.
"""
self.node1.repair()
_, repair_history = self.repair_table_contents(node=self.node1, include_system_keyspaces=False)
assert len(repair_history)