Verify that no migration storm happens during an upgrade from 3.0 to 3.11

patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
diff --git a/upgrade_tests/upgrade_schema_agreement_test.py b/upgrade_tests/upgrade_schema_agreement_test.py
new file mode 100644
index 0000000..8541411
--- /dev/null
+++ b/upgrade_tests/upgrade_schema_agreement_test.py
@@ -0,0 +1,258 @@
+import re
+import time
+
+from ccmlib.node import TimeoutError
+from dtest import Tester, debug
+
+
+class TestSchemaAgreementUpgrade(Tester):
+    """
+    Verifies that upgrades do not produce migration storms.
+
+    For upgrades from 3.0 to 3.x we have encountered that the beloved 'cdc' column
+    caused rolling upgrades to produce migration storms, because the schema digest calculated
+    by 3.11 differs from that in 3.0, because 3.11 included the 'cdc' column in the digest
+    calculation.
+
+    This test verifies that (after some grace time, i.e. nodes in status NORMAL) neither the
+    upgraded node nor the other nodes perform schema pulls (migration tasks) resulting in a
+    "migration storm".
+
+    A few schema migrations however happen before all nodes are in status NORMAL. This is not ideal,
+    but legit. In an ideal world, those should not happen - but we would have to do the schema
+    changes atomically - and currently there is just nothing like a concurrent schema change.
+
+    The tests run for a quite some time, about 5 minutes per test.
+    But there's some debug output for your entertainment - enjoy.
+
+    The node.start/stop calls pass wait_other_notice=False intentionally, because there seem to
+    be issues in 3.0 to thoroughly detect and _log_ the status change - i.e. the wait_other_notice
+    check doesn't seem to be reliable - at least not in this test.
+    """
+
+    # The number of seconds we wait for schema migration log entries to verify
+    migration_check_time = 30
+
+    def __init__(self, *args, **kwargs):
+        self.ignore_log_patterns = [
+            # This one occurs if we do a non-rolling upgrade, the node
+            # it's trying to send the migration to hasn't started yet,
+            # and when it does, it gets replayed and everything is fine.
+            r'Can\'t send migration request: node.*is down',
+        ]
+
+        Tester.__init__(self, *args, **kwargs)
+
+    def _prepare(self, version, num_nodes=3):
+        cluster = self.cluster
+
+        # Forcing cluster version on purpose
+        cluster.set_install_dir(version=version)
+        cluster.populate(num_nodes).start()
+
+        return cluster
+
+    def _set_verify_log_mark(self, nodes):
+        for node in nodes:
+            node.verify_log_mark = node.mark_log(filename='debug.log')
+
+    def _expect_no_schema_migrations(self, nodes):
+        """
+        Inspects the debug.log files from the given nodes whether any log about schema migration is present.
+        """
+
+        # Verify that there are _no_ log messages like:
+        expressions = [" - [pP]ulling schema from endpoint",
+                       " - [Ss]ubmitting migration task",
+                       " - [Pp]ulled schema from endpoint"]
+        debug("Inspecting log files of {}...".format([n.name for n in nodes]))
+        all_matchings = ""
+        for node in nodes:
+            try:
+                matchings = node.watch_log_for(expressions, from_mark=node.verify_log_mark, timeout=0, filename='debug.log')
+                all_matchings = all_matchings + "\n{}: {}".format(node.name, matchings)
+            except TimeoutError:
+                # good
+                debug("  {}: log files don't show schema migration messages (good)".format(node.name))
+        if all_matchings != "":
+            msg = "Expected no schema migration log entries, but got:{}".format(all_matchings)
+            debug(msg)  # debug message for the validation test case (3.0 vs 3.11.1)
+            self.fail(msg)
+
+    def _wait_for_status_normal(self, node, mark):
+        # Wait until the node is in state NORMAL (otherwise we can expect
+        # schema migrations for the first upgraded node).
+        node.watch_log_for("Node /{} state jump to NORMAL".format(node.address()),
+                           from_mark=mark, timeout=300, filename='debug.log')
+
+    def _bounce_node(self, node):
+        debug("Bouncing {}...".format(node.name))
+        debug("  Stopping...")
+        node.stop(wait_other_notice=False)  # intentionally set to wait_other_notice=False
+        mark = node.mark_log(filename='debug.log')
+        debug("  Starting...")
+        node.start(wait_other_notice=False)  # intentionally set to wait_other_notice=False
+        debug("  Waiting for status NORMAL...")
+        self._wait_for_status_normal(node, mark)
+
+    def _min_version(self, nodes):
+        """
+        Retrieve the minimum major version (x.y) the given nodes run.
+        """
+
+        min_version = 99.9
+        for node in nodes:
+            short_version = node.get_base_cassandra_version()
+            debug("{} is on {} ({})".format(node.name, short_version, node.get_cassandra_version()))
+            if short_version < min_version:
+                min_version = short_version
+        return min_version
+
+    def _upgrade_schema_agreement_test(self, upgrade_path):
+        """
+        Test the upgrade though the specified versions and verify that there is schema agreement.
+        In theory, upgrading a 2-node cluster and verifying that might be sufficient, but the intention
+        of this test is also to catch potential edge cases. Therefore it uses a 3-node cluster and
+        bounces the upgraded nodes for an additional verification.
+
+        The most effective way to check for schema migration is to inspect the debug log file for
+        related messages. However, since schema migrations are normal during upgrades and may happen
+        in a lazy fashion (e.g. potentially delayed by migration-interval), it does not work reliably -
+        i.e. the test would depend heavily on the test environment (hardware).
+
+        Therefore, at most points, we can only efficiently check the schema versions as shown via
+        'nodetool describecluster'.
+
+        If these tests become flaky, tuning the (initial) migration-delay should help.
+
+        :param upgrade_path: two-dimensional array containing the versions and whether each version
+                             logs a '"Gossiping my * schema version' instead of the usual 'Gossiping my
+                             schema version' message, which is only true for 3.11.2 (but not 4.0).
+        """
+
+        # prepare the cluster with initial version from the upgrade path
+        debug('Starting upgrade test with {}'.format(upgrade_path[0][1]))
+        cluster = self._prepare(version=upgrade_path[0][1])
+
+        nodes = self.cluster.nodelist()
+
+        # perform _rolling_ upgrades from one version to another
+        for (gossip_log_with_product_version, version) in upgrade_path[1:]:
+            debug("")
+            debug("Upgrading cluster to {}".format(version))
+            cluster.set_install_dir(version=version)
+
+            for node in nodes:
+                other_nodes = [n for n in nodes if n != node]
+
+                debug("")
+                debug("Stopping {} for upgrade...".format(node.name))
+                # needed to "patch" the config file (especially since 4.0) and get the correct version number
+                node.set_install_dir(version=version)
+                node.stop(wait_other_notice=False)  # intentionally set to wait_other_notice=False
+
+                # remember the logfile-mark when the node was upgraded
+                upgrade_log_mark = node.mark_log(filename='debug.log')
+                debug("Starting upgraded {}...".format(node.name))
+                node.start(wait_other_notice=False)  # intentionally set to wait_other_notice=False
+
+                # wait until the upgraded node is in status NORMAL
+                self._wait_for_status_normal(node, upgrade_log_mark)
+
+                # If it's a 3.11.2 node, check that the correct schema version is announced
+                min_version = self._min_version(nodes)
+                debug("Minimum version: {}".format(min_version))
+                if gossip_log_with_product_version:
+                    # 3.11.2 nodes (and only 3.11.2) indicate whether they announce
+                    # a "3.0 compatible" or "real" "3.11" schema version.
+                    watch_part = "Gossiping my {} schema version".format("3.0 compatible" if min_version == 3.0 else "3.11")
+                    debug("Inspecting log for '{}'...".format(watch_part))
+                    matchings = node.watch_log_for(watch_part, from_mark=upgrade_log_mark, timeout=120, filename='debug.log')
+                    debug("  Found: {}".format(matchings))
+
+                # Only log the schema information for debug purposes here. Primarily want to catch the
+                # schema migration race.
+                for n in nodes:
+                    out, _, _ = n.nodetool("describecluster")
+                    debug("nodetool describecluster of {}:".format(n.name))
+                    debug(out)
+
+                # We expect no schema migrations at this point.
+                self._set_verify_log_mark(other_nodes)
+                debug("  Sleep for {} seconds...".format(self.migration_check_time))
+                time.sleep(self.migration_check_time)
+                self._expect_no_schema_migrations(other_nodes)
+
+                # Try to trigger the schema migration race by bouncing the upgraded node.
+                # Bouncing a nodes causes a new gossip-digest-'generation', which in turn causes
+                # the whole endpoint state to propagate - including the schema version, which, in theory,
+                # should trigger the race.
+                # It is expected, that the _other_ nodes do not try to pull the schema.
+                debug("")
+                debug("Try to trigger schema migration race by bouncing the upgraded node")
+                self._bounce_node(node)
+                self._set_verify_log_mark(other_nodes)
+                debug("  Sleep for {} seconds...".format(self.migration_check_time))
+                time.sleep(self.migration_check_time)
+                self._expect_no_schema_migrations(other_nodes)
+
+                # Even if it was impossible to trigger a schema version race, compare the schema versions.
+                # Kind of a last resort. Sometimes it's difficult to trigger the race. But we know that we
+                # only want to have one schema version.
+                for n in nodes:
+                    out, _, _ = n.nodetool("describecluster")
+                    debug("nodetool describecluster of {}:".format(n.name))
+                    debug(out)
+                    versions = out.split('Schema versions:')[1].strip()
+                    num_schemas = len(re.findall('\[.*?\]', versions))
+                    self.assertEqual(num_schemas, 1, "Multiple schema versions detected on {}: {}".format(n.name, out))
+
+    def upgrade_schema_agreement_30_3112_test(self):
+        """
+        Test the upgrade from 3.0.latest to 3.11.2 and
+        verify schema agreement and no further migrations.
+        """
+        self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
+                                                          [True, 'alias:apache/cassandra-3.11']])
+
+    def upgrade_schema_agreement_3111_3112_test(self):
+        """
+        Test the upgrade from 3.11.1 to 3.11.2 and
+        verify schema agreement and no further migrations.
+        """
+        self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.11.1'],
+                                                          [True, 'alias:apache/cassandra-3.11']])
+
+    def upgrade_schema_agreement_30latest_3111_test(self):
+        """
+        Cross-check that the dtest still works (it expects the assertion on the log files).
+
+        If this test becomes flaky, check whether the time to verify (self.migration_check_time) the log files is
+        still good enough. However, as long as the vast majority of runs of this tests succeeds, the functionality
+        itself works.
+        """
+        with self.assertRaises(AssertionError, msg="Expected no schema migration log entries for the last {} seconds".format(self.migration_check_time)):
+            self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
+                                                              [False, 'alias:apache/cassandra-3.11.1']])
+
+    # REVIEWER NOTICE:
+    # The following tests should not make it into the dtest repo. The above tests using cassandra-3.11 cannot work
+    # until the corresponding is committed.
+
+    def upgrade_schema_agreement_ok30_test(self):
+        """
+        Test the upgrade from 3.0 to 3.11.2 and verify that there is no migration storm.
+
+        TO BE CLEAR: THIS TEST IS INTENDED TO FAIL AND NOT INTENDED TO BE COMMITTED!
+        """
+        self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.0'],
+                                                          [True, 'alias:snazy/schema-migration-upgrade-bug-trunk']])
+
+    def upgrade_schema_agreement_ok3112_test(self):
+        """
+        Test the upgrade from 3.0 to 3.11.2 and verify that there is no migration storm.
+
+        TO BE CLEAR: THIS TEST IS INTENDED TO FAIL AND NOT INTENDED TO BE COMMITTED!
+        """
+        self._upgrade_schema_agreement_test(upgrade_path=[[False, 'alias:apache/cassandra-3.11'],
+                                                          [True, 'alias:snazy/schema-migration-upgrade-bug-3.11']])