Fix Cassandra version families, and more…

 - Remove OVERRIDE_MANIFEST, as it is unused.
 - Upgrade manifest, remove upgrades from unsupported indev versions, and remove upgrades to current if indev is still supported.
 - Correct the enforced protocol version to use v3 for 2.2/3.0 upgrades. (ref: CASSANDRA-15193)
 - Fix broken thrift_upgrade_test.py
 - Fix upgrade_udtfix_test.py (not applicable before 3.11.6, ref: CASSANDRA-15035)
 - Remove hardcoded references to gitbox.a.o
 - Remove unnecessary implicit upgrade scenarios. `--upgrade-target-version-only` only tests upgrades from the target version and to the target version, i.e. those upgrade paths where the target (indev) version was the origin or destination version. Previously the filter was applied only for the version family but not on the variant, pulling in completely unrelated upgrade scenarios.

 patch by Jacek Lewandowski; reviewed by Tomek Łasica, Mick Semb Wever for CASSANDRA-16433
diff --git a/dtest.py b/dtest.py
index e144a58..9747c11 100644
--- a/dtest.py
+++ b/dtest.py
@@ -276,11 +276,11 @@
     Returns the highest protocol version accepted
     by the given C* version
     """
-    if cassandra_version >= '2.2':
+    if LooseVersion('3.0') <= cassandra_version:
         protocol_version = 4
-    elif cassandra_version >= '2.1':
+    elif LooseVersion('2.1') <= cassandra_version:
         protocol_version = 3
-    elif cassandra_version >= '2.0':
+    elif LooseVersion('2.0') <= cassandra_version:
         protocol_version = 2
     else:
         protocol_version = 1
diff --git a/upgrade_tests/README.md b/upgrade_tests/README.md
index f85381e..7dd5dfa 100644
--- a/upgrade_tests/README.md
+++ b/upgrade_tests/README.md
@@ -52,9 +52,10 @@
 
 The second useful concept is that of a __version family__. This simply describes which
 'version line' a particular version belongs to. For example, Cassandra 3.0.7 belongs
-to the '3.0.x' version family. Organizing specific versions into families allows
+to the `CASSANDRA_3_0` version family. Organizing specific versions into families allows
 us to generalize about support, so we can say, for example that: "2.2.x versions should be
-able to upgrade to 3.0.x versions, or 2.2.x versions can skip 3.0.x versions and upgrade directly to a 3.x version".
+able to upgrade to 3.0.x versions, or 2.2.x versions can skip 3.0.x versions and upgrade
+directly to a 3.x version".
 
 ### Code generation
 
diff --git a/upgrade_tests/cql_tests.py b/upgrade_tests/cql_tests.py
index 5c9ec78..12f1d59 100644
--- a/upgrade_tests/cql_tests.py
+++ b/upgrade_tests/cql_tests.py
@@ -29,7 +29,7 @@
 from tools.data import rows_to_list
 from tools.misc import add_skip
 from .upgrade_base import UpgradeTester
-from .upgrade_manifest import build_upgrade_pairs
+from .upgrade_manifest import build_upgrade_pairs, CASSANDRA_4_0
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
@@ -39,7 +39,7 @@
 class TestCQL(UpgradeTester):
 
     def is_40_or_greater(self):
-        return self.UPGRADE_PATH.upgrade_meta.family in ('trunk', '4.0')
+        return LooseVersion(self.UPGRADE_PATH.upgrade_meta.family) >= CASSANDRA_4_0
 
     def test_static_cf(self):
         """ Test static CF syntax """
diff --git a/upgrade_tests/storage_engine_upgrade_test.py b/upgrade_tests/storage_engine_upgrade_test.py
index c59984e..8112fa2 100644
--- a/upgrade_tests/storage_engine_upgrade_test.py
+++ b/upgrade_tests/storage_engine_upgrade_test.py
@@ -12,6 +12,7 @@
 from tools.assertions import (assert_all, assert_length_equal, assert_none,
                               assert_one)
 from tools.misc import new_node
+from upgrade_tests.upgrade_manifest import indev_3_0_x, indev_2_1_x
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
@@ -39,9 +40,9 @@
 
         # Forcing cluster version on purpose
         if self.dtest_config.cassandra_version_from_build >= MAJOR_VERSION_4:
-            cluster.set_install_dir(version="git:cassandra-3.0")
+            cluster.set_install_dir(version=indev_3_0_x.version)
         else:
-            cluster.set_install_dir(version="git:cassandra-2.1")
+            cluster.set_install_dir(version=indev_2_1_x.version)
         self.fixture_dtest_setup.reinitialize_cluster_for_different_version()
         cluster.populate(1).start()
 
diff --git a/upgrade_tests/thrift_upgrade_test.py b/upgrade_tests/thrift_upgrade_test.py
index 491786e..3da44f7 100644
--- a/upgrade_tests/thrift_upgrade_test.py
+++ b/upgrade_tests/thrift_upgrade_test.py
@@ -15,7 +15,7 @@
                               assert_lists_of_dicts_equal)
 from tools.misc import wait_for_agreement, add_skip
 from .upgrade_base import UpgradeTester
-from .upgrade_manifest import build_upgrade_pairs
+from .upgrade_manifest import build_upgrade_pairs, CASSANDRA_4_0
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
@@ -270,7 +270,7 @@
         if node.get_cassandra_version() < '4':
             client = get_thrift_client(host, port)
             _validate_dense_thrift(client, cf='dense_super_1')
-        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4')
+        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= CASSANDRA_4_0)
 
     def test_dense_supercolumn(self):
         cluster = self.prepare()
@@ -317,7 +317,7 @@
             _validate_dense_thrift(client, cf='dense_super_1')
 
         cursor = self.patient_cql_connection(node, row_factory=dict_factory)
-        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= '4')
+        _validate_dense_cql(cursor, cf='dense_super_1', is_version_4_or_greater=node.get_cassandra_version() >= CASSANDRA_4_0)
 
     def test_sparse_supercolumn(self):
         cluster = self.prepare()
@@ -355,7 +355,7 @@
         _validate_sparse_thrift(client, cf='sparse_super_2')
 
         self.set_node_to_current_version(node)
-        is_version_4_or_greater = node.get_cassandra_version() >= '4'
+        is_version_4_or_greater = node.get_cassandra_version() >= CASSANDRA_4_0
         #4.0 doesn't support compact storage
         if is_version_4_or_greater:
             cursor.execute("ALTER TABLE ks.sparse_super_2 DROP COMPACT STORAGE;")
@@ -374,7 +374,7 @@
 
 
 @pytest.mark.upgrade_test
-@since('4')
+@since('4.0')
 class TestUpgradeTo40(Tester):
     """
     Thrift is dead in 4.0. However, we still want to ensure users that used thrift
@@ -593,6 +593,7 @@
 
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
+            is_version_4_or_greater = node.get_cassandra_version() >= CASSANDRA_4_0
             if not is_version_4_or_greater:
                 client = get_thrift_client(host, port)
                 _validate_dense_thrift(client)
@@ -630,6 +631,7 @@
 
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
+            is_version_4_or_greater = node.get_cassandra_version() >= CASSANDRA_4_0
             if not is_version_4_or_greater:
                 client = get_thrift_client(host, port)
                 _validate_dense_thrift(client, cf='dense_super_2')
@@ -670,6 +672,7 @@
 
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
+            is_version_4_or_greater = node.get_cassandra_version() >= CASSANDRA_4_0
             if not is_version_4_or_greater:
                 client = get_thrift_client(host, port)
                 _validate_sparse_thrift(client)
@@ -707,6 +710,7 @@
 
         for is_upgraded, cursor in self.do_upgrade(cursor, row_factory=dict_factory, use_thrift=True):
             logger.debug("Querying {} node".format("upgraded" if is_upgraded else "old"))
+            is_version_4_or_greater = node.get_cassandra_version() >= CASSANDRA_4_0
             if not is_version_4_or_greater:
                 client = get_thrift_client(host, port)
                 _validate_sparse_thrift(client, cf='sparse_super_2')
diff --git a/upgrade_tests/upgrade_base.py b/upgrade_tests/upgrade_base.py
index 04c0c18..24ef8af 100644
--- a/upgrade_tests/upgrade_base.py
+++ b/upgrade_tests/upgrade_base.py
@@ -1,3 +1,5 @@
+from distutils.version import LooseVersion
+
 import os
 import sys
 import time
@@ -8,6 +10,8 @@
 
 from ccmlib.common import get_version_from_build, is_win
 
+from .upgrade_manifest import CASSANDRA_4_0
+
 from dtest import Tester, create_ks
 
 logger = logging.getLogger(__name__)
@@ -261,5 +265,5 @@
         return self.UPGRADE_PATH.upgrade_meta.family
 
     def upgrade_is_version_4_or_greater(self):
-        upgrade_version = self.upgrade_version_family()
-        return upgrade_version == 'trunk' or upgrade_version >= '4.0'
+        upgrade_version = LooseVersion(self.upgrade_version_family())
+        return upgrade_version >= CASSANDRA_4_0
diff --git a/upgrade_tests/upgrade_manifest.py b/upgrade_tests/upgrade_manifest.py
index 05e566e..6d4ad5e 100644
--- a/upgrade_tests/upgrade_manifest.py
+++ b/upgrade_tests/upgrade_manifest.py
@@ -19,6 +19,14 @@
 VERSION_FAMILY = None
 CONFIG = None
 
+# TODO add a new item whenever Cassandra is branched and update TRUNK to the version present in trunk
+CASSANDRA_2_0 = '2.0'
+CASSANDRA_2_1 = '2.1'
+CASSANDRA_2_2 = '2.2'
+CASSANDRA_3_0 = '3.0'
+CASSANDRA_3_11 = '3.11'
+CASSANDRA_4_0 = '4.0'
+TRUNK = CASSANDRA_4_0
 
 def is_same_family_current_to_indev(origin, destination):
     """
@@ -76,21 +84,22 @@
     else:
         current_version = get_version_from_build(cassandra_dir)
 
+    # TODO add a new item whenever Cassandra is branched
     if current_version.vstring.startswith('2.0'):
-        version_family = '2.0.x'
+        version_family = CASSANDRA_2_0
     elif current_version.vstring.startswith('2.1'):
-        version_family = '2.1.x'
+        version_family = CASSANDRA_2_1
     elif current_version.vstring.startswith('2.2'):
-        version_family = '2.2.x'
+        version_family = CASSANDRA_2_2
     elif current_version.vstring.startswith('3.0'):
-        version_family = '3.0.x'
-    elif '3.1' <= current_version < '4.0':
-        version_family = '3.x'
-    elif '4.0' <= current_version < '4.1':
-        version_family = 'trunk'
+        version_family = CASSANDRA_3_0
+    elif current_version.vstring.startswith('3.11'):
+        version_family = CASSANDRA_3_11
+    elif current_version.vstring.startswith('4.0'):
+        version_family = CASSANDRA_4_0
     else:
         # when this occurs, it's time to update this manifest a bit!
-        raise RuntimeError("4.1+ not yet supported on upgrade tests!")
+        raise RuntimeError("Testing upgrades from/to version %s is not supported. Please use a custom manifest (see upgrade_manifest.py)" % current_version.vstring)
 
     global VERSION_FAMILY
     VERSION_FAMILY = version_family
@@ -116,6 +125,14 @@
         """
         return self.family == VERSION_FAMILY
 
+    @property
+    def matches_current_env_version_family_and_is_indev(self):
+        """
+        Returns boolean indicating whether this meta matches the current version family of the environment
+        and whether this meta is in indev variant
+        """
+        return self.family == VERSION_FAMILY and self.variant == "indev"
+
     def clone_with_local_env_version(self):
         """
         Returns a new object cloned from this one, with the version replaced with the local env version.
@@ -126,19 +143,20 @@
         return self._replace(version="clone:{}".format(cassandra_dir))
 
 
-indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
-current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1', variant='current', version='2.1.20', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
+# TODO define new versions whenever Cassandra is branched
+indev_2_1_x = VersionMeta(name='indev_2_1_x', family=CASSANDRA_2_1, variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
+current_2_1_x = VersionMeta(name='current_2_1_x', family=CASSANDRA_2_1, variant='current', version='2.1.22', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
 
-indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
-current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2', variant='current', version='2.2.13', min_proto_v=1, max_proto_v=4, java_versions=(7, 8))
+indev_2_2_x = VersionMeta(name='indev_2_2_x', family=CASSANDRA_2_2, variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
+current_2_2_x = VersionMeta(name='current_2_2_x', family=CASSANDRA_2_2, variant='current', version='2.2.19', min_proto_v=1, max_proto_v=3, java_versions=(7, 8))
 
-indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,))
-current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0', variant='current', version='3.0.23', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+indev_3_0_x = VersionMeta(name='indev_3_0_x', family=CASSANDRA_3_0, variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+current_3_0_x = VersionMeta(name='current_3_0_x', family=CASSANDRA_3_0, variant='current', version='3.0.24', min_proto_v=3, max_proto_v=4, java_versions=(8,))
 
-indev_3_11_x = VersionMeta(name='indev_3_11_x', family='3.11', variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,))
-current_3_11_x = VersionMeta(name='current_3_11_x', family='3.11', variant='current', version='3.11.9', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+indev_3_11_x = VersionMeta(name='indev_3_11_x', family=CASSANDRA_3_11, variant='indev', version='github:apache/cassandra-3.11', min_proto_v=3, max_proto_v=4, java_versions=(8,))
+current_3_11_x = VersionMeta(name='current_3_11_x', family=CASSANDRA_3_11, variant='current', version='3.11.9', min_proto_v=3, max_proto_v=4, java_versions=(8,))
 
-indev_trunk = VersionMeta(name='indev_trunk', family='trunk', variant='indev', version='github:apache/trunk', min_proto_v=4, max_proto_v=5, java_versions=(8,))
+indev_trunk = VersionMeta(name='indev_trunk', family=TRUNK, variant='indev', version='github:apache/trunk', min_proto_v=4, max_proto_v=5, java_versions=(8,))
 
 
 # MANIFEST maps a VersionMeta representing a line/variant to a list of other VersionMeta's representing supported upgrades
@@ -148,38 +166,18 @@
 #   2) Features exclusive to version B may not work until all nodes are running version B.
 #   3) Nodes upgraded to version B can read data stored by the predecessor version A, and from a data standpoint will function the same as if they always ran version B.
 #   4) If a new sstable format is present in version B, writes will occur in that format after upgrade. Running sstableupgrade on version B will proactively convert version A sstables to version B.
+# TODO define new upgrade scenarios whenever Cassandra is branched
 MANIFEST = {
-    indev_2_1_x: [indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
-    current_2_1_x: [indev_2_1_x, indev_2_2_x, current_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
-
-    indev_2_2_x: [indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
-    current_2_2_x: [indev_2_2_x, indev_3_0_x, current_3_0_x, indev_3_11_x, current_3_11_x],
-
-    indev_3_0_x: [indev_3_11_x, current_3_11_x, indev_trunk],
-    current_3_0_x: [indev_3_0_x, indev_3_11_x, current_3_11_x, indev_trunk],
-
+    current_2_1_x: [indev_2_2_x, indev_3_0_x, indev_3_11_x],
+    current_2_2_x: [indev_2_2_x, indev_3_0_x, indev_3_11_x],
+    current_3_0_x: [indev_3_0_x, indev_3_11_x, indev_trunk],
     current_3_11_x: [indev_3_11_x, indev_trunk],
+
+    indev_2_2_x: [indev_3_0_x, indev_3_11_x],
+    indev_3_0_x: [indev_3_11_x, indev_trunk],
     indev_3_11_x: [indev_trunk]
 }
 
-# Local env and custom path testing instructions. Use these steps to REPLACE the normal upgrade test cases with your own.
-# 1) Add a VersionMeta for each version you wish to test (see examples below). Update the name, family, version, and protocol restrictions as needed. Use a unique name for each VersionMeta.
-# 2) Update OVERRIDE_MANIFEST (see example below).
-# 3) If you want to test using local code, set the version attribute using local slugs in the format 'local:/path/to/cassandra/:branch_name'
-# 4) Run the tests!
-#      To run all, use 'nosetests -v upgrade_tests/'. To run specific tests, use 'nosetests -vs --collect-only' to preview the test names, then run nosetests using the desired test name.
-#      Note that nosetests outputs test names in a format that needs to be tweaked a bit before they will run from the command line.
-custom_1 = VersionMeta(name='custom_branch_1', family='2.1.x', variant='indev', version='local:some_branch', min_proto_v=3, max_proto_v=4, java_versions=(7, 8))
-custom_2 = VersionMeta(name='custom_branch_2', family='2.2.x', variant='indev', version='git:trunk', min_proto_v=3, max_proto_v=4, java_versions=(7, 8))
-custom_3 = VersionMeta(name='custom_branch_3', family='3.0.x', variant='indev', version='git:cassandra-3.5', min_proto_v=3, max_proto_v=4, java_versions=(7, 8))
-custom_4 = VersionMeta(name='custom_branch_4', family='3.x', variant='indev', version='git:cassandra-3.6', min_proto_v=3, max_proto_v=4, java_versions=(7, 8))
-OVERRIDE_MANIFEST = {
-    # EXAMPLE:
-    # custom_1: [custom_2, custom_3],  # creates a test of custom_1 -> custom_2, and another test from custom_1 -> custom_3
-    # custom_3: [custom_4]             # creates a test of custom_3 -> custom_4
-}
-
-
 def _have_common_proto(origin_meta, destination_meta):
     """
     Takes two VersionMeta objects, in order of test from start version to next version.
@@ -194,7 +192,7 @@
     Returns a list of UpgradePath's.
     """
     valid_upgrade_pairs = []
-    manifest = OVERRIDE_MANIFEST or MANIFEST
+    manifest = MANIFEST
 
     configured_strategy = CONFIG.getoption("--upgrade-version-selection").upper()
     version_select_strategy = VersionSelectionStrategies[configured_strategy].value[0]
@@ -215,13 +213,13 @@
 
             # if either origin or destination match version, then do the test
             # the assumption is that a change in 3.0 could break upgrades to trunk, so include those tests as well
-            if filter_for_current_family and not origin_meta.matches_current_env_version_family and not destination_meta.matches_current_env_version_family:
+            if filter_for_current_family and not origin_meta.matches_current_env_version_family_and_is_indev and not destination_meta.matches_current_env_version_family:
                 logger.debug("skipping class creation, origin version {} and destination version {} do not match target version {}, and --upgrade-target-version-only was set".format(origin_meta.name, destination_meta.name, VERSION_FAMILY))
                 continue
 
             path_name = 'Upgrade_' + origin_meta.name + '_To_' + destination_meta.name
 
-            if not (RUN_STATIC_UPGRADE_MATRIX or OVERRIDE_MANIFEST):
+            if not RUN_STATIC_UPGRADE_MATRIX:
                 if destination_meta.matches_current_env_version_family:
                     # looks like this test should actually run in the current env, so let's set the final version to match the env exactly
                     oldmeta = destination_meta
diff --git a/upgrade_tests/upgrade_supercolumns_test.py b/upgrade_tests/upgrade_supercolumns_test.py
index d9fb738..f620e70 100644
--- a/upgrade_tests/upgrade_supercolumns_test.py
+++ b/upgrade_tests/upgrade_supercolumns_test.py
@@ -1,3 +1,5 @@
+from distutils.version import LooseVersion
+
 import os
 import pytest
 import logging
@@ -17,6 +19,8 @@
                                            Mutation, NotFoundException,
                                            SlicePredicate, SliceRange,
                                            SuperColumn)
+from upgrade_tests.upgrade_manifest import indev_2_1_x, indev_2_2_x, indev_3_0_x, indev_3_11_x, indev_trunk, \
+    CASSANDRA_4_0
 
 logger = logging.getLogger(__name__)
 
@@ -50,7 +54,7 @@
             # don't alter ignore_log_patterns on the class, just the obj for this test
             fixture_dtest_setup.ignore_log_patterns += [_known_teardown_race_error]
 
-    def prepare(self, num_nodes=1, cassandra_version="git:cassandra-2.1"):
+    def prepare(self, num_nodes=1, cassandra_version=indev_2_1_x.version):
         cluster = self.cluster
 
         # Forcing cluster version on purpose
@@ -124,26 +128,26 @@
         self.verify_with_thrift()
 
         for version in upgrade_path:
-            if version == 'git:cassandra-4.0' or version == 'git:trunk':
+            if LooseVersion(version.family) >= CASSANDRA_4_0:
                 session.execute("ALTER TABLE supcols.cols DROP COMPACT STORAGE")
-            self.upgrade_to_version(version)
+            self.upgrade_to_version(version.version)
 
             session = self.patient_exclusive_cql_connection(node1)
 
             self.verify_with_cql(session)
 
-            if self.cluster.version() < '4':
+            if self.cluster.version() < CASSANDRA_4_0:
                 node1.nodetool("enablethrift")
                 self.verify_with_thrift()
 
         cluster.remove(node=node1)
 
     def test_upgrade_super_columns_through_all_versions(self):
-        self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-2.2', 'git:cassandra-3.0',
-                                                                        'git:cassandra-3.11', 'git:trunk'])
+        self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_2_2_x, indev_3_0_x,
+                                                                        indev_3_11_x, indev_trunk])
 
     def test_upgrade_super_columns_through_limited_versions(self):
-        self._upgrade_super_columns_through_versions_test(upgrade_path=['git:cassandra-3.0', 'git:trunk'])
+        self._upgrade_super_columns_through_versions_test(upgrade_path=[indev_3_0_x, indev_trunk])
 
     def upgrade_to_version(self, tag, nodes=None):
         logger.debug('Upgrading to ' + tag)
diff --git a/upgrade_tests/upgrade_through_versions_test.py b/upgrade_tests/upgrade_through_versions_test.py
index d2ae933..a8e6087 100644
--- a/upgrade_tests/upgrade_through_versions_test.py
+++ b/upgrade_tests/upgrade_through_versions_test.py
@@ -1,3 +1,5 @@
+from distutils.version import LooseVersion
+
 import operator
 import os
 import pprint
@@ -22,7 +24,7 @@
 from .upgrade_manifest import (build_upgrade_pairs,
                                current_2_1_x, current_2_2_x, current_3_0_x,
                                indev_3_11_x,
-                               current_3_11_x, indev_trunk)
+                               current_3_11_x, indev_trunk, CASSANDRA_4_0)
 
 logger = logging.getLogger(__name__)
 
@@ -40,18 +42,24 @@
     # 'tester' is a cloned object so we shouldn't be inappropriately sharing anything with another process
     session = tester.patient_cql_connection(tester.node1, keyspace="upgrade", protocol_version=tester.protocol_version)
 
+    running = True
+
     prepared = session.prepare("UPDATE cf SET v=? WHERE k=?")
     prepared.consistency_level = ConsistencyLevel.QUORUM
 
     def handle_sigterm(signum, frame):
-        # need to close queue gracefully if possible, or the data_checker process
-        # can't seem to empty the queue and test failures result.
+        nonlocal running
+        running = False
+
+    def shutdown_gently():
+        logger.info("Data writer process terminating, closing queues")
         to_verify_queue.close()
-        exit(0)
+        verification_done_queue.close()
+        session.shutdown()
 
     signal.signal(signal.SIGTERM, handle_sigterm)
 
-    while True:
+    while running:
         try:
             key = None
 
@@ -69,11 +77,13 @@
             session.execute(prepared, (val, key))
 
             to_verify_queue.put((key, val,))
-        except Exception:
-            logger.debug("Error in data writer process!")
-            to_verify_queue.close()
+        except Exception as ex:
+            logger.error("Error in data writer process!", ex)
+            shutdown_gently()
             raise
 
+    shutdown_gently()
+
 
 def data_checker(tester, to_verify_queue, verification_done_queue):
     """
@@ -88,18 +98,24 @@
     # 'tester' is a cloned object so we shouldn't be inappropriately sharing anything with another process
     session = tester.patient_cql_connection(tester.node1, keyspace="upgrade", protocol_version=tester.protocol_version)
 
+    running = True
+
     prepared = session.prepare("SELECT v FROM cf WHERE k=?")
     prepared.consistency_level = ConsistencyLevel.QUORUM
 
     def handle_sigterm(signum, frame):
-        # need to close queue gracefully if possible, or the data_checker process
-        # can't seem to empty the queue and test failures result.
+        nonlocal running
+        running = False
+
+    def shutdown_gently():
+        logger.info("Data checker process terminating, closing queues")
+        to_verify_queue.close()
         verification_done_queue.close()
-        exit(0)
+        session.shutdown()
 
     signal.signal(signal.SIGTERM, handle_sigterm)
 
-    while True:
+    while running:
         try:
             # here we could block, but if the writer process terminates early with an empty queue
             # we would end up blocking indefinitely
@@ -107,11 +123,12 @@
 
             actual_val = session.execute(prepared, (key,))[0][0]
         except Empty:
-            time.sleep(0.1)  # let's not eat CPU if the queue is empty
+            time.sleep(1)  # let's not eat CPU if the queue is empty
+            logger.info("to_verify_queue is empty: %d" % to_verify_queue.qsize())
             continue
-        except Exception:
-            logger.debug("Error in data verifier process!")
-            verification_done_queue.close()
+        except Exception as ex:
+            logger.error("Error in data checker process!", ex)
+            shutdown_gently()
             raise
         else:
             try:
@@ -122,9 +139,13 @@
                 # and allow dropping some rewritables because we don't want to
                 # rewrite rows in the same sequence as originally written
                 pass
+            except Exception as ex:
+                logger.error("Failed to put into verification_done_queue", ex)
 
         assert expected_val == actual_val, "Data did not match expected value!"
 
+    shutdown_gently()
+
 
 def counter_incrementer(tester, to_verify_queue, verification_done_queue, rewrite_probability=0):
     """
@@ -139,18 +160,24 @@
     # 'tester' is a cloned object so we shouldn't be inappropriately sharing anything with another process
     session = tester.patient_cql_connection(tester.node1, keyspace="upgrade", protocol_version=tester.protocol_version)
 
+    running = True
+
     prepared = session.prepare("UPDATE countertable SET c = c + 1 WHERE k1=?")
     prepared.consistency_level = ConsistencyLevel.QUORUM
 
     def handle_sigterm(signum, frame):
-        # need to close queue gracefully if possible, or the data_checker process
-        # can't seem to empty the queue and test failures result.
+        nonlocal running
+        running = False
+
+    def shutdown_gently():
+        logger.info("Counter incrementer process terminating, closing queues")
         to_verify_queue.close()
-        exit(0)
+        verification_done_queue.close()
+        session.shutdown()
 
     signal.signal(signal.SIGTERM, handle_sigterm)
 
-    while True:
+    while running:
         try:
             key = None
             count = 0  # this will get set to actual last known count if we do a re-write
@@ -167,11 +194,13 @@
             session.execute(prepared, (key))
 
             to_verify_queue.put_nowait((key, count + 1,))
-        except Exception:
-            logger.debug("Error in counter incrementer process!")
-            to_verify_queue.close()
+        except Exception as ex:
+            logger.error("Error in counter incrementer process!", ex)
+            shutdown_gently()
             raise
 
+    shutdown_gently()
+
 
 def counter_checker(tester, to_verify_queue, verification_done_queue):
     """
@@ -186,18 +215,24 @@
     # 'tester' is a cloned object so we shouldn't be inappropriately sharing anything with another process
     session = tester.patient_cql_connection(tester.node1, keyspace="upgrade", protocol_version=tester.protocol_version)
 
+    running = True
+
     prepared = session.prepare("SELECT c FROM countertable WHERE k1=?")
     prepared.consistency_level = ConsistencyLevel.QUORUM
 
     def handle_sigterm(signum, frame):
-        # need to close queue gracefully if possible, or the data_checker process
-        # can't seem to empty the queue and test failures result.
+        nonlocal running
+        running = False
+
+    def shutdown_gently():
+        logger.info("Counter checker process terminating, closing queues")
+        to_verify_queue.close()
         verification_done_queue.close()
-        exit(0)
+        session.shutdown()
 
     signal.signal(signal.SIGTERM, handle_sigterm)
 
-    while True:
+    while running:
         try:
             # here we could block, but if the writer process terminates early with an empty queue
             # we would end up blocking indefinitely
@@ -207,9 +242,9 @@
         except Empty:
             time.sleep(0.1)  # let's not eat CPU if the queue is empty
             continue
-        except Exception:
-            logger.debug("Error in counter verifier process!")
-            verification_done_queue.close()
+        except Exception as ex:
+            logger.error("Error in counter verifier process!", ex)
+            shutdown_gently()
             raise
         else:
             tester.assertEqual(expected_count, actual_count, "Data did not match expected value!")
@@ -223,6 +258,8 @@
                 # rewrite rows in the same sequence as originally written
                 pass
 
+    shutdown_gently()
+
 
 @pytest.mark.upgrade_test
 @pytest.mark.resource_intensive
@@ -368,6 +405,7 @@
             # Stop write processes
             write_proc.terminate()
             # wait for the verification queue's to empty (and check all rows) before continuing
+            self._check_on_subprocs([verify_proc])  # make sure the verification processes are running still
             self._wait_until_queue_condition('writes pending verification', verification_queue, operator.le, 0, max_wait_s=1200)
             self._check_on_subprocs([verify_proc])  # make sure the verification processes are running still
 
@@ -447,7 +485,7 @@
         for node in nodes:
             node.set_install_dir(version=version_meta.version)
             logger.debug("Set new cassandra dir for %s: %s" % (node.name, node.get_install_dir()))
-            if internode_ssl and (version_meta.family == 'trunk' or version_meta.family >= '4.0'):
+            if internode_ssl and (LooseVersion(version_meta.family) >= CASSANDRA_4_0):
                 node.set_configuration_options({'server_encryption_options': {'enabled': True, 'enable_legacy_ssl_storage_port': True}})
 
         # hacky? yes. We could probably extend ccm to allow this publicly.
diff --git a/upgrade_tests/upgrade_udtfix_test.py b/upgrade_tests/upgrade_udtfix_test.py
index a138755..6c97474 100644
--- a/upgrade_tests/upgrade_udtfix_test.py
+++ b/upgrade_tests/upgrade_udtfix_test.py
@@ -4,12 +4,13 @@
 from dtest import RUN_STATIC_UPGRADE_MATRIX, Tester
 from distutils.version import LooseVersion
 from tools.misc import add_skip
-from .upgrade_manifest import build_upgrade_pairs
+from .upgrade_manifest import build_upgrade_pairs, CASSANDRA_3_0
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
 @pytest.mark.upgrade_test
+@since('3.11.6')
 class UpgradeUDTFixTest(Tester):
     __test__ = False
 
@@ -154,7 +155,7 @@
 
     start_family = spec['UPGRADE_PATH'].starting_meta.family
     upgrade_family = spec['UPGRADE_PATH'].upgrade_meta.family
-    start_family_applies = start_family == '3.0' and (upgrade_family == 'trunk' or LooseVersion(upgrade_family) > '3.0')
+    start_family_applies = start_family == CASSANDRA_3_0
     upgrade_applies_to_env = RUN_STATIC_UPGRADE_MATRIX or start_family_applies
     cls = type(gen_class_name, (UpgradeUDTFixTest,), spec)
     if not upgrade_applies_to_env: