blob: f2c89b21dc24cee446f59376d9f913de1ed5efae [file] [log] [blame]
import configparser
import copy
import logging
import os
import re
import subprocess
import sys
import threading
import time
import traceback
from distutils.version import LooseVersion
import pytest
import cassandra
from subprocess import CalledProcessError
from flaky import flaky
from cassandra import ConsistencyLevel, OperationTimedOut
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import ExecutionProfile
from cassandra.policies import RetryPolicy, RoundRobinPolicy
from ccmlib.node import ToolError, TimeoutError
from tools.misc import retry_till_success
LOG_SAVED_DIR = "logs"
except OSError:
LAST_LOG = os.path.join(LOG_SAVED_DIR, "last")
LAST_TEST_DIR = 'last_test_dir'
config = configparser.RawConfigParser()
if len('~/.cassandra-dtest'))) > 0:
if config.has_option('main', 'default_dir'):
DEFAULT_DIR = os.path.expanduser(config.get('main', 'default_dir'))
RUN_STATIC_UPGRADE_MATRIX = os.environ.get('RUN_STATIC_UPGRADE_MATRIX', '').lower() in ('yes', 'true')
MAJOR_VERSION_4 = LooseVersion('4.0')
logger = logging.getLogger(__name__)
def get_sha(repo_dir):
output = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir).strip()
prefix = 'github:apache/'
local_repo_location = os.environ.get('LOCAL_GIT_REPO')
if local_repo_location is not None:
prefix = 'local:{}:'.format(local_repo_location)
# local: slugs take the form 'local:/some/path/to/cassandra/:branch_name_or_sha'
return "{}{}".format(prefix, output)
except CalledProcessError as e:
if, 'Not a git repository') is not None:
# we tried to get a sha, but repo_dir isn't a git repo. No big deal, must just be
# working from a non-git install.
return None
# git call failed for some unknown reason
# copy the initial environment variables so we can reset them later:
initial_environment = copy.deepcopy(os.environ)
class DtestTimeoutError(Exception):
logger.debug("Python driver version in use: {}".format(cassandra.__version__))
class FlakyRetryPolicy(RetryPolicy):
A retry policy that retries 5 times by default, but can be configured to
retry more times.
def __init__(self, max_retries=5):
self.max_retries = max_retries
def on_read_timeout(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
logger.debug("Retrying read after timeout. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
return (self.RETHROW, None)
def on_write_timeout(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
logger.debug("Retrying write after timeout. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
return (self.RETHROW, None)
def on_unavailable(self, *args, **kwargs):
if kwargs['retry_num'] < self.max_retries:
logger.debug("Retrying request after UE. Attempt #" + str(kwargs['retry_num']))
return (self.RETRY, None)
return (self.RETHROW, None)
class Runner(threading.Thread):
def __init__(self, func):
self.__func = func
self.__error = None
self.__stopped = False
self.daemon = True
def run(self):
i = 0
while True:
if self.__stopped:
except Exception as e:
self.__error = e
i = i + 1
def stop(self):
if self.__stopped:
self.__stopped = True
# pytests may appear to hang forever waiting for cluster tear down. are all driver session objects shutdown?
# to debug hang you can add the following at the top of the test
# import faulthandler
# faulthandler.enable()
# and then when the hang occurs send a SIGABRT to the pytest process (e.g. kill -SIGABRT <pytest_pid>)
# this will print a python thread dump of all currently alive threads
if self.__error is not None:
raise self.__error
def check(self):
if self.__error is not None:
raise self.__error
def make_execution_profile(retry_policy=FlakyRetryPolicy(), consistency_level=ConsistencyLevel.ONE, **kwargs):
if 'load_balancing_policy' in kwargs:
return ExecutionProfile(retry_policy=retry_policy,
return ExecutionProfile(retry_policy=retry_policy,
def running_in_docker():
return os.path.isfile('/.dockerenv')
def cleanup_docker_environment_before_test_execution():
perform a bunch of system cleanup operations, like kill any instances that might be
hanging around incorrectly from a previous run, sync the disk, and clear swap.
Ideally we would also drop the page cache, but as docker isn't running in privileged
mode there is no way for us to do this.
# attempt to wack all existing running Cassandra processes forcefully to get us into a clean state
p_kill = subprocess.Popen('ps aux | grep -ie CassandraDaemon | grep java | grep -v grep | awk \'{print $2}\' | xargs kill -9',
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)
# explicitly call "sync" to flush everything that might be pending from a previous test
# so tests are less likely to hit a very slow fsync during the test by starting from a 'known' state
# note: to mitigate this further the docker image is mounting /tmp as a volume, which gives
# us an ext4 mount which should talk directly to the underlying device on the host, skipping
# the aufs pain that we get with anything else running in the docker image. Originally,
# I had a timeout of 120 seconds (2 minutes), 300 seconds (5 minutes) but sync was still occasionally timing out.
p_sync = subprocess.Popen('sudo /bin/sync', shell=True)
# turn swap off and back on to make sure it's fully cleared if anything happened to swap
# from a previous test run
p_swap = subprocess.Popen('sudo /sbin/swapoff -a && sudo /sbin/swapon -a', shell=True)
def test_failure_due_to_timeout(err, *args):
check if we should rerun a test with the flaky plugin or not.
for now, only run if we failed the test for one of the following
three exceptions: cassandra.OperationTimedOut, ccm.node.ToolError,
and ccm.node.TimeoutError.
- cassandra.OperationTimedOut will be thrown when a cql query made thru
the python-driver times out.
- ccm.node.ToolError will be thrown when an invocation of a "tool"
(in the case of dtests this will almost always invoking stress).
- ccm.node.TimeoutError will be thrown when a blocking ccm operation
on a individual node times out. In most cases this tends to be something
like watch_log_for hitting the timeout before the desired pattern is seen
in the node's logs.
if we failed for one of these reasons - and we're running in docker - run
the same "cleanup" logic we run before test execution and test setup begins
and for good measure introduce a 2 second sleep. why 2 seconds? because it's
magic :) - ideally this gets the environment back into a good state and makes
the rerun of flaky tests likely to suceed if they failed in the first place
due to environmental issues.
if issubclass(err[0], OperationTimedOut) or issubclass(err[0], ToolError) or issubclass(err[0], TimeoutError):
if running_in_docker():
return True
return False
class Tester(object):
def __getattribute__(self, name):
return object.__getattribute__(self, name)
except AttributeError:
fixture_dtest_setup = object.__getattribute__(self, 'fixture_dtest_setup')
return object.__getattribute__(fixture_dtest_setup , name)
@pytest.fixture(scope='function', autouse=True)
def set_dtest_setup_on_function(self, fixture_dtest_setup):
self.fixture_dtest_setup = fixture_dtest_setup
self.dtest_config = fixture_dtest_setup.dtest_config
return None
def set_node_to_current_version(self, node):
version = os.environ.get('CASSANDRA_VERSION')
if version:
os.environ['CASSANDRA_DIR'] = self.dtest_config.cassandra_dir
def go(self, func):
runner = Runner(func)
return runner
def assert_log_had_msg(self, node, msg, timeout=600, **kwargs):
Wrapper for ccmlib.node.Node#watch_log_for to cause an assertion failure when a log message isn't found
within the timeout.
:param node: Node which logs we should watch
:param msg: String message we expect to see in the logs.
:param timeout: Seconds to wait for msg to appear
node.watch_log_for(msg, timeout=timeout, **kwargs)
except TimeoutError:"Log message was not seen within timeout:\n{0}".format(msg))
def get_eager_protocol_version(cassandra_version):
Returns the highest protocol version accepted
by the given C* version
if LooseVersion('4.0') <= cassandra_version:
protocol_version = 5
elif LooseVersion('3.0') <= cassandra_version:
protocol_version = 4
elif LooseVersion('2.1') <= cassandra_version:
protocol_version = 3
elif LooseVersion('2.0') <= cassandra_version:
protocol_version = 2
protocol_version = 1
return protocol_version
# We default to UTF8Type because it's simpler to use in tests
def create_cf(session, name, key_type="varchar", speculative_retry=None, read_repair=None, compression=None,
gc_grace=None, columns=None, validation="UTF8Type", compact_storage=False, compaction_strategy='SizeTieredCompactionStrategy',
primary_key=None, clustering=None):
compaction_fragment = "compaction = {'class': '%s', 'enabled': 'true'}"
if compaction_strategy == '':
compaction_fragment = compaction_fragment % 'SizeTieredCompactionStrategy'
compaction_fragment = compaction_fragment % compaction_strategy
additional_columns = ""
if columns is not None:
for k, v in list(columns.items()):
additional_columns = "{}, {} {}".format(additional_columns, k, v)
if additional_columns == "":
query = 'CREATE COLUMNFAMILY %s (key %s, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment=\'test cf\'' % (name, key_type)
if primary_key:
query = 'CREATE COLUMNFAMILY %s (key %s%s, PRIMARY KEY(%s)) WITH comment=\'test cf\'' % (name, key_type, additional_columns, primary_key)
query = 'CREATE COLUMNFAMILY %s (key %s PRIMARY KEY%s) WITH comment=\'test cf\'' % (name, key_type, additional_columns)
if compaction_fragment is not None:
query = '%s AND %s' % (query, compaction_fragment)
if clustering:
query = '%s AND CLUSTERING ORDER BY (%s)' % (query, clustering)
if compression is not None:
query = '%s AND compression = { \'sstable_compression\': \'%sCompressor\' }' % (query, compression)
# if a compression option is omitted, C* will default to lz4 compression
query += ' AND compression = {}'
if read_repair is not None:
query = '%s AND read_repair_chance=%f AND dclocal_read_repair_chance=%f' % (query, read_repair, read_repair)
if gc_grace is not None:
query = '%s AND gc_grace_seconds=%d' % (query, gc_grace)
if speculative_retry is not None:
query = '%s AND speculative_retry=\'%s\'' % (query, speculative_retry)
if compact_storage:
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warn('AlreadyExists executing create cf query \'%s\'' % query)
#Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created
session.execute('SELECT * FROM %s LIMIT 1' % name);
def create_cf_simple(session, name, query):
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warn('AlreadyExists executing create cf query \'%s\'' % query)
#Going to ignore OperationTimedOut from create CF, so need to validate it was indeed created
session.execute('SELECT * FROM %s LIMIT 1' % name)
def create_ks(session, name, rf):
query = 'CREATE KEYSPACE %s WITH replication={%s}'
if isinstance(rf, int):
# we assume simpleStrategy
query = query % (name, "'class':'SimpleStrategy', 'replication_factor':%d" % rf)
assert len(rf) >= 0, "At least one datacenter/rf pair is needed"
# we assume networkTopologyStrategy
options = (', ').join(['\'%s\':%d' % (d, r) for d, r in rf.items()])
query = query % (name, "'class':'NetworkTopologyStrategy', %s" % options)
retry_till_success(session.execute, query=query, timeout=120, bypassed_exception=cassandra.OperationTimedOut)
except cassandra.AlreadyExists:
logger.warn('AlreadyExists executing create ks query \'%s\'' % query)
#Also validates it was indeed created even though we ignored OperationTimedOut
#Might happen some of the time because CircleCI disk IO is unreliable and hangs randomly
session.execute('USE {}'.format(name))
def get_auth_provider(user, password):
return PlainTextAuthProvider(username=user, password=password)
def make_auth(user, password):
def private_auth(node_ip):
return {'username': user, 'password': password}
return private_auth
def data_size(node, ks, cf):
Return the size in bytes for given table in a node.
This gets the size from nodetool cfstats output.
This might brake if the format of nodetool cfstats change
as it is looking for specific text "Space used (total)" in output.
@param node: Node in which table size to be checked for
@param ks: Keyspace name for the table
@param cf: table name
@return: data size in bytes
cfstats = node.nodetool("cfstats {}.{}".format(ks,cf))[0]
regex = re.compile(r'[\t]')
stats_lines = [regex.sub("", s) for s in cfstats.split('\n')
if regex.sub("", s).startswith('Space used (total)')]
if not len(stats_lines) == 1:
msg = ('Expected output from `nodetool cfstats` to contain exactly 1 '
'line starting with "Space used (total)". Found:\n') + cfstats
raise RuntimeError(msg)
space_used_line = stats_lines[0].split()
if len(space_used_line) == 4:
return float(space_used_line[3])
msg = ('Expected format for `Space used (total)` in nodetool cfstats is `Space used (total): <number>`.'
'Found:\n') + stats_lines[0]
raise RuntimeError(msg)
def get_port_from_node(node):
Return the port that this node is listening on.
We only use this to connect the native driver,
so we only care about the binary port.
return node.network_interfaces['binary'][1]
except Exception:
raise RuntimeError("No network interface defined on this node object. {}".format(node.network_interfaces))
def get_ip_from_node(node):
if node.network_interfaces['binary']:
node_ip = node.network_interfaces['binary'][0]
node_ip = node.network_interfaces['thrift'][0]
return node_ip
def kill_windows_cassandra_procs():
# On Windows, forcefully terminate any leftover previously running cassandra processes. This is a temporary
# workaround until we can determine the cause of intermittent hung-open tests and file-handles.
if is_win():
import psutil
for proc in psutil.process_iter():
pinfo = proc.as_dict(attrs=['pid', 'name', 'cmdline'])
except psutil.NoSuchProcess:
if (pinfo['name'] == 'java.exe' and '-Dcassandra' in pinfo['cmdline']):
print('Found running cassandra process with pid: ' + str(pinfo['pid']) + '. Killing.')
except ImportError:
logger.debug("WARN: psutil not installed. Cannot detect and kill "
"running cassandra processes - you may see cascading dtest failures.")
class MultiError(Exception):
Extends Exception to provide reporting multiple exceptions at once.
def __init__(self, exceptions, tracebacks):
# an exception and the corresponding traceback should be found at the same
# position in their respective lists, otherwise __str__ will be incorrect
self.exceptions = exceptions
self.tracebacks = tracebacks
def __str__(self):
output = "\n****************************** BEGIN MultiError ******************************\n"
for (exc, tb) in zip(self.exceptions, self.tracebacks):
output += str(exc)
output += str(tb) + "\n"
output += "****************************** END MultiError ******************************"
return output
def run_scenarios(scenarios, handler, deferred_exceptions=tuple()):
Runs multiple scenarios from within a single test method.
"Scenarios" are mini-tests where a common procedure can be reused with several different configurations.
They are intended for situations where complex/expensive setup isn't required and some shared state is acceptable (or trivial to reset).
Arguments: scenarios should be an iterable, handler should be a callable, and deferred_exceptions should be a tuple of exceptions which
are safe to delay until the scenarios are all run. For each item in scenarios, handler(item) will be called in turn.
Exceptions which occur will be bundled up and raised as a single MultiError exception, either when: a) all scenarios have run,
or b) on the first exception encountered which is not whitelisted in deferred_exceptions.
errors = []
tracebacks = []
for i, scenario in enumerate(scenarios, 1):
logger.debug("running scenario {}/{}: {}".format(i, len(scenarios), scenario))
except deferred_exceptions as e:
errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario)))
logger.debug("scenario {}/{} encountered a deferrable exception, continuing".format(i, len(scenarios)))
except Exception as e:
# catch-all for any exceptions not intended to be deferred
errors.append(type(e)('encountered {} {} running scenario:\n {}\n'.format(e.__class__.__name__, str(e), scenario)))
logger.debug("scenario {}/{} encountered a non-deferrable exception, aborting".format(i, len(scenarios)))
raise MultiError(errors, tracebacks)
if errors:
raise MultiError(errors, tracebacks)