blob: 27766c7d34c3985b7bced56a0497ef9be060578c [file] [log] [blame]
"""
This one's called hacks because it provides shared utilities to hack around
weirdnesses in Cassandra.
"""
import os
import time
import logging
from cassandra.concurrent import execute_concurrent
from tools.funcutils import get_rate_limited_function
logger = logging.getLogger(__name__)
def _files_in(directory):
return {
os.path.join(directory, name) for name in os.listdir(directory)
}
def advance_to_next_cl_segment(session, commitlog_dir,
keyspace_name='ks', table_name='junk_table',
timeout=120):
"""
This is a hack to work around problems like CASSANDRA-11811.
The problem happens in commitlog-replaying tests, like the snapshot and CDC
tests. If we replay the first commitlog that's created, we wind up
replaying some mutations that initialize system tables, so this function
advances the node to the next CL by filling up the first one.
"""
session.execute(
'CREATE TABLE {ks}.{tab} ('
'a uuid PRIMARY KEY, b uuid, c uuid, d uuid, '
'e uuid, f uuid, g uuid, h uuid'
')'.format(ks=keyspace_name, tab=table_name)
)
prepared_insert = session.prepare(
'INSERT INTO {ks}.{tab} '
'(a, b, c, d, e, f, g, h) '
'VALUES ('
'uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid()'
')'.format(ks=keyspace_name, tab=table_name)
)
# record segments that we want to advance past
initial_cl_files = _files_in(commitlog_dir)
start = time.time()
stop_time = start + timeout
rate_limited_debug_logger = get_rate_limited_function(logger.debug, 5)
logger.debug('attempting to write until we start writing to new CL segments: {}'.format(initial_cl_files))
while _files_in(commitlog_dir) <= initial_cl_files:
elapsed = time.time() - start
rate_limited_debug_logger(' commitlog-advancing load step has lasted {s:.2f}s'.format(s=elapsed))
assert (
time.time() <= stop_time), "It's been over a {s}s and we haven't written a new " + \
"commitlog segment. Something is wrong.".format(s=timeout)
execute_concurrent(
session,
((prepared_insert, ()) for _ in range(1000)),
concurrency=500,
raise_on_first_error=True,
)
logger.debug('present commitlog segments: {}'.format(_files_in(commitlog_dir)))