import pytest
import logging
import os
import subprocess
from distutils.version import LooseVersion
from cassandra import ConsistencyLevel
from ccmlib.common import is_win
from ccmlib.node import handle_external_tool_process, ToolError
import ccmlib.repository
from dtest import Tester, create_ks, create_cf
from tools.assertions import assert_length_equal
from import insert_c1c2
from tools.jmxutils import (JolokiaAgent, make_mbean,
since = pytest.mark.since
logger = logging.getLogger(__name__)
@since("2.2", max_version="4")
class TestDeprecatedRepairAPI(Tester):
@jira_ticket CASSANDRA-9570
Test if deprecated repair JMX API runs with expected parameters
def test_force_repair_async_1(self):
test forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters,
Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
['ks', True, [], [], False, False, ["cf"]])
assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
assert opt["primary_range"], "false" == opt
assert opt["incremental"], "true" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[]" == opt
assert opt["hosts"], "[]" == opt
assert opt["column_families"], "[cf]" == opt
def test_force_repair_async_2(self):
test forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters,
Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,int,java.util.Collection,java.util.Collection,boolean,boolean,[Ljava.lang.String;)",
['ks', 1, [], [], True, True, []])
assert opt["parallelism"], "parallel" == opt
assert opt["primary_range"], "true" == opt
assert opt["incremental"], "false" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[]" == opt
assert opt["hosts"], "[]" == opt
assert opt["column_families"], "[]" == opt
def test_force_repair_async_3(self):
test forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange,
boolean fullRepair, String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairAsync(java.lang.String,boolean,boolean,boolean,boolean,[Ljava.lang.String;)",
['ks', False, False, False, False, ["cf"]])
assert opt["parallelism"], "parallel" == opt
assert opt["primary_range"], "false" == opt
assert opt["incremental"], "true" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[]" == opt
assert opt["hosts"], "[]" == opt
assert opt["column_families"], "[cf]" == opt
def test_force_repair_range_async_1(self):
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair,
String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", True, ["dc1"], [], False, ["cf"]])
assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
assert opt["primary_range"], "false" == opt
assert opt["incremental"], "true" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[dc1]" == opt
assert opt["hosts"], "[]" == opt
assert opt["ranges"], "1" == opt
assert opt["column_families"], "[cf]" == opt
def test_force_repair_range_async_2(self):
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree,
Collection<String> dataCenters,
Collection<String> hosts,
boolean fullRepair, String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,int,java.util.Collection,java.util.Collection,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", 2, [], [], True, ["cf"]])
assert opt["parallelism"], "parallel" if is_win() else "dc_parallel" == opt
assert opt["primary_range"], "false" == opt
assert opt["incremental"], "false" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[]" == opt
assert opt["hosts"], "[]" == opt
assert opt["ranges"], "1" == opt
assert opt["column_families"], "[cf]" == opt
def test_force_repair_range_async_3(self):
test forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential,
boolean isLocal, boolean fullRepair,
String... columnFamilies)
opt = self._deprecated_repair_jmx("forceRepairRangeAsync(java.lang.String,java.lang.String,java.lang.String,boolean,boolean,boolean,[Ljava.lang.String;)",
["0", "1000", "ks", True, True, True, ["cf"]])
assert opt["parallelism"], "parallel" if is_win() else "sequential" == opt
assert opt["primary_range"], "false" == opt
assert opt["incremental"], "false" == opt
assert opt["job_threads"], "1" == opt
assert opt["data_centers"], "[dc1]" == opt
assert opt["hosts"], "[]" == opt
assert opt["ranges"], "1" == opt
assert opt["column_families"], "[cf]" == opt
def _deprecated_repair_jmx(self, method, arguments):
* Launch a two node, two DC cluster
* Create a keyspace and table
* Insert some data
* Call the deprecated repair JMX API based on the arguments passed into this method
* Check the node log to see if the correct repair was performed based on the jmx args
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate([1, 1])
node1, node2 = cluster.nodelist()
supports_pull_repair = cluster.version() >= LooseVersion('3.10')
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_cf(session, 'cf', read_repair=0.0, columns={'c1': 'text', 'c2': 'text'})
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ALL)
# Run repair
mbean = make_mbean('db', 'StorageService')
with JolokiaAgent(node1) as jmx:
# assert repair runs and returns valid cmd number
assert jmx.execute_method(mbean, method, arguments) == 1
# wait for log to start
node1.watch_log_for("Starting repair command")
# get repair parameters from the log
line = node1.grep_log(("Starting repair command #1" + (" \([^\)]+\)" if cluster.version() >= LooseVersion("3.10") else "") +
", repairing keyspace ks with repair options \(parallelism: (?P<parallelism>\w+), primary range: (?P<pr>\w+), "
"incremental: (?P<incremental>\w+), job threads: (?P<jobs>\d+), ColumnFamilies: (?P<cfs>.+), dataCenters: (?P<dc>.+), "
"hosts: (?P<hosts>.+), # of ranges: (?P<ranges>\d+)(, pull repair: (?P<pullrepair>true|false))?\)"))
assert_length_equal(line, 1)
line, m = line[0]
if supports_pull_repair:
assert"pullrepair"), "false" == "Pull repair cannot be enabled through the deprecated API so the pull repair option should always be false."
return {"parallelism":"parallelism"),
@since("3.0.16", max_version="4")
class TestDeprecatedRepairNotifications(Tester):
* @jira_ticket CASSANDRA-13121
* Test if legacy JMX detects failures in repair jobs launched with the deprecated API.
* Affects cassandra-3.x clusters when users run JMX from cassandra-2.1 and older to submit repair jobs.
def get_legacy_environment(self, legacy_version, node_env=None):
* Set up an environment to run nodetool from cassandra-2.1.
env = {}
if (node_env is not None):
env = node_env
legacy_dirpath = ccmlib.repository.directory_name(legacy_version)
env["CASSANDRA_HOME"] = legacy_dirpath
binpaths = [legacy_dirpath,
os.path.join(legacy_dirpath, "build", "classes", "main"),
os.path.join(legacy_dirpath, "build", "classes", "thrift")]
env["cassandra_bin"] = ":".join(binpaths)
env["CASSANDRA_CONF"] = os.path.join(legacy_dirpath, "conf")
classpaths = [env["CASSANDRA_CONF"], env["cassandra_bin"]]
for jar in os.listdir(os.path.join(legacy_dirpath, "lib")):
if (jar.endswith(".jar")):
classpaths.append(os.path.join(legacy_dirpath, "lib", jar))
env['CLASSPATH'] = ":".join(classpaths)
return env
def test_deprecated_repair_error_notification(self):
* Check whether a legacy JMX nodetool understands the
* notification for a failed repair job.
# This test intentionally provokes an error in a repair job
self.fixture_dtest_setup.ignore_log_patterns = [r'Repair failed', r'The current host must be part of the repair']
# start a 2-node cluster
logger.debug("Starting cluster...")
cluster = self.cluster
node1, node2 = cluster.nodelist()
cluster.start(wait_for_binary_proto=True, wait_other_notice=True)
# write some data that could be repaired
logger.debug("Stressing node1...")
node1.stress(stress_options=['write', 'n=5000', 'no-warmup', 'cl=ONE', '-schema', 'replication(factor=2)', '-rate', 'threads=5'])
# set up a legacy repository
logger.debug("Setting up legacy repository...")
legacy_version = 'github:apache/cassandra-2.1'
# Run repair with legacy nodetool.
# The options specified will cause an error, and legacy nodetool should error out.
logger.debug("Running repair on node1 using legacy nodetool (using options that will cause failure with error)")
legacy_dirpath = ccmlib.repository.directory_name(legacy_version)
legacy_nodetool_path = os.path.join(legacy_dirpath, "bin", "nodetool")
repair_env = self.get_legacy_environment(legacy_version, node_env=node1.get_env())
repair_args = [legacy_nodetool_path, "-h", "localhost", "-p", str(node1.jmx_port), "repair", "-hosts", ""]
p = subprocess.Popen(repair_args, env=repair_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
nodetool_stderr = None
nodetool_returncode = None
_, nodetool_stderr, _ = handle_external_tool_process(p, repair_args)
except ToolError as tool_error:
nodetool_stderr = tool_error.stderr
# Check for repair failed message in node1 log
repair_failed_logs = node1.grep_log(r"ERROR \[(Repair-Task|Thread)-\d+\] \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\d+ - Repair failed")
assert len(repair_failed_logs) > 0, "Node logs don't have an error message for the failed repair"
# Check for error and stacktrace in nodetool output
assert nodetool_stderr.find("error") > -1, "Legacy nodetool didn't print an error message for the failed repair"
assert nodetool_stderr.find("-- StackTrace --") > -1, "Legacy nodetool didn't print a stack trace for the failed repair"