| import os |
| import os.path |
| import re |
| |
| from ccmlib.node import Node |
| from dtest import DISABLE_VNODES, Tester, create_ks, debug |
| from tools.assertions import assert_almost_equal |
| from tools.data import create_c1c2_table, insert_c1c2, query_c1c2 |
| from tools.decorators import since |
| from tools.jmxutils import (JolokiaAgent, make_mbean, |
| remove_perf_disable_shared_mem) |
| from tools.misc import new_node |
| from compaction_test import grep_sstables_in_each_level |
| |
| |
| @since('3.2') |
| class TestDiskBalance(Tester): |
| """ |
| @jira_ticket CASSANDRA-6696 |
| """ |
| |
| def disk_balance_stress_test(self): |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 256}) |
| cluster.populate(4).start(wait_for_binary_proto=True) |
| node1 = cluster.nodes['node1'] |
| |
| node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', '-schema', 'replication(factor=3)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)']) |
| cluster.flush() |
| # make sure the data directories are balanced: |
| for node in cluster.nodelist(): |
| self.assert_balanced(node) |
| |
| def disk_balance_bootstrap_test(self): |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 256}) |
| # apparently we have legitimate errors in the log when bootstrapping (see bootstrap_test.py) |
| self.allow_log_errors = True |
| cluster.populate(4).start(wait_for_binary_proto=True) |
| node1 = cluster.nodes['node1'] |
| |
| node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', '-schema', 'replication(factor=3)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)']) |
| cluster.flush() |
| node5 = new_node(cluster) |
| node5.start(wait_for_binary_proto=True) |
| self.assert_balanced(node5) |
| |
| |
| def disk_balance_replace_same_address_test(self): |
| self._test_disk_balance_replace(same_address=True) |
| |
| def disk_balance_replace_different_address_test(self): |
| self._test_disk_balance_replace(same_address=False) |
| |
| def _test_disk_balance_replace(self, same_address): |
| debug("Creating cluster") |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 256}) |
| # apparently we have legitimate errors in the log when bootstrapping (see bootstrap_test.py) |
| self.allow_log_errors = True |
| cluster.populate(4).start(wait_for_binary_proto=True) |
| node1 = cluster.nodes['node1'] |
| |
| debug("Populating") |
| node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', '-schema', 'replication(factor=3)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)']) |
| cluster.flush() |
| |
| debug("Stopping and removing node2") |
| node2 = cluster.nodes['node2'] |
| node2.stop(gently=False) |
| self.cluster.remove(node2) |
| |
| node5_address = node2.address() if same_address else '127.0.0.5' |
| debug("Starting replacement node") |
| node5 = Node('node5', cluster=self.cluster, auto_bootstrap=True, |
| thrift_interface=None, storage_interface=(node5_address, 7000), |
| jmx_port='7500', remote_debug_port='0', initial_token=None, |
| binary_interface=(node5_address, 9042)) |
| self.cluster.add(node5, False) |
| node5.start(jvm_args=["-Dcassandra.replace_address_first_boot={}".format(node2.address())], |
| wait_for_binary_proto=True, |
| wait_other_notice=True) |
| |
| debug("Checking replacement node is balanced") |
| self.assert_balanced(node5) |
| |
| def disk_balance_decommission_test(self): |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 256}) |
| cluster.populate(4).start(wait_for_binary_proto=True) |
| node1 = cluster.nodes['node1'] |
| node4 = cluster.nodes['node4'] |
| node1.stress(['write', 'n=50k', 'no-warmup', '-rate', 'threads=100', '-schema', 'replication(factor=2)', 'compaction(strategy=SizeTieredCompactionStrategy,enabled=false)']) |
| cluster.flush() |
| |
| node4.decommission() |
| |
| for node in cluster.nodelist(): |
| node.nodetool('relocatesstables') |
| |
| for node in cluster.nodelist(): |
| self.assert_balanced(node) |
| |
| def blacklisted_directory_test(self): |
| cluster = self.cluster |
| cluster.set_datadir_count(3) |
| cluster.populate(1) |
| [node] = cluster.nodelist() |
| remove_perf_disable_shared_mem(node) |
| cluster.start(wait_for_binary_proto=True) |
| |
| session = self.patient_cql_connection(node) |
| create_ks(session, 'ks', 1) |
| create_c1c2_table(self, session) |
| insert_c1c2(session, n=10000) |
| node.flush() |
| for k in xrange(0, 10000): |
| query_c1c2(session, k) |
| |
| node.compact() |
| mbean = make_mbean('db', type='BlacklistedDirectories') |
| with JolokiaAgent(node) as jmx: |
| jmx.execute_method(mbean, 'markUnwritable', [os.path.join(node.get_path(), 'data0')]) |
| |
| for k in xrange(0, 10000): |
| query_c1c2(session, k) |
| |
| node.nodetool('relocatesstables') |
| |
| for k in xrange(0, 10000): |
| query_c1c2(session, k) |
| |
| def alter_replication_factor_test(self): |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 256}) |
| cluster.populate(3).start(wait_for_binary_proto=True) |
| node1 = cluster.nodes['node1'] |
| node1.stress(['write', 'n=1', 'no-warmup', '-rate', 'threads=100', '-schema', 'replication(factor=1)']) |
| cluster.flush() |
| session = self.patient_cql_connection(node1) |
| session.execute("ALTER KEYSPACE keyspace1 WITH replication = {'class':'SimpleStrategy', 'replication_factor':2}") |
| node1.stress(['write', 'n=100k', 'no-warmup', '-rate', 'threads=100']) |
| cluster.flush() |
| for node in cluster.nodelist(): |
| self.assert_balanced(node) |
| |
| def assert_balanced(self, node): |
| sums = [] |
| for sstabledir in node.get_sstables_per_data_directory('keyspace1', 'standard1'): |
| sum = 0 |
| for sstable in sstabledir: |
| sum = sum + os.path.getsize(sstable) |
| sums.append(sum) |
| assert_almost_equal(*sums, error=0.1, error_message=node.name) |
| |
| @since('3.10') |
| def disk_balance_after_boundary_change_stcs_test(self): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| """ |
| self._disk_balance_after_boundary_change_test(lcs=False) |
| |
| @since('3.10') |
| def disk_balance_after_boundary_change_lcs_test(self): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| """ |
| self._disk_balance_after_boundary_change_test(lcs=True) |
| |
| def _disk_balance_after_boundary_change_test(self, lcs): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| |
| - Creates a 1 node cluster with 5 disks and insert data with compaction disabled |
| - Bootstrap a node2 to make disk boundary changes on node1 |
| - Enable compaction on node1 and check disks are balanced |
| - Decommission node1 to make disk boundary changes on node2 |
| - Enable compaction on node2 and check disks are balanced |
| """ |
| |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 1024}) |
| num_disks = 5 |
| cluster.set_datadir_count(num_disks) |
| cluster.set_configuration_options(values={'concurrent_compactors': num_disks}) |
| |
| debug("Starting node1 with {} data dirs and concurrent_compactors".format(num_disks)) |
| cluster.populate(1).start(wait_for_binary_proto=True) |
| [node1] = cluster.nodelist() |
| |
| session = self.patient_cql_connection(node1) |
| # reduce system_distributed RF to 1 so we don't require forceful decommission |
| session.execute("ALTER KEYSPACE system_distributed WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") |
| session.execute("ALTER KEYSPACE system_traces WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':'1'};") |
| |
| num_flushes = 10 |
| keys_per_flush = 10000 |
| keys_to_write = num_flushes * keys_per_flush |
| |
| compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy" |
| debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts)) |
| total_keys = num_flushes * keys_per_flush |
| current_keys = 0 |
| while current_keys < total_keys: |
| start_key = current_keys + 1 |
| end_key = current_keys + keys_per_flush |
| debug("Writing keys {}..{} and flushing".format(start_key, end_key)) |
| node1.stress(['write', 'n={}'.format(keys_per_flush), "no-warmup", "cl=ALL", "-pop", |
| "seq={}..{}".format(start_key, end_key), "-rate", "threads=1", "-schema", "replication(factor=1)", |
| "compaction(strategy={},enabled=false)".format(compaction_opts)]) |
| node1.nodetool('flush keyspace1 standard1') |
| current_keys = end_key |
| |
| # Add a new node, so disk boundaries will change |
| debug("Bootstrap node2 and flush") |
| node2 = new_node(cluster, bootstrap=True) |
| node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], set_migration_task=False) |
| node2.flush() |
| |
| self._assert_balanced_after_boundary_change(node1, total_keys, lcs) |
| |
| debug("Decommissioning node1") |
| node1.decommission() |
| node1.stop() |
| |
| self._assert_balanced_after_boundary_change(node2, total_keys, lcs) |
| |
| @since('3.10') |
| def disk_balance_after_joining_ring_stcs_test(self): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| """ |
| self._disk_balance_after_joining_ring_test(lcs=False) |
| |
| @since('3.10') |
| def disk_balance_after_joining_ring_lcs_test(self): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| """ |
| self._disk_balance_after_joining_ring_test(lcs=True) |
| |
| def _disk_balance_after_joining_ring_test(self, lcs): |
| """ |
| @jira_ticket CASSANDRA-13948 |
| |
| - Creates a 3 node cluster with 5 disks and insert data with compaction disabled |
| - Stop node1 |
| - Start node1 without joining gossip and loading ring state so disk boundaries will not reflect actual ring state |
| - Join node1 to the ring to make disk boundaries change |
| - Enable compaction on node1 and check disks are balanced |
| """ |
| |
| cluster = self.cluster |
| if not DISABLE_VNODES: |
| cluster.set_configuration_options(values={'num_tokens': 1024}) |
| num_disks = 5 |
| cluster.set_datadir_count(num_disks) |
| cluster.set_configuration_options(values={'concurrent_compactors': num_disks}) |
| |
| debug("Starting 3 nodes with {} data dirs and concurrent_compactors".format(num_disks)) |
| cluster.populate(3).start(wait_for_binary_proto=True) |
| node1 = cluster.nodelist()[0] |
| |
| num_flushes = 10 |
| keys_per_flush = 10000 |
| keys_to_write = num_flushes * keys_per_flush |
| |
| compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy" |
| debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts)) |
| total_keys = num_flushes * keys_per_flush |
| current_keys = 0 |
| while current_keys < total_keys: |
| start_key = current_keys + 1 |
| end_key = current_keys + keys_per_flush |
| debug("Writing keys {}..{} and flushing".format(start_key, end_key)) |
| node1.stress(['write', 'n={}'.format(keys_per_flush), "no-warmup", "cl=ALL", "-pop", |
| "seq={}..{}".format(start_key, end_key), "-rate", "threads=1", "-schema", "replication(factor=1)", |
| "compaction(strategy={},enabled=false)".format(compaction_opts)]) |
| node1.nodetool('flush keyspace1 standard1') |
| current_keys = end_key |
| |
| debug("Stopping node1") |
| node1.stop() |
| |
| debug("Starting node1 without joining ring") |
| node1.start(wait_for_binary_proto=True, wait_other_notice=False, join_ring=False, |
| jvm_args=["-Dcassandra.load_ring_state=false", "-Dcassandra.write_survey=true"]) |
| |
| debug("Joining node1 to the ring") |
| node1.nodetool("join") |
| node1.nodetool("join") # Need to run join twice - one to join ring, another to leave write survey mode |
| |
| self._assert_balanced_after_boundary_change(node1, total_keys, lcs) |
| |
| def _assert_balanced_after_boundary_change(self, node, total_keys, lcs): |
| debug("Cleanup {}".format(node.name)) |
| node.cleanup() |
| |
| debug("Enabling compactions on {} now that boundaries changed".format(node.name)) |
| node.nodetool('enableautocompaction') |
| |
| debug("Waiting for compactions on {}".format(node.name)) |
| node.wait_for_compactions() |
| |
| debug("Disabling compactions on {} should not block forever".format(node.name)) |
| node.nodetool('disableautocompaction') |
| |
| debug("Major compact {} and check disks are balanced".format(node.name)) |
| node.compact() |
| |
| node.wait_for_compactions() |
| self.assert_balanced(node) |
| |
| debug("Reading data back ({} keys)".format(total_keys)) |
| node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", "-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"]) |
| |
| if lcs: |
| output = grep_sstables_in_each_level(node, "standard1") |
| debug("SSTables in each level: {}".format(output)) |
| |
| # [0, ?/, 0, 0, 0, 0...] |
| p = re.compile(r'(\d+)(/\d+)?,\s(\d+).*') |
| m = p.search(output) |
| cs_count = int(m.group(1)) + int(m.group(3)) |
| sstable_count = len(node.get_sstables('keyspace1', 'standard1')) |
| debug("Checking that compaction strategy sstable # ({}) is equal to actual # ({})".format(cs_count, sstable_count)) |
| self.assertEqual(sstable_count, cs_count) |
| self.assertFalse(node.grep_log("is already present on leveled manifest")) |