import re
import time
import pytest
import logging
from ccmlib.node import TimeoutError
from dtest import Tester
since = pytest.mark.since
logger = logging.getLogger(__name__)
class TestSchemaAgreementUpgrade(Tester):
Verifies that upgrades do not produce migration storms.
For upgrades from 3.0 to 3.x we have encountered that the beloved 'cdc' column
caused rolling upgrades to produce migration storms, because the schema digest calculated
by 3.11 differs from that in 3.0, because 3.11 included the 'cdc' column in the digest
This test verifies that (after some grace time, i.e. nodes in status NORMAL) neither the
upgraded node nor the other nodes perform schema pulls (migration tasks) resulting in a
"migration storm".
A few schema migrations however happen before all nodes are in status NORMAL. This is not ideal,
but legit. In an ideal world, those should not happen - but we would have to do the schema
changes atomically - and currently there is just nothing like a concurrent schema change.
The tests run for a quite some time, about 5 minutes per test.
But there's some debug output for your entertainment - enjoy.
The node.start/stop calls pass wait_other_notice=False intentionally, because there seem to
be issues in 3.0 to thoroughly detect and _log_ the status change - i.e. the wait_other_notice
check doesn't seem to be reliable - at least not in this test.
# The number of seconds we wait for schema migration log entries to verify
migration_check_time = 30
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
fixture_dtest_setup.ignore_log_patterns = (
# This one occurs if we do a non-rolling upgrade, the node
# it's trying to send the migration to hasn't started yet,
# and when it does, it gets replayed and everything is fine.
r'Can\'t send migration request: node.*is down',
def _prepare(self, version, num_nodes=3):
cluster = self.cluster
# Forcing cluster version on purpose
return cluster
def _set_verify_log_mark(self, nodes):
for node in nodes:
node.verify_log_mark = node.mark_log(filename='debug.log')
def _expect_no_schema_migrations(self, nodes):
Inspects the debug.log files from the given nodes whether any log about schema migration is present.
# Verify that there are _no_ log messages like:
expressions = [" - [pP]ulling schema from endpoint",
" - [Ss]ubmitting migration task",
" - [Pp]ulled schema from endpoint"]
logger.debug("Inspecting log files of {}...".format([ for n in nodes]))
all_matchings = ""
for node in nodes:
matchings = node.watch_log_for(expressions, from_mark=node.verify_log_mark, timeout=0, filename='debug.log')
all_matchings = all_matchings + "\n{}: {}".format(, matchings)
except TimeoutError:
# good
logger.debug(" {}: log files don't show schema migration messages (good)".format(
if all_matchings != "":
msg = "Expected no schema migration log entries, but got:{}".format(all_matchings)
logger.debug(msg) # debug message for the validation test case (3.0 vs 3.11.1)
def _wait_for_status_normal(self, node, mark):
# Wait until the node is in state NORMAL (otherwise we can expect
# schema migrations for the first upgraded node).
node.watch_log_for("Node /{} state jump to NORMAL".format(node.address()),
from_mark=mark, timeout=300, filename='debug.log')
def _bounce_node(self, node):
logger.debug("Bouncing {}...".format(
logger.debug(" Stopping...")
node.stop(wait_other_notice=False) # intentionally set to wait_other_notice=False
mark = node.mark_log(filename='debug.log')
logger.debug(" Starting...")
node.start(wait_other_notice=False) # intentionally set to wait_other_notice=False
logger.debug(" Waiting for status NORMAL...")
self._wait_for_status_normal(node, mark)
def _min_version(self, nodes):
Retrieve the minimum major version (x.y) the given nodes run.
min_version = 99.9
for node in nodes:
short_version = node.get_base_cassandra_version()
logger.debug("{} is on {} ({})".format(, short_version, node.get_cassandra_version()))
if short_version < min_version:
min_version = short_version
return min_version
def _upgrade_schema_agreement_test(self, upgrade_path):
Test the upgrade though the specified versions and verify that there is schema agreement.
In theory, upgrading a 2-node cluster and verifying that might be sufficient, but the intention
of this test is also to catch potential edge cases. Therefore it uses a 3-node cluster and
bounces the upgraded nodes for an additional verification.
The most effective way to check for schema migration is to inspect the debug log file for
related messages. However, since schema migrations are normal during upgrades and may happen
in a lazy fashion (e.g. potentially delayed by migration-interval), it does not work reliably -
i.e. the test would depend heavily on the test environment (hardware).
Therefore, at most points, we can only efficiently check the schema versions as shown via
'nodetool describecluster'.
If these tests become flaky, tuning the (initial) migration-delay should help.
:param upgrade_path: two-dimensional array containing the versions and whether each version
logs a '"Gossiping my * schema version' instead of the usual 'Gossiping my
schema version' message, which is only true for 3.11.2 (but not 4.0).
# prepare the cluster with initial version from the upgrade path
logger.debug('Starting upgrade test with {}'.format(upgrade_path[0][1]))
cluster = self._prepare(version=upgrade_path[0][1])
nodes = self.cluster.nodelist()
# perform _rolling_ upgrades from one version to another
for (gossip_log_with_product_version, version) in upgrade_path[1:]:
logger.debug("Upgrading cluster to {}".format(version))
for node in nodes:
other_nodes = [n for n in nodes if n != node]
logger.debug("Stopping {} for upgrade...".format(
# needed to "patch" the config file (especially since 4.0) and get the correct version number
node.stop(wait_other_notice=False) # intentionally set to wait_other_notice=False
# remember the logfile-mark when the node was upgraded
upgrade_log_mark = node.mark_log(filename='debug.log')
logger.debug("Starting upgraded {}...".format(
node.start(wait_other_notice=False) # intentionally set to wait_other_notice=False
# wait until the upgraded node is in status NORMAL
self._wait_for_status_normal(node, upgrade_log_mark)
# If it's a 3.11.2 node, check that the correct schema version is announced
min_version = self._min_version(nodes)
logger.debug("Minimum version: {}".format(min_version))
if gossip_log_with_product_version:
# 3.11.2 nodes (and only 3.11.2) indicate whether they announce
# a "3.0 compatible" or "real" "3.11" schema version.
watch_part = "Gossiping my {} schema version".format("3.0 compatible" if min_version == 3.0 else "3.11")
logger.debug("Inspecting log for '{}'...".format(watch_part))
matchings = node.watch_log_for(watch_part, from_mark=upgrade_log_mark, timeout=120, filename='debug.log')
logger.debug(" Found: {}".format(matchings))
# Only log the schema information for debug purposes here. Primarily want to catch the
# schema migration race.
for n in nodes:
out, _, _ = n.nodetool("describecluster")
logger.debug("nodetool describecluster of {}:".format(
# We expect no schema migrations at this point.
logger.debug(" Sleep for {} seconds...".format(self.migration_check_time))
# Try to trigger the schema migration race by bouncing the upgraded node.
# Bouncing a nodes causes a new gossip-digest-'generation', which in turn causes
# the whole endpoint state to propagate - including the schema version, which, in theory,
# should trigger the race.
# It is expected, that the _other_ nodes do not try to pull the schema.
logger.debug("Try to trigger schema migration race by bouncing the upgraded node")
logger.debug(" Sleep for {} seconds...".format(self.migration_check_time))
# Even if it was impossible to trigger a schema version race, compare the schema versions.
# Kind of a last resort. Sometimes it's difficult to trigger the race. But we know that we
# only want to have one schema version.
for n in nodes:
out, _, _ = n.nodetool("describecluster")
logger.debug("nodetool describecluster of {}:".format(
versions = out.split('Schema versions:')[1].strip()
num_schemas = len(re.findall(r'\[.*?\]', versions))
self.assertEqual(num_schemas, 1, "Multiple schema versions detected on {}: {}".format(, out))
def upgrade_schema_agreement_30_3112_test(self):
Test the upgrade from 3.0.latest to 3.11.2 and
verify schema agreement and no further migrations.
self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
[True, 'alias:apache/cassandra-3.11']])
def upgrade_schema_agreement_3111_3112_test(self):
Test the upgrade from 3.11.1 to 3.11.2 and
verify schema agreement and no further migrations.
self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.11.1'],
[True, 'alias:apache/cassandra-3.11']])
def upgrade_schema_agreement_30latest_3111_test(self):
Cross-check that the dtest still works (it expects the assertion on the log files).
If this test becomes flaky, check whether the time to verify (self.migration_check_time) the log files is
still good enough. However, as long as the vast majority of runs of this tests succeeds, the functionality
itself works.
with self.assertRaises(AssertionError, msg="Expected no schema migration log entries for the last {} seconds".format(self.migration_check_time)):
self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
[False, 'alias:apache/cassandra-3.11.1']])
# The following tests should not make it into the dtest repo. The above tests using cassandra-3.11 cannot work
# until the corresponding is committed.
def upgrade_schema_agreement_ok30_test(self):
Test the upgrade from 3.0 to 3.11.2 and verify that there is no migration storm.
self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
[True, 'alias:snazy/schema-migration-upgrade-bug-trunk']])
def upgrade_schema_agreement_ok3112_test(self):
Test the upgrade from 3.0 to 3.11.2 and verify that there is no migration storm.
self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.11'],
[True, 'alias:snazy/schema-migration-upgrade-bug-3.11']])