Merge branch 'cassandra-4.1' into trunk
diff --git a/.build/parent-pom-template.xml b/.build/parent-pom-template.xml
index 7200bb4..e4b63bf 100644
--- a/.build/parent-pom-template.xml
+++ b/.build/parent-pom-template.xml
@@ -489,7 +489,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>dtest-api</artifactId>
- <version>0.0.13</version>
+ <version>0.0.15</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/build.xml b/build.xml
index a3ef1fc..3e0a8e9 100644
--- a/build.xml
+++ b/build.xml
@@ -207,6 +207,7 @@
<string>--add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED</string>
<string>--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED</string>
<string>--add-exports java.rmi/sun.rmi.server=ALL-UNNAMED</string>
+ <string>--add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
<string>--add-exports java.sql/java.sql=ALL-UNNAMED</string>
<string>--add-opens java.base/java.lang.module=ALL-UNNAMED</string>
@@ -237,6 +238,7 @@
<string>--add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED</string>
<string>--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED</string>
<string>--add-exports java.rmi/sun.rmi.server=ALL-UNNAMED</string>
+ <string>--add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
<string>--add-exports java.sql/java.sql=ALL-UNNAMED</string>
<string>--add-exports java.base/java.lang.ref=ALL-UNNAMED</string>
<string>--add-exports jdk.unsupported/sun.misc=ALL-UNNAMED</string>
@@ -263,6 +265,8 @@
<string>--add-opens java.base/java.math=ALL-UNNAMED</string>
<string>--add-opens java.base/java.lang.reflect=ALL-UNNAMED</string>
<string>--add-opens java.base/java.net=ALL-UNNAMED</string>
+
+ <string>--add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED</string>
</resources>
<pathconvert property="_jvm_args_concat2" refid="_jvm17_arg_items" pathsep=" "/>
<condition property="java17-jvmargs" value="${_jvm_args_concat2}" else="">
@@ -307,7 +311,8 @@
</condition>
<!-- needed to compile org.apache.cassandra.utils.JMXServerUtils -->
- <condition property="jdk11plus-javac-exports" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED" else="">
+ <!-- needed to compile org.apache.cassandra.distributed.impl.Instance-->
+ <condition property="jdk11plus-javac-exports" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" else="">
<not>
<equals arg1="${ant.java.version}" arg2="1.8"/>
</not>
@@ -1035,6 +1040,7 @@
<src path="${test.simulator-asm.src}"/>
<src path="${test.simulator-bootstrap.src}"/>
<src path="${test.simulator-test.src}"/>
+ <compilerarg line="${jdk11plus-javac-exports}"/>
</javac>
<!-- Non-java resources needed by the test suite -->
@@ -1811,6 +1817,24 @@
<echo>"IDE configuration updated for use with JDK11"</echo>
</target>
+ <target name="_maybe_update_idea_to_java17" depends="init" if="java.version.17">
+ <replace file="${eclipse.project.name}.iml" token="JDK_1_8" value="JDK_17"/>
+ <replace file=".idea/misc.xml" token="JDK_1_8" value="JDK_17"/>
+ <replace file=".idea/misc.xml" token="1.8" value="17"/>
+ <replaceregexp file=".idea/workspace.xml"
+ match="name="VM_PARAMETERS" value="(.*)"
+ replace="name="VM_PARAMETERS" value="\1 ${java17-jvmargs}"
+ byline="true"/>
+
+ <echo file=".idea/compiler.xml"><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="JavacSettings">
+ <option name="ADDITIONAL_OPTIONS_STRING" value="--add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED" />
+ </component>
+</project>]]></echo>
+ <echo>"IDE configuration updated for use with JDK17"</echo>
+ </target>
+
<!-- Generate IDEA project description files -->
<target name="generate-idea-files" depends="init,resolver-dist-lib,gen-cql3-grammar,generate-jflex-java,createVersionPropFile" description="Generate IDEA files">
<mkdir dir=".idea"/>
@@ -1831,6 +1855,7 @@
</component>
</project>]]></echo>
<antcall target="_maybe_update_idea_to_java11"/>
+ <antcall target="_maybe_update_idea_to_java17"/>
</target>
<!-- Generate Eclipse project description files -->
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 78e1992..4daa825 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -200,6 +200,8 @@
DRAIN_EXECUTOR_TIMEOUT_MS("cassandra.drain_executor_timeout_ms", convertToString(TimeUnit.MINUTES.toMillis(5))),
DROP_OVERSIZED_READ_REPAIR_MUTATIONS("cassandra.drop_oversized_readrepair_mutations"),
DTEST_API_LOG_TOPOLOGY("cassandra.dtest.api.log.topology"),
+ /** This property indicates if the code is running under the in-jvm dtest framework */
+ DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
/**
* Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
@@ -263,6 +265,11 @@
*/
JAVA_LIBRARY_PATH("java.library.path"),
/**
+ * Controls the distributed garbage collector lease time for JMX objects.
+ * Should only be set by in-jvm dtests.
+ */
+ JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST("java.rmi.dgc.leaseValue"),
+ /**
* The value of this property represents the host name string
* that should be associated with remote stubs for locally created remote objects,
* in order to allow clients to invoke methods on the remote object.
@@ -421,6 +428,11 @@
/** Platform word size sun.arch.data.model. Examples: "32", "64", "unknown"*/
SUN_ARCH_DATA_MODEL("sun.arch.data.model"),
SUN_JAVA_COMMAND("sun.java.command", ""),
+ /**
+ * Controls the JMX server threadpool keap-alive time.
+ * Should only be set by in-jvm dtests.
+ */
+ SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME("sun.rmi.transport.tcp.threadKeepAliveTime"),
SUN_STDERR_ENCODING("sun.stderr.encoding"),
SUN_STDOUT_ENCODING("sun.stdout.encoding"),
SUPERUSER_SETUP_DELAY_MS("cassandra.superuser_setup_delay_ms", "10000"),
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index c290d9e..8f92215 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -148,18 +148,16 @@
public GCInspector()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
try
{
ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*");
- for (ObjectName name : mbs.queryNames(gcName, null))
+ for (ObjectName name : MBeanWrapper.instance.queryNames(gcName, null))
{
- GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class);
+ GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(MBeanWrapper.instance.getMBeanServer(), name.getCanonicalName(), GarbageCollectorMXBean.class);
gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
}
ObjectName me = new ObjectName(MBEAN_NAME);
- if (!mbs.isRegistered(me))
+ if (!MBeanWrapper.instance.isRegistered(me))
MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
}
catch (MalformedObjectNameException | IOException e)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1d4a0b1..5e66718 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -987,7 +987,8 @@
completeInitialization();
}
- private void completeInitialization()
+ @VisibleForTesting
+ public void completeInitialization()
{
if (!initialized)
registerMBeans();
diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
index ba9c704..45caf6e 100644
--- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java
+++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java
@@ -29,12 +29,14 @@
import java.net.InetAddress;
import java.rmi.AccessException;
import java.rmi.AlreadyBoundException;
+import java.rmi.NoSuchObjectException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.Registry;
import java.rmi.server.RMIClientSocketFactory;
import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -257,7 +259,8 @@
return env;
}
- private static void logJmxServiceUrl(InetAddress serverAddress, int port)
+ @VisibleForTesting
+ public static void logJmxServiceUrl(InetAddress serverAddress, int port)
{
String urlTemplate = "service:jmx:rmi://%1$s/jndi/rmi://%1$s:%2$d/jmxrmi";
String hostName;
@@ -330,11 +333,11 @@
* Better to use the internal API than re-invent the wheel.
*/
@SuppressWarnings("restriction")
- private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
+ public static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
private final String lookupName;
private Remote remoteServerStub;
- JmxRegistry(final int port,
+ public JmxRegistry(final int port,
final RMIClientSocketFactory csf,
RMIServerSocketFactory ssf,
final String lookupName) throws RemoteException {
@@ -367,5 +370,24 @@
public void setRemoteServerStub(Remote remoteServerStub) {
this.remoteServerStub = remoteServerStub;
}
+
+ /**
+ * Closes the underlying JMX registry by unexporting this instance.
+ * There is no reason to do this except for in-jvm dtests where we need
+ * to stop the registry, so we can start with a clean slate for future cluster
+ * builds, and the superclass never expects to be shut down and therefore doesn't
+ * handle this edge case at all.
+ */
+ @VisibleForTesting
+ public void close() {
+ try
+ {
+ UnicastRemoteObject.unexportObject(this, true);
+ }
+ catch (NoSuchObjectException ignored)
+ {
+ // Ignore if it's already unexported
+ }
+ }
}
}
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
index f003e1a..0ef342d 100644
--- a/src/java/org/apache/cassandra/utils/MBeanWrapper.java
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -19,15 +19,21 @@
package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.QueryExp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
+import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST;
import static org.apache.cassandra.config.CassandraRelevantProperties.MBEAN_REGISTRATION_CLASS;
/**
@@ -36,23 +42,46 @@
*/
public interface MBeanWrapper
{
- static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+ Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
- static final MBeanWrapper instance = create();
+ MBeanWrapper instance = create();
static MBeanWrapper create()
{
+ // If we're running in the in-jvm dtest environment, always use the delegating
+ // mbean wrapper even if we start off with no-op, so it can be switched later
+ if (DTEST_IS_IN_JVM_DTEST.getBoolean())
+ {
+ return new DelegatingMbeanWrapper(getMBeanWrapper());
+ }
+
+ return getMBeanWrapper();
+ }
+
+ static MBeanWrapper getMBeanWrapper()
+ {
if (ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.getBoolean())
+ {
return new NoOpMBeanWrapper();
+ }
String klass = MBEAN_REGISTRATION_CLASS.getString();
if (klass == null)
- return new PlatformMBeanWrapper();
+ {
+ if (DTEST_IS_IN_JVM_DTEST.getBoolean())
+ {
+ return new NoOpMBeanWrapper();
+ }
+ else
+ {
+ return new PlatformMBeanWrapper();
+ }
+ }
return FBUtilities.construct(klass, "mbean");
}
// Passing true for graceful will log exceptions instead of rethrowing them
- public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+ void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
default void registerMBean(Object obj, ObjectName mbeanName)
{
registerMBean(obj, mbeanName, OnException.THROW);
@@ -62,7 +91,9 @@
{
ObjectName name = create(mbeanName, onException);
if (name == null)
+ {
return;
+ }
registerMBean(obj, name, onException);
}
default void registerMBean(Object obj, String mbeanName)
@@ -70,7 +101,7 @@
registerMBean(obj, mbeanName, OnException.THROW);
}
- public boolean isRegistered(ObjectName mbeanName, OnException onException);
+ boolean isRegistered(ObjectName mbeanName, OnException onException);
default boolean isRegistered(ObjectName mbeanName)
{
return isRegistered(mbeanName, OnException.THROW);
@@ -80,7 +111,9 @@
{
ObjectName name = create(mbeanName, onException);
if (name == null)
+ {
return false;
+ }
return isRegistered(name, onException);
}
default boolean isRegistered(String mbeanName)
@@ -88,7 +121,7 @@
return isRegistered(mbeanName, OnException.THROW);
}
- public void unregisterMBean(ObjectName mbeanName, OnException onException);
+ void unregisterMBean(ObjectName mbeanName, OnException onException);
default void unregisterMBean(ObjectName mbeanName)
{
unregisterMBean(mbeanName, OnException.THROW);
@@ -98,7 +131,9 @@
{
ObjectName name = create(mbeanName, onException);
if (name == null)
+ {
return;
+ }
unregisterMBean(name, onException);
}
default void unregisterMBean(String mbeanName)
@@ -119,7 +154,11 @@
}
}
- static class NoOpMBeanWrapper implements MBeanWrapper
+ Set<ObjectName> queryNames(ObjectName name, QueryExp query);
+
+ MBeanServer getMBeanServer();
+
+ class NoOpMBeanWrapper implements MBeanWrapper
{
public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
public void registerMBean(Object obj, String mbeanName, OnException onException) {}
@@ -127,9 +166,11 @@
public boolean isRegistered(String mbeanName, OnException onException) { return false; }
public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
public void unregisterMBean(String mbeanName, OnException onException) {}
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {return Collections.emptySet(); }
+ public MBeanServer getMBeanServer() { return null; }
}
- static class PlatformMBeanWrapper implements MBeanWrapper
+ class PlatformMBeanWrapper implements MBeanWrapper
{
private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
@@ -168,9 +209,158 @@
onException.handler.accept(e);
}
}
+
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return mbs.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return mbs;
+ }
}
- public enum OnException
+ class InstanceMBeanWrapper implements MBeanWrapper
+ {
+ private MBeanServer mbs;
+ public final UUID id = UUID.randomUUID();
+
+ public InstanceMBeanWrapper(String hostname)
+ {
+ mbs = MBeanServerFactory.createMBeanServer(hostname + "-" + id);
+ }
+
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.registerMBean(obj, mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public boolean isRegistered(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ return mbs.isRegistered(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ return false;
+ }
+
+ public void unregisterMBean(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.unregisterMBean(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return mbs.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return mbs;
+ }
+
+ public void close() {
+ mbs.queryNames(null, null).forEach(name -> {
+ try {
+ if (!name.getCanonicalName().contains("MBeanServerDelegate"))
+ {
+ mbs.unregisterMBean(name);
+ }
+ } catch (Throwable e) {
+ logger.debug("Could not unregister mbean {}", name.getCanonicalName());
+ }
+ });
+ MBeanServerFactory.releaseMBeanServer(mbs);
+ mbs = null;
+ }
+ }
+
+ class DelegatingMbeanWrapper implements MBeanWrapper
+ {
+ MBeanWrapper delegate;
+
+ public DelegatingMbeanWrapper(MBeanWrapper mBeanWrapper)
+ {
+ delegate = mBeanWrapper;
+ }
+
+ public void setDelegate(MBeanWrapper wrapper) {
+ delegate = wrapper;
+ }
+
+ public MBeanWrapper getDelegate()
+ {
+ return delegate;
+ }
+
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ delegate.registerMBean(obj, mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public boolean isRegistered(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ return delegate.isRegistered(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ return false;
+ }
+
+ public void unregisterMBean(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ delegate.unregisterMBean(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return delegate.queryNames(name, query);
+ }
+
+ public MBeanServer getMBeanServer()
+ {
+ return delegate.getMBeanServer();
+ }
+ }
+
+ enum OnException
{
THROW(e -> { throw new RuntimeException(e); }),
LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
diff --git a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
new file mode 100644
index 0000000..62ab88f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.rmi.server.RMIClientSocketFactory;
+import java.util.Objects;
+
+/**
+ * This class is used to override the local address the JMX client calculates when trying to connect,
+ * which can otherwise be influenced by the system property "java.rmi.server.hostname" in strange and
+ * unpredictable ways.
+ */
+public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, Serializable
+{
+ private final InetAddress localAddress;
+
+ public RMIClientSocketFactoryImpl(InetAddress localAddress)
+ {
+ this.localAddress = localAddress;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException
+ {
+ return new Socket(localAddress, port);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RMIClientSocketFactoryImpl that = (RMIClientSocketFactoryImpl) o;
+ return Objects.equals(localAddress, that.localAddress);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(localAddress);
+ }
+}
diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
index 801256d..bf6ad2c 100644
--- a/src/java/org/apache/cassandra/utils/ReflectionUtils.java
+++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -30,10 +30,15 @@
public static Field getModifiersField() throws NoSuchFieldException
{
+ return getField(Field.class, "modifiers");
+ }
+
+ public static Field getField(Class<?> clazz, String fieldName) throws NoSuchFieldException
+ {
// below code works before Java 12
try
{
- return Field.class.getDeclaredField("modifiers");
+ return clazz.getDeclaredField(fieldName);
}
catch (NoSuchFieldException e)
{
@@ -42,10 +47,10 @@
{
Method getDeclaredFields0 = Class.class.getDeclaredMethod("getDeclaredFields0", boolean.class);
getDeclaredFields0.setAccessible(true);
- Field[] fields = (Field[]) getDeclaredFields0.invoke(Field.class, false);
+ Field[] fields = (Field[]) getDeclaredFields0.invoke(clazz, false);
for (Field field : fields)
{
- if ("modifiers".equals(field.getName()))
+ if (fieldName.equals(field.getName()))
{
return field;
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 78b14d2..79120a2 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -186,6 +186,8 @@
private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
{
+ // Indicate that we are running in the in-jvm dtest environment
+ CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST.setBoolean(true);
// those properties may be set for unit-test optimizations; those should not be used when running dtests
CassandraRelevantProperties.TEST_FLUSH_LOCAL_SCHEMA_CHANGES.reset();
CassandraRelevantProperties.NON_GRACEFUL_SHUTDOWN.reset();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
new file mode 100644
index 0000000..0fc7425
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.rmi.server.RMIServerSocketFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import javax.net.ServerSocketFactory;
+
+
+/**
+ * This class is used to keep track of RMI servers created during a cluster creation so we can
+ * later close the sockets, which would otherwise be left with a thread running waiting for
+ * connections that would never show up as the server was otherwise closed.
+ */
+class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory
+{
+ private final InetAddress bindAddress;
+ List<ServerSocket> sockets = new ArrayList<>();
+
+ public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress)
+ {
+ this.bindAddress = bindAddress;
+ }
+
+ @Override
+ public ServerSocket createServerSocket(int pPort) throws IOException
+ {
+ ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress);
+ try
+ {
+ result.setReuseAddress(true);
+ }
+ catch (SocketException e)
+ {
+ result.close();
+ throw e;
+ }
+ sockets.add(result);
+ return result;
+ }
+
+
+ public void close() throws IOException
+ {
+ for (ServerSocket socket : sockets)
+ {
+ socket.close();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o;
+ return Objects.equals(bindAddress, that.bindAddress);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bindAddress);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index 1ec844e..99ef272 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -56,6 +56,11 @@
{
return 9041 + nodeNum;
}
+
+ public int jmxPort(int nodeNum)
+ {
+ return 7199 + nodeNum;
+ }
};
}
},
@@ -89,6 +94,11 @@
{
return 9042;
}
+
+ public int jmxPort(int nodeNum)
+ {
+ return 7199;
+ }
};
}
};
@@ -100,4 +110,5 @@
abstract String ipAddress(int nodeNum);
abstract int storagePort(int nodeNum);
abstract int nativeTransportPort(int nodeNum);
+ abstract int jmxPort(int nodeNum);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 435de65..707da49 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -160,6 +160,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_CASSANDRA_TESTTAG;
import static org.apache.cassandra.distributed.api.Feature.BLANK_GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort;
@@ -177,6 +178,7 @@
private volatile boolean initialized = false;
private volatile boolean internodeMessagingStarted = false;
private final AtomicLong startedAt = new AtomicLong();
+ private IsolatedJmx isolatedJmx;
@Deprecated
Instance(IInstanceConfig config, ClassLoader classLoader)
@@ -603,6 +605,9 @@
config.networkTopology(), config.broadcastAddress());
DistributedTestSnitch.assign(config.networkTopology());
+ if (config.has(JMX))
+ startJmx();
+
DatabaseDescriptor.daemonInitialization();
LoggingSupportFactory.getLoggingSupport().onStartup();
@@ -723,6 +728,7 @@
StorageService.instance.setNormalModeUnsafe();
Gossiper.instance.register(StorageService.instance);
StorageService.instance.startSnapshotManager();
+ StorageService.instance.completeInitialization();
}
// Populate tokenMetadata for the second time,
@@ -759,6 +765,20 @@
initialized = true;
}
+ private void startJmx()
+ {
+ this.isolatedJmx = new IsolatedJmx(this, inInstancelogger);
+ isolatedJmx.startJmx();
+ }
+
+ private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+ {
+ if (config.has(JMX))
+ {
+ isolatedJmx.stopJmx();
+ }
+ }
+
// Update the messaging versions for all instances
// that have initialized their configurations.
private static void propagateMessagingVersions(ICluster cluster)
@@ -900,6 +920,8 @@
// ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it.
error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownNowAndWait(1L, MINUTES));
+
+ error = parallelRun(error, executor, this::stopJmx);
// Make sure any shutdown hooks registered for DeleteOnExit are released to prevent
// references to the instance class loaders from being held
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 92c56d6..3c515d5 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -41,6 +41,8 @@
public class InstanceConfig implements IInstanceConfig
{
public final int num;
+ private final int jmxPort;
+
public int num() { return num; }
private final NetworkTopology networkTopology;
@@ -71,7 +73,8 @@
String cdc_raw_directory,
Collection<String> initial_token,
int storage_port,
- int native_transport_port)
+ int native_transport_port,
+ int jmx_port)
{
this.num = num;
this.networkTopology = networkTopology;
@@ -113,6 +116,7 @@
// legacy parameters
.forceSet("commitlog_sync_batch_window_in_ms", "1");
this.featureFlags = EnumSet.noneOf(Feature.class);
+ this.jmxPort = jmx_port;
}
private InstanceConfig(InstanceConfig copy)
@@ -124,6 +128,7 @@
this.hostId = copy.hostId;
this.featureFlags = copy.featureFlags;
this.broadcastAddressAndPort = copy.broadcastAddressAndPort;
+ this.jmxPort = copy.jmxPort;
}
@Override
@@ -168,6 +173,12 @@
return networkTopology().localDC(broadcastAddress());
}
+ @Override
+ public int jmxPort()
+ {
+ return this.jmxPort;
+ }
+
public InstanceConfig with(Feature featureFlag)
{
featureFlags.add(featureFlag);
@@ -267,7 +278,8 @@
String.format("%s/node%d/cdc", root, nodeNum),
tokens,
provisionStrategy.storagePort(nodeNum),
- provisionStrategy.nativeTransportPort(nodeNum));
+ provisionStrategy.nativeTransportPort(nodeNum),
+ provisionStrategy.jmxPort(nodeNum));
}
private static String[] datadirs(int datadirCount, Path root, int nodeNum)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
new file mode 100644
index 0000000..e19e29f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+
+import org.slf4j.Logger;
+
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.utils.JMXServerUtils;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.RMIClientSocketFactoryImpl;
+import org.apache.cassandra.utils.ReflectionUtils;
+import sun.rmi.transport.tcp.TCPEndpoint;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST;
+import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
+import static org.apache.cassandra.config.CassandraRelevantProperties.SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
+
+public class IsolatedJmx
+{
+ private static final int RMI_KEEPALIVE_TIME = 1000;
+
+ private JMXConnectorServer jmxConnectorServer;
+ private JMXServerUtils.JmxRegistry registry;
+ private RMIJRMPServerImpl jmxRmiServer;
+ private MBeanWrapper.InstanceMBeanWrapper wrapper;
+ private RMIClientSocketFactoryImpl clientSocketFactory;
+ private CollectingRMIServerSocketFactoryImpl serverSocketFactory;
+ private Logger inInstancelogger;
+ private IInstanceConfig config;
+
+ public IsolatedJmx(IInstance instance, Logger inInstanceLogger) {
+ this.config = instance.config();
+ this.inInstancelogger = inInstanceLogger;
+ }
+
+ public void startJmx() {
+ try
+ {
+ // Several RMI threads hold references to in-jvm dtest objects, and are, by default, kept
+ // alive for long enough (minutes) to keep classloaders from being collected.
+ // Set these two system properties to a low value to allow cleanup to occur fast enough
+ // for GC to collect our classloaders.
+ JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST.setInt(RMI_KEEPALIVE_TIME);
+ SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME.setInt(RMI_KEEPALIVE_TIME);
+ ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
+ InetAddress addr = config.broadcastAddress().getAddress();
+
+ int jmxPort = config.jmxPort();
+
+ String hostname = addr.getHostAddress();
+ wrapper = new MBeanWrapper.InstanceMBeanWrapper(hostname + ":" + jmxPort);
+ ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(wrapper);
+ Map<String, Object> env = new HashMap<>();
+
+ serverSocketFactory = new CollectingRMIServerSocketFactoryImpl(addr);
+ env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE,
+ serverSocketFactory);
+ clientSocketFactory = new RMIClientSocketFactoryImpl(addr);
+ env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE,
+ clientSocketFactory);
+
+ // configure the RMI registry
+ registry = new JMXServerUtils.JmxRegistry(jmxPort,
+ clientSocketFactory,
+ serverSocketFactory,
+ "jmxrmi");
+
+ // Mark the JMX server as a permanently exported object. This allows the JVM to exit with the
+ // server running and also exempts it from the distributed GC scheduler which otherwise would
+ // potentially attempt a full GC every `sun.rmi.dgc.server.gcInterval` millis (default is 3600000ms)
+ // For more background see:
+ // - CASSANDRA-2967
+ // - https://www.jclarity.com/2015/01/27/rmi-system-gc-unplugged/
+ // - https://bugs.openjdk.java.net/browse/JDK-6760712
+ env.put("jmx.remote.x.daemon", "true");
+
+ // Set the port used to create subsequent connections to exported objects over RMI. This simplifies
+ // configuration in firewalled environments, but it can't be used in conjuction with SSL sockets.
+ // See: CASSANDRA-7087
+ int rmiPort = config.jmxPort();
+
+ // We create the underlying RMIJRMPServerImpl so that we can manually bind it to the registry,
+ // rather then specifying a binding address in the JMXServiceURL and letting it be done automatically
+ // when the server is started. The reason for this is that if the registry is configured with SSL
+ // sockets, the JMXConnectorServer acts as its client during the binding which means it needs to
+ // have a truststore configured which contains the registry's certificate. Manually binding removes
+ // this problem.
+ // See CASSANDRA-12109.
+ jmxRmiServer = new RMIJRMPServerImpl(rmiPort, clientSocketFactory, serverSocketFactory,
+ env);
+ JMXServiceURL serviceURL = new JMXServiceURL("rmi", hostname, rmiPort);
+ jmxConnectorServer = new RMIConnectorServer(serviceURL, env, jmxRmiServer, wrapper.getMBeanServer());
+
+ jmxConnectorServer.start();
+
+ registry.setRemoteServerStub(jmxRmiServer.toStub());
+ JMXServerUtils.logJmxServiceUrl(addr, jmxPort);
+ waitForJmxAvailability(hostname, jmxPort, env);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException("Feature.JMX was enabled but could not be started.", t);
+ }
+ }
+
+ private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException
+ {
+ String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort);
+ JMXServiceURL serviceURL = new JMXServiceURL(url);
+ int attempts = 0;
+ Throwable lastThrown = null;
+ while (attempts < 20)
+ {
+ attempts++;
+ try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env))
+ {
+ inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)",
+ url, attempts);
+ return;
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (Throwable thrown)
+ {
+ lastThrown = thrown;
+ }
+ inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts);
+ Thread.sleep(1000);
+ }
+ throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown);
+ }
+
+ public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException
+ {
+ if (!config.has(JMX))
+ return;
+ // First, swap the mbean wrapper back to a NoOp wrapper
+ // This prevents later attempts to unregister mbeans from failing in Cassandra code, as we're going to
+ // unregister all of them here
+ ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(new MBeanWrapper.NoOpMBeanWrapper());
+ try
+ {
+ wrapper.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close wrapper.", e);
+ }
+ try
+ {
+ jmxConnectorServer.stop();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close jmxConnectorServer.", e);
+ }
+ try
+ {
+ registry.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close registry.", e);
+ }
+ try
+ {
+ serverSocketFactory.close();
+ }
+ catch (Throwable e)
+ {
+ inInstancelogger.warn("failed to close serverSocketFactory.", e);
+ }
+ // The TCPEndpoint class holds references to a class in the in-jvm dtest framework
+ // which transitively has a reference to the InstanceClassLoader, so we need to
+ // make sure to remove the reference to them when the instance is shutting down
+ clearMapField(TCPEndpoint.class, null, "localEndpoints");
+ Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to give Distributed GC some time to clean up
+ }
+
+ private <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName)
+ throws IllegalAccessException, NoSuchFieldException {
+ Field mapField = ReflectionUtils.getField(clazz, mapName);
+ mapField.setAccessible(true);
+ Map<K, V> map = (Map<K, V>) mapField.get(instance);
+ // Because multiple instances can be shutting down at once,
+ // synchronize on the map to avoid ConcurrentModificationException
+ synchronized (map)
+ {
+ for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); it.hasNext(); )
+ {
+ it.next();
+ it.remove();
+ }
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
index a5a2dce..794873f 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java
@@ -26,21 +26,29 @@
import java.text.SimpleDateFormat;
import java.util.function.Consumer;
import javax.management.MBeanServer;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
-import org.apache.cassandra.io.util.File;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.sun.management.HotSpotDiagnosticMXBean;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.utils.SigarLibrary;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.JMX;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.utils.FBUtilities.now;
+import static org.hamcrest.Matchers.startsWith;
/* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup.
* All objects referencing the InstanceClassLoader need to be garbage collected or
@@ -139,6 +147,11 @@
void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable
{
+ doTest(numClusterNodes, updater, ignored -> {});
+ }
+
+ void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater, Consumer<Cluster> actionToPerform) throws Throwable
+ {
for (int loop = 0; loop < numTestLoops; loop++)
{
System.out.println(String.format("========== Starting loop %03d ========", loop));
@@ -149,6 +162,7 @@
cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL);
cluster.get(1).flush(KEYSPACE);
+ actionToPerform.accept(cluster);
if (dumpEveryLoop)
{
dumpResources(String.format("loop%03d", loop));
@@ -207,4 +221,50 @@
}
dumpResources("final-native");
}
+
+ @Test
+ public void looperJmxTest() throws Throwable
+ {
+ doTest(1, config -> config.with(JMX), cluster -> {
+ // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+ // and then the `getHostString`, which would otherwise return the IP address,
+ // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+ for (IInvokableInstance instance:cluster.get(1, cluster.size()))
+ {
+ IInstanceConfig config = instance.config();
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ // instances get their default domain set to their IP address, so us it
+ // to check that we are actually connecting to the correct instance
+ String defaultDomain = mbsc.getDefaultDomain();
+ Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ if (forceCollection)
+ {
+ System.runFinalization();
+ System.gc();
+ Thread.sleep(finalWaitMillis);
+ }
+ dumpResources("final-jmx");
+ }
+
+ @Test
+ public void looperEverythingTest() throws Throwable
+ {
+ doTest(1, config -> config.with(Feature.values()));
+ if (forceCollection)
+ {
+ System.runFinalization();
+ System.gc();
+ Thread.sleep(finalWaitMillis);
+ }
+ dumpResources("final-everything");
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
index 1fc2958..71ef4db 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
@@ -27,6 +27,7 @@
import org.apache.cassandra.db.virtual.LogMessagesTable;
import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.shared.WithProperties;
@@ -68,12 +69,17 @@
public void testMultipleAppendersFailToStartNode() throws Throwable
{
LOGBACK_CONFIGURATION_FILE.setString("test/conf/logback-dtest_with_vtable_appender_invalid.xml");
+
+ // NOTE: Because cluster startup is expected to fail in this case, and can leave things in a weird state
+ // for the next state, create without starting, and set failure as shutdown to false,
+ // so the try-with-resources can close instances properly.
try (WithProperties properties = new WithProperties().set(LOGBACK_CONFIGURATION_FILE, "test/conf/logback-dtest_with_vtable_appender_invalid.xml");
- Cluster ignored = Cluster.build(1)
- .withConfig(c -> c.with(Feature.values()))
- .start();
- )
+ Cluster cluster = Cluster.build(1)
+ .withConfig(c -> c.with(Feature.values())
+ .set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+ .createWithoutStarting())
{
+ cluster.startup();
fail("Node should not start as there is supposed to be invalid logback configuration file.");
}
catch (IllegalStateException ex)
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
new file mode 100644
index 0000000..83a35e5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.jmx;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.remote.JMXConnector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.hamcrest.Matchers.startsWith;
+
+public class JMXFeatureTest extends TestBaseImpl
+{
+
+ /**
+ * Test the in-jvm dtest JMX feature.
+ * - Create a cluster with multiple JMX servers, one per instance
+ * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance
+ * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleNetworkInterfacesProvisioning() throws Exception
+ {
+ int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+ Set<String> allInstances = new HashSet<>();
+ for (int i = 0; i < iterations; i++)
+ {
+ try (Cluster cluster = Cluster.build(2)
+ .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces)
+ .withConfig(c -> c.with(Feature.values())).start())
+ {
+ Set<String> instancesContacted = new HashSet<>();
+ for (IInvokableInstance instance : cluster.get(1, 2))
+ {
+ testInstance(instancesContacted, instance);
+ }
+ Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+ allInstances.addAll(instancesContacted);
+ }
+ }
+ Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+ }
+
+ @Test
+ public void testOneNetworkInterfaceProvisioning() throws Exception
+ {
+ int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times.
+ Set<String> allInstances = new HashSet<>();
+ for (int i = 0; i < iterations; i++)
+ {
+ try (Cluster cluster = Cluster.build(2)
+ .withNodeProvisionStrategy(INodeProvisionStrategy.Strategy.OneNetworkInterface)
+ .withConfig(c -> c.with(Feature.values())).start())
+ {
+ Set<String> instancesContacted = new HashSet<>();
+ for (IInvokableInstance instance : cluster.get(1, 2))
+ {
+ testInstance(instancesContacted, instance);
+ }
+ Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size());
+ allInstances.addAll(instancesContacted);
+ }
+ }
+ Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size());
+ }
+
+ private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException
+ {
+ // NOTE: At some point, the hostname of the broadcastAddress can be resolved
+ // and then the `getHostString`, which would otherwise return the IP address,
+ // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this.
+ IInstanceConfig config = instance.config();
+ try (JMXConnector jmxc = JMXUtil.getJmxConnector(config))
+ {
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+ // instances get their default domain set to their IP address, so us it
+ // to check that we are actually connecting to the correct instance
+ String defaultDomain = mbsc.getDefaultDomain();
+ instancesContacted.add(defaultDomain);
+ Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort()));
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index b935e24..ece6052 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.distributed.test.jmx;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -31,7 +30,6 @@
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXServiceURL;
import com.google.common.collect.ImmutableSet;
@@ -39,11 +37,8 @@
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
-import org.apache.cassandra.utils.JMXServerUtils;
-
-import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION;
-import static org.apache.cassandra.cql3.CQLTester.getAutomaticallyAllocatedPort;
public class JMXGetterCheckTest extends TestBaseImpl
{
@@ -57,20 +52,17 @@
"org.apache.cassandra.db:type=StorageService:resetLocalSchema" // this will fail when there are no other nodes which can serve schema
);
- @Test
- public void test() throws Exception
- {
- // start JMX server, which the instance will register with
- InetAddress loopback = InetAddress.getLoopbackAddress();
- String jmxHost = loopback.getHostAddress();
- int jmxPort = getAutomaticallyAllocatedPort(loopback);
- JMXConnectorServer jmxServer = JMXServerUtils.createJMXServer(jmxPort, true);
- jmxServer.start();
- String url = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi";
+ public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
- ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
+ @Test
+ public void testGetters() throws Exception
+ {
try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())
{
+ IInvokableInstance instance = cluster.get(1);
+
+ String jmxHost = instance.config().broadcastAddress().getAddress().getHostAddress();
+ String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, instance.config().jmxPort());
List<Named> errors = new ArrayList<>();
try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null))
{
@@ -122,7 +114,7 @@
}
/**
- * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneded JMX/Reflection logic cluttering the stacktrace
+ * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneeded JMX/Reflection logic cluttering the stacktrace
*/
private static class Named extends RuntimeException
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
index a8ddfe0..58a3fee 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/metric/TableMetricTest.java
@@ -29,7 +29,9 @@
import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServer;
import javax.management.ObjectName;
+import javax.management.QueryExp;
import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
@@ -57,7 +59,6 @@
MBEAN_REGISTRATION_CLASS.setString(MapMBeanWrapper.class.getName());
ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION.setBoolean(false);
}
-
private static volatile Map<String, Collection<String>> SYSTEM_TABLES = null;
private static Set<String> TABLE_METRIC_NAMES = ImmutableSet.of("WriteLatency");
@@ -165,7 +166,7 @@
{
inst.runOnInstance(() -> {
// cast only to make sure it linked properly
- MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+ MapMBeanWrapper mbeans = getMapMBeanWrapper();
Assert.assertTrue("Unable to find table mbean for " + keyspace + "." + table,
mbeans.isRegistered(ColumnFamilyStore.getTableMBeanName(keyspace, table, false)));
Assert.assertTrue("Unable to find column family mbean for " + keyspace + "." + table,
@@ -177,7 +178,7 @@
{
inst.runOnInstance(() -> {
// cast only to make sure it linked properly
- MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+ MapMBeanWrapper mbeans = getMapMBeanWrapper();
Assert.assertFalse("Found table mbean for " + keyspace + "." + table,
mbeans.isRegistered(ColumnFamilyStore.getTableMBeanName(keyspace, table, false)));
Assert.assertFalse("Found column family mbean for " + keyspace + "." + table,
@@ -189,7 +190,7 @@
{
inst.runOnInstance(() -> {
// cast only to make sure it linked properly
- MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+ MapMBeanWrapper mbeans = getMapMBeanWrapper();
String mbean = getTableMetricName(keyspace, table, name);
Assert.assertTrue("Unable to find metric " + name + " for " + keyspace + "." + table, mbeans.isRegistered(mbean));
@@ -203,7 +204,7 @@
{
inst.runOnInstance(() -> {
// cast only to make sure it linked properly
- MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+ MapMBeanWrapper mbeans = getMapMBeanWrapper();
String mbean = getTableMetricName(keyspace, table, name);
Assert.assertFalse("Found metric " + name + " for " + keyspace + "." + table, mbeans.isRegistered(mbean));
@@ -226,7 +227,7 @@
{
inst.runOnInstance(() -> {
// cast only to make sure it linked properly
- MapMBeanWrapper mbeans = (MapMBeanWrapper) MBeanWrapper.instance;
+ MapMBeanWrapper mbeans = getMapMBeanWrapper();
String keyspaceMBean = getKeyspaceMetricName(keyspace, name);
Assert.assertFalse("Found keyspace metric " + keyspaceMBean + " for " + keyspace, mbeans.isRegistered(keyspaceMBean));
@@ -243,6 +244,10 @@
return String.format("org.apache.cassandra.metrics:type=Table,keyspace=%s,scope=%s,name=%s", keyspace, table, name);
}
+ private static MapMBeanWrapper getMapMBeanWrapper()
+ {
+ return (MapMBeanWrapper) ((MBeanWrapper.DelegatingMbeanWrapper)MBeanWrapper.instance).getDelegate();
+ }
public static final class MapMBeanWrapper implements MBeanWrapper
{
private final ConcurrentMap<ObjectName, Object> map = new ConcurrentHashMap<>();
@@ -268,5 +273,17 @@
if (previous == null)
onException.handler.accept(new InstanceNotFoundException("MBean " + mbeanName + " was not found"));
}
+
+ @Override
+ public Set<ObjectName> queryNames(ObjectName name, QueryExp query)
+ {
+ return null;
+ }
+
+ @Override
+ public MBeanServer getMBeanServer()
+ {
+ return null;
+ }
}
}