CASSANDRA-18727 - JMXUtil.getJmxConnector should retry connection attempts (#37)
JMXUtil.getJmxConnector should retry connection attempts
Patch by Doug Roher for CASSANDRA-18727. Reviewed by Jon Meredith and Francisco Guerrero.
diff --git a/src/main/java/org/apache/cassandra/distributed/api/LogAction.java b/src/main/java/org/apache/cassandra/distributed/api/LogAction.java
index 8f9e693..7b90ffc 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/LogAction.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/LogAction.java
@@ -30,6 +30,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.cassandra.distributed.shared.Uninterruptibles;
+
public interface LogAction
{
/**
@@ -56,7 +58,7 @@
if (previousPosition == mark())
{
// still matching... wait a bit
- Internal.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
continue;
}
// position not matching, try to read
@@ -340,7 +342,7 @@
'}';
}
}
-
+
class Internal
{
private static final int DEFAULT_START_POSITION = -1;
@@ -388,29 +390,5 @@
{
return regexPredicate(Pattern.compile(pattern));
}
-
- private static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
- // copied from guava since dtest can't depend on guava
- boolean interrupted = false;
-
- try {
- long remainingNanos = unit.toNanos(sleepFor);
- long end = System.nanoTime() + remainingNanos;
-
- while(true) {
- try {
- TimeUnit.NANOSECONDS.sleep(remainingNanos);
- return;
- } catch (InterruptedException var12) {
- interrupted = true;
- remainingNanos = end - System.nanoTime();
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
}
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/JMXUtil.java b/src/main/java/org/apache/cassandra/distributed/shared/JMXUtil.java
index 183397e..c505201 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/JMXUtil.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/JMXUtil.java
@@ -18,11 +18,16 @@
package org.apache.cassandra.distributed.shared;
-import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.distributed.api.IInstanceConfig;
public final class JMXUtil
@@ -31,23 +36,92 @@
{
}
+ private final static Logger LOGGER = LoggerFactory.getLogger(JMXUtil.class);
+
public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi";
+ /**
+ * Create an instance of a {@link JMXConnector} to an in-jvm instance based on the input configuration.
+ * This overload uses 5 as the default number of retries which has been shown to be adequate in testing,
+ * and passes a null environment map to the connect call.
+ * @param config The instance configuration to use to get the necessary parameters to connect
+ * @return A JMXConnector instance which can communicate with the specified instance via JMX
+ */
public static JMXConnector getJmxConnector(IInstanceConfig config) {
+ return getJmxConnector(config, 5, null);
+ }
+
+ /**
+ * Create an instance of a {@link JMXConnector} to an in-jvm instance based on the input configuration.
+ * This overload uses 5 as the default number of retries which has been shown to be adequate in testing.
+ * @param config The instance configuration to use to get the necessary parameters to connect
+ * @param jmxEnv an optional map which specifies the JMX environment to use. Can be null.
+ * @return A JMXConnector instance which can communicate with the specified instance via JMX
+ */
+ public static JMXConnector getJmxConnector(IInstanceConfig config, Map<String, ?> jmxEnv) {
+ return getJmxConnector(config, 5, jmxEnv);
+ }
+
+
+ /**
+ * Create an instance of a {@link JMXConnector} to an in-jvm instance based on the input configuration
+ * This overload passes a null environment map to the connect call.
+ * @param config The instance configuration to use to get the necessary parameters to connect
+ * @param numAttempts the number of retries to attempt before failing to connect.
+ * @return A JMXConnector instance which can communicate with the specified instance via JMX
+ */
+ public static JMXConnector getJmxConnector(IInstanceConfig config, int numAttempts)
+ {
+ return getJmxConnector(config, numAttempts, null);
+ }
+
+ /**
+ * Create an instance of a {@link JMXConnector} to an in-jvm instance based on the input configuration
+ * @param config The instance configuration to use to get the necessary parameters to connect
+ * @param numAttempts the number of retries to attempt before failing to connect.
+ * @param jmxEnv an optional map which specifies the JMX environment to use. Can be null.
+ * @return A JMXConnector instance which can communicate with the specified instance via JMX
+ */
+ public static JMXConnector getJmxConnector(IInstanceConfig config, int numAttempts, Map<String, ?> jmxEnv) {
String jmxHost = getJmxHost(config);
String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, config.jmxPort());
- try
+ int attempts = 1;
+ Throwable lastThrown = null;
+ while (attempts <= numAttempts)
{
- return JMXConnectorFactory.connect(new JMXServiceURL(url), null);
+ try
+ {
+ JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(url), jmxEnv);
+
+ LOGGER.info("Connected to JMX server at {} after {} attempt(s)",
+ url, attempts);
+ return connector;
+ }
+
+ catch(MalformedURLException e)
+ {
+ // Because we are building the URL from the config, this should never happen.
+ // This is unrecoverable, so just rethrow wrapped in RuntimeException
+ throw new RuntimeException("Failed to connect to JXM server due to a malformed URL (Instance config must be broken?)", e);
+ }
+
+ catch (Throwable thrown)
+ {
+ lastThrown = thrown;
+ }
+ if (attempts < numAttempts)
+ {
+ LOGGER.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts);
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ attempts++;
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ String error = "Failed to connect to JMX, which was unreachable after " + attempts + " attempts.";
+ LOGGER.error(error, lastThrown);
+ throw new RuntimeException(error, lastThrown);
}
public static String getJmxHost(IInstanceConfig config) {
return config.broadcastAddress().getAddress().getHostAddress();
}
-
}
diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Uninterruptibles.java b/src/main/java/org/apache/cassandra/distributed/shared/Uninterruptibles.java
new file mode 100644
index 0000000..823a2f2
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Uninterruptibles.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.concurrent.TimeUnit;
+
+public class Uninterruptibles
+{
+ public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
+ // copied from guava since dtest can't depend on guava
+ boolean interrupted = false;
+
+ try {
+ long remainingNanos = unit.toNanos(sleepFor);
+ long end = System.nanoTime() + remainingNanos;
+
+ while(true) {
+ try {
+ TimeUnit.NANOSECONDS.sleep(remainingNanos);
+ return;
+ } catch (InterruptedException iex) {
+ interrupted = true;
+ remainingNanos = end - System.nanoTime();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}