| import time |
| import pytest |
| import re |
| import logging |
| |
| from datetime import datetime |
| from collections import Counter, namedtuple |
| from re import findall, compile |
| from uuid import UUID, uuid1 |
| |
| from cassandra import ConsistencyLevel |
| from cassandra.query import SimpleStatement |
| from cassandra.metadata import Murmur3Token |
| from ccmlib.common import is_win |
| from ccmlib.node import Node, ToolError |
| |
| from dtest import Tester, create_ks, create_cf |
| from tools.assertions import assert_almost_equal, assert_one |
| from tools.data import create_c1c2_table, insert_c1c2 |
| from tools.misc import new_node, ImmutableMapping |
| from tools.jmxutils import make_mbean, JolokiaAgent |
| |
| since = pytest.mark.since |
| logger = logging.getLogger(__name__) |
| |
| |
| class ConsistentState(object): |
| PREPARING = 0 |
| PREPARED = 1 |
| REPAIRING = 2 |
| FINALIZE_PROMISED = 3 |
| FINALIZED = 4 |
| FAILED = 5 |
| |
| |
| def assert_parent_repair_session_count(nodes, expected): |
| for node in nodes: |
| with JolokiaAgent(node) as jmx: |
| result = jmx.execute_method("org.apache.cassandra.db:type=RepairService", |
| "parentRepairSessionsCount") |
| assert expected == result, "The number of cached ParentRepairSessions should be {} but was {}. " \ |
| "This may mean that PRS objects are leaking on node {}. Check " \ |
| "ActiveRepairService for PRS clean up code.".format(expected, result, node.name) |
| |
| |
| class TestIncRepair(Tester): |
| |
| @pytest.fixture(autouse=True) |
| def fixture_add_additional_log_patterns(self, fixture_dtest_setup): |
| fixture_dtest_setup.ignore_log_patterns = ( |
| r'Can\'t send migration request: node.*is down' |
| ) |
| |
| @classmethod |
| def _get_repaired_data(cls, node, keyspace): |
| _sstable_name = compile('SSTable: (.+)') |
| _repaired_at = compile('Repaired at: (\d+)') |
| _pending_repair = compile('Pending repair: (\-\-|null|[a-f0-9\-]+)') |
| _sstable_data = namedtuple('_sstabledata', ('name', 'repaired', 'pending_id')) |
| |
| out = node.run_sstablemetadata(keyspace=keyspace).stdout |
| |
| def matches(pattern): |
| return filter(None, [pattern.match(l) for l in out.split('\n')]) |
| names = [m.group(1) for m in matches(_sstable_name)] |
| repaired_times = [int(m.group(1)) for m in matches(_repaired_at)] |
| |
| def uuid_or_none(s): |
| return None if s == 'null' or s == '--' else UUID(s) |
| pending_repairs = [uuid_or_none(m.group(1)) for m in matches(_pending_repair)] |
| assert names |
| assert repaired_times |
| assert pending_repairs |
| assert len(names) == len(repaired_times) == len(pending_repairs) |
| return [_sstable_data(*a) for a in zip(names, repaired_times, pending_repairs)] |
| |
| def assertNoRepairedSSTables(self, node, keyspace): |
| """ Checks that no sstables are marked repaired, and none are marked pending repair """ |
| data = self._get_repaired_data(node, keyspace) |
| assert all([t.repaired == 0 for t in data]), '{}'.format(data) |
| assert all([t.pending_id is None for t in data]) |
| |
| def assertAllPendingRepairSSTables(self, node, keyspace, pending_id=None): |
| """ Checks that no sstables are marked repaired, and all are marked pending repair """ |
| data = self._get_repaired_data(node, keyspace) |
| assert all([t.repaired == 0 for t in data]), '{}'.format(data) |
| if pending_id: |
| assert all([t.pending_id == pending_id for t in data]) |
| else: |
| assert all([t.pending_id is not None for t in data]) |
| |
| def assertAllRepairedSSTables(self, node, keyspace): |
| """ Checks that all sstables are marked repaired, and none are marked pending repair """ |
| data = self._get_repaired_data(node, keyspace) |
| assert all([t.repaired > 0 for t in data]), '{}'.format(data) |
| assert all([t.pending_id is None for t in data]), '{}'.format(data) |
| |
| def assertRepairedAndUnrepaired(self, node, keyspace): |
| """ Checks that a node has both repaired and unrepaired sstables for a given keyspace """ |
| data = self._get_repaired_data(node, keyspace) |
| assert any([t.repaired > 0 for t in data]), '{}'.format(data) |
| assert any([t.repaired == 0 for t in data]), '{}'.format(data) |
| assert all([t.pending_id is None for t in data]), '{}'.format(data) |
| |
| @since('4.0') |
| def test_consistent_repair(self): |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| # make data inconsistent between nodes |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i)) |
| node3.flush() |
| time.sleep(1) |
| node3.stop(gently=False) |
| stmt.consistency_level = ConsistencyLevel.QUORUM |
| |
| session = self.exclusive_cql_connection(node1) |
| for i in range(10): |
| session.execute(stmt, (i + 10, i + 10)) |
| node1.flush() |
| time.sleep(1) |
| node1.stop(gently=False) |
| node3.start(wait_for_binary_proto=True) |
| session = self.exclusive_cql_connection(node2) |
| for i in range(10): |
| session.execute(stmt, (i + 20, i + 20)) |
| node1.start(wait_for_binary_proto=True) |
| |
| # flush and check that no sstables are marked repaired |
| for node in self.cluster.nodelist(): |
| node.flush() |
| self.assertNoRepairedSSTables(node, 'ks') |
| session = self.patient_exclusive_cql_connection(node) |
| results = list(session.execute("SELECT * FROM system.repairs")) |
| assert len(results) == 0, str(results) |
| |
| # disable compaction so we can verify sstables are marked pending repair |
| for node in self.cluster.nodelist(): |
| node.nodetool('disableautocompaction ks tbl') |
| |
| node1.repair(options=['ks']) |
| |
| # check that all participating nodes have the repair recorded in their system |
| # table, that all nodes are listed as participants, and that all sstables are |
| # (still) marked pending repair |
| expected_participants = {n.address() for n in self.cluster.nodelist()} |
| expected_participants_wp = {n.address_and_port() for n in self.cluster.nodelist()} |
| recorded_pending_ids = set() |
| for node in self.cluster.nodelist(): |
| session = self.patient_exclusive_cql_connection(node) |
| results = list(session.execute("SELECT * FROM system.repairs")) |
| assert len(results) == 1 |
| result = results[0] |
| assert set(result.participants) == expected_participants |
| if hasattr(result, "participants_wp"): |
| assert set(result.participants_wp) == expected_participants_wp |
| assert result.state, ConsistentState.FINALIZED == "4=FINALIZED" |
| pending_id = result.parent_id |
| self.assertAllPendingRepairSSTables(node, 'ks', pending_id) |
| recorded_pending_ids.add(pending_id) |
| |
| assert len(recorded_pending_ids) == 1 |
| |
| # sstables are compacted out of pending repair by a compaction |
| # task, we disabled compaction earlier in the test, so here we |
| # force the compaction and check that all sstables are promoted |
| for node in self.cluster.nodelist(): |
| node.nodetool('compact ks tbl') |
| self.assertAllRepairedSSTables(node, 'ks') |
| |
| def _make_fake_session(self, keyspace, table): |
| node1 = self.cluster.nodelist()[0] |
| session = self.patient_exclusive_cql_connection(node1) |
| session_id = uuid1() |
| cfid = list(session.execute("SELECT * FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'".format(keyspace, table)))[0].id |
| now = datetime.now() |
| # pulled from a repairs table |
| ranges = {'\x00\x00\x00\x08K\xc2\xed\\<\xd3{X\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6', |
| '\x00\x00\x00\x08r\x04\x89[j\x81\xc4\xe6\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4', |
| '\x00\x00\x00\x08\xd8\xcdo\x9e\xcbl\x83\xd4\x00\x00\x00\x08K\xc2\xed\\<\xd3{X'} |
| ranges = {bytes(b, "Latin-1") for b in ranges} |
| |
| for node in self.cluster.nodelist(): |
| session = self.patient_exclusive_cql_connection(node) |
| session.execute("INSERT INTO system.repairs " |
| "(parent_id, cfids, coordinator, coordinator_port, last_update, participants, participants_wp, ranges, repaired_at, started_at, state) " |
| "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", |
| [session_id, {cfid}, node1.address(), 7000, now, {n.address() for n in self.cluster.nodelist()}, |
| {str(n.address()) + ":7000" for n in self.cluster.nodelist()}, |
| ranges, now, now, ConsistentState.REPAIRING]) # 2=REPAIRING |
| |
| # as we faked repairs and inserted directly into system.repairs table, the current |
| # implementation in trunk (LocalSessions) only pulls the sessions via callbacks or |
| # from the system.repairs table once at startup. we need to stop and start the nodes |
| # as a way to force the repair sessions to get populated into the correct in-memory objects |
| time.sleep(1) |
| for node in self.cluster.nodelist(): |
| node.flush() |
| node.stop(gently=False) |
| |
| for node in self.cluster.nodelist(): |
| node.start() |
| |
| return session_id |
| |
| @since('4.0') |
| def test_manual_session_fail(self): |
| """ check manual failing of repair sessions via nodetool works properly """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| # make data inconsistent between nodes |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| assert "no sessions" in out.stdout |
| |
| session_id = self._make_fake_session('ks', 'tbl') |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "REPAIRING" in line |
| |
| node1.nodetool("repair_admin cancel --session {}".format(session_id)) |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin list --all') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "FAILED" in line |
| |
| @since('4.0') |
| def test_manual_session_cancel_non_coordinator_failure(self): |
| """ check manual failing of repair sessions via a node other than the coordinator fails """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| # make data inconsistent between nodes |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| assert "no sessions" in out.stdout |
| |
| session_id = self._make_fake_session('ks', 'tbl') |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "REPAIRING" in line |
| |
| try: |
| node2.nodetool("repair_admin --cancel --session {}".format(session_id)) |
| self.fail("cancel from a non coordinator should fail") |
| except ToolError: |
| pass # expected |
| |
| # nothing should have changed |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "REPAIRING" in line |
| |
| @since('4.0') |
| def test_manual_session_force_cancel(self): |
| """ check manual failing of repair sessions via a non-coordinator works if the --force flag is set """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| # make data inconsistent between nodes |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| assert "no sessions" in out.stdout |
| |
| session_id = self._make_fake_session('ks', 'tbl') |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "REPAIRING" in line |
| |
| node2.nodetool("repair_admin cancel --session {} --force".format(session_id)) |
| |
| for node in self.cluster.nodelist(): |
| out = node.nodetool('repair_admin list --all') |
| lines = out.stdout.split('\n') |
| assert len(lines) > 1 |
| line = lines[1] |
| assert re.match(str(session_id), line) |
| assert "FAILED" in line |
| |
| def test_sstable_marking(self): |
| """ |
| * Launch a three node cluster |
| * Stop node3 |
| * Write 10K rows with stress |
| * Start node3 |
| * Issue an incremental repair, and wait for it to finish |
| * Run sstablemetadata on every node, assert that all sstables are marked as repaired |
| """ |
| # hinted handoff can create SSTable that we don't need after node3 restarted |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| node3.stop(gently=True) |
| |
| node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=3)']) |
| node1.flush() |
| node2.flush() |
| |
| node3.start() |
| if node3.get_cassandra_version() < '2.2': |
| log_file = 'system.log' |
| else: |
| log_file = 'debug.log' |
| node3.watch_log_for("Initializing keyspace1.standard1", filename=log_file) |
| # wait for things to settle before starting repair |
| time.sleep(1) |
| if self.cluster.version() >= "2.2": |
| node3.repair() |
| else: |
| node3.nodetool("repair -par -inc") |
| |
| if self.cluster.version() >= '4.0': |
| # sstables are compacted out of pending repair by a compaction |
| for node in self.cluster.nodelist(): |
| node.nodetool('compact keyspace1 standard1') |
| |
| for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in self.cluster.nodelist()): |
| assert 'Repaired at: 0' not in out |
| |
| def test_multiple_repair(self): |
| """ |
| * Launch a three node cluster |
| * Create a keyspace with RF 3 and a table |
| * Insert 49 rows |
| * Stop node3 |
| * Insert 50 more rows |
| * Restart node3 |
| * Issue an incremental repair on node3 |
| * Stop node2 |
| * Insert a final50 rows |
| * Restart node2 |
| * Issue an incremental repair on node2 |
| * Replace node3 with a new node |
| * Verify data integrity |
| # TODO: Several more verifications of data need to be interspersed throughout the test. The final assertion is insufficient. |
| @jira_ticket CASSANDRA-10644 |
| """ |
| cluster = self.cluster |
| cluster.populate(3).start() |
| node1, node2, node3 = cluster.nodelist() |
| |
| session = self.patient_cql_connection(node1) |
| create_ks(session, 'ks', 3) |
| if cluster.version() < '4.0': |
| create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'}) |
| else: |
| create_cf(session, 'cf', columns={'c1': 'text', 'c2': 'text'}) |
| |
| logger.debug("insert data") |
| |
| insert_c1c2(session, keys=list(range(1, 50)), consistency=ConsistencyLevel.ALL) |
| node1.flush() |
| |
| logger.debug("bringing down node 3") |
| node3.flush() |
| node3.stop(gently=False) |
| |
| logger.debug("inserting additional data into node 1 and 2") |
| insert_c1c2(session, keys=list(range(50, 100)), consistency=ConsistencyLevel.TWO) |
| node1.flush() |
| node2.flush() |
| |
| logger.debug("restarting and repairing node 3") |
| node3.start(wait_for_binary_proto=True) |
| |
| if cluster.version() >= "2.2": |
| node3.repair() |
| else: |
| node3.nodetool("repair -par -inc") |
| |
| # wait stream handlers to be closed on windows |
| # after session is finished (See CASSANDRA-10644) |
| if is_win: |
| time.sleep(2) |
| |
| logger.debug("stopping node 2") |
| node2.stop(gently=False) |
| |
| logger.debug("inserting data in nodes 1 and 3") |
| insert_c1c2(session, keys=list(range(100, 150)), consistency=ConsistencyLevel.TWO) |
| node1.flush() |
| node3.flush() |
| |
| logger.debug("start and repair node 2") |
| node2.start(wait_for_binary_proto=True) |
| |
| if cluster.version() >= "2.2": |
| node2.repair() |
| else: |
| node2.nodetool("repair -par -inc") |
| |
| logger.debug("replace node and check data integrity") |
| node3.stop(gently=False) |
| node5 = Node('node5', cluster, True, ('127.0.0.5', 9160), ('127.0.0.5', 7000), '7500', '0', None, ('127.0.0.5', 9042)) |
| cluster.add(node5, False, data_center="dc1") |
| node5.start(replace_address='127.0.0.3') |
| |
| assert_one(session, "SELECT COUNT(*) FROM ks.cf LIMIT 200", [149]) |
| |
| def test_sstable_repairedset(self): |
| """ |
| * Launch a two node cluster |
| * Insert data with stress |
| * Stop node2 |
| * Run sstablerepairedset against node2 |
| * Start node2 |
| * Run sstablemetadata on both nodes, pipe to a file |
| * Verify the output of sstablemetadata shows no repairs have occurred |
| * Stop node1 |
| * Insert more data with stress |
| * Start node1 |
| * Issue an incremental repair |
| * Run sstablemetadata on both nodes again, pipe to a new file |
| * Verify repairs occurred and repairedAt was updated |
| """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false'}) |
| self.init_default_config() |
| self.cluster.populate(2).start() |
| node1, node2 = self.cluster.nodelist() |
| node1.stress(['write', 'n=10K', 'no-warmup', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)', '-rate', 'threads=50']) |
| |
| node1.flush() |
| node2.flush() |
| |
| node2.stop(gently=False) |
| |
| node2.run_sstablerepairedset(keyspace='keyspace1') |
| node2.start(wait_for_binary_proto=True) |
| |
| initialOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout |
| initialOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout |
| |
| matches = findall('(?<=Repaired at:).*', '\n'.join([initialOut1, initialOut2])) |
| logger.debug("Repair timestamps are: {}".format(matches)) |
| |
| uniquematches = set(matches) |
| matchcount = Counter(matches) |
| |
| assert len(uniquematches) >= 2, uniquematches |
| |
| assert len(max(matchcount)) >= 1, matchcount |
| |
| assert re.search('Repaired at: 0', '\n'.join([initialOut1, initialOut2])) |
| |
| node1.stop() |
| node2.stress(['write', 'n=15K', 'no-warmup', '-schema', 'replication(factor=2)']) |
| node2.flush() |
| node1.start(wait_for_binary_proto=True) |
| |
| if self.cluster.version() >= "2.2": |
| node1.repair() |
| else: |
| node1.nodetool("repair -par -inc") |
| |
| if self.cluster.version() >= '4.0': |
| # sstables are compacted out of pending repair by a compaction |
| for node in self.cluster.nodelist(): |
| node.nodetool('compact keyspace1 standard1') |
| |
| finalOut1 = node1.run_sstablemetadata(keyspace='keyspace1').stdout |
| if not isinstance(finalOut1, str): |
| finalOut1 = finalOut1 |
| finalOut2 = node2.run_sstablemetadata(keyspace='keyspace1').stdout |
| if not isinstance(finalOut2, str): |
| finalOut2 = finalOut2 |
| |
| matches = findall('(?<=Repaired at:).*', '\n'.join([finalOut1, finalOut2])) |
| |
| logger.debug(matches) |
| |
| uniquematches = set(matches) |
| matchcount = Counter(matches) |
| |
| assert len(uniquematches) >= 2 |
| |
| assert len(max(matchcount)) >= 2 |
| |
| assert not re.search('Repaired at: 0', '\n'.join([finalOut1, finalOut2])) |
| |
| def test_compaction(self): |
| """ |
| Test we can major compact after an incremental repair |
| * Launch a three node cluster |
| * Create a keyspace with RF 3 and a table |
| * Stop node3 |
| * Insert 100 rows |
| * Restart node3 |
| * Issue an incremental repair |
| * Insert 50 more rows |
| * Perform a major compaction on node3 |
| * Verify all data is present |
| # TODO: I have no idea what this is testing. The assertions do not verify anything meaningful. |
| # TODO: Fix all the string formatting |
| """ |
| cluster = self.cluster |
| cluster.populate(3).start() |
| node1, node2, node3 = cluster.nodelist() |
| |
| session = self.patient_cql_connection(node1) |
| create_ks(session, 'ks', 3) |
| session.execute("create table tab(key int PRIMARY KEY, val int);") |
| |
| node3.stop() |
| |
| for x in range(0, 100): |
| session.execute("insert into tab(key,val) values(" + str(x) + ",0)") |
| node1.flush() |
| |
| node3.start(wait_for_binary_proto=True) |
| |
| if cluster.version() >= "2.2": |
| node3.repair() |
| else: |
| node3.nodetool("repair -par -inc") |
| for x in range(0, 150): |
| session.execute("insert into tab(key,val) values(" + str(x) + ",1)") |
| |
| cluster.flush() |
| |
| node3.nodetool('compact') |
| |
| for x in range(0, 150): |
| assert_one(session, "select val from tab where key =" + str(x), [1]) |
| |
| @since("2.2") |
| def test_multiple_full_repairs_lcs(self): |
| """ |
| @jira_ticket CASSANDRA-11172 - repeated full repairs should not cause infinite loop in getNextBackgroundTask |
| """ |
| cluster = self.cluster |
| cluster.populate(2).start() |
| node1, node2 = cluster.nodelist() |
| for x in range(0, 10): |
| node1.stress(['write', 'n=100k', 'no-warmup', '-rate', 'threads=10', '-schema', 'compaction(strategy=LeveledCompactionStrategy,sstable_size_in_mb=10)', 'replication(factor=2)']) |
| cluster.flush() |
| cluster.wait_for_compactions() |
| node1.nodetool("repair -full keyspace1 standard1") |
| |
| @pytest.mark.env("long") |
| @pytest.mark.skip(reason='hangs CI') |
| def test_multiple_subsequent_repair(self): |
| """ |
| @jira_ticket CASSANDRA-8366 |
| |
| There is an issue with subsequent inc repairs increasing load size. |
| So we perform several repairs and check that the expected amount of data exists. |
| * Launch a three node cluster |
| * Write 5M rows with stress |
| * Wait for minor compactions to finish |
| * Issue an incremental repair on each node, sequentially |
| * Issue major compactions on each node |
| * Sleep for a while so load size can be propagated between nodes |
| * Verify the correct amount of data is on each node |
| """ |
| cluster = self.cluster |
| cluster.populate(3).start() |
| node1, node2, node3 = cluster.nodelist() |
| |
| logger.debug("Inserting data with stress") |
| node1.stress(['write', 'n=5M', 'no-warmup', '-rate', 'threads=10', '-schema', 'replication(factor=3)']) |
| |
| logger.debug("Flushing nodes") |
| cluster.flush() |
| |
| logger.debug("Waiting compactions to finish") |
| cluster.wait_for_compactions() |
| |
| if self.cluster.version() >= '2.2': |
| logger.debug("Repairing node1") |
| node1.nodetool("repair") |
| logger.debug("Repairing node2") |
| node2.nodetool("repair") |
| logger.debug("Repairing node3") |
| node3.nodetool("repair") |
| else: |
| logger.debug("Repairing node1") |
| node1.nodetool("repair -par -inc") |
| logger.debug("Repairing node2") |
| node2.nodetool("repair -par -inc") |
| logger.debug("Repairing node3") |
| node3.nodetool("repair -par -inc") |
| |
| # Using "print" instead of logger.debug() here is on purpose. The compactions |
| # take a long time and don't print anything by default, which can result |
| # in the test being timed out after 20 minutes. These print statements |
| # prevent it from being timed out. |
| print("compacting node1") |
| node1.compact() |
| print("compacting node2") |
| node2.compact() |
| print("compacting node3") |
| node3.compact() |
| |
| # wait some time to be sure the load size is propagated between nodes |
| logger.debug("Waiting for load size info to be propagated between nodes") |
| time.sleep(45) |
| |
| load_size_in_kb = float(sum([n.data_size() for n in [node1, node2, node3]])) |
| load_size = load_size_in_kb / 1024 / 1024 |
| logger.debug("Total Load size: {}GB".format(load_size)) |
| |
| # There is still some overhead, but it's lot better. We tolerate 25%. |
| expected_load_size = 4.5 # In GB |
| assert_almost_equal(load_size, expected_load_size, error=0.25) |
| |
| @pytest.mark.resource_intensive |
| def test_sstable_marking_not_intersecting_all_ranges(self): |
| """ |
| @jira_ticket CASSANDRA-10299 |
| * Launch a four node cluster |
| * Insert data with stress |
| * Issue an incremental repair on each node sequentially |
| * Assert no extra, unrepaired sstables are generated |
| """ |
| cluster = self.cluster |
| cluster.populate(4).start() |
| node1, node2, node3, node4 = cluster.nodelist() |
| |
| logger.debug("Inserting data with stress") |
| node1.stress(['write', 'n=3', 'no-warmup', '-rate', 'threads=1', '-schema', 'replication(factor=3)']) |
| |
| logger.debug("Flushing nodes") |
| cluster.flush() |
| |
| repair_options = '' if self.cluster.version() >= '2.2' else '-inc -par' |
| |
| logger.debug("Repairing node 1") |
| node1.nodetool("repair {}".format(repair_options)) |
| logger.debug("Repairing node 2") |
| node2.nodetool("repair {}".format(repair_options)) |
| logger.debug("Repairing node 3") |
| node3.nodetool("repair {}".format(repair_options)) |
| logger.debug("Repairing node 4") |
| node4.nodetool("repair {}".format(repair_options)) |
| |
| if cluster.version() >= '4.0': |
| # sstables are compacted out of pending repair by a compaction |
| for node in cluster.nodelist(): |
| node.nodetool('compact keyspace1 standard1') |
| |
| for out in (node.run_sstablemetadata(keyspace='keyspace1').stdout for node in cluster.nodelist() if len(node.get_sstables('keyspace1', 'standard1')) > 0): |
| assert 'Repaired at: 0' not in out |
| |
| @pytest.mark.no_vnodes |
| @since('4.0') |
| def test_move(self): |
| """ Test repaired data remains in sync after a move """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(4, tokens=[0, 2**32, 2**48, -(2**32)]).start() |
| node1, node2, node3, node4 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| # insert some data |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| for i in range(1000): |
| session.execute(stmt, (i, i)) |
| |
| node1.repair(options=['ks']) |
| |
| for i in range(1000): |
| v = i + 1000 |
| session.execute(stmt, (v, v)) |
| |
| # everything should be in sync |
| for node in self.cluster.nodelist(): |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| node2.nodetool('move {}'.format(2**16)) |
| |
| # everything should still be in sync |
| for node in self.cluster.nodelist(): |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| @pytest.mark.no_vnodes |
| @since('4.0') |
| def test_decommission(self): |
| """ Test repaired data remains in sync after a decommission """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(4).start() |
| node1, node2, node3, node4 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| # insert some data |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| for i in range(1000): |
| session.execute(stmt, (i, i)) |
| |
| node1.repair(options=['ks']) |
| |
| for i in range(1000): |
| v = i + 1000 |
| session.execute(stmt, (v, v)) |
| |
| # everything should be in sync |
| for node in self.cluster.nodelist(): |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| node2.nodetool('decommission') |
| |
| # everything should still be in sync |
| for node in [node1, node3, node4]: |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| @pytest.mark.no_vnodes |
| @since('4.0') |
| def test_bootstrap(self): |
| """ Test repaired data remains in sync after a bootstrap """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| |
| # insert some data |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| for i in range(1000): |
| session.execute(stmt, (i, i)) |
| |
| node1.repair(options=['ks']) |
| |
| for i in range(1000): |
| v = i + 1000 |
| session.execute(stmt, (v, v)) |
| |
| # everything should be in sync |
| for node in [node1, node2, node3]: |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| node4 = new_node(self.cluster) |
| node4.start(wait_for_binary_proto=True) |
| |
| assert len(self.cluster.nodelist()) == 4 |
| # everything should still be in sync |
| for node in self.cluster.nodelist(): |
| result = node.repair(options=['ks', '--validate']) |
| assert "Repaired data is in sync" in result.stdout |
| |
| @since('4.0') |
| def test_force(self): |
| """ |
| forcing an incremental repair should incrementally repair any nodes |
| that are up, but should not promote the sstables to repaired |
| """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i)) |
| |
| node2.stop(wait_other_notice=True) |
| |
| # repair should fail because node2 is down |
| with pytest.raises(ToolError): |
| node1.repair(options=['ks']) |
| |
| # run with force flag |
| node1.repair(options=['ks', '--force']) |
| |
| # ... and verify nothing was promoted to repaired |
| self.assertNoRepairedSSTables(node1, 'ks') |
| self.assertNoRepairedSSTables(node2, 'ks') |
| |
| @since('4.0') |
| def test_force_with_none_down(self): |
| """ |
| if we force an incremental repair, but all the involved nodes are up, |
| we should run normally and promote sstables afterwards |
| """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i)) |
| |
| # run with force flag |
| node1.repair(options=['ks', '--force']) |
| |
| # ... and verify everything was still promoted |
| self.assertAllRepairedSSTables(node1, 'ks') |
| self.assertAllRepairedSSTables(node2, 'ks') |
| self.assertAllRepairedSSTables(node3, 'ks') |
| |
| @since('4.0') |
| def test_hosts(self): |
| """ |
| running an incremental repair with hosts specified should incrementally repair |
| the given nodes, but should not promote the sstables to repaired |
| """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i)) |
| |
| # run with force flag |
| node1.repair(options=['ks', '-hosts', ','.join([node1.address(), node2.address()])]) |
| |
| # ... and verify nothing was promoted to repaired |
| self.assertNoRepairedSSTables(node1, 'ks') |
| self.assertNoRepairedSSTables(node2, 'ks') |
| |
| @since('4.0') |
| def test_subrange(self): |
| """ |
| running an incremental repair with hosts specified should incrementally repair |
| the given nodes, but should not promote the sstables to repaired |
| """ |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500, |
| 'partitioner': 'org.apache.cassandra.dht.Murmur3Partitioner'}) |
| self.init_default_config() |
| self.cluster.populate(3).start() |
| node1, node2, node3 = self.cluster.nodelist() |
| |
| session = self.patient_exclusive_cql_connection(node3) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 3}") |
| session.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)") |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k,v) VALUES (%s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| self.assertNoRepairedSSTables(node, 'ks') |
| |
| # only repair the partition k=0 |
| token = Murmur3Token.from_key(bytes([0, 0, 0, 0])) |
| # import ipdb; ipdb.set_trace() |
| # run with force flag |
| node1.repair(options=['ks', '-st', str(token.value - 1), '-et', str(token.value)]) |
| |
| # verify we have a mix of repaired and unrepaired sstables |
| self.assertRepairedAndUnrepaired(node1, 'ks') |
| self.assertRepairedAndUnrepaired(node2, 'ks') |
| self.assertRepairedAndUnrepaired(node3, 'ks') |
| |
| @since('4.0') |
| def test_repaired_tracking_with_partition_deletes(self): |
| """ |
| check that when an tracking repaired data status following a digest mismatch, |
| repaired data mismatches are marked as unconfirmed as we may skip sstables |
| after the partition delete are encountered. |
| @jira_ticket CASSANDRA-14145 |
| """ |
| session, node1, node2 = self.setup_for_repaired_data_tracking() |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| self.assertNoRepairedSSTables(node, 'ks') |
| |
| node1.repair(options=['ks']) |
| node2.stop(wait_other_notice=True) |
| |
| session.execute("delete from ks.tbl where k = 5") |
| |
| node1.flush() |
| node2.start() |
| |
| # expect unconfirmed inconsistencies as the partition deletes cause some sstables to be skipped |
| with JolokiaAgent(node1) as jmx: |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5", |
| expect_unconfirmed_inconsistencies=True) |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5", |
| expect_unconfirmed_inconsistencies=True) |
| # no digest reads for range queries so blocking read repair metric isn't incremented |
| # *all* sstables are read for partition ranges too, and as the repaired set is still in sync there should |
| # be no inconsistencies |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False) |
| |
| @since('4.0') |
| def test_repaired_tracking_with_varying_sstable_sets(self): |
| """ |
| verify that repaired data digests are computed over the merged data for each replica |
| and that the particular number of sstables on each doesn't affect the comparisons |
| both replicas start with the same repaired set, comprising 2 sstables. node1's is |
| then compacted and additional unrepaired data added (which overwrites some in the |
| repaired set). We expect the repaired digests to still match as the tracking will |
| force all sstables containing the partitions to be read |
| there are two variants of this, for single partition slice & names reads and range reads |
| @jira_ticket CASSANDRA-14145 |
| """ |
| session, node1, node2 = self.setup_for_repaired_data_tracking() |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| |
| for i in range(10,20): |
| session.execute(stmt, (i, i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| self.assertNoRepairedSSTables(node, 'ks') |
| |
| node1.repair(options=['ks']) |
| node2.stop(wait_other_notice=True) |
| |
| session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)") |
| session.execute("insert into ks.tbl (k, c, v) values (15, 15, 155)") |
| node1.flush() |
| node1.compact() |
| node1.compact() |
| node2.start() |
| |
| # we don't expect any inconsistencies as all repaired data is read on both replicas |
| with JolokiaAgent(node1) as jmx: |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5") |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5") |
| # no digest reads for range queries so read repair metric isn't incremented |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", expect_read_repair=False) |
| |
| @since('4.0') |
| def test_repaired_tracking_with_mismatching_replicas(self): |
| """ |
| verify that when replicas have different repaired sets, this can be detected via the digests |
| computed at read time. All nodes have start with the same data, but only 1 replica's sstables |
| are marked repaired. Then a divergence is introduced by overwriting on 1 replica only, which |
| is required to trigger a digest mismatch & full data read (for single partition reads). |
| As the repaired sets are different between the replicas, but no other shortcutting occurs |
| (no partition tombstones or sstable skipping) and no sstables are involved in pending repair |
| session, we expect confirmed inconsistencies to be reported. |
| there are two variants of this, for single partition slice & names reads and range reads |
| @jira_ticket CASSANDRA-14145 |
| """ |
| session, node1, node2 = self.setup_for_repaired_data_tracking() |
| stmt = SimpleStatement("INSERT INTO ks.tbl (k, c, v) VALUES (%s, %s, %s)") |
| stmt.consistency_level = ConsistencyLevel.ALL |
| for i in range(10): |
| session.execute(stmt, (i, i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| |
| for i in range(10,20): |
| session.execute(stmt, (i, i, i)) |
| |
| for node in self.cluster.nodelist(): |
| node.flush() |
| self.assertNoRepairedSSTables(node, 'ks') |
| |
| # stop node 2 and mark its sstables repaired |
| node2.stop(wait_other_notice=True) |
| node2.run_sstablerepairedset(keyspace='ks') |
| # before restarting node2 overwrite some data on node1 to trigger digest mismatches |
| session.execute("insert into ks.tbl (k, c, v) values (5, 5, 55)") |
| node2.start(wait_for_binary_proto=True) |
| |
| out1 = node1.run_sstablemetadata(keyspace='ks').stdout |
| out2 = node2.run_sstablemetadata(keyspace='ks').stdout |
| |
| # verify the repaired at times for the sstables on node1/node2 |
| assert all(t == 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out1)]]) |
| assert all(t > 0 for t in [int(x) for x in [y.split(' ')[0] for y in findall('(?<=Repaired at: ).*', out2)]]) |
| |
| # we expect inconsistencies due to sstables being marked repaired on one replica only |
| # these are marked confirmed because no sessions are pending & all sstables are |
| # skipped due to partition deletes |
| with JolokiaAgent(node1) as jmx: |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5", |
| expect_confirmed_inconsistencies=True) |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl WHERE k = 5 AND c = 5", |
| expect_confirmed_inconsistencies=True) |
| # no digest reads for range queries so read repair metric isn't incremented |
| self.query_and_check_repaired_mismatches(jmx, session, "SELECT * FROM ks.tbl", |
| expect_confirmed_inconsistencies=True, |
| expect_read_repair=False) |
| |
| @since('4.0') |
| def test_parent_repair_session_cleanup(self): |
| """ |
| Calls incremental repair on 3 node cluster and verifies if all ParentRepairSession objects are cleaned |
| @jira_ticket CASSANDRA-16446 |
| """ |
| self.cluster.populate(3).start() |
| session = self.patient_cql_connection(self.cluster.nodelist()[0]) |
| create_ks(session, 'ks', 2) |
| create_c1c2_table(self, session) |
| |
| for node in self.cluster.nodelist(): |
| node.repair(options=['ks']) |
| |
| assert_parent_repair_session_count(self.cluster.nodelist(), 0) |
| |
| def setup_for_repaired_data_tracking(self): |
| self.fixture_dtest_setup.setup_overrides.cluster_options = ImmutableMapping({'hinted_handoff_enabled': 'false', |
| 'num_tokens': 1, |
| 'commitlog_sync_period_in_ms': 500}) |
| self.init_default_config() |
| self.cluster.populate(2) |
| node1, node2 = self.cluster.nodelist() |
| self.cluster.start() |
| |
| session = self.patient_exclusive_cql_connection(node1) |
| session.execute("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 2}") |
| session.execute("CREATE TABLE ks.tbl (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'") |
| return session, node1, node2 |
| |
| def query_and_check_repaired_mismatches(self, jmx, session, query, |
| expect_read_repair=True, |
| expect_unconfirmed_inconsistencies=False, |
| expect_confirmed_inconsistencies=False): |
| |
| rr_count = make_mbean('metrics', type='ReadRepair', name='ReconcileRead') |
| unconfirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesUnconfirmed,scope=tbl') |
| confirmed_count = make_mbean('metrics', type='Table,keyspace=ks', name='RepairedDataInconsistenciesConfirmed,scope=tbl') |
| |
| rr_before = self.get_attribute_count(jmx, rr_count) |
| uc_before = self.get_attribute_count(jmx, unconfirmed_count) |
| cc_before = self.get_attribute_count(jmx, confirmed_count) |
| |
| stmt = SimpleStatement(query) |
| stmt.consistency_level = ConsistencyLevel.ALL |
| session.execute(stmt) |
| |
| rr_after = self.get_attribute_count(jmx, rr_count) |
| uc_after = self.get_attribute_count(jmx, unconfirmed_count) |
| cc_after = self.get_attribute_count(jmx, confirmed_count) |
| |
| logger.debug("Read Repair Count: {before}, {after}".format(before=rr_before, after=rr_after)) |
| logger.debug("Unconfirmed Inconsistency Count: {before}, {after}".format(before=uc_before, after=uc_after)) |
| logger.debug("Confirmed Inconsistency Count: {before}, {after}".format(before=cc_before, after=cc_after)) |
| |
| if expect_read_repair: |
| assert rr_after > rr_before |
| else: |
| assert rr_after == rr_before |
| |
| if expect_unconfirmed_inconsistencies: |
| assert uc_after > uc_before |
| else: |
| assert uc_after == uc_before |
| |
| if expect_confirmed_inconsistencies: |
| assert cc_after > cc_before |
| else: |
| assert cc_after == cc_before |
| |
| def get_attribute_count(self, jmx, bean): |
| # the MBean may not have been initialized, in which case Jolokia agent will return |
| # a HTTP 404 response. If we receive such, we know that the count can only be 0 |
| if jmx.has_mbean(bean): |
| return jmx.read_attribute(bean, 'Count') |
| else: |
| return 0 |