Add support for JMX in the in-jvm dtest framework

patch by Doug Rohrer; reviewed by Alex Petrov, Jon Meredith, Francisco Guerrero Hernandez for CASSANDRA-18511
diff --git a/.build/build-resolver.xml b/.build/build-resolver.xml
index 698fb57..99bfe4b 100644
--- a/.build/build-resolver.xml
+++ b/.build/build-resolver.xml
@@ -51,6 +51,8 @@
         <resolver:remoterepos id="all">
             <remoterepo id="resolver-central" url="${artifact.remoteRepository.central}"/>
             <remoterepo id="resolver-apache" url="${artifact.remoteRepository.apache}"/>
+            <!-- Only needed for PR builds - remove before commit -->
+            <!-- <remoterepo id="resolver-apache-snapshot" url="https://repository.apache.org/content/repositories/snapshots" releases="false" snapshots="true" updates="always" checksums="fail" />-->
         </resolver:remoterepos>
 
         <resolver:resolve>
diff --git a/build.xml b/build.xml
index 6985c8c..8f26c5e 100644
--- a/build.xml
+++ b/build.xml
@@ -383,7 +383,7 @@
             <exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/>
           </dependency>
           <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" scope="test"/>
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.13" scope="test"/>
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.15" scope="test"/>
           <dependency groupId="org.reflections" artifactId="reflections" version="0.10.2" scope="test"/>
           <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" scope="test"/>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided">
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index 787d79a..0c0f9e4 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -137,18 +137,17 @@
 
     public GCInspector()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
         try
         {
             ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*");
-            for (ObjectName name : mbs.queryNames(gcName, null))
+            for (ObjectName name : MBeanWrapper.instance.queryNames(gcName, null))
             {
-                GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class);
+                GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(MBeanWrapper.instance.getMBeanServer(), name.getCanonicalName(), GarbageCollectorMXBean.class);
                 gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
             }
-
-            MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
+            ObjectName me = new ObjectName(MBEAN_NAME);
+            if (!MBeanWrapper.instance.isRegistered(me))
+                MBeanWrapper.instance.registerMBean(this, me);
         }
         catch (Exception e)
         {
diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
index fc36c6c..aad4f05 100644
--- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java
+++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
@@ -33,6 +33,7 @@
 import java.rmi.registry.Registry;
 import java.rmi.server.RMIClientSocketFactory;
 import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -44,6 +45,7 @@
 import javax.rmi.ssl.SslRMIServerSocketFactory;
 import javax.security.auth.Subject;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -234,7 +236,8 @@
         return env;
     }
 
-    private static void logJmxServiceUrl(InetAddress serverAddress, int port)
+    @VisibleForTesting
+    public static void logJmxServiceUrl(InetAddress serverAddress, int port)
     {
         String urlTemplate = "service:jmx:rmi://%1$s/jndi/rmi://%1$s:%2$d/jmxrmi";
         String hostName;
@@ -284,11 +287,12 @@
      * Better to use the internal API than re-invent the wheel.
      */
     @SuppressWarnings("restriction")
-    private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+    public static class JmxRegistry extends sun.rmi.registry.RegistryImpl
+    {
         private final String lookupName;
         private Remote remoteServerStub;
 
-        JmxRegistry(final int port,
+        public JmxRegistry(final int port,
                     final RMIClientSocketFactory csf,
                     RMIServerSocketFactory ssf,
                     final String lookupName) throws RemoteException {
@@ -321,5 +325,24 @@
         public void setRemoteServerStub(Remote remoteServerStub) {
             this.remoteServerStub = remoteServerStub;
         }
+
+        /**
+         * Closes the underlying JMX registry by unexporting this instance.
+         * There is no reason to do this except for in-jvm dtests where we need
+         * to stop the registry, so we can start with a clean slate for future cluster
+         * builds, and the superclass never expects to be shut down and therefore doesn't
+         * handle this edge case at all.
+         */
+        @VisibleForTesting
+        public void close() {
+            try
+            {
+                UnicastRemoteObject.unexportObject(this, true);
+            }
+            catch (NoSuchObjectException ignored)
+            {
+                // Ignore if it's already unexported
+            }
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
index edee6af..a76a2e7 100644
--- a/src/java/org/apache/cassandra/utils/MBeanWrapper.java
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -19,10 +19,15 @@
 package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.QueryExp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,62 +38,160 @@
  */
 public interface MBeanWrapper
 {
-    static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+    Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
 
-    static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ?
-                                         new NoOpMBeanWrapper() :
-                                         new PlatformMBeanWrapper();
+    MBeanWrapper instance = create();
+    String IS_DISABLED_MBEAN_REGISTRATION = "org.apache.cassandra.disable_mbean_registration";
+    String DTEST_IS_IN_JVM_DTEST = "org.apache.cassandra.dtest.is_in_jvm_dtest";
+    String MBEAN_REGISTRATION_CLASS = "org.apache.cassandra.mbean_registration_class";
+
+    static MBeanWrapper create()
+    {
+        // If we're running in the in-jvm dtest environment, always use the delegating
+        // mbean wrapper even if we start off with no-op, so it can be switched later
+        if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST))
+        {
+            return new DelegatingMbeanWrapper(getMBeanWrapper());
+        }
+
+        return getMBeanWrapper();
+    }
+
+    static MBeanWrapper getMBeanWrapper()
+    {
+        if (Boolean.getBoolean(IS_DISABLED_MBEAN_REGISTRATION))
+        {
+            return new NoOpMBeanWrapper();
+        }
+
+        String klass = System.getProperty(MBEAN_REGISTRATION_CLASS);
+        if (klass == null)
+        {
+            if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST))
+            {
+                return new NoOpMBeanWrapper();
+            }
+            else
+            {
+                return new PlatformMBeanWrapper();
+            }
+        }
+        return FBUtilities.construct(klass, "mbean");
+    }
+
+    static ObjectName create(String mbeanName, OnException onException)
+    {
+        try
+        {
+            return new ObjectName(mbeanName);
+        }
+        catch (MalformedObjectNameException e)
+        {
+            onException.handler.accept(e);
+            return null;
+        }
+    }
 
     // Passing true for graceful will log exceptions instead of rethrowing them
-    public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+    void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+
     default void registerMBean(Object obj, ObjectName mbeanName)
     {
         registerMBean(obj, mbeanName, OnException.THROW);
     }
 
-    public void registerMBean(Object obj, String mbeanName, OnException onException);
+    default void registerMBean(Object obj, String mbeanName, OnException onException)
+    {
+        ObjectName name = create(mbeanName, onException);
+        if (name == null)
+        {
+            return;
+        }
+        registerMBean(obj, name, onException);
+    }
+
     default void registerMBean(Object obj, String mbeanName)
     {
         registerMBean(obj, mbeanName, OnException.THROW);
     }
 
-    public boolean isRegistered(ObjectName mbeanName, OnException onException);
+    boolean isRegistered(ObjectName mbeanName, OnException onException);
+
     default boolean isRegistered(ObjectName mbeanName)
     {
         return isRegistered(mbeanName, OnException.THROW);
     }
 
-    public boolean isRegistered(String mbeanName, OnException onException);
+    default boolean isRegistered(String mbeanName, OnException onException)
+    {
+        ObjectName name = create(mbeanName, onException);
+        if (name == null)
+        {
+            return false;
+        }
+        return isRegistered(name, onException);
+    }
+
     default boolean isRegistered(String mbeanName)
     {
         return isRegistered(mbeanName, OnException.THROW);
     }
 
-    public void unregisterMBean(ObjectName mbeanName, OnException onException);
+    void unregisterMBean(ObjectName mbeanName, OnException onException);
+
     default void unregisterMBean(ObjectName mbeanName)
     {
         unregisterMBean(mbeanName, OnException.THROW);
     }
 
-    public void unregisterMBean(String mbeanName, OnException onException);
+    default void unregisterMBean(String mbeanName, OnException onException)
+    {
+        ObjectName name = create(mbeanName, onException);
+        if (name == null)
+        {
+            return;
+        }
+        unregisterMBean(name, onException);
+    }
+
     default void unregisterMBean(String mbeanName)
     {
         unregisterMBean(mbeanName, OnException.THROW);
     }
 
-    static class NoOpMBeanWrapper implements MBeanWrapper
+    enum OnException
+    {
+        THROW(e -> { throw new RuntimeException(e); }),
+        LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
+        IGNORE(e -> { });
+
+        private Consumer<Exception> handler;
+        OnException(Consumer<Exception> handler)
+        {
+            this.handler = handler;
+        }
+    }
+
+    Set<ObjectName> queryNames(ObjectName name, QueryExp query);
+
+    MBeanServer getMBeanServer();
+
+    class NoOpMBeanWrapper implements MBeanWrapper
     {
         public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
         public void registerMBean(Object obj, String mbeanName, OnException onException) {}
-        public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; }
-        public boolean isRegistered(String mbeanName, OnException onException) { return false; }
+        public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false;}
+        public boolean isRegistered(String mbeanName, OnException onException) { return false;}
         public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
         public void unregisterMBean(String mbeanName, OnException onException) {}
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {return Collections.emptySet(); }
+        public MBeanServer getMBeanServer() { return null; }
     }
 
-    static class PlatformMBeanWrapper implements MBeanWrapper
+    class PlatformMBeanWrapper implements MBeanWrapper
     {
         private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
         public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
         {
             try
@@ -101,18 +204,6 @@
             }
         }
 
-        public void registerMBean(Object obj, String mbeanName, OnException onException)
-        {
-            try
-            {
-                mbs.registerMBean(obj, new ObjectName(mbeanName));
-            }
-            catch (Exception e)
-            {
-                onException.handler.accept(e);
-            }
-        }
-
         public boolean isRegistered(ObjectName mbeanName, OnException onException)
         {
             try
@@ -126,11 +217,57 @@
             return false;
         }
 
-        public boolean isRegistered(String mbeanName, OnException onException)
+        public void unregisterMBean(ObjectName mbeanName, OnException onException)
         {
             try
             {
-                return mbs.isRegistered(new ObjectName(mbeanName));
+                mbs.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return mbs.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return mbs;
+        }
+
+    }
+
+    class InstanceMBeanWrapper implements MBeanWrapper
+    {
+        private MBeanServer mbs;
+        public final UUID id = UUID.randomUUID();
+
+        public InstanceMBeanWrapper(String hostname)
+        {
+            mbs = MBeanServerFactory.createMBeanServer(hostname + "-" + id);
+        }
+
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public boolean isRegistered(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                return mbs.isRegistered(mbeanName);
             }
             catch (Exception e)
             {
@@ -151,29 +288,100 @@
             }
         }
 
-        public void unregisterMBean(String mbeanName, OnException onException)
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return mbs.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return mbs;
+        }
+
+        public void close()
+        {
+            mbs.queryNames(null, null).forEach(name -> {
+                try
+                {
+                    if (!name.getCanonicalName().contains("MBeanServerDelegate"))
+                    {
+                        mbs.unregisterMBean(name);
+                    }
+                }
+                catch (Throwable e)
+                {
+                    logger.debug("Could not unregister mbean {}", name.getCanonicalName());
+                }
+            });
+            MBeanServerFactory.releaseMBeanServer(mbs);
+            mbs = null;
+        }
+    }
+
+    class DelegatingMbeanWrapper implements MBeanWrapper
+    {
+        MBeanWrapper delegate;
+
+        public DelegatingMbeanWrapper(MBeanWrapper mBeanWrapper)
+        {
+            delegate = mBeanWrapper;
+        }
+
+        public MBeanWrapper getDelegate()
+        {
+            return delegate;
+        }
+
+        public void setDelegate(MBeanWrapper wrapper)
+        {
+            delegate = wrapper;
+        }
+
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
         {
             try
             {
-                mbs.unregisterMBean(new ObjectName(mbeanName));
+                delegate.registerMBean(obj, mbeanName);
             }
             catch (Exception e)
             {
                 onException.handler.accept(e);
             }
         }
-    }
 
-    public enum OnException
-    {
-        THROW(e -> { throw new RuntimeException(e); }),
-        LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
-        IGNORE(e -> {});
-
-        private Consumer<Exception> handler;
-        OnException(Consumer<Exception> handler)
+        public boolean isRegistered(ObjectName mbeanName, OnException onException)
         {
-            this.handler = handler;
+            try
+            {
+                return delegate.isRegistered(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public void unregisterMBean(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                delegate.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+        {
+            return delegate.queryNames(name, query);
+        }
+
+        public MBeanServer getMBeanServer()
+        {
+            return delegate.getMBeanServer();
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
new file mode 100644
index 0000000..62ab88f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.rmi.server.RMIClientSocketFactory;
+import java.util.Objects;
+
+/**
+ * This class is used to override the local address the JMX client calculates when trying to connect,
+ * which can otherwise be influenced by the system property "java.rmi.server.hostname" in strange and
+ * unpredictable ways.
+ */
+public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, Serializable
+{
+    private final InetAddress localAddress;
+
+    public RMIClientSocketFactoryImpl(InetAddress localAddress)
+    {
+        this.localAddress = localAddress;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port) throws IOException
+    {
+        return new Socket(localAddress, port);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RMIClientSocketFactoryImpl that = (RMIClientSocketFactoryImpl) o;
+        return Objects.equals(localAddress, that.localAddress);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(localAddress);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 6f861d6..cffbbc4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -77,6 +77,8 @@
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 import org.reflections.Reflections;
 
+import static org.apache.cassandra.utils.MBeanWrapper.DTEST_IS_IN_JVM_DTEST;
+
 /**
  * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
  *
@@ -142,6 +144,9 @@
 
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 
+    {
+        System.setProperty(DTEST_IS_IN_JVM_DTEST, "true");
+    }
     protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
     {
         private final int generation;
@@ -390,6 +395,16 @@
         return instances.get(node - 1).coordinator();
     }
 
+    public List<I> get(int... nodes)
+    {
+        if (nodes == null || nodes.length == 0)
+            throw new IllegalArgumentException("No nodes provided");
+        List<I> list = new ArrayList<>(nodes.length);
+        for (int i : nodes)
+            list.add(get(i));
+        return list;
+    }
+
     /**
      * WARNING: we index from 1 here, for consistency with inet address!
      */
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
new file mode 100644
index 0000000..5e67eaf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.rmi.server.RMIServerSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.net.ServerSocketFactory;
+
+
+/**
+ * This class is used to keep track of RMI servers created during a cluster creation so we can
+ * later close the sockets, which would otherwise be left with a thread running waiting for
+ * connections that would never show up as the server was otherwise closed.
+ */
+class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory
+{
+    private final InetAddress bindAddress;
+    List<ServerSocket> sockets = new ArrayList<>();
+
+    public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress)
+    {
+        this.bindAddress = bindAddress;
+    }
+
+    @Override
+    public ServerSocket createServerSocket(int pPort) throws IOException
+    {
+        ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress);
+        try
+        {
+            result.setReuseAddress(true);
+        }
+        catch (SocketException e)
+        {
+            result.close();
+            throw e;
+        }
+        sockets.add(result);
+        return result;
+    }
+
+
+    public void close() throws IOException
+    {
+        for (ServerSocket socket : sockets)
+        {
+            socket.close();
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o;
+        return Objects.equals(bindAddress, that.bindAddress);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(bindAddress);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index fbef883..cca6296 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -44,6 +44,9 @@
 import javax.management.Notification;
 import javax.management.NotificationListener;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -123,13 +126,16 @@
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
+    private Logger inInstancelogger; // Defer creation until running in the instance context
     public final IInstanceConfig config;
     private volatile boolean initialized = false;
+    private IsolatedJmx isolatedJmx;
 
     // should never be invoked directly, so that it is instantiated on other class loader;
     // only visible for inheritance
@@ -503,6 +509,7 @@
     public void startup(ICluster cluster)
     {
         sync(() -> {
+            inInstancelogger = LoggerFactory.getLogger(Instance.class);
             try
             {
                 if (config.has(GOSSIP))
@@ -518,6 +525,9 @@
                 assert config.networkTopology().contains(config.broadcastAddress());
                 DistributedTestSnitch.assign(config.networkTopology());
 
+                if (config.has(JMX))
+                    startJmx();
+
                 DatabaseDescriptor.daemonInitialization();
                 FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
                 DatabaseDescriptor.createAllDirectories();
@@ -604,6 +614,20 @@
         initialized = true;
     }
 
+    private void startJmx()
+    {
+        isolatedJmx = new IsolatedJmx(this, inInstancelogger);
+        isolatedJmx.startJmx();
+    }
+
+    private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+    {
+        if (config.has(JMX))
+        {
+            isolatedJmx.stopJmx();
+        }
+    }
+
     private void mkdirs()
     {
         new File(config.getString("saved_caches_directory")).mkdirs();
@@ -758,6 +782,8 @@
                                 CommitLog.instance::shutdownBlocking
             );
 
+            error = parallelRun(error, executor, this::stopJmx);
+
             Throwables.maybeFail(error);
         }).apply(isolatedExecutor);
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index e546962..7f6af4b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -46,6 +46,8 @@
     private static final Logger logger = LoggerFactory.getLogger(InstanceConfig.class);
 
     public final int num;
+    private final int jmxPort;
+
     public int num() { return num; }
 
     private final NetworkTopology networkTopology;
@@ -72,7 +74,8 @@
                            String commitlog_directory,
                            String hints_directory,
                            String cdc_raw_directory,
-                           String initial_token)
+                           String initial_token,
+                           int jmx_port)
     {
         this.num = num;
         this.networkTopology = networkTopology;
@@ -110,6 +113,7 @@
                 // legacy parameters
                 .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
         this.featureFlags = EnumSet.noneOf(Feature.class);
+        this.jmxPort = jmx_port;
     }
 
     private InstanceConfig(InstanceConfig copy)
@@ -121,6 +125,7 @@
         this.hostId = copy.hostId;
         this.featureFlags = copy.featureFlags;
         this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
+        this.jmxPort = copy.jmxPort;
     }
 
 
@@ -161,6 +166,12 @@
         return networkTopology().localDC(broadcastAddress());
     }
 
+    @Override
+    public int jmxPort()
+    {
+        return this.jmxPort;
+    }
+
     public InstanceConfig with(Feature featureFlag)
     {
         featureFlags.add(featureFlag);
@@ -251,7 +262,8 @@
                                   String.format("%s/node%d/commitlog", root, nodeNum),
                                   String.format("%s/node%d/hints", root, nodeNum),
                                   String.format("%s/node%d/cdc", root, nodeNum),
-                                  token);
+                                  token,
+                                  7199);
     }
 
     private static String[] datadirs(int datadirCount, File root, int nodeNum)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 53c1ad5..2b4fc87 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -52,7 +52,7 @@
 {
     final ExecutorService isolatedExecutor;
     private final String name;
-    private final ClassLoader classLoader;
+    final ClassLoader classLoader;
     private final Method deserializeOnInstance;
 
     IsolatedExecutor(String name, ClassLoader classLoader)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
new file mode 100644
index 0000000..b3d0659
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import org.slf4j.Logger;
+
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.utils.JMXServerUtils;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
+import sun.rmi.transport.tcp.TCPEndpoint;
+
+import static org.apache.cassandra.distributed.api.Feature.JMX;
+import static org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION;
+
+public class IsolatedJmx
+{
+    /** Controls the JMX server threadpool keap-alive time. */
+    private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME = "sun.rmi.transport.tcp.threadKeepAliveTime";
+    /** Controls the distributed garbage collector lease time for JMX objects. */
+    private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST ="java.rmi.dgc.leaseValue";
+    private static final int RMI_KEEPALIVE_TIME = 1000;
+
+    private JMXConnectorServer jmxConnectorServer;
+    private JMXServerUtils.JmxRegistry registry;
+    private RMIJRMPServerImpl jmxRmiServer;
+    private MBeanWrapper.InstanceMBeanWrapper wrapper;
+    private RMIClientSocketFactoryImpl clientSocketFactory;
+    private CollectingRMIServerSocketFactoryImpl serverSocketFactory;
+    private Logger inInstancelogger;
+    private IInstanceConfig config;
+
+    public IsolatedJmx(Instance instance, Logger inInstancelogger) {
+        this.inInstancelogger = inInstancelogger;
+        config = instance.config();
+    }
+
+    public void startJmx()
+    {
+        try
+        {
+            // Several RMI threads hold references to in-jvm dtest objects, and are, by default, kept
+            // alive for long enough (minutes) to keep classloaders from being collected.
+            // Set these two system properties to a low value to allow cleanup to occur fast enough
+            // for GC to collect our classloaders.
+            System.setProperty(JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST, String.valueOf(RMI_KEEPALIVE_TIME));
+            System.setProperty(SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME, String.valueOf(RMI_KEEPALIVE_TIME));
+            System.setProperty(IS_DISABLED_MBEAN_REGISTRATION, "false");
+            InetAddress addr = config.broadcastAddress().getAddress();
+
+            int jmxPort = config.jmxPort();
+
+            String hostname = addr.getHostAddress();
+            wrapper = new MBeanWrapper.InstanceMBeanWrapper(hostname + ":" + jmxPort);
+            ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(wrapper);
+            Map<String, Object> env = new HashMap<>();
+
+            serverSocketFactory = new CollectingRMIServerSocketFactoryImpl(addr);
+            env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE,
+                    serverSocketFactory);
+            clientSocketFactory = new RMIClientSocketFactoryImpl(addr);
+            env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE,
+                    clientSocketFactory);
+
+            // configure the RMI registry
+            registry = new JMXServerUtils.JmxRegistry(jmxPort,
+                                                      clientSocketFactory,
+                                                      serverSocketFactory,
+                                                      "jmxrmi");
+
+            // Mark the JMX server as a permanently exported object. This allows the JVM to exit with the
+            // server running and also exempts it from the distributed GC scheduler which otherwise would
+            // potentially attempt a full GC every `sun.rmi.dgc.server.gcInterval` millis (default is 3600000ms)
+            // For more background see:
+            //   - CASSANDRA-2967
+            //   - https://www.jclarity.com/2015/01/27/rmi-system-gc-unplugged/
+            //   - https://bugs.openjdk.java.net/browse/JDK-6760712
+            env.put("jmx.remote.x.daemon", "true");
+
+            // Set the port used to create subsequent connections to exported objects over RMI. This simplifies
+            // configuration in firewalled environments, but it can't be used in conjuction with SSL sockets.
+            // See: CASSANDRA-7087
+            int rmiPort = config.jmxPort();
+
+            // We create the underlying RMIJRMPServerImpl so that we can manually bind it to the registry,
+            // rather then specifying a binding address in the JMXServiceURL and letting it be done automatically
+            // when the server is started. The reason for this is that if the registry is configured with SSL
+            // sockets, the JMXConnectorServer acts as its client during the binding which means it needs to
+            // have a truststore configured which contains the registry's certificate. Manually binding removes
+            // this problem.
+            // See CASSANDRA-12109.
+            jmxRmiServer = new RMIJRMPServerImpl(rmiPort, clientSocketFactory, serverSocketFactory,
+                                                 env);
+            JMXServiceURL serviceURL = new JMXServiceURL("rmi", hostname, rmiPort);
+            jmxConnectorServer = new RMIConnectorServer(serviceURL, env, jmxRmiServer, wrapper.getMBeanServer());
+
+            jmxConnectorServer.start();
+
+            registry.setRemoteServerStub(jmxRmiServer.toStub());
+            JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
+            waitForJmxAvailability(hostname, jmxPort, env);
+        }
+        catch (Throwable e)
+        {
+            throw new RuntimeException("Feature.JMX was enabled but could not be started.", e);
+        }
+    }
+
+
+    private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException
+    {
+        String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
+        JMXServiceURL serviceURL = new JMXServiceURL(url);
+        int attempts = 0;
+        Throwable lastThrown = null;
+        while (attempts < 20)
+        {
+            attempts++;
+            try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env))
+            {
+                inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)",
+                                      url, attempts);
+                return;
+            }
+            catch (MalformedURLException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (Throwable thrown)
+            {
+                lastThrown = thrown;
+            }
+            inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts);
+            Thread.sleep(1000);
+        }
+        throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown);
+    }
+
+    public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+    {
+        if (!config.has(JMX))
+            return;
+        // First, swap the mbean wrapper back to a NoOp wrapper
+        // This prevents later attempts to unregister mbeans from failing in Cassandra code, as we're going to
+        // unregister all of them here
+        ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(new MBeanWrapper.NoOpMBeanWrapper());
+        try
+        {
+            wrapper.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close wrapper.", e);
+        }
+        try
+        {
+            jmxConnectorServer.stop();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close jmxConnectorServer.", e);
+        }
+        try
+        {
+            registry.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close registry.", e);
+        }
+        try
+        {
+            serverSocketFactory.close();
+        }
+        catch (Throwable e)
+        {
+            inInstancelogger.warn("failed to close serverSocketFactory.", e);
+        }
+        // The TCPEndpoint class holds references to a class in the in-jvm dtest framework
+        // which transitively has a reference to the InstanceClassLoader, so we need to
+        // make sure to remove the reference to them when the instance is shutting down
+        clearMapField(TCPEndpoint.class, null, "localEndpoints");
+        Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to give Distributed GC some time to clean up
+    }
+
+    private <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName)
+    throws IllegalAccessException, NoSuchFieldException {
+        Field mapField = clazz.getDeclaredField(mapName);
+        mapField.setAccessible(true);
+        Map<K, V> map = (Map<K, V>) mapField.get(instance);
+        // Because multiple instances can be shutting down at once,
+        // synchronize on the map to avoid ConcurrentModificationException
+        synchronized (map)
+        {
+            for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); it.hasNext(); )
+            {
+                it.next();
+                it.remove();
+            }
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 1c4850a..177cbb9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -28,7 +28,10 @@
 import java.time.Instant;
 import java.util.function.Consumer;
 import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
 
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -36,14 +39,19 @@
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SigarLibrary;
 
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.hamcrest.Matchers.startsWith;
 
 /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
  * All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -142,6 +150,11 @@
 
     void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
     {
+        doTest(numClusterNodes, updater, ignored -> {});
+    }
+
+    void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater, Consumer<Cluster> actionToPerform) throws Throwable
+    {
         for (int loop = 0; loop < numTestLoops; loop++)
         {
             System.out.println(String.format("========== Starting loop %03d ========", loop));
@@ -155,6 +168,7 @@
                 cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
                 cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush()));
+                actionToPerform.accept(cluster);
                 if (dumpEveryLoop)
                 {
                     dumpResources(String.format("loop%03d", loop));
@@ -213,4 +227,50 @@
         }
         dumpResources("final-native");
     }
+
+    @Test
+    public void looperJmxTest() throws Throwable
+    {
+        doTest(1, config -> config.with(JMX), cluster -> {
+            // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+            // and then the `getHostString`, which would otherwise return the IP address,
+            // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+            for (IInvokableInstance instance:cluster.get(1, cluster.size()))
+            {
+                IInstanceConfig config = instance.config();
+                try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+                {
+                    MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+                    // instances get their default domain set to their IP address, so us it
+                    // to check that we are actually connecting to the correct instance
+                    String defaultDomain = mbsc.getDefaultDomain();
+                    Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-jmx");
+    }
+
+    @Test
+    public void looperEverythingTest() throws Throwable
+    {
+        doTest(1, config -> config.with(Feature.values()));
+        if (forceCollection)
+        {
+            System.runFinalization();
+            System.gc();
+            Thread.sleep(finalWaitMillis);
+        }
+        dumpResources("final-everything");
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
new file mode 100644
index 0000000..1c38bd1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.hamcrest.Matchers.startsWith;
+
+public class JMXFeatureTest extends TestBaseImpl
+{
+    /**
+     * Test the in-jvm dtest JMX feature.
+     * - Create a cluster with multiple JMX servers, one per instance
+     * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance
+     * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port.
+     * NOTE: In later versions of Cassandra, there is also a `testOneNetworkInterfaceProvisioning` that leverages the ability to specify
+     * ports in addition to IP/Host for binding, but this version does not support that feature. Keeping the test name the same
+     * so that it's consistent across versions.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultipleNetworkInterfacesProvisioning() throws Exception
+    {
+        int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+        Set<String> allInstances = new HashSet<>();
+        for (int i = 0; i < iterations; i++)
+        {
+            try (Cluster cluster = Cluster.build(2).withConfig(c -> c.with(Feature.values())).start())
+            {
+                Set<String> instancesContacted = new HashSet<>();
+                for (IInvokableInstance instance : cluster.get(1, 2))
+                {
+                    testInstance(instancesContacted, instance);
+                }
+                Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+                allInstances.addAll(instancesContacted);
+            }
+        }
+        Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+    }
+
+    private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException
+    {
+        IInstanceConfig config = instance.config();
+        try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+        {
+            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+            // instances get their default domain set to their IP address, so us it
+            // to check that we are actually connecting to the correct instance
+            String defaultDomain = mbsc.getDefaultDomain();
+            instancesContacted.add(defaultDomain);
+            Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
new file mode 100644
index 0000000..a3f74df
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.management.JMRuntimeException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+public class JMXGetterCheckTest extends TestBaseImpl
+{
+    private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of(
+    "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+    );
+    private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of(
+    "org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the instance, which then causes the JVM to exit
+    "org.apache.cassandra.db:type=StorageService:drain", // don't drain, it stops things which can cause other APIs to be unstable as we are in a stopped state
+    "org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop gossip this can cause other issues, so avoid
+    "org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this will fail when there are no other nodes which can serve schema
+    "org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", // this will fail because it only exists to match an old, deprecated mbean and just throws an UnsportedOperationException
+    "org.apache.cassandra.db:type=StorageService:decommission" // Don't decommission nodes! Note that in future versions of C* this is unnecessary because decommission takes an argument.
+    );
+
+    public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+
+    @Test
+    public void testGetters() throws Exception
+    {
+        try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())
+        {
+            IInvokableInstance instance = cluster.get(1);
+
+            String jmxHost = instance.config().broadcastAddress().getAddress().getHostAddress();
+            String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, instance.config().jmxPort());
+            List<Named> errors = new ArrayList<>();
+            try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null))
+            {
+                MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+                Set<ObjectName> metricNames = new TreeSet<>(mbsc.queryNames(null, null));
+                for (ObjectName name : metricNames)
+                {
+                    if (!name.getDomain().startsWith("org.apache.cassandra"))
+                        continue;
+                    MBeanInfo info = mbsc.getMBeanInfo(name);
+                    for (MBeanAttributeInfo a : info.getAttributes())
+                    {
+                        String fqn = String.format("%s:%s", name, a.getName());
+                        if (!a.isReadable() || IGNORE_ATTRIBUTES.contains(fqn))
+                            continue;
+                        try
+                        {
+                            mbsc.getAttribute(name, a.getName());
+                        }
+                        catch (JMRuntimeException e)
+                        {
+                            errors.add(new Named(String.format("Attribute %s", fqn), e.getCause()));
+                        }
+                    }
+
+                    for (MBeanOperationInfo o : info.getOperations())
+                    {
+                        String fqn = String.format("%s:%s", name, o.getName());
+                        if (o.getSignature().length != 0 || IGNORE_OPERATIONS.contains(fqn))
+                            continue;
+                        try
+                        {
+                            mbsc.invoke(name, o.getName(), new Object[0], new String[0]);
+                        }
+                        catch (JMRuntimeException e)
+                        {
+                            errors.add(new Named(String.format("Operation %s", fqn), e.getCause()));
+                        }
+                    }
+                }
+            }
+            if (!errors.isEmpty())
+            {
+                AssertionError root = new AssertionError();
+                errors.forEach(root::addSuppressed);
+                throw root;
+            }
+        }
+    }
+
+    /**
+     * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneeded JMX/Reflection logic cluttering the stacktrace
+     */
+    private static class Named extends RuntimeException
+    {
+        public Named(String msg, Throwable cause)
+        {
+            super(msg + "\nCaused by: " + cause.getClass().getCanonicalName() + ": " + cause.getMessage(), cause.getCause());
+            StackTraceElement[] stack = cause.getStackTrace();
+            List<StackTraceElement> copy = new ArrayList<>();
+            for (StackTraceElement s : stack)
+            {
+                if (!s.getClassName().startsWith("org.apache.cassandra"))
+                    break;
+                copy.add(s);
+            }
+            Collections.reverse(copy);
+            setStackTrace(copy.toArray(new StackTraceElement[0]));
+        }
+    }
+}