blob: d04fb001bcb6fff6fa523cc23ad912026867d277 [file] [log] [blame]
import pytest
import glob
import os
import shutil
import time
import logging
import re
import tempfile
import subprocess
import sys
import errno
import pprint
from collections import OrderedDict
from cassandra.cluster import Cluster as PyCluster
from cassandra.cluster import NoHostAvailable
from cassandra.cluster import EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy
from ccmlib.common import is_win
from ccmlib.cluster import Cluster
from dtest import (get_ip_from_node, make_execution_profile, get_auth_provider, get_port_from_node,
get_eager_protocol_version)
from distutils.version import LooseVersion
from tools.context import log_filter
from tools.funcutils import merge_dicts
logger = logging.getLogger(__name__)
def retry_till_success(fun, *args, **kwargs):
timeout = kwargs.pop('timeout', 60)
bypassed_exception = kwargs.pop('bypassed_exception', Exception)
deadline = time.time() + timeout
while True:
try:
return fun(*args, **kwargs)
except bypassed_exception:
if time.time() > deadline:
raise
else:
# brief pause before next attempt
time.sleep(0.25)
class DTestSetup(object):
def __init__(self, dtest_config=None, setup_overrides=None, cluster_name="test"):
self.dtest_config = dtest_config
self.setup_overrides = setup_overrides
self.cluster_name = cluster_name
self.ignore_log_patterns = []
self.cluster = None
self.cluster_options = []
self.replacement_node = None
self.allow_log_errors = False
self.connections = []
self.log_saved_dir = "logs"
try:
os.mkdir(self.log_saved_dir)
except OSError:
pass
self.last_log = os.path.join(self.log_saved_dir, "last")
self.test_path = self.get_test_path()
self.subprocs = []
self.log_watch_thread = None
self.last_test_dir = "last_test_dir"
self.jvm_args = []
self.create_cluster_func = None
self.iterations = 0
def get_test_path(self):
test_path = tempfile.mkdtemp(prefix='dtest-')
# ccm on cygwin needs absolute path to directory - it crosses from cygwin space into
# regular Windows space on wmic calls which will otherwise break pathing
if sys.platform == "cygwin":
process = subprocess.Popen(["cygpath", "-m", test_path], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
test_path = process.communicate()[0].rstrip()
return test_path
def glob_data_dirs(self, path, ks="ks"):
result = []
for node in self.cluster.nodelist():
for data_dir in node.data_directories():
ks_dir = os.path.join(data_dir, ks, path)
result.extend(glob.glob(ks_dir))
return result
def begin_active_log_watch(self):
"""
Calls into ccm to start actively watching logs.
In the event that errors are seen in logs, ccm will call back to _log_error_handler.
When the cluster is no longer in use, stop_active_log_watch should be called to end log watching.
(otherwise a 'daemon' thread will (needlessly) run until the process exits).
"""
self.log_watch_thread = self.cluster.actively_watch_logs_for_error(self._log_error_handler, interval=0.25)
def _log_error_handler(self, errordata):
"""
Callback handler used in conjunction with begin_active_log_watch.
When called, prepares exception instance, we will use pytest.fail
to kill the current test being executed and mark it as failed
@param errordata is a dictonary mapping node name to failure list.
"""
# in some cases self.allow_log_errors may get set after proactive log checking has been enabled
# so we need to double-check first thing before proceeding
if self.allow_log_errors:
return
reportable_errordata = OrderedDict()
for nodename, errors in list(errordata.items()):
filtered_errors = list(self.__filter_errors(['\n'.join(msg) for msg in errors]))
if len(filtered_errors) != 0:
reportable_errordata[nodename] = filtered_errors
# no errors worthy of halting the test
if not reportable_errordata:
return
message = "Errors seen in logs for: {nodes}".format(nodes=", ".join(list(reportable_errordata.keys())))
for nodename, errors in list(reportable_errordata.items()):
for error in errors:
message += "\n{nodename}: {error}".format(nodename=nodename, error=error)
logger.debug('Errors were just seen in logs, ending test (if not ending already)!')
pytest.fail("Error details: \n{message}".format(message=message))
def copy_logs(self, directory=None, name=None):
"""Copy the current cluster's log files somewhere, by default to LOG_SAVED_DIR with a name of 'last'"""
if directory is None:
directory = self.log_saved_dir
if name is None:
name = self.last_log
else:
name = os.path.join(directory, name)
if not os.path.exists(directory):
os.mkdir(directory)
logs = [(node.name, node.logfilename(), node.debuglogfilename(), node.gclogfilename(),
node.compactionlogfilename())
for node in self.cluster.nodelist()]
if len(logs) != 0:
basedir = str(int(time.time() * 1000)) + '_' + str(id(self))
logdir = os.path.join(directory, basedir)
os.mkdir(logdir)
for n, log, debuglog, gclog, compactionlog in logs:
if os.path.exists(log):
assert os.path.getsize(log) >= 0
shutil.copyfile(log, os.path.join(logdir, n + ".log"))
if os.path.exists(debuglog):
assert os.path.getsize(debuglog) >= 0
shutil.copyfile(debuglog, os.path.join(logdir, n + "_debug.log"))
if os.path.exists(gclog):
assert os.path.getsize(gclog) >= 0
shutil.copyfile(gclog, os.path.join(logdir, n + "_gc.log"))
if os.path.exists(compactionlog):
assert os.path.getsize(compactionlog) >= 0
shutil.copyfile(compactionlog, os.path.join(logdir, n + "_compaction.log"))
if os.path.exists(name):
os.unlink(name)
if not is_win():
os.symlink(basedir, name)
def cql_connection(self, node, keyspace=None, user=None,
password=None, compression=True, protocol_version=None, port=None, ssl_opts=None, **kwargs):
return self._create_session(node, keyspace, user, password, compression,
protocol_version, port=port, ssl_opts=ssl_opts, **kwargs)
def exclusive_cql_connection(self, node, keyspace=None, user=None,
password=None, compression=True, protocol_version=None, port=None, ssl_opts=None,
**kwargs):
node_ip = get_ip_from_node(node)
wlrr = WhiteListRoundRobinPolicy([node_ip])
return self._create_session(node, keyspace, user, password, compression,
protocol_version, port=port, ssl_opts=ssl_opts, load_balancing_policy=wlrr,
**kwargs)
def _create_session(self, node, keyspace, user, password, compression, protocol_version,
port=None, ssl_opts=None, execution_profiles=None, **kwargs):
node_ip = get_ip_from_node(node)
if not port:
port = get_port_from_node(node)
if protocol_version is None:
protocol_version = get_eager_protocol_version(node.cluster.version())
if user is not None:
auth_provider = get_auth_provider(user=user, password=password)
else:
auth_provider = None
profiles = {EXEC_PROFILE_DEFAULT: make_execution_profile(**kwargs)
} if not execution_profiles else execution_profiles
cluster = PyCluster([node_ip],
auth_provider=auth_provider,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_options=ssl_opts,
connect_timeout=15,
allow_beta_protocol_version=True,
execution_profiles=profiles)
session = cluster.connect(wait_for_all_pools=True)
if keyspace is not None:
session.set_keyspace(keyspace)
self.connections.append(session)
return session
def patient_cql_connection(self, node, keyspace=None,
user=None, password=None, timeout=30, compression=True,
protocol_version=None, port=None, ssl_opts=None, **kwargs):
"""
Returns a connection after it stops throwing NoHostAvailables due to not being ready.
If the timeout is exceeded, the exception is raised.
"""
if is_win():
timeout *= 2
expected_log_lines = ('Control connection failed to connect, shutting down Cluster:',
'[control connection] Error connecting to ')
with log_filter('cassandra.cluster', expected_log_lines):
session = retry_till_success(
self.cql_connection,
node,
keyspace=keyspace,
user=user,
password=password,
timeout=timeout,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_opts=ssl_opts,
bypassed_exception=NoHostAvailable,
**kwargs
)
return session
def patient_exclusive_cql_connection(self, node, keyspace=None,
user=None, password=None, timeout=30, compression=True,
protocol_version=None, port=None, ssl_opts=None, **kwargs):
"""
Returns a connection after it stops throwing NoHostAvailables due to not being ready.
If the timeout is exceeded, the exception is raised.
"""
if is_win():
timeout *= 2
return retry_till_success(
self.exclusive_cql_connection,
node,
keyspace=keyspace,
user=user,
password=password,
timeout=timeout,
compression=compression,
protocol_version=protocol_version,
port=port,
ssl_opts=ssl_opts,
bypassed_exception=NoHostAvailable,
**kwargs
)
def check_logs_for_errors(self):
for node in self.cluster.nodelist():
errors = list(self.__filter_errors(
['\n'.join(msg) for msg in node.grep_log_for_errors()]))
if len(errors) != 0:
for error in errors:
print("Unexpected error in {node_name} log, error: \n{error}".format(node_name=node.name, error=error))
return True
def __filter_errors(self, errors):
"""Filter errors, removing those that match self.ignore_log_patterns"""
if not hasattr(self, 'ignore_log_patterns'):
self.ignore_log_patterns = []
for e in errors:
for pattern in self.ignore_log_patterns:
if re.search(pattern, e):
break
else:
yield e
def get_jfr_jvm_args(self):
"""
@return The JVM arguments required for attaching flight recorder to a Java process.
"""
return ["-XX:+UnlockCommercialFeatures", "-XX:+FlightRecorder"]
def start_jfr_recording(self, nodes):
"""
Start Java flight recorder provided the cluster was started with the correct jvm arguments.
"""
for node in nodes:
p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.start'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
logger.debug(stdout)
logger.debug(stderr)
def dump_jfr_recording(self, nodes):
"""
Save Java flight recorder results to file for analyzing with mission control.
"""
for node in nodes:
p = subprocess.Popen(['jcmd', str(node.pid), 'JFR.dump',
'recording=1', 'filename=recording_{}.jfr'.format(node.address())],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
logger.debug(stdout)
logger.debug(stderr)
def supports_v5_protocol(self, cluster_version):
return cluster_version >= LooseVersion('4.0')
def cleanup_last_test_dir(self):
if os.path.exists(self.last_test_dir):
os.remove(self.last_test_dir)
def stop_active_log_watch(self):
"""
Joins the log watching thread, which will then exit.
Should be called after each test, ideally after nodes are stopped but before cluster files are removed.
Can be called multiple times without error.
If not called, log watching thread will remain running until the parent process exits.
"""
self.log_watch_thread.join(timeout=60)
def cleanup_cluster(self, request=None):
with log_filter('cassandra'): # quiet noise from driver when nodes start going down
test_failed = request and hasattr(request.node, 'rep_call') and request.node.rep_call.failed
if self.dtest_config.keep_test_dir or (self.dtest_config.keep_failed_test_dir and test_failed):
self.cluster.stop(gently=self.dtest_config.enable_jacoco_code_coverage)
else:
# when recording coverage the jvm has to exit normally
# or the coverage information is not written by the jacoco agent
# otherwise we can just kill the process
if self.dtest_config.enable_jacoco_code_coverage:
self.cluster.stop(gently=True)
# Cleanup everything:
try:
if self.log_watch_thread:
self.stop_active_log_watch()
finally:
logger.debug("removing ccm cluster {name} at: {path}".format(name=self.cluster.name,
path=self.test_path))
self.cluster.remove()
logger.debug("clearing ssl stores from [{0}] directory".format(self.test_path))
for filename in ('keystore.jks', 'truststore.jks', 'ccm_node.cer'):
try:
os.remove(os.path.join(self.test_path, filename))
except OSError as e:
# ENOENT = no such file or directory
assert e.errno == errno.ENOENT
os.rmdir(self.test_path)
self.cleanup_last_test_dir()
def cleanup_and_replace_cluster(self):
for con in self.connections:
con.cluster.shutdown()
self.connections = []
self.cleanup_cluster()
self.test_path = self.get_test_path()
self.initialize_cluster(self.create_cluster_func)
def init_default_config(self):
# the failure detector can be quite slow in such tests with quick start/stop
phi_values = {'phi_convict_threshold': 5}
# enable read time tracking of repaired data between replicas by default
if self.cluster.version() >= '4':
repaired_data_tracking_values = {'repaired_data_tracking_for_partition_reads_enabled': 'true',
'repaired_data_tracking_for_range_reads_enabled': 'true',
'report_unconfirmed_repaired_data_mismatches': 'true'}
else:
repaired_data_tracking_values = {}
timeout = 15000
if self.cluster_options is not None and len(self.cluster_options) > 0:
values = merge_dicts(self.cluster_options, phi_values, repaired_data_tracking_values)
else:
values = merge_dicts(phi_values, repaired_data_tracking_values, {
'read_request_timeout_in_ms': timeout,
'range_request_timeout_in_ms': timeout,
'write_request_timeout_in_ms': timeout,
'truncate_request_timeout_in_ms': timeout,
'request_timeout_in_ms': timeout
})
if self.setup_overrides is not None and len(self.setup_overrides.cluster_options) > 0:
values = merge_dicts(values, self.setup_overrides.cluster_options)
# No more thrift in 4.0, and start_rpc doesn't exists anymore
if self.cluster.version() >= '4' and 'start_rpc' in values:
del values['start_rpc']
if self.cluster.version() >= '4':
values['corrupted_tombstone_strategy'] = 'exception'
if self.dtest_config.use_vnodes:
self.cluster.set_configuration_options(values={'num_tokens': self.dtest_config.num_tokens})
else:
self.cluster.set_configuration_options(values={'num_tokens': 1})
if self.dtest_config.use_off_heap_memtables:
self.cluster.set_configuration_options(values={'memtable_allocation_type': 'offheap_objects'})
self.cluster.set_configuration_options(values)
logger.debug("Done setting configuration options:\n" + pprint.pformat(self.cluster._config_options, indent=4))
def maybe_setup_jacoco(self, cluster_name='test'):
"""Setup JaCoCo code coverage support"""
if not self.dtest_config.enable_jacoco_code_coverage:
return
# use explicit agent and execfile locations
# or look for a cassandra build if they are not specified
agent_location = os.environ.get('JACOCO_AGENT_JAR',
os.path.join(self.dtest_config.cassandra_dir, 'build/lib/jars/jacocoagent.jar'))
jacoco_execfile = os.environ.get('JACOCO_EXECFILE',
os.path.join(self.dtest_config.cassandra_dir, 'build/jacoco/jacoco.exec'))
if os.path.isfile(agent_location):
logger.debug("Jacoco agent found at {}".format(agent_location))
with open(os.path.join(
self.test_path, cluster_name, 'cassandra.in.sh'), 'w') as f:
f.write('JVM_OPTS="$JVM_OPTS -javaagent:{jar_path}=destfile={exec_file}"'
.format(jar_path=agent_location, exec_file=jacoco_execfile))
if os.path.isfile(jacoco_execfile):
logger.debug("Jacoco execfile found at {}, execution data will be appended".format(jacoco_execfile))
else:
logger.debug("Jacoco execfile will be created at {}".format(jacoco_execfile))
else:
logger.debug("Jacoco agent not found or is not file. Execution will not be recorded.")
@staticmethod
def create_ccm_cluster(dtest_setup):
logger.info("cluster ccm directory: " + dtest_setup.test_path)
version = dtest_setup.dtest_config.cassandra_version
if version:
cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_version=version)
else:
cluster = Cluster(dtest_setup.test_path, dtest_setup.cluster_name, cassandra_dir=dtest_setup.dtest_config.cassandra_dir)
cluster.set_datadir_count(dtest_setup.dtest_config.data_dir_count)
cluster.set_environment_variable('CASSANDRA_LIBJEMALLOC', dtest_setup.dtest_config.jemalloc_path)
return cluster
def set_cluster_log_levels(self):
"""
The root logger gets configured in the fixture named fixture_logging_setup.
Based on the logging configuration options the user invoked pytest with,
that fixture sets the root logger to that configuration. We then ensure all
Cluster objects we work with "inherit" these logging settings (which we can
lookup off the root logger)
"""
if logging.root.level != 'NOTSET':
log_level = logging.getLevelName(logging.INFO)
else:
log_level = logging.root.level
self.cluster.set_log_level(log_level)
def initialize_cluster(self, create_cluster_func):
"""
This method is responsible for initializing and configuring a ccm
cluster for the next set of tests. This can be called for two
different reasons:
* A class of tests is starting
* A test method failed/errored, so the cluster has been wiped
Subclasses that require custom initialization should generally
do so by overriding post_initialize_cluster().
"""
# connections = []
# cluster_options = []
self.iterations += 1
self.create_cluster_func = create_cluster_func
self.cluster = self.create_cluster_func(self)
self.init_default_config()
self.maybe_setup_jacoco()
self.set_cluster_log_levels()
# cls.init_config()
# write_last_test_file(cls.test_path, cls.cluster)
# cls.post_initialize_cluster()
def reinitialize_cluster_for_different_version(self):
"""
This method is used by upgrade tests to re-init the cluster to work with a specific
version that may not be compatible with the existing configuration options
"""
self.init_default_config()