Merge branch 'cassandra-4.1' into trunk
diff --git a/.build/parent-pom-template.xml b/.build/parent-pom-template.xml
index 7200bb4..e4b63bf 100644
--- a/.build/parent-pom-template.xml
+++ b/.build/parent-pom-template.xml
@@ -489,7 +489,7 @@
       <dependency>
         <groupId>org.apache.cassandra</groupId>
         <artifactId>dtest-api</artifactId>
-        <version>0.0.13</version>
+        <version>0.0.15</version>
         <scope>test</scope>
       </dependency>
       <dependency>
diff --git a/build.xml b/build.xml
index a3ef1fc..3e0a8e9 100644
--- a/build.xml
+++ b/build.xml
@@ -207,6 +207,7 @@
         <string>--add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED</string>
         <string>--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED</string>
         <string>--add-exports java.rmi/sun.rmi.server=ALL-UNNAMED</string>
+        <string>--add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
         <string>--add-exports java.sql/java.sql=ALL-UNNAMED</string>
 
         <string>--add-opens java.base/java.lang.module=ALL-UNNAMED</string>
@@ -237,6 +238,7 @@
         <string>--add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED</string>
         <string>--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED</string>
         <string>--add-exports java.rmi/sun.rmi.server=ALL-UNNAMED</string>
+        <string>--add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
         <string>--add-exports java.sql/java.sql=ALL-UNNAMED</string>
         <string>--add-exports java.base/java.lang.ref=ALL-UNNAMED</string>
         <string>--add-exports jdk.unsupported/sun.misc=ALL-UNNAMED</string>
@@ -263,6 +265,8 @@
         <string>--add-opens java.base/java.math=ALL-UNNAMED</string>
         <string>--add-opens java.base/java.lang.reflect=ALL-UNNAMED</string>
         <string>--add-opens java.base/java.net=ALL-UNNAMED</string>
+
+        <string>--add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
     </resources>
     <pathconvert property="_jvm_args_concat2" refid="_jvm17_arg_items" pathsep=" "/>
     <condition property="java17-jvmargs" value="${_jvm_args_concat2}" else="">
@@ -307,7 +311,8 @@
     </condition>
 
     <!-- needed to compile org.apache.cassandra.utils.JMXServerUtils -->
-    <condition property="jdk11plus-javac-exports" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED" else="">
+    <!-- needed to compile org.apache.cassandra.distributed.impl.Instance-->
+    <condition property="jdk11plus-javac-exports" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" else="">
         <not>
             <equals arg1="${ant.java.version}" arg2="1.8"/>
         </not>
@@ -1035,6 +1040,7 @@
      <src path="${test.simulator-asm.src}"/>
      <src path="${test.simulator-bootstrap.src}"/>
      <src path="${test.simulator-test.src}"/>
+     <compilerarg line="${jdk11plus-javac-exports}"/>
     </javac>
 
     <!-- Non-java resources needed by the test suite -->
@@ -1811,6 +1817,24 @@
       <echo>"IDE configuration updated for use with JDK11"</echo>
   </target>
 
+  <target name="_maybe_update_idea_to_java17" depends="init" if="java.version.17">
+        <replace file="${eclipse.project.name}.iml" token="JDK_1_8" value="JDK_17"/>
+        <replace file=".idea/misc.xml" token="JDK_1_8" value="JDK_17"/>
+        <replace file=".idea/misc.xml" token="1.8" value="17"/>
+        <replaceregexp file=".idea/workspace.xml"
+                       match="name=&quot;VM_PARAMETERS&quot; value=&quot;(.*)"
+                       replace="name=&quot;VM_PARAMETERS&quot; value=&quot;\1 ${java17-jvmargs}"
+                       byline="true"/>
+
+        <echo file=".idea/compiler.xml"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="JavacSettings">
+    <option name="ADDITIONAL_OPTIONS_STRING" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" />
+  </component>
+</project>]]></echo>
+        <echo>"IDE configuration updated for use with JDK17"</echo>
+    </target>
+
   <!-- Generate IDEA project description files -->
   <target name="generate-idea-files" depends="init,resolver-dist-lib,gen-cql3-grammar,generate-jflex-java,createVersionPropFile" description="Generate IDEA files">
     <mkdir dir=".idea"/>
@@ -1831,6 +1855,7 @@
   </component>
 </project>]]></echo>
       <antcall target="_maybe_update_idea_to_java11"/>
+      <antcall target="_maybe_update_idea_to_java17"/>
   </target>
 
   <!-- Generate Eclipse project description files -->
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 78e1992..4daa825 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -200,6 +200,8 @@
     DRAIN_EXECUTOR_TIMEOUT_MS("cassandra.drain_executor_timeout_ms", convertToString(TimeUnit.MINUTES.toMillis(5))),
     DROP_OVERSIZED_READ_REPAIR_MUTATIONS("cassandra.drop_oversized_readrepair_mutations"),
     DTEST_API_LOG_TOPOLOGY("cassandra.dtest.api.log.topology"),
+    /** This property indicates if the code is running under the in-jvm dtest framework */
+    DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
     ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
     /**
      * Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
@@ -263,6 +265,11 @@
      */
     JAVA_LIBRARY_PATH("java.library.path"),
     /**
+     * Controls the distributed garbage collector lease time for JMX objects.
+     * Should only be set by in-jvm dtests.
+     */
+    JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST("java.rmi.dgc.leaseValue"),
+    /**
      * The value of this property represents the host name string
      * that should be associated with remote stubs for locally created remote objects,
      * in order to allow clients to invoke methods on the remote object.
@@ -421,6 +428,11 @@
     /** Platform word size sun.arch.data.model. Examples: "32", "64", "unknown"*/
     SUN_ARCH_DATA_MODEL("sun.arch.data.model"),
     SUN_JAVA_COMMAND("sun.java.command", ""),
+    /**
+     * Controls the JMX server threadpool keap-alive time.
+     * Should only be set by in-jvm dtests.
+     */
+    SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME("sun.rmi.transport.tcp.threadKeepAliveTime"),
     SUN_STDERR_ENCODING("sun.stderr.encoding"),
     SUN_STDOUT_ENCODING("sun.stdout.encoding"),
     SUPERUSER_SETUP_DELAY_MS("cassandra.superuser_setup_delay_ms", "10000"),
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c290d9e..8f92215 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -148,18 +148,16 @@
 
     public GCInspector()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
         try
         {
             ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*");
-            for (ObjectName name : mbs.queryNames(gcName, null))
+            for (ObjectName name : MBeanWrapper.instance.queryNames(gcName, null))
             {
-                GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class);
+                GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(MBeanWrapper.instance.getMBeanServer(), name.getCanonicalName(), GarbageCollectorMXBean.class);
                 gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
             }
             ObjectName me = new ObjectName(MBEAN_NAME);
-            if (!mbs.isRegistered(me))
+            if (!MBeanWrapper.instance.isRegistered(me))
                 MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
         }
         catch (MalformedObjectNameException | IOException e)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1d4a0b1..5e66718 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -987,7 +987,8 @@
         completeInitialization();
     }
 
-    private void completeInitialization()
+    @VisibleForTesting
+    public void completeInitialization()
     {
         if (!initialized)
             registerMBeans();
diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
index ba9c704..45caf6e 100644
--- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java
+++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
@@ -29,12 +29,14 @@
 import java.net.InetAddress;
 import java.rmi.AccessException;
 import java.rmi.AlreadyBoundException;
+import java.rmi.NoSuchObjectException;
 import java.rmi.NotBoundException;
 import java.rmi.Remote;
 import java.rmi.RemoteException;
 import java.rmi.registry.Registry;
 import java.rmi.server.RMIClientSocketFactory;
 import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -257,7 +259,8 @@
         return env;
     }
 
-    private static void logJmxServiceUrl(InetAddress serverAddress, int port)
+    @VisibleForTesting
+    public static void logJmxServiceUrl(InetAddress serverAddress, int port)
     {
         String urlTemplate = "service:jmx:rmi://%1$s/jndi/rmi://%1$s:%2$d/jmxrmi";
         String hostName;
@@ -330,11 +333,11 @@
      * Better to use the internal API than re-invent the wheel.
      */
     @SuppressWarnings("restriction")
-    private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+    public static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
         private final String lookupName;
         private Remote remoteServerStub;
 
-        JmxRegistry(final int port,
+        public JmxRegistry(final int port,
                     final RMIClientSocketFactory csf,
                     RMIServerSocketFactory ssf,
                     final String lookupName) throws RemoteException {
@@ -367,5 +370,24 @@
         public void setRemoteServerStub(Remote remoteServerStub) {
             this.remoteServerStub = remoteServerStub;
         }
+
+        /**
+         * Closes the underlying JMX registry by unexporting this instance.
+         * There is no reason to do this except for in-jvm dtests where we need
+         * to stop the registry, so we can start with a clean slate for future cluster
+         * builds, and the superclass never expects to be shut down and therefore doesn't
+         * handle this edge case at all.
+         */
+        @VisibleForTesting
+        public void close() {
+            try
+            {
+                UnicastRemoteObject.unexportObject(this, true);
+            }
+            catch (NoSuchObjectException ignored)
+            {
+                // Ignore if it's already unexported
+            }
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
index f003e1a..0ef342d 100644
--- a/src/java/org/apache/cassandra/utils/MBeanWrapper.java
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -19,15 +19,21 @@
 package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.QueryExp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
+import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST;
 import static org.apache.cassandra.config.CassandraRelevantProperties.MBEAN_REGISTRATION_CLASS;
 
 /**
@@ -36,23 +42,46 @@
  */
 public interface MBeanWrapper
 {
-    static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+    Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
 
-    static final MBeanWrapper instance = create();
+    MBeanWrapper instance = create();
 
     static MBeanWrapper create()
     {
+        // If we're running in the in-jvm dtest environment, always use the delegating
+        // mbean wrapper even if we start off with no-op, so it can be switched later
+        if (DTEST_IS_IN_JVM_DTEST.getBoolean())
+        {
+            return new DelegatingMbeanWrapper(getMBeanWrapper());
+        }
+
+        return getMBeanWrapper();
+    }
+
+    static MBeanWrapper getMBeanWrapper()
+    {
         if (ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.getBoolean())
+        {
             return new NoOpMBeanWrapper();
+        }
 
         String klass = MBEAN_REGISTRATION_CLASS.getString();
         if (klass == null)
-            return new PlatformMBeanWrapper();
+        {
+            if (DTEST_IS_IN_JVM_DTEST.getBoolean())
+            {
+                return new NoOpMBeanWrapper();
+            }
+            else
+            {
+                return new PlatformMBeanWrapper();
+            }
+        }
         return FBUtilities.construct(klass, "mbean");
     }
 
     // Passing true for graceful will log exceptions instead of rethrowing them
-    public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+    void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
     default void registerMBean(Object obj, ObjectName mbeanName)
     {
         registerMBean(obj, mbeanName, OnException.THROW);
@@ -62,7 +91,9 @@
     {
         ObjectName name = create(mbeanName, onException);
         if (name == null)
+        {
             return;
+        }
         registerMBean(obj, name, onException);
     }
     default void registerMBean(Object obj, String mbeanName)
@@ -70,7 +101,7 @@
         registerMBean(obj, mbeanName, OnException.THROW);
     }
 
-    public boolean isRegistered(ObjectName mbeanName, OnException onException);
+    boolean isRegistered(ObjectName mbeanName, OnException onException);
     default boolean isRegistered(ObjectName mbeanName)
     {
         return isRegistered(mbeanName, OnException.THROW);
@@ -80,7 +111,9 @@
     {
         ObjectName name = create(mbeanName, onException);
         if (name == null)
+        {
             return false;
+        }
         return isRegistered(name, onException);
     }
     default boolean isRegistered(String mbeanName)
@@ -88,7 +121,7 @@
         return isRegistered(mbeanName, OnException.THROW);
     }
 
-    public void unregisterMBean(ObjectName mbeanName, OnException onException);
+    void unregisterMBean(ObjectName mbeanName, OnException onException);
     default void unregisterMBean(ObjectName mbeanName)
     {
         unregisterMBean(mbeanName, OnException.THROW);
@@ -98,7 +131,9 @@
     {
         ObjectName name = create(mbeanName, onException);
         if (name == null)
+        {
             return;
+        }
         unregisterMBean(name, onException);
     }
     default void unregisterMBean(String mbeanName)
@@ -119,7 +154,11 @@
         }
     }
 
-    static class NoOpMBeanWrapper implements MBeanWrapper
+    Set<ObjectName> queryNames(ObjectName name, QueryExp query);
+
+    MBeanServer getMBeanServer();
+
+    class NoOpMBeanWrapper implements MBeanWrapper
     {
         public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
         public void registerMBean(Object obj, String mbeanName, OnException onException) {}
@@ -127,9 +166,11 @@
         public boolean isRegistered(String mbeanName, OnException onException) { return false; }
         public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
         public void unregisterMBean(String mbeanName, OnException onException) {}
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {return Collections.emptySet(); }
+        public MBeanServer getMBeanServer() { return null; }
     }
 
-    static class PlatformMBeanWrapper implements MBeanWrapper
+    class PlatformMBeanWrapper implements MBeanWrapper
     {
         private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
@@ -168,9 +209,158 @@
                 onException.handler.accept(e);
             }
         }
+
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return mbs.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return mbs;
+        }
     }
 
-    public enum OnException
+    class InstanceMBeanWrapper implements MBeanWrapper
+    {
+        private MBeanServer mbs;
+        public final UUID id = UUID.randomUUID();
+
+        public InstanceMBeanWrapper(String hostname)
+        {
+            mbs = MBeanServerFactory.createMBeanServer(hostname + "-" + id);
+        }
+
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public boolean isRegistered(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                return mbs.isRegistered(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public void unregisterMBean(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return mbs.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return mbs;
+        }
+
+        public void close() {
+            mbs.queryNames(null, null).forEach(name -> {
+                try {
+                    if (!name.getCanonicalName().contains("MBeanServerDelegate"))
+                    {
+                        mbs.unregisterMBean(name);
+                    }
+                } catch (Throwable e) {
+                    logger.debug("Could not unregister mbean {}", name.getCanonicalName());
+                }
+            });
+            MBeanServerFactory.releaseMBeanServer(mbs);
+            mbs = null;
+        }
+    }
+
+    class DelegatingMbeanWrapper implements MBeanWrapper
+    {
+        MBeanWrapper delegate;
+
+        public DelegatingMbeanWrapper(MBeanWrapper mBeanWrapper)
+        {
+            delegate = mBeanWrapper;
+        }
+
+        public void setDelegate(MBeanWrapper wrapper) {
+            delegate = wrapper;
+        }
+
+        public MBeanWrapper getDelegate()
+        {
+            return delegate;
+        }
+        
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                delegate.registerMBean(obj, mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public boolean isRegistered(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                return delegate.isRegistered(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public void unregisterMBean(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                delegate.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return delegate.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return delegate.getMBeanServer();
+        }
+    }
+
+    enum OnException
     {
         THROW(e -> { throw new RuntimeException(e); }),
         LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
diff --git a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
new file mode 100644
index 0000000..62ab88f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.rmi.server.RMIClientSocketFactory;
+import java.util.Objects;
+
+/**
+ * This class is used to override the local address the JMX client calculates when trying to connect,
+ * which can otherwise be influenced by the system property "java.rmi.server.hostname" in strange and
+ * unpredictable ways.
+ */
+public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, Serializable
+{
+    private final InetAddress localAddress;
+
+    public RMIClientSocketFactoryImpl(InetAddress localAddress)
+    {
+        this.localAddress = localAddress;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port) throws IOException
+    {
+        return new Socket(localAddress, port);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RMIClientSocketFactoryImpl that = (RMIClientSocketFactoryImpl) o;
+        return Objects.equals(localAddress, that.localAddress);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(localAddress);
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
index 801256d..bf6ad2c 100644
--- a/src/java/org/apache/cassandra/utils/ReflectionUtils.java
+++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -30,10 +30,15 @@
 
     public static Field getModifiersField() throws NoSuchFieldException
     {
+        return getField(Field.class, "modifiers");
+    }
+
+    public static Field getField(Class<?> clazz, String fieldName) throws NoSuchFieldException
+    {
         // below code works before Java 12
         try
         {
-            return Field.class.getDeclaredField("modifiers");
+            return clazz.getDeclaredField(fieldName);
         }
         catch (NoSuchFieldException e)
         {
@@ -42,10 +47,10 @@
             {
                 Method getDeclaredFields0 = Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
                 getDeclaredFields0.setAccessible(true);
-                Field[] fields = (Field[]) getDeclaredFields0.invoke(Field.class, false);
+                Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz, false);
                 for (Field field : fields)
                 {
-                    if ("modifiers".equals(field.getName()))
+                    if (fieldName.equals(field.getName()))
                     {
                         return field;
                     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 78b14d2..79120a2 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -186,6 +186,8 @@
         private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
 
         {
+            // Indicate that we are running in the in-jvm dtest environment
+            CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST.setBoolean(true);
             // those properties may be set for unit-test optimizations; those should not be used when running dtests
             CassandraRelevantProperties.TEST_FLUSH_LOCAL_SCHEMA_CHANGES.reset();
             CassandraRelevantProperties.NON_GRACEFUL_SHUTDOWN.reset();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
new file mode 100644
index 0000000..0fc7425
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.rmi.server.RMIServerSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import javax.net.ServerSocketFactory;
+
+
+/**
+ * This class is used to keep track of RMI servers created during a cluster creation so we can
+ * later close the sockets, which would otherwise be left with a thread running waiting for
+ * connections that would never show up as the server was otherwise closed.
+ */
+class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory
+{
+    private final InetAddress bindAddress;
+    List<ServerSocket> sockets = new ArrayList<>();
+
+    public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress)
+    {
+        this.bindAddress = bindAddress;
+    }
+
+    @Override
+    public ServerSocket createServerSocket(int pPort) throws IOException
+    {
+        ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress);
+        try
+        {
+            result.setReuseAddress(true);
+        }
+        catch (SocketException e)
+        {
+            result.close();
+            throw e;
+        }
+        sockets.add(result);
+        return result;
+    }
+
+
+    public void close() throws IOException
+    {
+        for (ServerSocket socket : sockets)
+        {
+            socket.close();
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o;
+        return Objects.equals(bindAddress, that.bindAddress);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(bindAddress);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index 1ec844e..99ef272 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -56,6 +56,11 @@
                     {
                         return 9041 + nodeNum;
                     }
+
+                    public int jmxPort(int nodeNum)
+                    {
+                        return 7199 + nodeNum;
+                    }
                 };
             }
         },
@@ -89,6 +94,11 @@
                     {
                         return 9042;
                     }
+
+                    public int jmxPort(int nodeNum)
+                    {
+                        return 7199;
+                    }
                 };
             }
         };
@@ -100,4 +110,5 @@
     abstract String ipAddress(int nodeNum);
     abstract int storagePort(int nodeNum);
     abstract int nativeTransportPort(int nodeNum);
+    abstract int jmxPort(int nodeNum);
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 435de65..707da49 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -160,6 +160,7 @@
 import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_CASSANDRA_TESTTAG;
 import static org.apache.cassandra.distributed.api.Feature.BLANK_GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort;
@@ -177,6 +178,7 @@
     private volatile boolean initialized = false;
     private volatile boolean internodeMessagingStarted = false;
     private final AtomicLong startedAt = new AtomicLong();
+    private IsolatedJmx isolatedJmx;
 
     @Deprecated
     Instance(IInstanceConfig config, ClassLoader classLoader)
@@ -603,6 +605,9 @@
                                                                                                     config.networkTopology(), config.broadcastAddress());
                 DistributedTestSnitch.assign(config.networkTopology());
 
+                if (config.has(JMX))
+                    startJmx();
+
                 DatabaseDescriptor.daemonInitialization();
                 LoggingSupportFactory.getLoggingSupport().onStartup();
 
@@ -723,6 +728,7 @@
                     StorageService.instance.setNormalModeUnsafe();
                     Gossiper.instance.register(StorageService.instance);
                     StorageService.instance.startSnapshotManager();
+                    StorageService.instance.completeInitialization();
                 }
 
                 // Populate tokenMetadata for the second time,
@@ -759,6 +765,20 @@
         initialized = true;
     }
 
+    private void startJmx()
+    {
+        this.isolatedJmx = new IsolatedJmx(this, inInstancelogger);
+        isolatedJmx.startJmx();
+    }
+
+    private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+    {
+        if (config.has(JMX))
+        {
+            isolatedJmx.stopJmx();
+        }
+    }
+
     // Update the messaging versions for all instances
     // that have initialized their configurations.
     private static void propagateMessagingVersions(ICluster cluster)
@@ -900,6 +920,8 @@
 
             // ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it.
             error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownNowAndWait(1L, MINUTES));
+            
+            error = parallelRun(error, executor, this::stopJmx);
 
             // Make sure any shutdown hooks registered for DeleteOnExit are released to prevent
             // references to the instance class loaders from being held
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 92c56d6..3c515d5 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -41,6 +41,8 @@
 public class InstanceConfig implements IInstanceConfig
 {
     public final int num;
+    private final int jmxPort;
+
     public int num() { return num; }
 
     private final NetworkTopology networkTopology;
@@ -71,7 +73,8 @@
                            String cdc_raw_directory,
                            Collection<String> initial_token,
                            int storage_port,
-                           int native_transport_port)
+                           int native_transport_port,
+                           int jmx_port)
     {
         this.num = num;
         this.networkTopology = networkTopology;
@@ -113,6 +116,7 @@
                 // legacy parameters
                 .forceSet("commitlog_sync_batch_window_in_ms", "1");
         this.featureFlags = EnumSet.noneOf(Feature.class);
+        this.jmxPort = jmx_port;
     }
 
     private InstanceConfig(InstanceConfig copy)
@@ -124,6 +128,7 @@
         this.hostId = copy.hostId;
         this.featureFlags = copy.featureFlags;
         this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
+        this.jmxPort = copy.jmxPort;
     }
 
     @Override
@@ -168,6 +173,12 @@
         return networkTopology().localDC(broadcastAddress());
     }
 
+    @Override
+    public int jmxPort()
+    {
+        return this.jmxPort;
+    }
+
     public InstanceConfig with(Feature featureFlag)
     {
         featureFlags.add(featureFlag);
@@ -267,7 +278,8 @@
                                   String.format("%s/node%d/cdc", root, nodeNum),
                                   tokens,
                                   provisionStrategy.storagePort(nodeNum),
-                                  provisionStrategy.nativeTransportPort(nodeNum));
+                                  provisionStrategy.nativeTransportPort(nodeNum),
+                                  provisionStrategy.jmxPort(nodeNum));
     }
 
     private static String[] datadirs(int datadirCount, Path root, int nodeNum)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
new file mode 100644
index 0000000..e19e29f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import org.slf4j.Logger;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.utils.JMXServerUtils;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
+import org.apache.cassandra.utils.ReflectionUtils;
+import sun.rmi.transport.tcp.TCPEndpoint;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST;
+import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
+import static org.apache.cassandra.config.CassandraRelevantProperties.SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
+
+public class IsolatedJmx
+{
+    private static final int RMI_KEEPALIVE_TIME = 1000;
+
+    private JMXConnectorServer jmxConnectorServer;
+    private JMXServerUtils.JmxRegistry registry;
+    private RMIJRMPServerImpl jmxRmiServer;
+    private MBeanWrapper.InstanceMBeanWrapper wrapper;
+    private RMIClientSocketFactoryImpl clientSocketFactory;
+    private CollectingRMIServerSocketFactoryImpl serverSocketFactory;
+    private Logger inInstancelogger;
+    private IInstanceConfig config;
+
+    public IsolatedJmx(IInstance instance, Logger inInstanceLogger) {
+        this.config = instance.config();
+        this.inInstancelogger = inInstanceLogger;
+    }
+
+    public void startJmx() {
+        try
+        {
+            // Several RMI threads hold references to in-jvm dtest objects, and are, by default, kept
+            // alive for long enough (minutes) to keep classloaders from being collected.
+            // Set these two system properties to a low value to allow cleanup to occur fast enough
+            // for GC to collect our classloaders.
+            JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST.setInt(RMI_KEEPALIVE_TIME);
+            SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME.setInt(RMI_KEEPALIVE_TIME);
+            ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
+            InetAddress addr = config.broadcastAddress().getAddress();
+
+            int jmxPort = config.jmxPort();
+
+            String hostname = addr.getHostAddress();
+            wrapper = new MBeanWrapper.InstanceMBeanWrapper(hostname + ":" + jmxPort);
+            ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(wrapper);
+            Map<String, Object> env = new HashMap<>();
+
+            serverSocketFactory = new CollectingRMIServerSocketFactoryImpl(addr);
+            env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE,
+                    serverSocketFactory);
+            clientSocketFactory = new RMIClientSocketFactoryImpl(addr);
+            env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE,
+                    clientSocketFactory);
+
+            // configure the RMI registry
+            registry = new JMXServerUtils.JmxRegistry(jmxPort,
+                                                      clientSocketFactory,
+                                                      serverSocketFactory,
+                                                      "jmxrmi");
+
+            // Mark the JMX server as a permanently exported object. This allows the JVM to exit with the
+            // server running and also exempts it from the distributed GC scheduler which otherwise would
+            // potentially attempt a full GC every `sun.rmi.dgc.server.gcInterval` millis (default is 3600000ms)
+            // For more background see:
+            //   - CASSANDRA-2967
+            //   - https://www.jclarity.com/2015/01/27/rmi-system-gc-unplugged/
+            //   - https://bugs.openjdk.java.net/browse/JDK-6760712
+            env.put("jmx.remote.x.daemon", "true");
+
+            // Set the port used to create subsequent connections to exported objects over RMI. This simplifies
+            // configuration in firewalled environments, but it can't be used in conjuction with SSL sockets.
+            // See: CASSANDRA-7087
+            int rmiPort = config.jmxPort();
+
+            // We create the underlying RMIJRMPServerImpl so that we can manually bind it to the registry,
+            // rather then specifying a binding address in the JMXServiceURL and letting it be done automatically
+            // when the server is started. The reason for this is that if the registry is configured with SSL
+            // sockets, the JMXConnectorServer acts as its client during the binding which means it needs to
+            // have a truststore configured which contains the registry's certificate. Manually binding removes
+            // this problem.
+            // See CASSANDRA-12109.
+            jmxRmiServer = new RMIJRMPServerImpl(rmiPort, clientSocketFactory, serverSocketFactory,
+                                                 env);
+            JMXServiceURL serviceURL = new JMXServiceURL("rmi", hostname, rmiPort);
+            jmxConnectorServer = new RMIConnectorServer(serviceURL, env, jmxRmiServer, wrapper.getMBeanServer());
+
+            jmxConnectorServer.start();
+
+            registry.setRemoteServerStub(jmxRmiServer.toStub());
+            JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
+            waitForJmxAvailability(hostname, jmxPort, env);
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException("Feature.JMX was enabled but could not be started.", t);
+        }
+    }
+
+    private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException
+    {
+        String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
+        JMXServiceURL serviceURL = new JMXServiceURL(url);
+        int attempts = 0;
+        Throwable lastThrown = null;
+        while (attempts < 20)
+        {
+            attempts++;
+            try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env))
+            {
+                inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)",
+                                      url, attempts);
+                return;
+            }
+            catch (MalformedURLException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (Throwable thrown)
+            {
+                lastThrown = thrown;
+            }
+            inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts);
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown);
+    }
+
+    public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+    {
+        if (!config.has(JMX))
+            return;
+        // First, swap the mbean wrapper back to a NoOp wrapper
+        // This prevents later attempts to unregister mbeans from failing in Cassandra code, as we're going to
+        // unregister all of them here
+        ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(new MBeanWrapper.NoOpMBeanWrapper());
+        try
+        {
+            wrapper.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close wrapper.", e);
+        }
+        try
+        {
+            jmxConnectorServer.stop();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close jmxConnectorServer.", e);
+        }
+        try
+        {
+            registry.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close registry.", e);
+        }
+        try
+        {
+            serverSocketFactory.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close serverSocketFactory.", e);
+        }
+        // The TCPEndpoint class holds references to a class in the in-jvm dtest framework
+        // which transitively has a reference to the InstanceClassLoader, so we need to
+        // make sure to remove the reference to them when the instance is shutting down
+        clearMapField(TCPEndpoint.class, null, "localEndpoints");
+        Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to give Distributed GC some time to clean up
+    }
+
+    private <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName)
+    throws IllegalAccessException, NoSuchFieldException {
+        Field mapField = ReflectionUtils.getField(clazz, mapName);
+        mapField.setAccessible(true);
+        Map<K, V> map = (Map<K, V>) mapField.get(instance);
+        // Because multiple instances can be shutting down at once,
+        // synchronize on the map to avoid ConcurrentModificationException
+        synchronized (map)
+        {
+            for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); it.hasNext(); )
+            {
+                it.next();
+                it.remove();
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index a5a2dce..794873f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -26,21 +26,29 @@
 import java.text.SimpleDateFormat;
 import java.util.function.Consumer;
 import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
 
-import org.apache.cassandra.io.util.File;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import com.sun.management.HotSpotDiagnosticMXBean;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.SigarLibrary;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.utils.FBUtilities.now;
+import static org.hamcrest.Matchers.startsWith;
 
 /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
  * All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -139,6 +147,11 @@
 
     void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
     {
+        doTest(numClusterNodes, updater, ignored -> {});
+    }
+
+    void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater, Consumer<Cluster> actionToPerform) throws Throwable
+    {
         for (int loop = 0; loop < numTestLoops; loop++)
         {
             System.out.println(String.format("========== Starting loop %03d ========", loop));
@@ -149,6 +162,7 @@
                 cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
                 cluster.get(1).flush(KEYSPACE);
+                actionToPerform.accept(cluster);
                 if (dumpEveryLoop)
                 {
                     dumpResources(String.format("loop%03d", loop));
@@ -207,4 +221,50 @@
         }
         dumpResources("final-native");
     }
+
+    @Test
+    public void looperJmxTest() throws Throwable
+    {
+        doTest(1, config -> config.with(JMX), cluster -> {
+            // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+            // and then the `getHostString`, which would otherwise return the IP address,
+            // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+            for (IInvokableInstance instance:cluster.get(1, cluster.size()))
+            {
+                IInstanceConfig config = instance.config();
+                try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+                {
+                    MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+                    // instances get their default domain set to their IP address, so us it
+                    // to check that we are actually connecting to the correct instance
+                    String defaultDomain = mbsc.getDefaultDomain();
+                    Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-jmx");
+    }
+
+    @Test
+    public void looperEverythingTest() throws Throwable
+    {
+        doTest(1, config -> config.with(Feature.values()));
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-everything");
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
index 1fc2958..71ef4db 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
@@ -27,6 +27,7 @@
 import org.apache.cassandra.db.virtual.LogMessagesTable;
 import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.shared.WithProperties;
@@ -68,12 +69,17 @@
     public void testMultipleAppendersFailToStartNode() throws Throwable
     {
         LOGBACK_CONFIGURATION_FILE.setString("test/conf/logback-dtest_with_vtable_appender_invalid.xml");
+
+        // NOTE: Because cluster startup is expected to fail in this case, and can leave things in a weird state
+        // for the next state, create without starting, and set failure as shutdown to false,
+        // so the try-with-resources can close instances properly.
         try (WithProperties properties = new WithProperties().set(LOGBACK_CONFIGURATION_FILE, "test/conf/logback-dtest_with_vtable_appender_invalid.xml");
-             Cluster ignored = Cluster.build(1)
-                                      .withConfig(c -> c.with(Feature.values()))
-                                      .start();
-             )
+             Cluster cluster = Cluster.build(1)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        .set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+                                      .createWithoutStarting())
         {
+            cluster.startup();
             fail("Node should not start as there is supposed to be invalid logback configuration file.");
         }
         catch (IllegalStateException ex)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
new file mode 100644
index 0000000..83a35e5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.hamcrest.Matchers.startsWith;
+
+public class JMXFeatureTest extends TestBaseImpl
+{
+
+    /**
+     * Test the in-jvm dtest JMX feature.
+     * - Create a cluster with multiple JMX servers, one per instance
+     * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance
+     * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultipleNetworkInterfacesProvisioning() throws Exception
+    {
+        int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+        Set<String> allInstances = new HashSet<>();
+        for (int i = 0; i < iterations; i++)
+        {
+            try (Cluster cluster = Cluster.build(2)
+                                          .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces)
+                                          .withConfig(c -> c.with(Feature.values())).start())
+            {
+                Set<String> instancesContacted = new HashSet<>();
+                for (IInvokableInstance instance : cluster.get(1, 2))
+                {
+                    testInstance(instancesContacted, instance);
+                }
+                Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+                allInstances.addAll(instancesContacted);
+            }
+        }
+        Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+    }
+
+    @Test
+    public void testOneNetworkInterfaceProvisioning() throws Exception
+    {
+        int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+        Set<String> allInstances = new HashSet<>();
+        for (int i = 0; i < iterations; i++)
+        {
+            try (Cluster cluster = Cluster.build(2)
+                                          .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.OneNetworkInterface)
+                                          .withConfig(c -> c.with(Feature.values())).start())
+            {
+                Set<String> instancesContacted = new HashSet<>();
+                for (IInvokableInstance instance : cluster.get(1, 2))
+                {
+                    testInstance(instancesContacted, instance);
+                }
+                Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+                allInstances.addAll(instancesContacted);
+            }
+        }
+        Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+    }
+
+    private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException
+    {
+        // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+        // and then the `getHostString`, which would otherwise return the IP address,
+        // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+        IInstanceConfig config = instance.config();
+        try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+        {
+            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+            // instances get their default domain set to their IP address, so us it
+            // to check that we are actually connecting to the correct instance
+            String defaultDomain = mbsc.getDefaultDomain();
+            instancesContacted.add(defaultDomain);
+            Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index b935e24..ece6052 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.distributed.test.jmx;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -31,7 +30,6 @@
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXServiceURL;
 
 import com.google.common.collect.ImmutableSet;
@@ -39,11 +37,8 @@
 
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
-import org.apache.cassandra.utils.JMXServerUtils;
-
-import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
-import static org.apache.cassandra.cql3.CQLTester.getAutomaticallyAllocatedPort;
 
 public class JMXGetterCheckTest extends TestBaseImpl
 {
@@ -57,20 +52,17 @@
     "org.apache.cassandra.db:type=StorageService:resetLocalSchema" // this will fail when there are no other nodes which can serve schema
     );
 
-    @Test
-    public void test() throws Exception
-    {
-        // start JMX server, which the instance will register with
-        InetAddress loopback = InetAddress.getLoopbackAddress();
-        String jmxHost = loopback.getHostAddress();
-        int jmxPort = getAutomaticallyAllocatedPort(loopback);
-        JMXConnectorServer jmxServer = JMXServerUtils.createJMXServer(jmxPort, true);
-        jmxServer.start();
-        String url = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi";
+    public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
 
-        ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
+    @Test
+    public void testGetters() throws Exception
+    {
         try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())
         {
+            IInvokableInstance instance = cluster.get(1);
+
+            String jmxHost = instance.config().broadcastAddress().getAddress().getHostAddress();
+            String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, instance.config().jmxPort());
             List<Named> errors = new ArrayList<>();
             try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null))
             {
@@ -122,7 +114,7 @@
     }
 
     /**
-     * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneded JMX/Reflection logic cluttering the stacktrace
+     * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneeded JMX/Reflection logic cluttering the stacktrace
      */
     private static class Named extends RuntimeException
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
index a8ddfe0..58a3fee 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
@@ -29,7 +29,9 @@
 import java.util.stream.Collectors;
 import javax.management.InstanceAlreadyExistsException;
 import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
 import javax.management.ObjectName;
+import javax.management.QueryExp;
 
 import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
@@ -57,7 +59,6 @@
         MBEAN_REGISTRATION_CLASS.setString(MapMBeanWrapper.class.getName());
         ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
     }
-
     private static volatile Map<String, Collection<String>> SYSTEM_TABLES = null;
     private static Set<String> TABLE_METRIC_NAMES = ImmutableSet.of("WriteLatency");
 
@@ -165,7 +166,7 @@
     {
         inst.runOnInstance(() -> {
             // cast only to make sure it linked properly
-            MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+            MapMBeanWrapper mbeans = getMapMBeanWrapper();
             Assert.assertTrue("Unable to find table mbean for " + keyspace + "." + table,
                               mbeans.isRegistered(ColumnFamilyStore.getTableMBeanName(keyspace, table, false)));
             Assert.assertTrue("Unable to find column family mbean for " + keyspace + "." + table,
@@ -177,7 +178,7 @@
     {
         inst.runOnInstance(() -> {
             // cast only to make sure it linked properly
-            MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+            MapMBeanWrapper mbeans = getMapMBeanWrapper();
             Assert.assertFalse("Found table mbean for " + keyspace + "." + table,
                                mbeans.isRegistered(ColumnFamilyStore.getTableMBeanName(keyspace, table, false)));
             Assert.assertFalse("Found column family mbean for " + keyspace + "." + table,
@@ -189,7 +190,7 @@
     {
         inst.runOnInstance(() -> {
             // cast only to make sure it linked properly
-            MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+            MapMBeanWrapper mbeans = getMapMBeanWrapper();
             String mbean = getTableMetricName(keyspace, table, name);
             Assert.assertTrue("Unable to find metric " + name + " for " + keyspace + "." + table, mbeans.isRegistered(mbean));
 
@@ -203,7 +204,7 @@
     {
         inst.runOnInstance(() -> {
             // cast only to make sure it linked properly
-            MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+            MapMBeanWrapper mbeans = getMapMBeanWrapper();
             String mbean = getTableMetricName(keyspace, table, name);
             Assert.assertFalse("Found metric " + name + " for " + keyspace + "." + table, mbeans.isRegistered(mbean));
 
@@ -226,7 +227,7 @@
     {
         inst.runOnInstance(() -> {
             // cast only to make sure it linked properly
-            MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+            MapMBeanWrapper mbeans = getMapMBeanWrapper();
 
             String keyspaceMBean = getKeyspaceMetricName(keyspace, name);
             Assert.assertFalse("Found keyspace metric " + keyspaceMBean + " for " + keyspace, mbeans.isRegistered(keyspaceMBean));
@@ -243,6 +244,10 @@
         return String.format("org.apache.cassandra.metrics:type=Table,keyspace=%s,scope=%s,name=%s", keyspace, table, name);
     }
 
+    private static MapMBeanWrapper getMapMBeanWrapper()
+    {
+        return (MapMBeanWrapper) ((MBeanWrapper.DelegatingMbeanWrapper)MBeanWrapper.instance).getDelegate();
+    }
     public static final class MapMBeanWrapper implements MBeanWrapper
     {
         private final ConcurrentMap<ObjectName, Object> map = new ConcurrentHashMap<>();
@@ -268,5 +273,17 @@
             if (previous == null)
                 onException.handler.accept(new InstanceNotFoundException("MBean " + mbeanName + " was not found"));
         }
+
+        @Override
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return null;
+        }
+
+        @Override
+        public MBeanServer getMBeanServer()
+        {
+            return null;
+        }
     }
 }