blob: 6ff11cd6e58fbd1c97735422a71ca3b0d03b6feb [file] [log] [blame]
import binascii
import glob
import os
import stat
import struct
import time
from distutils.version import LooseVersion
import pytest
import logging
from cassandra import WriteTimeout
from cassandra.cluster import NoHostAvailable, OperationTimedOut
from ccmlib.common import is_win
from ccmlib.node import Node, TimeoutError, NodeError
from parse import parse
from dtest import Tester, create_ks
from tools.assertions import (assert_almost_equal, assert_none, assert_one, assert_lists_equal_ignoring_order)
from tools.data import rows_to_list
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestCommitLog(Tester):
"""
CommitLog Tests
"""
@pytest.fixture(autouse=True)
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.allow_log_errors = True
@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
if fixture_dtest_setup.dtest_config.cassandra_version_from_build >= '3.0':
fixture_dtest_setup.cluster.set_configuration_options({'enable_materialized_views': 'true'})
fixture_dtest_setup.cluster.populate(1)
[self.node1] = fixture_dtest_setup.cluster.nodelist()
yield
# Some of the tests change commitlog permissions to provoke failure
# so this changes them back so we can delete them.
self._change_commitlog_perms(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
def prepare(self, configuration=None, create_test_keyspace=True, **kwargs):
if configuration is None:
configuration = {}
default_conf = {'commitlog_sync_period_in_ms': 1000}
set_conf = dict(default_conf, **configuration)
logger.debug('setting commitlog configuration with the following values: {set_conf}'
' and the following kwargs: {kwargs}'.format(set_conf=set_conf, kwargs=kwargs))
self.cluster.set_configuration_options(values=set_conf, **kwargs)
self.cluster.start()
self.session1 = self.patient_cql_connection(self.node1)
if create_test_keyspace:
self.session1.execute("DROP KEYSPACE IF EXISTS ks;")
create_ks(self.session1, 'ks', 1)
self.session1.execute("DROP TABLE IF EXISTS test;")
query = """
CREATE TABLE test (
key int primary key,
col1 int
)
"""
self.session1.execute(query)
def _change_commitlog_perms(self, mod):
for path in self._get_commitlog_paths():
logger.debug('changing permissions to {perms} on {path}'.format(perms=oct(mod), path=path))
os.chmod(path, mod)
commitlogs = glob.glob(path + '/*')
if commitlogs:
logger.debug('changing permissions to {perms} on the following files:'
'\n {files}'.format(perms=oct(mod), files='\n '.join(commitlogs)))
else:
logger.debug(self._change_commitlog_perms.__name__ + ' called on empty commitlog directory '
'{path} with permissions {perms}'.format(path=path, perms=oct(mod)))
for commitlog in commitlogs:
os.chmod(commitlog, mod)
def _get_commitlog_paths(self):
"""
Returns the list of commitlog and cdc paths
"""
# TODO: this does not account for non-default commitlog/cdc paths
# specified in cassandra.yaml
return [d for d in [os.path.join(self.node1.get_path(), 'commitlogs'),
os.path.join(self.node1.get_path(), 'cdc')]
if os.path.isdir(d)]
def _get_commitlog_files(self):
"""
Returns the paths to commitlog files
"""
return [os.path.join(path, filename)
for path in self._get_commitlog_paths()
for filename in os.listdir(path)]
def _segment_size_test(self, segment_size_in_mb, compressed=False):
"""
Execute a basic commitlog test and validate the commitlog files
"""
conf = {'commitlog_segment_size_in_mb': segment_size_in_mb}
if compressed:
conf['commitlog_compression'] = [{'class_name': 'LZ4Compressor'}]
conf['memtable_heap_space_in_mb'] = 512
self.prepare(configuration=conf, create_test_keyspace=False)
segment_size = segment_size_in_mb * 1024 * 1024
self.node1.stress(['write', 'n=150k', 'no-warmup', '-rate', 'threads=25'])
time.sleep(1)
commitlogs = self._get_commitlog_files()
assert len(commitlogs) > 0, 'No commit log files were created'
# the most recently-written segment of the commitlog may be smaller
# than the expected size, so we allow exactly one segment to be smaller
smaller_found = False
for i, f in enumerate(commitlogs):
size = os.path.getsize(f)
size_in_mb = int(size / 1024 / 1024)
logger.debug('segment file {} {}; smaller already found: {}'.format(f, size_in_mb, smaller_found))
if size_in_mb < 1 or size < (segment_size * 0.1):
logger.debug('segment file not yet used; moving to next file')
continue # commitlog not yet used
try:
if compressed:
# if compression is used, we assume there will be at most a 50% compression ratio
assert size < segment_size
assert size > segment_size / 2
else:
# if no compression is used, the size will be close to what we expect
assert_almost_equal(size, segment_size, error=0.05)
except AssertionError as e:
# the last segment may be smaller
if not smaller_found:
assert size <= segment_size
smaller_found = True
else:
raise e
def _provoke_commitlog_failure(self):
"""
Provoke the commitlog failure
"""
logger.debug('Provoking commitlog failure')
# Test things are ok at this point
self.session1.execute("""
INSERT INTO test (key, col1) VALUES (1, 1);
""")
assert_one(
self.session1,
"SELECT * FROM test where key=1;",
[1, 1]
)
self._change_commitlog_perms(0)
# Use stress_process to skip internal error handling in ccm. Grep node logs for specific errors in test method.
self.node1.stress_process(['write', 'n=1M', 'no-warmup', '-col', 'size=FIXED(1000)', '-rate', 'threads=25']).communicate()
@since('3.0.7')
def test_mv_lock_contention_during_replay(self):
"""
Ensure that we don't generate WriteTimeoutExceptions during commitlog
replay due to MV lock contention. Fixed in 3.0.7 and 3.7.
@jira_ticket CASSANDRA-11891
"""
cluster_ver = self.cluster.version()
if LooseVersion('3.1') <= cluster_ver < LooseVersion('3.7'):
pytest.skip("Fixed in 3.0.7 and 3.7")
node1 = self.node1
node1.set_batch_commitlog(enabled=True)
node1.start()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, 'Test', 1)
session.execute("""
CREATE TABLE mytable (
a int,
b int,
c int,
PRIMARY KEY (a, b)
);
""")
session.execute("""
CREATE MATERIALIZED VIEW myview
AS SELECT * FROM mytable
WHERE a IS NOT NULL AND b IS NOT NULL
PRIMARY KEY (a, b);
""")
logger.debug("Insert data")
num_rows = 1024 # maximum number of mutations replayed at once by the commit log
for i in range(num_rows):
session.execute("INSERT INTO Test.mytable (a, b, c) VALUES (0, {i}, {i})".format(i=i))
node1.stop(gently=False)
node1.mark_log_for_errors()
logger.debug("Verify commitlog was written before abrupt stop")
commitlog_files = os.listdir(os.path.join(node1.get_path(), 'commitlogs'))
assert [] != commitlog_files
# set a short timeout to ensure lock contention will generally exceed this
node1.set_configuration_options({'write_request_timeout_in_ms': 30})
logger.debug("Starting node again")
node1.start()
logger.debug("Verify commit log was replayed on startup")
start_time, replay_complete = time.time(), False
while not replay_complete:
matches = node1.grep_log(r".*WriteTimeoutException.*")
assert [] == matches
replay_complete = node1.grep_log("Log replay complete")
assert time.time() - start_time < 120, "Did not finish commitlog replay within 120 seconds"
logger.debug("Reconnecting to node")
session = self.patient_cql_connection(node1)
logger.debug("Make query to ensure data is present")
res = list(session.execute("SELECT * FROM Test.mytable"))
assert num_rows == len(res), res
def test_commitlog_replay_on_startup(self):
"""
Test commit log replay
"""
node1 = self.node1
node1.set_batch_commitlog(enabled=True)
node1.start()
logger.debug("Insert data")
session = self.patient_cql_connection(node1)
create_ks(session, 'Test', 1)
session.execute("""
CREATE TABLE users (
user_name varchar PRIMARY KEY,
password varchar,
gender varchar,
state varchar,
birth_year bigint
);
""")
session.execute("INSERT INTO Test. users (user_name, password, gender, state, birth_year) "
"VALUES('gandalf', 'p@$$', 'male', 'WA', 1955);")
logger.debug("Verify data is present")
session = self.patient_cql_connection(node1)
res = session.execute("SELECT * FROM Test. users")
assert rows_to_list(res) == [['gandalf', 1955, 'male', 'p@$$', 'WA']]
logger.debug("Stop node abruptly")
node1.stop(gently=False)
logger.debug("Verify commitlog was written before abrupt stop")
commitlog_dir = os.path.join(node1.get_path(), 'commitlogs')
commitlog_files = os.listdir(commitlog_dir)
assert len(commitlog_files) > 0
logger.debug("Verify no SSTables were flushed before abrupt stop")
assert 0 == len(node1.get_sstables('test', 'users'))
logger.debug("Verify commit log was replayed on startup")
node1.start()
node1.watch_log_for("Log replay complete")
# Here we verify from the logs that some mutations were replayed
replays = [match_tuple[0] for match_tuple in node1.grep_log(r" \d+ replayed mutations")]
logger.debug('The following log lines indicate that mutations were replayed: {msgs}'.format(msgs=replays))
num_replayed_mutations = [
parse('{} {num_mutations:d} replayed mutations{}', line).named['num_mutations']
for line in replays
]
# assert there were some lines where more than zero mutations were replayed
assert [m for m in num_replayed_mutations if m > 0] != []
logger.debug("Make query and ensure data is present")
session = self.patient_cql_connection(node1)
res = session.execute("SELECT * FROM Test. users")
assert_lists_equal_ignoring_order(rows_to_list(res), [['gandalf', 1955, 'male', 'p@$$', 'WA']])
def test_default_segment_size(self):
"""
Test default commitlog_segment_size_in_mb (32MB)
"""
self._segment_size_test(32)
def test_small_segment_size(self):
"""
Test a small commitlog_segment_size_in_mb (5MB)
"""
self._segment_size_test(5)
@since('2.2')
def test_default_compressed_segment_size(self):
"""
Test default compressed commitlog_segment_size_in_mb (32MB)
"""
self._segment_size_test(32, compressed=True)
@since('2.2')
def test_small_compressed_segment_size(self):
"""
Test a small compressed commitlog_segment_size_in_mb (5MB)
"""
self._segment_size_test(5, compressed=True)
def test_stop_failure_policy(self):
"""
Test the stop commitlog failure policy (default one)
"""
self.prepare()
self._provoke_commitlog_failure()
failure = self.node1.grep_log("Failed .+ commit log segments. Commit disk failure policy is stop; terminating thread")
logger.debug(failure)
assert failure, "Cannot find the commitlog failure message in logs"
assert self.node1.is_running(), "Node1 should still be running"
# Cannot write anymore after the failure
with pytest.raises(NoHostAvailable):
self.session1.execute("""
INSERT INTO test (key, col1) VALUES (2, 2);
""")
# Should not be able to read neither
with pytest.raises(NoHostAvailable):
self.session1.execute("""
"SELECT * FROM test;"
""")
def test_stop_commit_failure_policy(self):
"""
Test the stop_commit commitlog failure policy
"""
self.prepare(configuration={
'commit_failure_policy': 'stop_commit'
})
self.session1.execute("""
INSERT INTO test (key, col1) VALUES (2, 2);
""")
self._provoke_commitlog_failure()
failure = self.node1.grep_log("Failed .+ commit log segments. Commit disk failure policy is stop_commit; terminating thread")
logger.debug(failure)
assert failure, "Cannot find the commitlog failure message in logs"
assert self.node1.is_running(), "Node1 should still be running"
# Cannot write anymore after the failure
logger.debug('attempting to insert to node with failing commitlog; should fail')
with pytest.raises((OperationTimedOut, WriteTimeout)):
self.session1.execute("""
INSERT INTO test (key, col1) VALUES (2, 2);
""")
# Should be able to read
logger.debug('attempting to read from node with failing commitlog; should succeed')
assert_one(
self.session1,
"SELECT * FROM test where key=2;",
[2, 2]
)
def test_die_failure_policy(self):
"""
Test the die commitlog failure policy
"""
self.prepare(configuration={
'commit_failure_policy': 'die'
})
self._provoke_commitlog_failure()
failure = self.node1.grep_log(r"ERROR \[COMMIT-LOG-ALLOCATOR\].+JVM state determined to be unstable. Exiting forcefully")
logger.debug(failure)
assert failure, "Cannot find the commitlog failure message in logs"
assert not self.node1.is_running(), "Node1 should not be running"
def test_ignore_failure_policy(self):
"""
Test the ignore commitlog failure policy
"""
self.prepare(configuration={
'commit_failure_policy': 'ignore'
})
self._provoke_commitlog_failure()
failure = self.node1.grep_log(r"ERROR \[COMMIT-LOG-ALLOCATOR\].+Failed .+ commit log segments")
assert failure, "Cannot find the commitlog failure message in logs"
assert self.node1.is_running(), "Node1 should still be running"
# on Windows, we can't delete the segments if they're chmod to 0 so they'll still be available for use by CLSM,
# and we can still create new segments since os.chmod is limited to stat.S_IWRITE and stat.S_IREAD to set files
# as read-only. New mutations will still be allocated and WriteTimeouts will not be raised. It's sufficient that
# we confirm that a) the node isn't dead (stop) and b) the node doesn't terminate the thread (stop_commit)
query = "INSERT INTO test (key, col1) VALUES (2, 2);"
if is_win():
# We expect this to succeed
self.session1.execute(query)
assert not self.node1.grep_log("terminating thread"), "thread was terminated but CL error should have been ignored."
assert self.node1.is_running(), "Node1 should still be running after an ignore error on CL"
else:
with pytest.raises((OperationTimedOut, WriteTimeout)):
self.session1.execute(query)
# Should not exist
assert_none(self.session1, "SELECT * FROM test where key=2;")
# bring back the node commitlogs
self._change_commitlog_perms(stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
self.session1.execute("""
INSERT INTO test (key, col1) VALUES (3, 3);
""")
assert_one(
self.session1,
"SELECT * FROM test where key=3;",
[3, 3]
)
time.sleep(2)
assert_one(
self.session1,
"SELECT * FROM test where key=2;",
[2, 2]
)
@since('2.2')
def test_bad_crc(self):
"""
if the commit log header crc (checksum) doesn't match the actual crc of the header data,
and the commit_failure_policy is stop, C* shouldn't startup
@jira_ticket CASSANDRA-9749
"""
expected_error = "Exiting due to error while processing commit log during initialization."
self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [
expected_error]
node = self.node1
assert isinstance(node, Node)
node.set_configuration_options({'commit_failure_policy': 'stop', 'commitlog_sync_period_in_ms': 1000})
self.cluster.start()
cursor = self.patient_cql_connection(self.cluster.nodelist()[0])
create_ks(cursor, 'ks', 1)
cursor.execute("CREATE TABLE ks.tbl (k INT PRIMARY KEY, v INT)")
for i in range(10):
cursor.execute("INSERT INTO ks.tbl (k, v) VALUES ({0}, {0})".format(i))
results = list(cursor.execute("SELECT * FROM ks.tbl"))
assert len(results) == 10
# with the commitlog_sync_period_in_ms set to 1000,
# this sleep guarantees that the commitlog data is
# actually flushed to disk before we kill -9 it
time.sleep(1)
node.stop(gently=False)
# check that ks.tbl hasn't been flushed
path = node.get_path()
for data_dir in node.data_directories():
ks_dir = os.path.join(data_dir, 'ks')
db_dir = os.listdir(ks_dir)[0]
sstables = len([f for f in os.listdir(os.path.join(ks_dir, db_dir)) if f.endswith('.db')])
assert sstables == 0
# modify the commit log crc values
cl_dir = os.path.join(path, 'commitlogs')
assert len(os.listdir(cl_dir)) > 0
for cl in os.listdir(cl_dir):
# locate the CRC location
with open(os.path.join(cl_dir, cl), 'rb') as f:
f.seek(0)
version = struct.unpack('>i', f.read(4))[0]
crc_pos = 12
if version >= 5:
f.seek(crc_pos)
psize = struct.unpack('>h', f.read(2))[0] & 0xFFFF
crc_pos += 2 + psize
# rewrite it with crap
with open(os.path.join(cl_dir, cl), 'wb') as f:
f.seek(crc_pos)
f.write(struct.pack('>i', 123456))
# verify said crap
with open(os.path.join(cl_dir, cl), 'rb') as f:
f.seek(crc_pos)
crc = struct.unpack('>i', f.read(4))[0]
assert crc == 123456
mark = node.mark_log()
node.start()
node.watch_log_for(expected_error, from_mark=mark)
with pytest.raises((TimeoutError, NodeError)):
node.wait_for_binary_interface(from_mark=mark, timeout=20)
assert not node.is_running()
@since('2.2')
def test_compression_error(self):
"""
@jira_ticket CASSANDRA-7886
if the commit log header refers to an unknown compression class, and
the commit_failure_policy is stop, C* shouldn't start up
"""
expected_error = 'Could not create Compression for type org.apache.cassandra.io.compress.LZ5Compressor'
self.fixture_dtest_setup.ignore_log_patterns = list(self.fixture_dtest_setup.ignore_log_patterns) + [
expected_error]
node = self.node1
assert isinstance(node, Node)
node.set_configuration_options({'commit_failure_policy': 'stop',
'commitlog_compression': [{'class_name': 'LZ4Compressor'}],
'commitlog_sync_period_in_ms': 1000})
self.cluster.start()
cursor = self.patient_cql_connection(self.cluster.nodelist()[0])
create_ks(cursor, 'ks1', 1)
cursor.execute("CREATE TABLE ks1.tbl (k INT PRIMARY KEY, v INT)")
for i in range(10):
cursor.execute("INSERT INTO ks1.tbl (k, v) VALUES ({0}, {0})".format(i))
results = list(cursor.execute("SELECT * FROM ks1.tbl"))
assert len(results) == 10
# with the commitlog_sync_period_in_ms set to 1000,
# this sleep guarantees that the commitlog data is
# actually flushed to disk before we kill -9 it
time.sleep(1)
node.stop(gently=False)
# check that ks1.tbl hasn't been flushed
sstables = 0
for data_dir in node.data_directories():
ks_dir = os.path.join(data_dir, 'ks1')
db_dir = os.listdir(ks_dir)[0]
sstables = sstables + len([f for f in os.listdir(os.path.join(ks_dir, db_dir)) if f.endswith('.db')])
assert sstables == 0
def get_header_crc(header):
"""
When calculating the header crc, C* splits up the 8b id, first adding the 4 least significant
bytes to the crc, then the 5 most significant bytes, so this splits them and calculates the same way
"""
new_header = bytearray(header[:4])
# C* evaluates most and least significant 4 bytes out of order
new_header.extend(header[8:12])
new_header.extend(header[4:8])
# C* evaluates the short parameter length as an int
new_header.extend(b'\x00\x00')
new_header.extend(header[12:14]) # the
new_header.extend(header[14:])
# https://docs.python.org/2/library/binascii.html
# "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless
# of platform. In the past the value would be signed on some platforms and unsigned on
# others. Use & 0xffffffff on the value if you want it to match Python 3 behavior."
return binascii.crc32(new_header) & 0xffffffff
# modify the compression parameters to look for a compressor that isn't there
# while this scenario is pretty unlikely, if a jar or lib got moved or something,
# you'd have a similar situation, which would be fixable by the user
path = node.get_path()
cl_dir = os.path.join(path, 'commitlogs')
assert len(os.listdir(cl_dir)) > 0
for cl in os.listdir(cl_dir):
# read the header and find the crc location
with open(os.path.join(cl_dir, cl), 'rb') as f:
f.seek(0)
crc_pos = 12
f.seek(crc_pos)
psize = struct.unpack('>h', f.read(2))[0] & 0xFFFF
crc_pos += 2 + psize
header_length = crc_pos
f.seek(crc_pos)
crc = struct.unpack('>i', f.read(4))[0]
# check that we're going this right
f.seek(0)
header_bytes = f.read(header_length)
# https://docs.python.org/2/library/binascii.html
# "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless
# of platform. In the past the value would be signed on some platforms and unsigned on
# others. Use & 0xffffffff on the value if you want it to match Python 3 behavior."
assert get_header_crc(header_bytes) == (crc & 0xffffffff)
# rewrite it with imaginary compressor
assert 'LZ4Compressor'.encode("ascii") in header_bytes
header_bytes = header_bytes.replace('LZ4Compressor'.encode("ascii"), 'LZ5Compressor'.encode("ascii"))
assert 'LZ4Compressor'.encode("ascii") not in header_bytes
assert 'LZ5Compressor'.encode("ascii") in header_bytes
with open(os.path.join(cl_dir, cl), 'wb') as f:
f.seek(0)
f.write(header_bytes)
f.seek(crc_pos)
f.write(struct.pack('>I', get_header_crc(header_bytes)))
# verify we wrote everything correctly
with open(os.path.join(cl_dir, cl), 'rb') as f:
f.seek(0)
assert f.read(header_length) == header_bytes
f.seek(crc_pos)
crc = struct.unpack('>i', f.read(4))[0]
# https://docs.python.org/2/library/binascii.html
# "Changed in version 2.6: The return value is in the range [-2**31, 2**31-1] regardless
# of platform. In the past the value would be signed on some platforms and unsigned on
# others. Use & 0xffffffff on the value if you want it to match Python 3 behavior."
assert (crc & 0xffffffff) == get_header_crc(header_bytes)
mark = node.mark_log()
node.start()
node.watch_log_for(expected_error, from_mark=mark)
with pytest.raises((TimeoutError, NodeError)):
node.wait_for_binary_interface(from_mark=mark, timeout=20)