blob: d84b5dcd0f8019b8131782792112be60b129008e [file] [log] [blame]
import os
import random
import re
import string
import tempfile
import time
from distutils.version import LooseVersion
import pytest
import parse
import logging
from dtest import Tester, create_ks
from tools.assertions import assert_length_equal, assert_none, assert_one
since = pytest.mark.since
logger = logging.getLogger(__name__)
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
class TestCompaction(Tester):
@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_log_level(self, fixture_dtest_setup):
# compaction test for version 2.2.2 and above relies on DEBUG log in debug.log
fixture_dtest_setup.cluster.set_log_level("DEBUG")
@pytest.mark.parametrize("strategy", strategies)
@since('0', max_version='2.2.X')
def test_compaction_delete(self, strategy):
"""
Test that executing a delete properly tombstones a row.
Insert data, delete a partition of data and check that the requesite rows are tombstoned.
"""
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("create table ks.cf (key int PRIMARY KEY, val int) with compaction = {'class':'" + strategy + "'} and gc_grace_seconds = 30;")
for x in range(0, 100):
session.execute('insert into cf (key, val) values (' + str(x) + ',1)')
node1.flush()
for x in range(0, 10):
session.execute('delete from cf where key = ' + str(x))
node1.flush()
for x in range(0, 10):
assert_none(session, 'select * from cf where key = ' + str(x))
json_path = tempfile.mkstemp(suffix='.json')
jname = json_path[1]
with open(jname, 'w') as f:
node1.run_sstable2json(f)
with open(jname, 'r') as g:
jsoninfo = g.read()
numfound = jsoninfo.count("markedForDeleteAt")
assert numfound == 10
def test_data_size(self):
"""
Ensure that data size does not have unwarranted increases after compaction.
Insert data and check data size before and after a compaction.
"""
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
stress_write(node1)
node1.flush()
table_name = 'standard1'
output = node1.nodetool('cfstats').stdout
if output.find(table_name) != -1:
output = output[output.find(table_name):]
output = output[output.find("Space used (live)"):]
initialValue = int(output[output.find(":") + 1:output.find("\n")].strip())
else:
logger.debug("datasize not found")
logger.debug(output)
node1.flush()
node1.compact()
node1.wait_for_compactions()
output = node1.nodetool('cfstats').stdout
if output.find(table_name) != -1:
output = output[output.find(table_name):]
output = output[output.find("Space used (live)"):]
finalValue = int(output[output.find(":") + 1:output.find("\n")].strip())
else:
logger.debug("datasize not found")
# allow 5% size increase - if we have few sstables it is not impossible that live size increases *slightly* after compaction
assert finalValue < initialValue * 1.05
@pytest.mark.parametrize("strategy", strategies)
def test_bloomfilter_size(self, strategy):
"""
@jira_ticket CASSANDRA-11344
Check that bloom filter size is between 50KB and 100KB for 100K keys
"""
if not hasattr(self, 'strategy') or strategy == "LeveledCompactionStrategy":
strategy_string = 'strategy=LeveledCompactionStrategy,sstable_size_in_mb=1'
min_bf_size = 40000
max_bf_size = 100000
else:
if strategy == "DateTieredCompactionStrategy":
strategy_string = "strategy=DateTieredCompactionStrategy,base_time_seconds=86400" # we want a single sstable, so make sure we don't have a tiny first window
else:
strategy_string = "strategy={}".format(strategy)
min_bf_size = 100000
max_bf_size = 150000
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
for x in range(0, 5):
node1.stress(['write', 'n=100K', "no-warmup", "cl=ONE", "-rate",
"threads=300", "-schema", "replication(factor=1)",
"compaction({},enabled=false)".format(strategy_string)])
node1.flush()
node1.nodetool('enableautocompaction')
node1.wait_for_compactions()
table_name = 'standard1'
output = node1.nodetool('cfstats').stdout
output = output[output.find(table_name):]
output = output[output.find("Bloom filter space used"):]
bfSize = int(output[output.find(":") + 1:output.find("\n")].strip())
# in some rare cases we can end up with more than one sstable per data directory with
# non-lcs strategies (see CASSANDRA-12323)
if not hasattr(self, 'strategy') or strategy == "LeveledCompactionStrategy":
size_factor = 1
else:
sstable_count = len(node1.get_sstables('keyspace1', 'standard1'))
dir_count = len(node1.data_directories())
logger.debug("sstable_count is: {}".format(sstable_count))
logger.debug("dir_count is: {}".format(dir_count))
if node1.get_cassandra_version() < LooseVersion('3.2'):
size_factor = sstable_count
else:
size_factor = sstable_count / float(dir_count)
logger.debug("bloom filter size is: {}".format(bfSize))
logger.debug("size factor = {}".format(size_factor))
assert bfSize >= size_factor * min_bf_size
assert bfSize <= size_factor * max_bf_size
@pytest.mark.parametrize("strategy", strategies)
def test_sstable_deletion(self, strategy):
"""
Test that sstables are deleted properly when able after compaction.
Insert data setting gc_grace_seconds to 0, and determine sstable
is deleted upon data deletion.
"""
self.skip_if_no_major_compaction(strategy)
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("create table cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'}")
for x in range(0, 100):
session.execute('insert into cf (key, val) values (' + str(x) + ',1)')
node1.flush()
for x in range(0, 100):
session.execute('delete from cf where key = ' + str(x))
node1.flush()
node1.nodetool("compact ks cf")
node1.wait_for_compactions()
time.sleep(1)
try:
for data_dir in node1.data_directories():
cfs = os.listdir(os.path.join(data_dir, "ks"))
ssdir = os.listdir(os.path.join(data_dir, "ks", cfs[0]))
for afile in ssdir:
assert not "Data" in afile, afile
except OSError:
self.fail("Path to sstables not valid.")
@pytest.mark.parametrize("strategy", ['DateTieredCompactionStrategy'])
def test_dtcs_deletion(self, strategy):
"""
Test that sstables are deleted properly when able after compaction with
DateTieredCompactionStrategy.
Insert data setting max_sstable_age_days low, and determine sstable
is deleted upon data deletion past max_sstable_age_days.
"""
if strategy != 'DateTieredCompactionStrategy':
pytest.skip('Not implemented unless DateTieredCompactionStrategy is used')
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
# max sstable age is 0.5 minute:
session.execute("""create table cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0
and compaction= {'class':'DateTieredCompactionStrategy', 'max_sstable_age_days':0.00035, 'min_threshold':2}""")
# insert data
for x in range(0, 300):
session.execute('insert into cf (key, val) values (' + str(x) + ',1) USING TTL 35')
node1.flush()
time.sleep(40)
expired_sstables = node1.get_sstables('ks', 'cf')
expected_sstable_count = 1
if self.cluster.version() > LooseVersion('3.1'):
expected_sstable_count = cluster.data_dir_count
assert len(expired_sstables) == expected_sstable_count
# write a new sstable to make DTCS check for expired sstables:
for x in range(0, 100):
session.execute('insert into cf (key, val) values ({}, {})'.format(x, x))
node1.flush()
time.sleep(5)
# we only check every 10 minutes - sstable should still be there:
for expired_sstable in expired_sstables:
assert expired_sstable, node1.get_sstables('ks' in 'cf')
session.execute("alter table cf with compaction = {'class':'DateTieredCompactionStrategy', 'max_sstable_age_days':0.00035, 'min_threshold':2, 'expired_sstable_check_frequency_seconds':0}")
time.sleep(1)
for x in range(0, 100):
session.execute('insert into cf (key, val) values ({}, {})'.format(x, x))
node1.flush()
time.sleep(5)
for expired_sstable in expired_sstables:
assert expired_sstable, node1.get_sstables('ks' not in 'cf')
def test_compaction_throughput(self):
"""
Test setting compaction throughput.
Set throughput, insert data and ensure compaction performance corresponds.
"""
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
# disableautocompaction only disables compaction for existing tables,
# so initialize stress tables with stress first
stress_write(node1, keycount=1)
node1.nodetool('disableautocompaction')
stress_write(node1, keycount=200000 * cluster.data_dir_count)
threshold = "5"
node1.nodetool('setcompactionthroughput -- ' + threshold)
node1.flush()
if node1.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
mark = node1.mark_log(filename=log_file)
node1.compact()
matches = node1.watch_log_for('Compacted', from_mark=mark, filename=log_file)
stringline = matches[0]
throughput_pattern = '{}={avgthroughput:f}{units}/s'
m = parse.search(throughput_pattern, stringline)
avgthroughput = m.named['avgthroughput']
found_units = m.named['units']
unit_conversion_dct = {
"MB": 1,
"MiB": 1,
"KiB": 1. / 1024,
"GiB": 1024
}
units = ['MB'] if cluster.version() < LooseVersion('3.6') else ['KiB', 'MiB', 'GiB']
assert found_units in units
logger.debug(avgthroughput)
avgthroughput_mb = unit_conversion_dct[found_units] * float(avgthroughput)
# The throughput in the log is computed independantly from the throttling and on the output files while
# throttling is on the input files, so while that throughput shouldn't be higher than the one set in
# principle, a bit of wiggle room is expected
assert float(threshold) + 0.5 >= avgthroughput_mb
@pytest.mark.parametrize("strategy", strategies)
def test_compaction_strategy_switching(self, strategy):
"""
Ensure that switching strategies does not result in problems.
Insert data, switch strategies, then check against data loss.
"""
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
if strategy in strategies:
strategies.remove(strategy)
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
for strat in strategies:
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)
session.execute("create table ks.cf (key int PRIMARY KEY, val int) with gc_grace_seconds = 0 and compaction= {'class':'" + strategy + "'};")
for x in range(0, 100):
session.execute('insert into ks.cf (key, val) values (' + str(x) + ',1)')
node1.flush()
for x in range(0, 10):
session.execute('delete from cf where key = ' + str(x))
session.execute("alter table ks.cf with compaction = {'class':'" + strat + "'};")
for x in range(11, 100):
assert_one(session, "select * from ks.cf where key =" + str(x), [x, 1])
for x in range(0, 10):
assert_none(session, 'select * from cf where key = ' + str(x))
node1.flush()
cluster.clear()
time.sleep(5)
cluster.start()
def test_large_compaction_warning(self):
"""
@jira_ticket CASSANDRA-9643
Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb
"""
cluster = self.cluster
cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1})
cluster.populate(1).start()
[node] = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, 'ks', 1)
mark = node.mark_log()
strlen = (1024 * 1024) / 100
session.execute("CREATE TABLE large(userid text PRIMARY KEY, properties map<int, text>) with compression = {}")
for i in range(200): # ensures partition size larger than compaction_large_partition_warning_threshold_mb
session.execute("UPDATE ks.large SET properties[%i] = '%s' WHERE userid = 'user'" % (i, get_random_word(strlen)))
ret = list(session.execute("SELECT properties from ks.large where userid = 'user'"))
assert_length_equal(ret, 1)
assert 200 == len(list(ret[0][0].keys()))
node.flush()
node.nodetool('compact ks large')
verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting'
sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB'
node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180)
ret = list(session.execute("SELECT properties from ks.large where userid = 'user'"))
assert_length_equal(ret, 1)
assert 200 == len(list(ret[0][0].keys()))
@pytest.mark.parametrize("strategy", strategies)
def test_disable_autocompaction_nodetool(self, strategy):
"""
Make sure we can enable/disable compaction using nodetool
"""
cluster = self.cluster
cluster.populate(1).start()
[node] = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, 'ks', 1)
session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
node.nodetool('disableautocompaction ks to_disable')
for i in range(1000):
session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100))
if i % 100 == 0:
node.flush()
if node.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy)
node.nodetool('enableautocompaction ks to_disable')
# sleep to allow compactions to start
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy)
@pytest.mark.parametrize("strategy", strategies)
def test_disable_autocompaction_schema(self, strategy):
"""
Make sure we can disable compaction via the schema compaction parameter 'enabled' = false
"""
cluster = self.cluster
cluster.populate(1).start()
[node] = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, 'ks', 1)
session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(strategy))
for i in range(1000):
session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100))
if i % 100 == 0:
node.flush()
if node.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy)
# should still be disabled after restart:
node.stop()
node.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node)
session.execute("use ks")
# sleep to make sure we dont start any logs
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy)
node.nodetool('enableautocompaction ks to_disable')
# sleep to allow compactions to start
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy)
@pytest.mark.parametrize("strategy", strategies)
def test_disable_autocompaction_alter(self, strategy):
"""
Make sure we can enable compaction using an alter-statement
"""
cluster = self.cluster
cluster.populate(1).start()
[node] = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, 'ks', 1)
session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'false\'}}'.format(strategy))
for i in range(1000):
session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100))
if i % 100 == 0:
node.flush()
if node.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy)
session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'enabled\':\'true\'}}'.format(strategy))
# we need to flush atleast once when altering to enable:
session.execute('insert into to_disable (id, d) values (99, \'hello\')')
node.flush()
# sleep to allow compactions to start
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy)
@pytest.mark.parametrize("strategy", strategies)
def test_disable_autocompaction_alter_and_nodetool(self, strategy):
"""
Make sure compaction stays disabled after an alter statement where we have disabled using nodetool first
"""
cluster = self.cluster
cluster.populate(1).start()
[node] = cluster.nodelist()
session = self.patient_cql_connection(node)
create_ks(session, 'ks', 1)
session.execute('CREATE TABLE to_disable (id int PRIMARY KEY, d TEXT) WITH compaction = {{\'class\':\'{0}\'}}'.format(strategy))
node.nodetool('disableautocompaction ks to_disable')
for i in range(1000):
session.execute('insert into to_disable (id, d) values ({0}, \'{1}\')'.format(i, 'hello' * 100))
if i % 100 == 0:
node.flush()
if node.get_cassandra_version() < '2.2':
log_file = 'system.log'
else:
log_file = 'debug.log'
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found compaction log items for {0}'.format(strategy)
session.execute('ALTER TABLE to_disable WITH compaction = {{\'class\':\'{0}\', \'tombstone_threshold\':0.9}}'.format(strategy))
session.execute('insert into to_disable (id, d) values (99, \'hello\')')
node.flush()
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) == 0, 'Found log items for {0}'.format(strategy)
node.nodetool('enableautocompaction ks to_disable')
# sleep to allow compactions to start
time.sleep(2)
assert len(node.grep_log('Compacting.+to_disable', filename=log_file)) > 0, 'Found no log items for {0}'.format(strategy)
@since('3.7')
def test_user_defined_compaction(self):
"""
Test a user defined compaction task by generating a few sstables with cassandra stress
and autocompaction disabled, and then passing a list of sstable data files directly to nodetool compact.
Check that after compaction there is only one sstable per node directory. Make sure to use sstableutil
to list only final files because after a compaction some temporary files may still not have been deleted.
@jira_ticket CASSANDRA-11765
"""
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
# disableautocompaction only disables compaction for existing tables,
# so initialize stress tables with stress first
stress_write(node1, keycount=1)
node1.nodetool('disableautocompaction')
stress_write(node1, keycount=500000)
node1.nodetool('flush keyspace1 standard1')
sstable_files = ' '.join(node1.get_sstable_data_files('keyspace1', 'standard1'))
logger.debug('Compacting {}'.format(sstable_files))
node1.nodetool('compact --user-defined {}'.format(sstable_files))
sstable_files = node1.get_sstable_data_files('keyspace1', 'standard1')
assert len(node1.data_directories()) == len(sstable_files), \
'Expected one sstable data file per node directory but got {}'.format(sstable_files)
@pytest.mark.parametrize("strategy", ['LeveledCompactionStrategy'])
@since('3.10')
def test_fanout_size(self, strategy):
"""
@jira_ticket CASSANDRA-11550
"""
if not hasattr(self, 'strategy') or strategy != 'LeveledCompactionStrategy':
pytest.skip('Not implemented unless LeveledCompactionStrategy is used')
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
stress_write(node1, keycount=1)
node1.nodetool('disableautocompaction')
session = self.patient_cql_connection(node1)
logger.debug("Altering compaction strategy to LCS")
session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'fanout_size':10};")
stress_write(node1, keycount=1000000)
node1.nodetool('flush keyspace1 standard1')
# trigger the compaction
node1.compact()
# check the sstable count in each level
table_name = 'standard1'
output = grep_sstables_in_each_level(node1, table_name)
# [0, ?/10, ?, 0, 0, 0...]
p = re.compile(r'0,\s(\d+)/10,.*')
m = p.search(output)
assert 10 * len(node1.data_directories()) == int(m.group(1))
logger.debug("Altering the fanout_size")
session.execute("ALTER TABLE keyspace1.standard1 with compaction={'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'fanout_size':5};")
# trigger the compaction
node1.compact()
# check the sstable count in each level again
output = grep_sstables_in_each_level(node1, table_name)
# [0, ?/5, ?/25, ?, 0, 0...]
p = re.compile(r'0,\s(\d+)/5,\s(\d+)/25,.*')
m = p.search(output)
assert 5 * len(node1.data_directories()) == int(m.group(1))
assert 25 * len(node1.data_directories()) == int(m.group(2))
def skip_if_no_major_compaction(self, strategy):
if self.cluster.version() < '2.2' and strategy == 'LeveledCompactionStrategy':
pytest.skip(msg='major compaction not implemented for LCS in this version of Cassandra')
def grep_sstables_in_each_level(node, table_name):
output = node.nodetool('cfstats').stdout
output = output[output.find(table_name):]
output = output[output.find("SSTables in each level"):]
return output[output.find(":") + 1:output.find("\n")].strip()
def get_random_word(wordLen, population=string.ascii_letters + string.digits):
return ''.join([random.choice(population) for _ in range(int(wordLen))])
def stress_write(node, keycount=100000):
node.stress(['write', 'n={keycount}'.format(keycount=keycount)])