Add support for JMX in the in-jvm dtest framework
patch by Doug Rohrer; reviewed by Alex Petrov, Jon Meredith, Francisco Guerrero Hernandez for CASSANDRA-18511
diff --git a/.build/build-resolver.xml b/.build/build-resolver.xml
index 698fb57..99bfe4b 100644
--- a/.build/build-resolver.xml
+++ b/.build/build-resolver.xml
@@ -51,6 +51,8 @@
<resolver:remoterepos id="all">
<remoterepo id="resolver-central" url="${artifact.remoteRepository.central}"/>
<remoterepo id="resolver-apache" url="${artifact.remoteRepository.apache}"/>
+ <!-- Only needed for PR builds - remove before commit -->
+ <!-- <remoterepo id="resolver-apache-snapshot" url="https://repository.apache.org/content/repositories/snapshots" releases="false" snapshots="true" updates="always" checksums="fail" />-->
</resolver:remoterepos>
<resolver:resolve>
diff --git a/build.xml b/build.xml
index 6985c8c..8f26c5e 100644
--- a/build.xml
+++ b/build.xml
@@ -383,7 +383,7 @@
<exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/>
</dependency>
<dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" scope="test"/>
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.13" scope="test"/>
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.15" scope="test"/>
<dependency groupId="org.reflections" artifactId="reflections" version="0.10.2" scope="test"/>
<dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" scope="test"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided">
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index 787d79a..0c0f9e4 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -137,18 +137,17 @@
public GCInspector()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
try
{
ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*");
- for (ObjectName name : mbs.queryNames(gcName, null))
+ for (ObjectName name : MBeanWrapper.instance.queryNames(gcName, null))
{
- GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class);
+ GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(MBeanWrapper.instance.getMBeanServer(), name.getCanonicalName(), GarbageCollectorMXBean.class);
gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
}
-
- MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
+ ObjectName me = new ObjectName(MBEAN_NAME);
+ if (!MBeanWrapper.instance.isRegistered(me))
+ MBeanWrapper.instance.registerMBean(this, me);
}
catch (Exception e)
{
diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
index fc36c6c..aad4f05 100644
--- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java
+++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
@@ -33,6 +33,7 @@
import java.rmi.registry.Registry;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -44,6 +45,7 @@
import javax.rmi.ssl.SslRMIServerSocketFactory;
import javax.security.auth.Subject;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -234,7 +236,8 @@
return env;
}
- private static void logJmxServiceUrl(InetAddress serverAddress, int port)
+ @VisibleForTesting
+ public static void logJmxServiceUrl(InetAddress serverAddress, int port)
{
String urlTemplate = "service:jmx:rmi://%1$s/jndi/rmi://%1$s:%2$d/jmxrmi";
String hostName;
@@ -284,11 +287,12 @@
* Better to use the internal API than re-invent the wheel.
*/
@SuppressWarnings("restriction")
- private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+ public static class JmxRegistry extends sun.rmi.registry.RegistryImpl
+ {
private final String lookupName;
private Remote remoteServerStub;
- JmxRegistry(final int port,
+ public JmxRegistry(final int port,
final RMIClientSocketFactory csf,
RMIServerSocketFactory ssf,
final String lookupName) throws RemoteException {
@@ -321,5 +325,24 @@
public void setRemoteServerStub(Remote remoteServerStub) {
this.remoteServerStub = remoteServerStub;
}
+
+ /**
+ * Closes the underlying JMX registry by unexporting this instance.
+ * There is no reason to do this except for in-jvm dtests where we need
+ * to stop the registry, so we can start with a clean slate for future cluster
+ * builds, and the superclass never expects to be shut down and therefore doesn't
+ * handle this edge case at all.
+ */
+ @VisibleForTesting
+ public void close() {
+ try
+ {
+ UnicastRemoteObject.unexportObject(this, true);
+ }
+ catch (NoSuchObjectException ignored)
+ {
+ // Ignore if it's already unexported
+ }
+ }
}
}
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
index edee6af..a76a2e7 100644
--- a/src/java/org/apache/cassandra/utils/MBeanWrapper.java
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -19,10 +19,15 @@
package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.QueryExp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,62 +38,160 @@
*/
public interface MBeanWrapper
{
- static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+ Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
- static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ?
- new NoOpMBeanWrapper() :
- new PlatformMBeanWrapper();
+ MBeanWrapper instance = create();
+ String IS_DISABLED_MBEAN_REGISTRATION = "org.apache.cassandra.disable_mbean_registration";
+ String DTEST_IS_IN_JVM_DTEST = "org.apache.cassandra.dtest.is_in_jvm_dtest";
+ String MBEAN_REGISTRATION_CLASS = "org.apache.cassandra.mbean_registration_class";
+
+ static MBeanWrapper create()
+ {
+ // If we're running in the in-jvm dtest environment, always use the delegating
+ // mbean wrapper even if we start off with no-op, so it can be switched later
+ if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST))
+ {
+ return new DelegatingMbeanWrapper(getMBeanWrapper());
+ }
+
+ return getMBeanWrapper();
+ }
+
+ static MBeanWrapper getMBeanWrapper()
+ {
+ if (Boolean.getBoolean(IS_DISABLED_MBEAN_REGISTRATION))
+ {
+ return new NoOpMBeanWrapper();
+ }
+
+ String klass = System.getProperty(MBEAN_REGISTRATION_CLASS);
+ if (klass == null)
+ {
+ if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST))
+ {
+ return new NoOpMBeanWrapper();
+ }
+ else
+ {
+ return new PlatformMBeanWrapper();
+ }
+ }
+ return FBUtilities.construct(klass, "mbean");
+ }
+
+ static ObjectName create(String mbeanName, OnException onException)
+ {
+ try
+ {
+ return new ObjectName(mbeanName);
+ }
+ catch (MalformedObjectNameException e)
+ {
+ onException.handler.accept(e);
+ return null;
+ }
+ }
// Passing true for graceful will log exceptions instead of rethrowing them
- public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+ void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+
default void registerMBean(Object obj, ObjectName mbeanName)
{
registerMBean(obj, mbeanName, OnException.THROW);
}
- public void registerMBean(Object obj, String mbeanName, OnException onException);
+ default void registerMBean(Object obj, String mbeanName, OnException onException)
+ {
+ ObjectName name = create(mbeanName, onException);
+ if (name == null)
+ {
+ return;
+ }
+ registerMBean(obj, name, onException);
+ }
+
default void registerMBean(Object obj, String mbeanName)
{
registerMBean(obj, mbeanName, OnException.THROW);
}
- public boolean isRegistered(ObjectName mbeanName, OnException onException);
+ boolean isRegistered(ObjectName mbeanName, OnException onException);
+
default boolean isRegistered(ObjectName mbeanName)
{
return isRegistered(mbeanName, OnException.THROW);
}
- public boolean isRegistered(String mbeanName, OnException onException);
+ default boolean isRegistered(String mbeanName, OnException onException)
+ {
+ ObjectName name = create(mbeanName, onException);
+ if (name == null)
+ {
+ return false;
+ }
+ return isRegistered(name, onException);
+ }
+
default boolean isRegistered(String mbeanName)
{
return isRegistered(mbeanName, OnException.THROW);
}
- public void unregisterMBean(ObjectName mbeanName, OnException onException);
+ void unregisterMBean(ObjectName mbeanName, OnException onException);
+
default void unregisterMBean(ObjectName mbeanName)
{
unregisterMBean(mbeanName, OnException.THROW);
}
- public void unregisterMBean(String mbeanName, OnException onException);
+ default void unregisterMBean(String mbeanName, OnException onException)
+ {
+ ObjectName name = create(mbeanName, onException);
+ if (name == null)
+ {
+ return;
+ }
+ unregisterMBean(name, onException);
+ }
+
default void unregisterMBean(String mbeanName)
{
unregisterMBean(mbeanName, OnException.THROW);
}
- static class NoOpMBeanWrapper implements MBeanWrapper
+ enum OnException
+ {
+ THROW(e -> { throw new RuntimeException(e); }),
+ LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
+ IGNORE(e -> { });
+
+ private Consumer<Exception> handler;
+ OnException(Consumer<Exception> handler)
+ {
+ this.handler = handler;
+ }
+ }
+
+ Set<ObjectName> queryNames(ObjectName name, QueryExp query);
+
+ MBeanServer getMBeanServer();
+
+ class NoOpMBeanWrapper implements MBeanWrapper
{
public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
public void registerMBean(Object obj, String mbeanName, OnException onException) {}
- public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; }
- public boolean isRegistered(String mbeanName, OnException onException) { return false; }
+ public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false;}
+ public boolean isRegistered(String mbeanName, OnException onException) { return false;}
public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
public void unregisterMBean(String mbeanName, OnException onException) {}
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {return Collections.emptySet(); }
+ public MBeanServer getMBeanServer() { return null; }
}
- static class PlatformMBeanWrapper implements MBeanWrapper
+ class PlatformMBeanWrapper implements MBeanWrapper
{
private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+
public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
{
try
@@ -101,18 +204,6 @@
}
}
- public void registerMBean(Object obj, String mbeanName, OnException onException)
- {
- try
- {
- mbs.registerMBean(obj, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- onException.handler.accept(e);
- }
- }
-
public boolean isRegistered(ObjectName mbeanName, OnException onException)
{
try
@@ -126,11 +217,57 @@
return false;
}
- public boolean isRegistered(String mbeanName, OnException onException)
+ public void unregisterMBean(ObjectName mbeanName, OnException onException)
{
try
{
- return mbs.isRegistered(new ObjectName(mbeanName));
+ mbs.unregisterMBean(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return mbs.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return mbs;
+ }
+
+ }
+
+ class InstanceMBeanWrapper implements MBeanWrapper
+ {
+ private MBeanServer mbs;
+ public final UUID id = UUID.randomUUID();
+
+ public InstanceMBeanWrapper(String hostname)
+ {
+ mbs = MBeanServerFactory.createMBeanServer(hostname + "-" + id);
+ }
+
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.registerMBean(obj, mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public boolean isRegistered(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ return mbs.isRegistered(mbeanName);
}
catch (Exception e)
{
@@ -151,29 +288,100 @@
}
}
- public void unregisterMBean(String mbeanName, OnException onException)
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return mbs.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return mbs;
+ }
+
+ public void close()
+ {
+ mbs.queryNames(null, null).forEach(name -> {
+ try
+ {
+ if (!name.getCanonicalName().contains("MBeanServerDelegate"))
+ {
+ mbs.unregisterMBean(name);
+ }
+ }
+ catch (Throwable e)
+ {
+ logger.debug("Could not unregister mbean {}", name.getCanonicalName());
+ }
+ });
+ MBeanServerFactory.releaseMBeanServer(mbs);
+ mbs = null;
+ }
+ }
+
+ class DelegatingMbeanWrapper implements MBeanWrapper
+ {
+ MBeanWrapper delegate;
+
+ public DelegatingMbeanWrapper(MBeanWrapper mBeanWrapper)
+ {
+ delegate = mBeanWrapper;
+ }
+
+ public MBeanWrapper getDelegate()
+ {
+ return delegate;
+ }
+
+ public void setDelegate(MBeanWrapper wrapper)
+ {
+ delegate = wrapper;
+ }
+
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
{
try
{
- mbs.unregisterMBean(new ObjectName(mbeanName));
+ delegate.registerMBean(obj, mbeanName);
}
catch (Exception e)
{
onException.handler.accept(e);
}
}
- }
- public enum OnException
- {
- THROW(e -> { throw new RuntimeException(e); }),
- LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
- IGNORE(e -> {});
-
- private Consumer<Exception> handler;
- OnException(Consumer<Exception> handler)
+ public boolean isRegistered(ObjectName mbeanName, OnException onException)
{
- this.handler = handler;
+ try
+ {
+ return delegate.isRegistered(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ return false;
+ }
+
+ public void unregisterMBean(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ delegate.unregisterMBean(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return delegate.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return delegate.getMBeanServer();
}
}
}
diff --git a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
new file mode 100644
index 0000000..62ab88f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.rmi.server.RMIClientSocketFactory;
+import java.util.Objects;
+
+/**
+ * This class is used to override the local address the JMX client calculates when trying to connect,
+ * which can otherwise be influenced by the system property "java.rmi.server.hostname" in strange and
+ * unpredictable ways.
+ */
+public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, Serializable
+{
+ private final InetAddress localAddress;
+
+ public RMIClientSocketFactoryImpl(InetAddress localAddress)
+ {
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException
+ {
+ return new Socket(localAddress, port);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RMIClientSocketFactoryImpl that = (RMIClientSocketFactoryImpl) o;
+ return Objects.equals(localAddress, that.localAddress);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(localAddress);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 6f861d6..cffbbc4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -77,6 +77,8 @@
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.reflections.Reflections;
+import static org.apache.cassandra.utils.MBeanWrapper.DTEST_IS_IN_JVM_DTEST;
+
/**
* AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
*
@@ -142,6 +144,9 @@
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
+ {
+ System.setProperty(DTEST_IS_IN_JVM_DTEST, "true");
+ }
protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance
{
private final int generation;
@@ -390,6 +395,16 @@
return instances.get(node - 1).coordinator();
}
+ public List<I> get(int... nodes)
+ {
+ if (nodes == null || nodes.length == 0)
+ throw new IllegalArgumentException("No nodes provided");
+ List<I> list = new ArrayList<>(nodes.length);
+ for (int i : nodes)
+ list.add(get(i));
+ return list;
+ }
+
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
new file mode 100644
index 0000000..5e67eaf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.rmi.server.RMIServerSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import javax.net.ServerSocketFactory;
+
+
+/**
+ * This class is used to keep track of RMI servers created during a cluster creation so we can
+ * later close the sockets, which would otherwise be left with a thread running waiting for
+ * connections that would never show up as the server was otherwise closed.
+ */
+class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory
+{
+ private final InetAddress bindAddress;
+ List<ServerSocket> sockets = new ArrayList<>();
+
+ public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress)
+ {
+ this.bindAddress = bindAddress;
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int pPort) throws IOException
+ {
+ ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress);
+ try
+ {
+ result.setReuseAddress(true);
+ }
+ catch (SocketException e)
+ {
+ result.close();
+ throw e;
+ }
+ sockets.add(result);
+ return result;
+ }
+
+
+ public void close() throws IOException
+ {
+ for (ServerSocket socket : sockets)
+ {
+ socket.close();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o;
+ return Objects.equals(bindAddress, that.bindAddress);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bindAddress);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index fbef883..cca6296 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -44,6 +44,9 @@
import javax.management.Notification;
import javax.management.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -123,13 +126,16 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
+ private Logger inInstancelogger; // Defer creation until running in the instance context
public final IInstanceConfig config;
private volatile boolean initialized = false;
+ private IsolatedJmx isolatedJmx;
// should never be invoked directly, so that it is instantiated on other class loader;
// only visible for inheritance
@@ -503,6 +509,7 @@
public void startup(ICluster cluster)
{
sync(() -> {
+ inInstancelogger = LoggerFactory.getLogger(Instance.class);
try
{
if (config.has(GOSSIP))
@@ -518,6 +525,9 @@
assert config.networkTopology().contains(config.broadcastAddress());
DistributedTestSnitch.assign(config.networkTopology());
+ if (config.has(JMX))
+ startJmx();
+
DatabaseDescriptor.daemonInitialization();
FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
DatabaseDescriptor.createAllDirectories();
@@ -604,6 +614,20 @@
initialized = true;
}
+ private void startJmx()
+ {
+ isolatedJmx = new IsolatedJmx(this, inInstancelogger);
+ isolatedJmx.startJmx();
+ }
+
+ private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+ {
+ if (config.has(JMX))
+ {
+ isolatedJmx.stopJmx();
+ }
+ }
+
private void mkdirs()
{
new File(config.getString("saved_caches_directory")).mkdirs();
@@ -758,6 +782,8 @@
CommitLog.instance::shutdownBlocking
);
+ error = parallelRun(error, executor, this::stopJmx);
+
Throwables.maybeFail(error);
}).apply(isolatedExecutor);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index e546962..7f6af4b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -46,6 +46,8 @@
private static final Logger logger = LoggerFactory.getLogger(InstanceConfig.class);
public final int num;
+ private final int jmxPort;
+
public int num() { return num; }
private final NetworkTopology networkTopology;
@@ -72,7 +74,8 @@
String commitlog_directory,
String hints_directory,
String cdc_raw_directory,
- String initial_token)
+ String initial_token,
+ int jmx_port)
{
this.num = num;
this.networkTopology = networkTopology;
@@ -110,6 +113,7 @@
// legacy parameters
.forceSet("commitlog_sync_batch_window_in_ms", 1.0);
this.featureFlags = EnumSet.noneOf(Feature.class);
+ this.jmxPort = jmx_port;
}
private InstanceConfig(InstanceConfig copy)
@@ -121,6 +125,7 @@
this.hostId = copy.hostId;
this.featureFlags = copy.featureFlags;
this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
+ this.jmxPort = copy.jmxPort;
}
@@ -161,6 +166,12 @@
return networkTopology().localDC(broadcastAddress());
}
+ @Override
+ public int jmxPort()
+ {
+ return this.jmxPort;
+ }
+
public InstanceConfig with(Feature featureFlag)
{
featureFlags.add(featureFlag);
@@ -251,7 +262,8 @@
String.format("%s/node%d/commitlog", root, nodeNum),
String.format("%s/node%d/hints", root, nodeNum),
String.format("%s/node%d/cdc", root, nodeNum),
- token);
+ token,
+ 7199);
}
private static String[] datadirs(int datadirCount, File root, int nodeNum)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 53c1ad5..2b4fc87 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -52,7 +52,7 @@
{
final ExecutorService isolatedExecutor;
private final String name;
- private final ClassLoader classLoader;
+ final ClassLoader classLoader;
private final Method deserializeOnInstance;
IsolatedExecutor(String name, ClassLoader classLoader)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
new file mode 100644
index 0000000..b3d0659
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import org.slf4j.Logger;
+
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.utils.JMXServerUtils;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
+import sun.rmi.transport.tcp.TCPEndpoint;
+
+import static org.apache.cassandra.distributed.api.Feature.JMX;
+import static org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION;
+
+public class IsolatedJmx
+{
+ /** Controls the JMX server threadpool keap-alive time. */
+ private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME = "sun.rmi.transport.tcp.threadKeepAliveTime";
+ /** Controls the distributed garbage collector lease time for JMX objects. */
+ private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST ="java.rmi.dgc.leaseValue";
+ private static final int RMI_KEEPALIVE_TIME = 1000;
+
+ private JMXConnectorServer jmxConnectorServer;
+ private JMXServerUtils.JmxRegistry registry;
+ private RMIJRMPServerImpl jmxRmiServer;
+ private MBeanWrapper.InstanceMBeanWrapper wrapper;
+ private RMIClientSocketFactoryImpl clientSocketFactory;
+ private CollectingRMIServerSocketFactoryImpl serverSocketFactory;
+ private Logger inInstancelogger;
+ private IInstanceConfig config;
+
+ public IsolatedJmx(Instance instance, Logger inInstancelogger) {
+ this.inInstancelogger = inInstancelogger;
+ config = instance.config();
+ }
+
+ public void startJmx()
+ {
+ try
+ {
+ // Several RMI threads hold references to in-jvm dtest objects, and are, by default, kept
+ // alive for long enough (minutes) to keep classloaders from being collected.
+ // Set these two system properties to a low value to allow cleanup to occur fast enough
+ // for GC to collect our classloaders.
+ System.setProperty(JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST, String.valueOf(RMI_KEEPALIVE_TIME));
+ System.setProperty(SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME, String.valueOf(RMI_KEEPALIVE_TIME));
+ System.setProperty(IS_DISABLED_MBEAN_REGISTRATION, "false");
+ InetAddress addr = config.broadcastAddress().getAddress();
+
+ int jmxPort = config.jmxPort();
+
+ String hostname = addr.getHostAddress();
+ wrapper = new MBeanWrapper.InstanceMBeanWrapper(hostname + ":" + jmxPort);
+ ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(wrapper);
+ Map<String, Object> env = new HashMap<>();
+
+ serverSocketFactory = new CollectingRMIServerSocketFactoryImpl(addr);
+ env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE,
+ serverSocketFactory);
+ clientSocketFactory = new RMIClientSocketFactoryImpl(addr);
+ env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE,
+ clientSocketFactory);
+
+ // configure the RMI registry
+ registry = new JMXServerUtils.JmxRegistry(jmxPort,
+ clientSocketFactory,
+ serverSocketFactory,
+ "jmxrmi");
+
+ // Mark the JMX server as a permanently exported object. This allows the JVM to exit with the
+ // server running and also exempts it from the distributed GC scheduler which otherwise would
+ // potentially attempt a full GC every `sun.rmi.dgc.server.gcInterval` millis (default is 3600000ms)
+ // For more background see:
+ // - CASSANDRA-2967
+ // - https://www.jclarity.com/2015/01/27/rmi-system-gc-unplugged/
+ // - https://bugs.openjdk.java.net/browse/JDK-6760712
+ env.put("jmx.remote.x.daemon", "true");
+
+ // Set the port used to create subsequent connections to exported objects over RMI. This simplifies
+ // configuration in firewalled environments, but it can't be used in conjuction with SSL sockets.
+ // See: CASSANDRA-7087
+ int rmiPort = config.jmxPort();
+
+ // We create the underlying RMIJRMPServerImpl so that we can manually bind it to the registry,
+ // rather then specifying a binding address in the JMXServiceURL and letting it be done automatically
+ // when the server is started. The reason for this is that if the registry is configured with SSL
+ // sockets, the JMXConnectorServer acts as its client during the binding which means it needs to
+ // have a truststore configured which contains the registry's certificate. Manually binding removes
+ // this problem.
+ // See CASSANDRA-12109.
+ jmxRmiServer = new RMIJRMPServerImpl(rmiPort, clientSocketFactory, serverSocketFactory,
+ env);
+ JMXServiceURL serviceURL = new JMXServiceURL("rmi", hostname, rmiPort);
+ jmxConnectorServer = new RMIConnectorServer(serviceURL, env, jmxRmiServer, wrapper.getMBeanServer());
+
+ jmxConnectorServer.start();
+
+ registry.setRemoteServerStub(jmxRmiServer.toStub());
+ JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
+ waitForJmxAvailability(hostname, jmxPort, env);
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException("Feature.JMX was enabled but could not be started.", e);
+ }
+ }
+
+
+ private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException
+ {
+ String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
+ JMXServiceURL serviceURL = new JMXServiceURL(url);
+ int attempts = 0;
+ Throwable lastThrown = null;
+ while (attempts < 20)
+ {
+ attempts++;
+ try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env))
+ {
+ inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)",
+ url, attempts);
+ return;
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (Throwable thrown)
+ {
+ lastThrown = thrown;
+ }
+ inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts);
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown);
+ }
+
+ public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+ {
+ if (!config.has(JMX))
+ return;
+ // First, swap the mbean wrapper back to a NoOp wrapper
+ // This prevents later attempts to unregister mbeans from failing in Cassandra code, as we're going to
+ // unregister all of them here
+ ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(new MBeanWrapper.NoOpMBeanWrapper());
+ try
+ {
+ wrapper.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close wrapper.", e);
+ }
+ try
+ {
+ jmxConnectorServer.stop();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close jmxConnectorServer.", e);
+ }
+ try
+ {
+ registry.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close registry.", e);
+ }
+ try
+ {
+ serverSocketFactory.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close serverSocketFactory.", e);
+ }
+ // The TCPEndpoint class holds references to a class in the in-jvm dtest framework
+ // which transitively has a reference to the InstanceClassLoader, so we need to
+ // make sure to remove the reference to them when the instance is shutting down
+ clearMapField(TCPEndpoint.class, null, "localEndpoints");
+ Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to give Distributed GC some time to clean up
+ }
+
+ private <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName)
+ throws IllegalAccessException, NoSuchFieldException {
+ Field mapField = clazz.getDeclaredField(mapName);
+ mapField.setAccessible(true);
+ Map<K, V> map = (Map<K, V>) mapField.get(instance);
+ // Because multiple instances can be shutting down at once,
+ // synchronize on the map to avoid ConcurrentModificationException
+ synchronized (map)
+ {
+ for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); it.hasNext(); )
+ {
+ it.next();
+ it.remove();
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index 1c4850a..177cbb9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -28,7 +28,10 @@
import java.time.Instant;
import java.util.function.Consumer;
import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -36,14 +39,19 @@
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SigarLibrary;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.hamcrest.Matchers.startsWith;
/* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
* All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -142,6 +150,11 @@
void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
{
+ doTest(numClusterNodes, updater, ignored -> {});
+ }
+
+ void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater, Consumer<Cluster> actionToPerform) throws Throwable
+ {
for (int loop = 0; loop < numTestLoops; loop++)
{
System.out.println(String.format("========== Starting loop %03d ========", loop));
@@ -155,6 +168,7 @@
cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush()));
+ actionToPerform.accept(cluster);
if (dumpEveryLoop)
{
dumpResources(String.format("loop%03d", loop));
@@ -213,4 +227,50 @@
}
dumpResources("final-native");
}
+
+ @Test
+ public void looperJmxTest() throws Throwable
+ {
+ doTest(1, config -> config.with(JMX), cluster -> {
+ // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+ // and then the `getHostString`, which would otherwise return the IP address,
+ // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+ for (IInvokableInstance instance:cluster.get(1, cluster.size()))
+ {
+ IInstanceConfig config = instance.config();
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ // instances get their default domain set to their IP address, so us it
+ // to check that we are actually connecting to the correct instance
+ String defaultDomain = mbsc.getDefaultDomain();
+ Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ if (forceCollection)
+ {
+ System.runFinalization();
+ System.gc();
+ Thread.sleep(finalWaitMillis);
+ }
+ dumpResources("final-jmx");
+ }
+
+ @Test
+ public void looperEverythingTest() throws Throwable
+ {
+ doTest(1, config -> config.with(Feature.values()));
+ if (forceCollection)
+ {
+ System.runFinalization();
+ System.gc();
+ Thread.sleep(finalWaitMillis);
+ }
+ dumpResources("final-everything");
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
new file mode 100644
index 0000000..1c38bd1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.hamcrest.Matchers.startsWith;
+
+public class JMXFeatureTest extends TestBaseImpl
+{
+ /**
+ * Test the in-jvm dtest JMX feature.
+ * - Create a cluster with multiple JMX servers, one per instance
+ * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance
+ * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port.
+ * NOTE: In later versions of Cassandra, there is also a `testOneNetworkInterfaceProvisioning` that leverages the ability to specify
+ * ports in addition to IP/Host for binding, but this version does not support that feature. Keeping the test name the same
+ * so that it's consistent across versions.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleNetworkInterfacesProvisioning() throws Exception
+ {
+ int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+ Set<String> allInstances = new HashSet<>();
+ for (int i = 0; i < iterations; i++)
+ {
+ try (Cluster cluster = Cluster.build(2).withConfig(c -> c.with(Feature.values())).start())
+ {
+ Set<String> instancesContacted = new HashSet<>();
+ for (IInvokableInstance instance : cluster.get(1, 2))
+ {
+ testInstance(instancesContacted, instance);
+ }
+ Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+ allInstances.addAll(instancesContacted);
+ }
+ }
+ Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+ }
+
+ private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException
+ {
+ IInstanceConfig config = instance.config();
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ // instances get their default domain set to their IP address, so us it
+ // to check that we are actually connecting to the correct instance
+ String defaultDomain = mbsc.getDefaultDomain();
+ instancesContacted.add(defaultDomain);
+ Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
new file mode 100644
index 0000000..a3f74df
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import javax.management.JMRuntimeException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+public class JMXGetterCheckTest extends TestBaseImpl
+{
+ private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of(
+ "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
+ );
+ private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of(
+ "org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the instance, which then causes the JVM to exit
+ "org.apache.cassandra.db:type=StorageService:drain", // don't drain, it stops things which can cause other APIs to be unstable as we are in a stopped state
+ "org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop gossip this can cause other issues, so avoid
+ "org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this will fail when there are no other nodes which can serve schema
+ "org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", // this will fail because it only exists to match an old, deprecated mbean and just throws an UnsportedOperationException
+ "org.apache.cassandra.db:type=StorageService:decommission" // Don't decommission nodes! Note that in future versions of C* this is unnecessary because decommission takes an argument.
+ );
+
+ public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+
+ @Test
+ public void testGetters() throws Exception
+ {
+ try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())
+ {
+ IInvokableInstance instance = cluster.get(1);
+
+ String jmxHost = instance.config().broadcastAddress().getAddress().getHostAddress();
+ String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, instance.config().jmxPort());
+ List<Named> errors = new ArrayList<>();
+ try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ Set<ObjectName> metricNames = new TreeSet<>(mbsc.queryNames(null, null));
+ for (ObjectName name : metricNames)
+ {
+ if (!name.getDomain().startsWith("org.apache.cassandra"))
+ continue;
+ MBeanInfo info = mbsc.getMBeanInfo(name);
+ for (MBeanAttributeInfo a : info.getAttributes())
+ {
+ String fqn = String.format("%s:%s", name, a.getName());
+ if (!a.isReadable() || IGNORE_ATTRIBUTES.contains(fqn))
+ continue;
+ try
+ {
+ mbsc.getAttribute(name, a.getName());
+ }
+ catch (JMRuntimeException e)
+ {
+ errors.add(new Named(String.format("Attribute %s", fqn), e.getCause()));
+ }
+ }
+
+ for (MBeanOperationInfo o : info.getOperations())
+ {
+ String fqn = String.format("%s:%s", name, o.getName());
+ if (o.getSignature().length != 0 || IGNORE_OPERATIONS.contains(fqn))
+ continue;
+ try
+ {
+ mbsc.invoke(name, o.getName(), new Object[0], new String[0]);
+ }
+ catch (JMRuntimeException e)
+ {
+ errors.add(new Named(String.format("Operation %s", fqn), e.getCause()));
+ }
+ }
+ }
+ }
+ if (!errors.isEmpty())
+ {
+ AssertionError root = new AssertionError();
+ errors.forEach(root::addSuppressed);
+ throw root;
+ }
+ }
+ }
+
+ /**
+ * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneeded JMX/Reflection logic cluttering the stacktrace
+ */
+ private static class Named extends RuntimeException
+ {
+ public Named(String msg, Throwable cause)
+ {
+ super(msg + "\nCaused by: " + cause.getClass().getCanonicalName() + ": " + cause.getMessage(), cause.getCause());
+ StackTraceElement[] stack = cause.getStackTrace();
+ List<StackTraceElement> copy = new ArrayList<>();
+ for (StackTraceElement s : stack)
+ {
+ if (!s.getClassName().startsWith("org.apache.cassandra"))
+ break;
+ copy.add(s);
+ }
+ Collections.reverse(copy);
+ setStackTrace(copy.toArray(new StackTraceElement[0]));
+ }
+ }
+}