blob: 657e61a3bb173dc451fc61c9ce9b0d9f5ac98430 [file] [log] [blame]
from __future__ import division
import errno
import os
import shutil
import time
import uuid
from collections import namedtuple
from itertools import izip as zip
from itertools import repeat
from cassandra import WriteFailure
from cassandra.concurrent import (execute_concurrent,
execute_concurrent_with_args)
from ccmlib.node import Node
from nose.tools import assert_equal, assert_less_equal
from dtest import Tester, create_ks, debug
from tools.data import rows_to_list
from tools.decorators import since
from tools.files import size_of_files_in_dir
from tools.funcutils import get_rate_limited_function
from tools.hacks import advance_to_next_cl_segment
_16_uuid_column_spec = (
'a uuid PRIMARY KEY, b uuid, c uuid, d uuid, e uuid, f uuid, g uuid, '
'h uuid, i uuid, j uuid, k uuid, l uuid, m uuid, n uuid, o uuid, '
'p uuid'
)
def _insert_rows(session, table_name, insert_stmt, values):
prepared_insert = session.prepare(insert_stmt)
values = list(values) # in case values is a generator
execute_concurrent(session, ((prepared_insert, x) for x in values),
concurrency=500, raise_on_first_error=True)
data_loaded = rows_to_list(session.execute('SELECT * FROM ' + table_name))
debug('{n} rows inserted into {table_name}'.format(n=len(data_loaded), table_name=table_name))
# use assert_equal over assert_length_equal to avoid printing out
# potentially large lists
assert_equal(len(values), len(data_loaded))
return data_loaded
def _move_contents(source_dir, dest_dir, verbose=True):
for source_filename in os.listdir(source_dir):
source_path, dest_path = (os.path.join(source_dir, source_filename),
os.path.join(dest_dir, source_filename))
if verbose:
debug('moving {} to {}'.format(source_path, dest_path))
shutil.move(source_path, dest_path)
def _get_16_uuid_insert_stmt(ks_name, table_name):
return (
'INSERT INTO {ks_name}.{table_name} '
'(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) '
'VALUES (uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid())'
).format(ks_name=ks_name, table_name=table_name)
def _get_create_table_statement(ks_name, table_name, column_spec, options=None):
if options:
options_pairs = ('{k}={v}'.format(k=k, v=v) for (k, v) in options.iteritems())
options_string = 'WITH ' + ' AND '.join(options_pairs)
else:
options_string = ''
return (
'CREATE TABLE ' + ks_name + '.' + table_name + ' '
'(' + column_spec + ') ' + options_string
)
def _write_to_cdc_WriteFailure(session, insert_stmt):
prepared = session.prepare(insert_stmt)
start, rows_loaded, error_found = time.time(), 0, False
rate_limited_debug = get_rate_limited_function(debug, 5)
while not error_found:
# We want to fail if inserting data takes too long. Locally this
# takes about 10s, but let's be generous.
assert_less_equal(
(time.time() - start), 600,
"It's taken more than 10 minutes to reach a WriteFailure trying "
'to overrun the space designated for CDC commitlogs. This could '
"be because data isn't being written quickly enough in this "
'environment, or because C* is failing to reject writes when '
'it should.'
)
# If we haven't logged from here in the last 5s, do so.
rate_limited_debug(
' data load step has lasted {s:.2f}s, '
'loaded {r} rows'.format(s=(time.time() - start), r=rows_loaded))
batch_results = list(execute_concurrent(
session,
((prepared, ()) for _ in range(1000)),
concurrency=500,
# Don't propagate errors to the main thread. We expect at least
# one WriteFailure, so we handle it below as part of the
# results recieved from this method.
raise_on_first_error=False
))
# Here, we track the number of inserted values by getting the
# number of successfully completed statements...
rows_loaded += len([br for br in batch_results if br[0]])
# then, we make sure that the only failures are the expected
# WriteFailures.
assert_equal([],
[result for (success, result) in batch_results
if not success and not isinstance(result, WriteFailure)])
# Finally, if we find a WriteFailure, that means we've inserted all
# the CDC data we can and so we flip error_found to exit the loop.
if any(isinstance(result, WriteFailure) for (_, result) in batch_results):
debug("write failed (presumably because we've overrun "
'designated CDC commitlog space) after '
'loading {r} rows in {s:.2f}s'.format(
r=rows_loaded,
s=time.time() - start))
error_found = True
return rows_loaded
_TableInfoNamedtuple = namedtuple('TableInfoNamedtuple', [
# required
'ks_name', 'table_name', 'column_spec',
# optional
'options', 'insert_stmt',
# derived
'name', 'create_stmt'
])
class TableInfo(_TableInfoNamedtuple):
__slots__ = ()
def __new__(cls, ks_name, table_name, column_spec, options=None, insert_stmt=None):
name = ks_name + '.' + table_name
create_stmt = _get_create_table_statement(ks_name, table_name, column_spec, options)
self = super(TableInfo, cls).__new__(
cls,
# required
ks_name=ks_name, table_name=table_name, column_spec=column_spec,
# optional
options=options, insert_stmt=insert_stmt,
# derived
name=name, create_stmt=create_stmt
)
return self
def _set_cdc_on_table(session, table_name, value, ks_name=None):
"""
Uses <session> to set CDC to <value> on <ks_name>.<table_name>.
"""
table_string = ks_name + '.' + table_name if ks_name else table_name
value_string = 'true' if value else 'false'
stmt = 'ALTER TABLE ' + table_string + ' WITH CDC = ' + value_string
debug(stmt)
session.execute(stmt)
def _get_set_cdc_func(session, ks_name, table_name):
"""
Close over a session, keyspace name, and table name and return a function
that takes enables CDC on that keyspace if its argument is truthy and
otherwise disables it.
"""
def set_cdc(value):
return _set_cdc_on_table(
session=session,
ks_name=ks_name, table_name=table_name,
value=value
)
return set_cdc
def _get_commitlog_files(node_path):
commitlog_dir = os.path.join(node_path, 'commitlogs')
return {
os.path.join(commitlog_dir, name)
for name in os.listdir(commitlog_dir)
}
def _get_cdc_raw_files(node_path, cdc_raw_dir_name='cdc_raw'):
commitlog_dir = os.path.join(node_path, cdc_raw_dir_name)
return {
os.path.join(commitlog_dir, name)
for name in os.listdir(commitlog_dir)
}
@since('3.8')
class TestCDC(Tester):
"""
@jira_ticket CASSANDRA-8844
Test the correctness of some features of CDC, Change Data Capture, which
provides a view of the commitlog on tables for which it is enabled.
"""
def _create_temp_dir(self, dir_name, verbose=True):
"""
Create a directory that will be deleted when this test class is torn
down.
"""
if verbose:
debug('creating ' + dir_name)
try:
os.mkdir(dir_name)
except OSError as e:
if e.errno != errno.EEXIST:
debug(dir_name + ' already exists. removing and recreating.')
shutil.rmtree(dir_name)
os.mkdir(dir_name)
else:
raise e
def debug_and_rmtree():
shutil.rmtree(dir_name)
debug(dir_name + ' removed')
self.addCleanup(debug_and_rmtree)
def prepare(self, ks_name,
table_name=None, cdc_enabled_table=None,
gc_grace_seconds=None,
column_spec=None,
configuration_overrides=None,
table_id=None):
"""
Create a 1-node cluster, start it, create a keyspace, and if
<table_name>, create a table in that keyspace. If <cdc_enabled_table>,
that table is created with CDC enabled. If <column_spec>, use that
string to specify the schema of the table -- for example, a valid value
is 'a int PRIMARY KEY, b int'. The <configuration_overrides> is
treated as a dict-like object and passed to
self.cluster.set_configuration_options.
"""
config_defaults = {
'cdc_enabled': True,
# we want to be able to generate new segments quickly
'commitlog_segment_size_in_mb': 2,
}
if configuration_overrides is None:
configuration_overrides = {}
self.cluster.populate(1)
self.cluster.set_configuration_options(dict(config_defaults, **configuration_overrides))
self.cluster.start(wait_for_binary_proto=True)
node = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, ks_name, rf=1)
if table_name is not None:
self.assertIsNotNone(cdc_enabled_table, 'if creating a table in prepare, must specify whether or not CDC is enabled on it')
self.assertIsNotNone(column_spec, 'if creating a table in prepare, must specify its schema')
options = {}
if gc_grace_seconds is not None:
options['gc_grace_seconds'] = gc_grace_seconds
if table_id is not None:
options['id'] = table_id
if cdc_enabled_table:
options['cdc'] = 'true'
stmt = _get_create_table_statement(
ks_name, table_name, column_spec,
options=options
)
debug(stmt)
session.execute(stmt)
return node, session
def _assert_cdc_data_readable_on_round_trip(self, start_with_cdc_enabled):
"""
Parameterized test asserting that data written to a table is still
readable after flipping the CDC flag on that table, then flipping it
again. Starts with CDC enabled if start_with_cdc_enabled, otherwise
starts with it disabled.
"""
ks_name, table_name = 'ks', 'tab'
sequence = [True, False, True] if start_with_cdc_enabled else [False, True, False]
start_enabled, alter_path = sequence[0], list(sequence[1:])
node, session = self.prepare(ks_name=ks_name, table_name=table_name,
cdc_enabled_table=start_enabled,
column_spec='a int PRIMARY KEY, b int')
set_cdc = _get_set_cdc_func(session=session, ks_name=ks_name, table_name=table_name)
insert_stmt = session.prepare('INSERT INTO ' + table_name + ' (a, b) VALUES (?, ?)')
data = tuple(zip(list(range(1000)), list(range(1000))))
execute_concurrent_with_args(session, insert_stmt, data)
# We need data to be in commitlogs, not sstables.
self.assertEqual([], list(node.get_sstables(ks_name, table_name)))
for enable in alter_path:
set_cdc(enable)
self.assertItemsEqual(session.execute('SELECT * FROM ' + table_name), data)
def test_cdc_enabled_data_readable_on_round_trip(self):
"""
Test that data is readable after an enabled->disabled->enabled round
trip.
"""
self._assert_cdc_data_readable_on_round_trip(start_with_cdc_enabled=True)
def test_cdc_disabled_data_readable_on_round_trip(self):
"""
Test that data is readable after an disabled->enabled->disabled round
trip.
"""
self._assert_cdc_data_readable_on_round_trip(start_with_cdc_enabled=False)
def test_insertion_and_commitlog_behavior_after_reaching_cdc_total_space(self):
"""
Test that C* behaves correctly when CDC tables have consumed all the
space available to them. In particular: after writing
cdc_total_space_in_mb MB into CDC commitlogs:
- CDC writes are rejected
- non-CDC writes are accepted
- on flush, CDC commitlogs are copied to cdc_raw
- on flush, non-CDC commitlogs are not copied to cdc_raw
This is a lot of behavior to validate in one test, but we do so to
avoid running multiple tests that each write 1MB of data to fill
cdc_total_space_in_mb.
"""
ks_name = 'ks'
full_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='full_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'full_cdc_tab'),
options={'cdc': 'true'}
)
configuration_overrides = {
# Make CDC space as small as possible so we can fill it quickly.
'cdc_total_space_in_mb': 4,
}
node, session = self.prepare(
ks_name=ks_name,
configuration_overrides=configuration_overrides
)
session.execute(full_cdc_table_info.create_stmt)
# Later, we'll also make assertions about the behavior of non-CDC
# tables, so we create one here.
non_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='non_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'non_cdc_tab')
)
session.execute(non_cdc_table_info.create_stmt)
# We'll also make assertions about the behavior of CDC tables when
# other CDC tables have already filled the designated space for CDC
# commitlogs, so we create the second CDC table here.
empty_cdc_table_info = TableInfo(
ks_name=ks_name, table_name='empty_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'empty_cdc_tab'),
options={'cdc': 'true'}
)
session.execute(empty_cdc_table_info.create_stmt)
# Here, we insert values into the first CDC table until we get a
# WriteFailure. This should happen when the CDC commitlogs take up 1MB
# or more.
debug('flushing non-CDC commitlogs')
node.flush()
# Then, we insert rows into the CDC table until we can't anymore.
debug('beginning data insert to fill CDC commitlogs')
rows_loaded = _write_to_cdc_WriteFailure(session, full_cdc_table_info.insert_stmt)
self.assertLess(0, rows_loaded,
'No CDC rows inserted. This may happen when '
'cdc_total_space_in_mb > commitlog_segment_size_in_mb')
commitlog_dir = os.path.join(node.get_path(), 'commitlogs')
commitlogs_size = size_of_files_in_dir(commitlog_dir)
debug('Commitlog dir ({d}) is {b}B'.format(d=commitlog_dir, b=commitlogs_size))
# We should get a WriteFailure when trying to write to the CDC table
# that's filled the designated CDC space...
with self.assertRaises(WriteFailure):
session.execute(full_cdc_table_info.insert_stmt)
# or any CDC table.
with self.assertRaises(WriteFailure):
session.execute(empty_cdc_table_info.insert_stmt)
# Now we test for behaviors of non-CDC tables when we've exceeded
# cdc_total_space_in_mb.
#
# First, we drain and save the names of all the new discarded CDC
# segments
node.drain()
session.cluster.shutdown()
node.stop()
node.start(wait_for_binary_proto=True)
session = self.patient_cql_connection(node)
pre_non_cdc_write_cdc_raw_segments = _get_cdc_raw_files(node.get_path())
# save the names of all the commitlog segments written up to this
# point:
pre_non_cdc_write_segments = _get_commitlog_files(node.get_path())
# Check that writing to non-CDC tables succeeds even when writes to CDC
# tables are rejected:
non_cdc_prepared_insert = session.prepare(non_cdc_table_info.insert_stmt)
session.execute(non_cdc_prepared_insert, ()) # should not raise an exception
# Check the following property: any new commitlog segments written to
# after cdc_raw has reached its maximum configured size should not be
# moved to cdc_raw, on commitlog discard, because any such commitlog
# segments are written to non-CDC tables.
#
# First, write to non-cdc tables.
start, time_limit = time.time(), 600
rate_limited_debug = get_rate_limited_function(debug, 5)
debug('writing to non-cdc table')
# We write until we get a new commitlog segment.
while _get_commitlog_files(node.get_path()) <= pre_non_cdc_write_segments:
elapsed = time.time() - start
rate_limited_debug(' non-cdc load step has lasted {s:.2f}s'.format(s=elapsed))
self.assertLessEqual(
elapsed, time_limit,
"It's been over a {s}s and we haven't written a new "
"commitlog segment. Something is wrong.".format(s=time_limit)
)
execute_concurrent(
session,
((non_cdc_prepared_insert, ()) for _ in range(1000)),
concurrency=500,
raise_on_first_error=True,
)
# Finally, we check that draining doesn't move any new segments to cdc_raw:
node.drain()
session.cluster.shutdown()
self.assertEqual(pre_non_cdc_write_cdc_raw_segments, _get_cdc_raw_files(node.get_path()))
def _init_new_loading_node(self, ks_name, create_stmt, use_thrift=False):
loading_node = Node(
name='node2',
cluster=self.cluster,
auto_bootstrap=False,
thrift_interface=('127.0.0.2', 9160) if use_thrift else None,
storage_interface=('127.0.0.2', 7000),
jmx_port='7400',
remote_debug_port='0',
initial_token=None,
binary_interface=('127.0.0.2', 9042)
)
debug('adding node')
self.cluster.add(loading_node, is_seed=True)
debug('starting new node')
loading_node.start(wait_for_binary_proto=True)
debug('recreating ks and table')
loading_session = self.patient_exclusive_cql_connection(loading_node)
create_ks(loading_session, ks_name, rf=1)
debug('creating new table')
loading_session.execute(create_stmt)
debug('stopping new node')
loading_node.stop()
loading_session.cluster.shutdown()
return loading_node
def test_cdc_data_available_in_cdc_raw(self):
ks_name = 'ks'
# First, create a new node just for data generation.
generation_node, generation_session = self.prepare(ks_name=ks_name)
cdc_table_info = TableInfo(
ks_name=ks_name, table_name='cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks_name, 'cdc_tab'),
options={
'cdc': 'true',
# give table an explicit id so when we create it again it's the
# same table and we can replay into it
'id': uuid.uuid4()
}
)
# Write until we get a new CL segment to avoid replaying initialization
# mutations from this node's startup into system tables in the other
# node. See CASSANDRA-11811.
advance_to_next_cl_segment(
session=generation_session,
commitlog_dir=os.path.join(generation_node.get_path(), 'commitlogs')
)
generation_session.execute(cdc_table_info.create_stmt)
# insert 10000 rows
inserted_rows = _insert_rows(generation_session, cdc_table_info.name, cdc_table_info.insert_stmt, repeat((), 10000))
# drain the node to guarantee all cl segements will be recycled
debug('draining')
generation_node.drain()
debug('stopping')
# stop the node and clean up all sessions attached to it
generation_node.stop()
generation_session.cluster.shutdown()
# create a new node to use for cdc_raw cl segment replay
loading_node = self._init_new_loading_node(ks_name, cdc_table_info.create_stmt, self.cluster.version() < '4')
# move cdc_raw contents to commitlog directories, then start the
# node again to trigger commitlog replay, which should replay the
# cdc_raw files we moved to commitlogs into memtables.
debug('moving cdc_raw and restarting node')
_move_contents(
os.path.join(generation_node.get_path(), 'cdc_raw'),
os.path.join(loading_node.get_path(), 'commitlogs')
)
loading_node.start(wait_for_binary_proto=True)
debug('node successfully started; waiting on log replay')
loading_node.grep_log('Log replay complete')
debug('log replay complete')
# final assertions
validation_session = self.patient_exclusive_cql_connection(loading_node)
data_in_cdc_table_after_restart = rows_to_list(
validation_session.execute('SELECT * FROM ' + cdc_table_info.name)
)
debug('found {cdc} values in CDC table'.format(
cdc=len(data_in_cdc_table_after_restart)
))
# Then we assert that the CDC data that we expect to be there is there.
# All data that was in CDC tables should have been copied to cdc_raw,
# then used in commitlog replay, so it should be back in the cluster.
self.assertEqual(
inserted_rows,
data_in_cdc_table_after_restart,
# The message on failure is too long, since cdc_data is thousands
# of items, so we print something else here
msg='not all expected data selected'
)