Fixed a NullPointerException when calling nodetool enablethrift

patch by David Capwell; reviewed by Ekaterina Dimitrova, Jordan West, Yifan Cai for CASSANDRA-16127
diff --git a/bootstrap_test.py b/bootstrap_test.py
index 526992d..1cda916 100644
--- a/bootstrap_test.py
+++ b/bootstrap_test.py
@@ -835,10 +835,17 @@
         shutil.rmtree(commitlog_dir)
 
     @since('2.2')
+    @pytest.mark.ported_to_in_jvm # see org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest
     def test_bootstrap_binary_disabled(self):
         """
-        Test binary while bootstrapping and streaming fails
-        @jira_ticket CASSANDRA-14526, CASSANDRA-14525
+        Test binary while bootstrapping and streaming fails.
+
+        This test was ported to jvm-dtest org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest,
+        as of this writing there are a few limitations with jvm-dtest which requries this test to
+        stay, namely vnode support (ci also tests under different configs).  Once jvm-dtest supports
+        vnodes, this test can go away in favor of that class.
+
+        @jira_ticket CASSANDRA-14526, CASSANDRA-14525, CASSANDRA-16127
         """
         config = {'authenticator': 'org.apache.cassandra.auth.PasswordAuthenticator',
                   'authorizer': 'org.apache.cassandra.auth.CassandraAuthorizer',
@@ -871,9 +878,6 @@
         node2.start(jvm_args=["-Dcassandra.ring_delay_ms=5000"])
         self.assert_log_had_msg(node2, 'Some data streaming failed')
 
-        if self.cluster.version() >= LooseVersion('4.0'):
-            self.assert_log_had_msg(node2, 'Not starting client transports as bootstrap has not completed')
-
         try:
             node2.nodetool('join')
             pytest.fail('nodetool should have errored and failed to join ring')
diff --git a/client_network_stop_start_test.py b/client_network_stop_start_test.py
new file mode 100644
index 0000000..6f472df
--- /dev/null
+++ b/client_network_stop_start_test.py
@@ -0,0 +1,112 @@
+import logging
+import os
+import os.path
+import pytest
+import shutil
+import string
+import time
+
+from ccmlib.node import TimeoutError
+from distutils.version import LooseVersion
+from dtest import Tester
+from tools import sslkeygen
+
+since = pytest.mark.since
+logger = logging.getLogger(__name__)
+
+# see https://issues.apache.org/jira/browse/CASSANDRA-16127
+class TestClientNetworkStopStart(Tester):
+
+    def _normalize(self, a):
+        return a.translate(str.maketrans(dict.fromkeys(string.whitespace)))
+
+    def _in(self, a, b):
+        return self._normalize(a) in self._normalize(b)
+
+    def _assert_client_active_msg(self, name, enabled, out):
+        expected = "{} active: {}".format(name, str(enabled).lower())
+        actived = "actived" if enabled else "deactivated"
+        assert self._in(expected, out), "{} is expected to be {} ({}) but was not found in output: {}".format(name, actived, str(enabled).lower(), out)
+
+    def _assert_watch_log_for(self, node_or_cluster, to_watch, assert_msg=None):
+        if assert_msg is None:
+            assert_msg = "Unable to locate '{}'".format(to_watch)
+        nodelist_fn = getattr(node_or_cluster, "nodelist", None)
+        logger.debug("watching for '{}'".format(to_watch))
+        start = time.perf_counter()
+        if callable(nodelist_fn):
+            for node in nodelist_fn():
+                assert node.watch_log_for_no_errors(to_watch), assert_msg
+        else:
+            assert node_or_cluster.watch_log_for_no_errors(to_watch), assert_msg
+        logger.debug("Completed watching for '{}'; took {}s".format(to_watch, time.perf_counter() - start))
+
+    def _assert_binary_actually_found(self, node_or_cluster):
+        # ccm will silently move on if the logs don't have CQL in time, which then leads to
+        # flaky tests; to avoid that force waiting to be correct and assert the log was seen.
+        logger.debug("Verifying that the CQL log was seen and that ccm didn't return early...")
+        self._assert_watch_log_for(node_or_cluster, "Starting listening for CQL clients on", "Binary didn't start...")
+
+    def _assert_client_enable(self, node, native_enabled=True, thrift_enabled=False):
+        out = node.nodetool("info")
+        self._assert_client_active_msg("Native Transport", native_enabled, out.stdout)
+        if node.get_cassandra_version() >= LooseVersion('4.0'):
+            assert "Thrift" not in out.stdout, "Thrift found in output: {}".format(out.stdout)
+        else:
+            self._assert_client_active_msg("Thrift", thrift_enabled, out.stdout)
+
+    def _assert_startup(self, node_or_cluster):
+        """Checks to see if the startup message was found"""
+        self._assert_watch_log_for(node_or_cluster, "Startup complete", "Unable to find startup message, either the process crashed or is missing CASSANDRA-16127")
+
+    @since(['2.2', '3.0.23', '3.11.9'])
+    def test_defaults(self):
+        """Tests default configurations have the correct client network setup"""
+        cluster = self.cluster
+        logger.debug("Starting cluster..")
+        cluster.populate(1).start(wait_for_binary_proto=True)
+        self._assert_binary_actually_found(cluster)
+        self._assert_startup(cluster)
+        node = cluster.nodelist()[0]
+        self._assert_client_enable(node)
+
+    @since(['2.2', '3.0.23', '3.11.9'], max_version='3.11.x')
+    def test_hsha_defaults(self):
+        """Enables hsha"""
+        cluster = self.cluster
+        logger.debug("Starting cluster..")
+        cluster.set_configuration_options(values={
+            'rpc_server_type': 'hsha',
+            # seems 1 causes a dead lock... heh...
+            'rpc_min_threads': 16,
+            'rpc_max_threads': 2048
+        })
+        cluster.populate(1).start(wait_for_binary_proto=True)
+        self._assert_binary_actually_found(cluster)
+        self._assert_startup(cluster)
+        node = cluster.nodelist()[0]
+        self._assert_client_enable(node)
+
+    @since(['2.2', '3.0.23', '3.11.9'], max_version='3.11.x')
+    def test_hsha_with_ssl(self):
+        """Enables hsha with ssl"""
+        cluster = self.cluster
+        logger.debug("Starting cluster..")
+        cred = sslkeygen.generate_credentials("127.0.0.1")
+        cluster.set_configuration_options(values={
+            'rpc_server_type': 'hsha',
+            # seems 1 causes a dead lock... heh...
+            'rpc_min_threads': 16,
+            'rpc_max_threads': 2048,
+            'client_encryption_options': {
+                'enabled': True,
+                'optional': False,
+                'keystore': cred.cakeystore,
+                'keystore_password': 'cassandra'
+            }
+        })
+        cluster.populate(1).start(wait_for_binary_proto=True)
+        self._assert_binary_actually_found(cluster)
+        self._assert_startup(cluster)
+        node = cluster.nodelist()[0]
+        self._assert_client_enable(node)
diff --git a/conftest.py b/conftest.py
index 25cc791..a17e876 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,4 +1,5 @@
 import copy
+import collections
 import inspect
 import logging
 import os
@@ -374,10 +375,30 @@
 
 
 def _skip_msg(current_running_version, since_version, max_version):
-    if loose_version_compare(current_running_version, since_version) < 0:
+    if isinstance(since_version, collections.Sequence):
+        previous = None
+        since_version.sort()
+
+        for i in range(1, len(since_version) + 1):
+            sv = since_version[-i]
+            if loose_version_compare(current_running_version, sv) >= 0:
+                if not previous:
+                    if max_version and loose_version_compare(current_running_version, max_version) > 0:
+                        return "%s > %s" % (current_running_version, max_version)
+                    return None
+
+                if loose_version_compare(current_running_version, previous) < 0:
+                    return None
+
+            previous = LooseVersion('.'.join([str(s) for s in sv.version[:-1]]))
+
+        # no matches found, so fail
         return "%s < %s" % (current_running_version, since_version)
-    if max_version and loose_version_compare(current_running_version, max_version) > 0:
-        return "%s > %s" % (current_running_version, max_version)
+    else:
+        if loose_version_compare(current_running_version, since_version) < 0:
+            return "%s < %s" % (current_running_version, since_version)
+        if max_version and loose_version_compare(current_running_version, max_version) > 0:
+            return "%s > %s" % (current_running_version, max_version)
 
 
 @pytest.fixture(autouse=True)
@@ -388,8 +409,11 @@
         if max_version_str:
             max_version = LooseVersion(max_version_str)
 
-        since_str = request.node.get_closest_marker('since').args[0]
-        since = LooseVersion(since_str)
+        since_str_or_list = request.node.get_closest_marker('since').args[0]
+        if not isinstance(since_str_or_list, str) and isinstance(since_str_or_list, collections.Sequence):
+            since = [LooseVersion(since_str) for since_str in since_str_or_list]
+        else:
+            since = LooseVersion(since_str_or_list)
         # For upgrade tests don't run the test if any of the involved versions
         # are excluded by the annotation
         if hasattr(request.cls, "UPGRADE_PATH"):
@@ -414,6 +438,51 @@
             if skip_msg:
                 pytest.skip(skip_msg)
 
+def _skip_ported_msg(current_running_version, ported_from_version):
+    if loose_version_compare(current_running_version, ported_from_version) >= 0:
+        return "ported to in-JVM from %s >= %s" % (ported_from_version, current_running_version)
+
+
+@pytest.fixture(autouse=True)
+def fixture_ported_to_in_jvm(request, fixture_dtest_setup):
+    """
+    Adds a new mark called 'ported_to_in_jvm' which denotes that a test was ported to jvm-dtest.
+
+    As of this point in time there are weaknesses of jvm-dtest which require these tests to still
+    be run in the cases not covered by jvm-dtest; namely vnode.
+    """
+    marker = request.node.get_closest_marker('ported_to_in_jvm')
+    if marker and not request.config.getoption("--use-vnodes"):
+
+        if not marker.args:
+            pytest.skip("ported to in-jvm")
+
+        from_str = marker.args[0]
+        ported_from_version = LooseVersion(from_str)
+
+        # For upgrade tests don't run the test if any of the involved versions
+        # are excluded by the annotation
+        if hasattr(request.cls, "UPGRADE_PATH"):
+            upgrade_path = request.cls.UPGRADE_PATH
+            ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.starting_meta.version)
+            starting_version = get_version_from_build(ccm_repo_cache_dir)
+            skip_msg = _skip_ported_msg(starting_version, ported_from_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
+            ccm_repo_cache_dir, _ = ccmlib.repository.setup(upgrade_path.upgrade_meta.version)
+            ending_version = get_version_from_build(ccm_repo_cache_dir)
+            skip_msg = _skip_ported_msg(ending_version, ported_from_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
+        else:
+            # For regular tests the value in the current cluster actually means something so we should
+            # use that to check.
+            # Use cassandra_version_from_build as it's guaranteed to be a LooseVersion
+            # whereas cassandra_version may be a string if set in the cli options
+            current_running_version = fixture_dtest_setup.dtest_config.cassandra_version_from_build
+            skip_msg = _skip_ported_msg(current_running_version, ported_from_version)
+            if skip_msg:
+                pytest.skip(skip_msg)
 
 @pytest.fixture(autouse=True)
 def fixture_skip_version(request, fixture_dtest_setup):