| import json |
| import os |
| import random |
| import re |
| import subprocess |
| import pytest |
| import logging |
| |
| from ccmlib import common |
| from ccmlib.node import ToolError |
| |
| from dtest import Tester, create_ks |
| |
| since = pytest.mark.since |
| logger = logging.getLogger(__name__) |
| |
| |
| class TestOfflineTools(Tester): |
| |
| @pytest.fixture(autouse=True) |
| def fixture_add_additional_log_patterns(self, fixture_dtest_setup): |
| fixture_dtest_setup.ignore_log_patterns = ( |
| # In 2.0, we will get this error log message due to jamm not being |
| # in the classpath |
| "Unable to initialize MemoryMeter" |
| ) |
| |
| def test_sstablelevelreset(self): |
| """ |
| Insert data and call sstablelevelreset on a series of |
| tables. Confirm level is reset to 0 using its output. |
| Test a variety of possible errors and ensure response is resonable. |
| @since 2.1.5 |
| @jira_ticket CASSANDRA-7614 |
| """ |
| cluster = self.cluster |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| node1 = cluster.nodelist()[0] |
| |
| # test by trying to run on nonexistent keyspace |
| cluster.stop(gently=False) |
| try: |
| node1.run_sstablelevelreset("keyspace1", "standard1") |
| except ToolError as e: |
| assert re.search("ColumnFamily not found: keyspace1/standard1", str(e)) |
| # this should return exit code 1 |
| assert e.exit_status == 1, "Expected sstablelevelreset to have a return code of 1 == but instead return code was {}".format(e.exit_status) |
| |
| # now test by generating keyspace but not flushing sstables |
| cluster.start(wait_for_binary_proto=True) |
| node1.stress(['write', 'n=100', 'no-warmup', '-schema', 'replication(factor=1)', |
| '-rate', 'threads=8']) |
| cluster.stop(gently=False) |
| |
| output, error, rc = node1.run_sstablelevelreset("keyspace1", "standard1") |
| self._check_stderr_error(error) |
| assert re.search("Found no sstables, did you give the correct keyspace", output) |
| assert rc == 0, str(rc) |
| |
| # test by writing small amount of data and flushing (all sstables should be level 0) |
| cluster.start(wait_for_binary_proto=True) |
| session = self.patient_cql_connection(node1) |
| session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1};") |
| node1.stress(['write', 'n=1K', 'no-warmup', '-schema', 'replication(factor=1)', |
| '-rate', 'threads=8']) |
| node1.flush() |
| cluster.stop(gently=False) |
| |
| output, error, rc = node1.run_sstablelevelreset("keyspace1", "standard1") |
| self._check_stderr_error(error) |
| assert re.search("since it is already on level 0", output) |
| assert rc == 0, str(rc) |
| |
| # test by loading large amount data so we have multiple levels and checking all levels are 0 at end |
| cluster.start(wait_for_binary_proto=True) |
| node1.stress(['write', 'n=50K', 'no-warmup', '-schema', 'replication(factor=1)', |
| '-rate', 'threads=8']) |
| cluster.flush() |
| self.wait_for_compactions(node1) |
| cluster.stop() |
| |
| initial_levels = self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"])) |
| _, error, rc = node1.run_sstablelevelreset("keyspace1", "standard1") |
| final_levels = self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"])) |
| self._check_stderr_error(error) |
| assert rc == 0, str(rc) |
| |
| logger.debug(initial_levels) |
| logger.debug(final_levels) |
| |
| # let's make sure there was at least L1 beforing resetting levels |
| assert max(initial_levels) > 0 |
| |
| # let's check all sstables are on L0 after sstablelevelreset |
| assert max(final_levels) == 0 |
| |
| def get_levels(self, data): |
| (out, err, rc) = data |
| return list(map(int, re.findall("SSTable Level: ([0-9])", out))) |
| |
| def wait_for_compactions(self, node): |
| pattern = re.compile("pending tasks: 0") |
| while True: |
| output, err, _ = node.nodetool("compactionstats") |
| if pattern.search(output): |
| break |
| |
| def test_sstableofflinerelevel(self): |
| """ |
| Generate sstables of varying levels. |
| Reset sstables to L0 with sstablelevelreset |
| Run sstableofflinerelevel and ensure tables are promoted correctly |
| Also test a variety of bad inputs including nonexistent keyspace and sstables |
| @since 2.1.5 |
| @jira_ticket CASSANRDA-8031 |
| """ |
| cluster = self.cluster |
| cluster.set_configuration_options(values={'compaction_throughput_mb_per_sec': 0}) |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| node1 = cluster.nodelist()[0] |
| |
| # NOTE - As of now this does not return when it encounters Exception and causes test to hang, temporarily commented out |
| # test by trying to run on nonexistent keyspace |
| # cluster.stop(gently=False) |
| # output, error, rc = node1.run_sstableofflinerelevel("keyspace1", "standard1", output=True) |
| # assert "java.lang.IllegalArgumentException: Unknown keyspace/columnFamily keyspace1.standard1" in error |
| # # this should return exit code 1 |
| # assert rc, 1 == msg=str(rc) |
| # cluster.start() |
| |
| # now test by generating keyspace but not flushing sstables |
| |
| node1.stress(['write', 'n=1', 'no-warmup', |
| '-schema', 'replication(factor=1)', |
| '-col', 'n=FIXED(10)', 'SIZE=FIXED(1024)', |
| '-rate', 'threads=8']) |
| |
| cluster.stop(gently=False) |
| try: |
| output, error, _ = node1.run_sstableofflinerelevel("keyspace1", "standard1") |
| except ToolError as e: |
| assert re.search("No sstables to relevel for keyspace1.standard1", e.stdout) |
| assert e.exit_status == 1, str(e.exit_status) |
| |
| # test by flushing (sstable should be level 0) |
| cluster.start(wait_for_binary_proto=True) |
| session = self.patient_cql_connection(node1) |
| logger.debug("Altering compaction strategy to LCS") |
| session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'enabled':'false'};") |
| |
| node1.stress(['write', 'n=1K', 'no-warmup', |
| '-schema', 'replication(factor=1)', |
| '-col', 'n=FIXED(10)', 'SIZE=FIXED(1024)', |
| '-rate', 'threads=8']) |
| |
| node1.flush() |
| cluster.stop() |
| |
| output, _, rc = node1.run_sstableofflinerelevel("keyspace1", "standard1") |
| assert re.search("L0=1", output) |
| assert rc == 0, str(rc) |
| |
| cluster.start(wait_for_binary_proto=True) |
| node1.nodetool('enableautocompaction keyspace1 standard1') |
| # test by loading large amount data so we have multiple sstables |
| # must write enough to create more than just L1 sstables |
| keys = 8 * cluster.data_dir_count |
| node1.stress(['write', 'n={0}K'.format(keys), 'no-warmup', |
| '-schema', 'replication(factor=1)', |
| '-col', 'n=FIXED(10)', 'SIZE=FIXED(1200)', |
| '-rate', 'threads=8']) |
| |
| node1.flush() |
| logger.debug("Waiting for compactions to finish") |
| self.wait_for_compactions(node1) |
| logger.debug("Stopping node") |
| cluster.stop() |
| logger.debug("Done stopping node") |
| |
| # Let's reset all sstables to L0 |
| logger.debug("Getting initial levels") |
| initial_levels = list(self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"]))) |
| assert [] != initial_levels |
| logger.debug('initial_levels:') |
| logger.debug(initial_levels) |
| logger.debug("Running sstablelevelreset") |
| node1.run_sstablelevelreset("keyspace1", "standard1") |
| logger.debug("Getting final levels") |
| final_levels = list(self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"]))) |
| assert [] != final_levels |
| logger.debug('final levels:') |
| logger.debug(final_levels) |
| |
| # let's make sure there was at least 3 levels (L0, L1 and L2) |
| assert max(initial_levels) > 1 |
| # let's check all sstables are on L0 after sstablelevelreset |
| assert max(final_levels) == 0 |
| |
| # time to relevel sstables |
| logger.debug("Getting initial levels") |
| initial_levels = self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"])) |
| logger.debug("Running sstableofflinerelevel") |
| output, error, _ = node1.run_sstableofflinerelevel("keyspace1", "standard1") |
| logger.debug("Getting final levels") |
| final_levels = self.get_levels(node1.run_sstablemetadata(keyspace="keyspace1", column_families=["standard1"])) |
| |
| logger.debug(output) |
| logger.debug(error) |
| |
| logger.debug(initial_levels) |
| logger.debug(final_levels) |
| |
| # let's check sstables were promoted after releveling |
| assert max(final_levels) > 1 |
| |
| @since('2.2') |
| def test_sstableverify(self): |
| """ |
| Generate sstables and test offline verification works correctly |
| Test on bad input: nonexistent keyspace and sstables |
| Test on potential situations: deleted sstables, corrupted sstables |
| """ |
| cluster = self.cluster |
| cluster.populate(3).start(wait_for_binary_proto=True) |
| node1, node2, node3 = cluster.nodelist() |
| |
| # test on nonexistent keyspace |
| try: |
| (out, err, rc) = node1.run_sstableverify("keyspace1", "standard1") |
| except ToolError as e: |
| assert "Unknown keyspace/table keyspace1.standard1" in repr(e) |
| assert e.exit_status == 1, str(e.exit_status) |
| |
| # test on nonexistent sstables: |
| node1.stress(['write', 'n=100', 'no-warmup', '-schema', 'replication(factor=3)', |
| '-rate', 'threads=8']) |
| (out, err, rc) = node1.run_sstableverify("keyspace1", "standard1") |
| assert rc == 0, str(rc) |
| |
| # Generate multiple sstables and test works properly in the simple case |
| node1.stress(['write', 'n=100K', 'no-warmup', '-schema', 'replication(factor=3)', |
| '-rate', 'threads=8']) |
| node1.flush() |
| node1.stress(['write', 'n=100K', 'no-warmup', '-schema', 'replication(factor=3)', |
| '-rate', 'threads=8']) |
| node1.flush() |
| cluster.stop() |
| |
| (out, error, rc) = node1.run_sstableverify("keyspace1", "standard1") |
| |
| assert rc == 0, str(rc) |
| |
| # STDOUT of the sstableverify command consists of multiple lines which may contain |
| # Java-normalized paths. To later compare these with Python-normalized paths, we |
| # map over each line of out and replace Java-normalized paths with Python equivalents. |
| outlines = [re.sub("(?<=path=').*(?=')", |
| lambda match: os.path.normcase(match.group(0)), |
| line) for line in out.splitlines()] |
| |
| # check output is correct for each sstable |
| sstables = self._get_final_sstables(node1, "keyspace1", "standard1") |
| |
| for sstable in sstables: |
| verified = False |
| hashcomputed = False |
| for line in outlines: |
| if sstable in line: |
| if "Verifying BigTableReader" in line: |
| verified = True |
| elif "Checking computed hash of BigTableReader" in line: |
| hashcomputed = True |
| else: |
| logger.debug(line) |
| |
| logger.debug(verified) |
| logger.debug(hashcomputed) |
| logger.debug(sstable) |
| assert verified and hashcomputed |
| |
| # now try intentionally corrupting an sstable to see if hash computed is different and error recognized |
| sstable1 = sstables[1] |
| with open(sstable1, 'rb') as f: |
| sstabledata = bytearray(f.read()) |
| with open(sstable1, 'wb') as out: |
| position = random.randrange(0, len(sstabledata)) |
| sstabledata[position] = (sstabledata[position] + 1) % 256 |
| out.write(sstabledata) |
| |
| # use verbose to get some coverage on it |
| try: |
| (out, error, rc) = node1.run_sstableverify("keyspace1", "standard1", options=['-v']) |
| except ToolError as e: |
| # Process sstableverify output to normalize paths in string to Python casing as above |
| error = re.sub("(?<=Corrupted: ).*", lambda match: os.path.normcase(match.group(0)), str(e)) |
| |
| assert re.search("Corrupted: " + sstable1, error) |
| assert e.exit_status == 1, str(e.exit_status) |
| |
| def test_sstableexpiredblockers(self): |
| cluster = self.cluster |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| [node1] = cluster.nodelist() |
| session = self.patient_cql_connection(node1) |
| create_ks(session, 'ks', 1) |
| session.execute("create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds=0") |
| # create a blocker: |
| session.execute("insert into ks.cf (key, val) values (1,1)") |
| node1.flush() |
| session.execute("delete from ks.cf where key = 2") |
| node1.flush() |
| session.execute("delete from ks.cf where key = 3") |
| node1.flush() |
| out, error, _ = node1.run_sstableexpiredblockers(keyspace="ks", column_family="cf") |
| assert "blocks 2 expired sstables from getting dropped" in out |
| |
| # 4.0 removes back compatibility with pre-3.0 versions, so testing upgradesstables for |
| # paths from those versions to 4.0 is invalid (and can only fail). There isn't currently |
| # any difference between the 3.0 and 4.0 sstable format though, but when the version is |
| # bumped for 4.0, remove the max_version & add a case for testing a 3.0 -> 4.0 upgrade |
| @since('2.2', max_version='3.X') |
| def test_sstableupgrade(self): |
| """ |
| Test that sstableupgrade functions properly offline on a same-version Cassandra sstable, a |
| stdout message of "Found 0 sstables that need upgrading." should be returned. |
| """ |
| # Set up original node version to test for upgrade |
| cluster = self.cluster |
| testversion = cluster.version() |
| original_install_dir = cluster.get_install_dir() |
| logger.debug('Original install dir: {}'.format(original_install_dir)) |
| |
| # Set up last major version to upgrade from, assuming 2.1 branch is the oldest tested version |
| if testversion < '2.2': |
| # Upgrading from 2.0->2.1 fails due to the jamm 0.2.5->0.3.0 jar update. |
| # ** This will happen again next time jamm version is upgraded. |
| # CCM doesn't handle this upgrade correctly and results in an error when flushing 2.1: |
| # Error opening zip file or JAR manifest missing : /home/mshuler/git/cassandra/lib/jamm-0.2.5.jar |
| # The 2.1 installed jamm version is 0.3.0, but bin/cassandra.in.sh used by nodetool still has 0.2.5 |
| # (when this is fixed in CCM issue #463, install version='github:apache/cassandra-2.0' as below) |
| pytest.skip('Skipping 2.1 test due to jamm.jar version upgrade problem in CCM node configuration.') |
| elif testversion < '3.0': |
| logger.debug('Test version: {} - installing github:apache/cassandra-2.1'.format(testversion)) |
| cluster.set_install_dir(version='github:apache/cassandra-2.1') |
| # As of 3.5, sstable format 'ma' from 3.0 is still the latest - install 2.2 to upgrade from |
| elif testversion < '4.0': |
| logger.debug('Test version: {} - installing github:apache/cassandra-2.2'.format(testversion)) |
| cluster.set_install_dir(version='github:apache/cassandra-2.2') |
| # From 4.0, one can only upgrade from 3.0 |
| else: |
| logger.debug('Test version: {} - installing github:apache/cassandra-3.0'.format(testversion)) |
| cluster.set_install_dir(version='github:apache/cassandra-3.0') |
| |
| # Start up last major version, write out an sstable to upgrade, and stop node |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| [node1] = cluster.nodelist() |
| # Check that node1 is actually what we expect |
| logger.debug('Downgraded install dir: {}'.format(node1.get_install_dir())) |
| session = self.patient_cql_connection(node1) |
| create_ks(session, 'ks', 1) |
| session.execute('create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds=0') |
| session.execute('insert into ks.cf (key, val) values (1,1)') |
| node1.flush() |
| cluster.stop() |
| logger.debug('Beginning ks.cf sstable: {}'.format(node1.get_sstables(keyspace='ks', column_family='cf'))) |
| |
| # Upgrade Cassandra to original testversion and run sstableupgrade |
| cluster.set_install_dir(original_install_dir) |
| # Check that node1 is actually upgraded |
| logger.debug('Upgraded to original install dir: {}'.format(node1.get_install_dir())) |
| # Perform a node start/stop so system tables get internally updated, otherwise we may get "Unknown keyspace/table ks.cf" |
| cluster.start(wait_for_binary_proto=True) |
| node1.flush() |
| cluster.stop() |
| |
| # A bit hacky, but we can only upgrade to 4.0 from 3.0, but both use the |
| # same sstable major format currently, so there is no upgrading to do. |
| # So on 4.0, we only test that sstable upgrade detect there is no |
| # upgrade. We'll removed that test if 4.0 introduce a major sstable |
| # change before it's release. |
| if testversion < '4.0': |
| (out, error, rc) = node1.run_sstableupgrade(keyspace='ks', column_family='cf') |
| out = str(out) |
| error = str(error) |
| logger.debug(out) |
| logger.debug(error) |
| logger.debug('Upgraded ks.cf sstable: {}'.format(node1.get_sstables(keyspace='ks', column_family='cf'))) |
| assert 'Found 1 sstables that need upgrading.' in str(out) |
| |
| # Check that sstableupgrade finds no upgrade needed on current version. |
| (out, error, rc) = node1.run_sstableupgrade(keyspace='ks', column_family='cf') |
| out = str(out) |
| error = str(error) |
| logger.debug(out) |
| logger.debug(error) |
| assert 'Found 0 sstables that need upgrading.' in out |
| |
| @since('3.0') |
| def test_sstabledump(self): |
| """ |
| Test that sstabledump functions properly offline to output the contents of a table. |
| """ |
| cluster = self.cluster |
| # disable JBOD conf since the test expects exactly one SSTable to be written. |
| cluster.set_datadir_count(1) |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| [node1] = cluster.nodelist() |
| session = self.patient_cql_connection(node1) |
| create_ks(session, 'ks', 1) |
| session.execute('create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds=0') |
| session.execute('insert into ks.cf (key, val) values (1,1)') |
| |
| # delete a partition and then insert a row to test CASSANDRA-13177 |
| session.execute('DELETE FROM ks.cf WHERE key = 2') |
| session.execute('INSERT INTO ks.cf (key, val) VALUES (2, 2)') |
| |
| node1.flush() |
| cluster.stop() |
| [(out, error, rc)] = node1.run_sstabledump(keyspace='ks', column_families=['cf']) |
| logger.debug(out) |
| logger.debug(error) |
| |
| # Load the json output and check that it contains the inserted key=1 |
| s = json.loads(out) |
| logger.debug(s) |
| assert len(s) == 2 |
| |
| # order the rows so that we have key=1 first, then key=2 |
| row0, row1 = s |
| (row0, row1) = (row0, row1) if row0['partition']['key'] == ['1'] else (row1, row0) |
| |
| assert row0['partition']['key'] == ['1'] |
| |
| assert row1['partition']['key'] == ['2'] |
| assert row1['partition'].get('deletion_info') is not None |
| assert row1.get('rows') is not None |
| |
| # Check that we only get the key back using the enumerate option |
| [(out, error, rc)] = node1.run_sstabledump(keyspace='ks', column_families=['cf'], enumerate_keys=True) |
| logger.debug(out) |
| logger.debug(error) |
| s = json.loads(out) |
| logger.debug(s) |
| assert len(s) == 2 |
| dumped_keys = set(row[0] for row in s) |
| assert {'1', '2'} == dumped_keys |
| |
| def _check_stderr_error(self, error): |
| acceptable = ["Max sstable size of", "Consider adding more capacity", "JNA link failure", "Class JavaLaunchHelper is implemented in both"] |
| |
| if len(error) > 0: |
| for line in error.splitlines(): |
| assert any([msg in line for msg in acceptable]), \ |
| 'Found line \n\n"{line}"\n\n in error\n\n{error}'.format(line=line, error=error) |
| |
| def _get_final_sstables(self, node, ks, table): |
| """ |
| Return the node final sstable data files, excluding the temporary tables. |
| If sstableutil exists (>= 3.0) then we rely on this tool since the table |
| file names no longer contain tmp in their names (CASSANDRA-7066). |
| """ |
| # Get all sstable data files |
| allsstables = list(map(os.path.normcase, node.get_sstables(ks, table))) |
| |
| # Remove any temporary files |
| tool_bin = node.get_tool('sstableutil') |
| if os.path.isfile(tool_bin): |
| args = [tool_bin, '--type', 'tmp', ks, table] |
| env = common.make_cassandra_env(node.get_install_cassandra_root(), node.get_node_cassandra_root()) |
| p = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| (stdout, stderr) = p.communicate() |
| tmpsstables = list(map(os.path.normcase, stdout.splitlines())) |
| |
| ret = list(set(allsstables) - set(tmpsstables)) |
| else: |
| ret = [sstable for sstable in allsstables if "tmp" not in sstable[50:]] |
| |
| return ret |