QPID-8446: [JMS AMQP 0-x] Add ability to notify mesaging application about the result of every connectivity attempt
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 7514ff8..4794a10 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -129,6 +129,7 @@
private final Object _failoverMutex = new Object();
private final Object _sessionCreationLock = new Object();
+ private final ConnectAttemptListener _connectAttemptListener;
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
@@ -306,6 +307,13 @@
public AMQConnection(ConnectionURL connectionURL) throws QpidException
{
+ this(connectionURL, null);
+ }
+
+ AMQConnection(final ConnectionURL connectionURL, final ConnectAttemptListener connectAttemptListener)
+ throws QpidException
+ {
+ _connectAttemptListener = connectAttemptListener;
boolean success = false;
try
{
@@ -565,10 +573,12 @@
}
}
BrokerDetails brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
+ BrokerDetails lastBrokerDetails = null;
boolean retryAllowed = true;
Exception connectionException = null;
while (!isConnected() && retryAllowed && brokerDetails != null)
{
+ lastBrokerDetails = brokerDetails;
ProtocolVersion pe = null;
try
{
@@ -605,11 +615,17 @@
}
else
{
- retryAllowed = _failoverPolicy.failoverAllowed();
- brokerDetails = _failoverPolicy.getNextBrokerDetails();
+ if (repeatLastConnectAttempt(connectionException, brokerDetails))
+ {
+ brokerDetails = _failoverPolicy.getCurrentBrokerDetails();
+ }
+ else
+ {
+ retryAllowed = _failoverPolicy.failoverAllowed();
+ brokerDetails = _failoverPolicy.getNextBrokerDetails();
+ }
_protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
}
-
}
}
verifyClientID();
@@ -668,11 +684,53 @@
{
_logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
}
-
+ notifySuccessfulConnectAttempt(lastBrokerDetails);
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
_sessions.setMinChannelID(_delegate.getMinChannelID());
}
+ private boolean repeatLastConnectAttempt(final Exception e, final BrokerDetails brokerDetails)
+ {
+ boolean repeatLastReconnectAttempt = false;
+ if (_connectAttemptListener != null)
+ {
+ final AMQException amqException;
+ if (e instanceof AMQException)
+ {
+ amqException = (AMQException) e;
+ }
+ else
+ {
+ amqException = new AMQException(ErrorCodes.CONNECTION_FORCED, e.getMessage(), e);
+ }
+
+ try
+ {
+ repeatLastReconnectAttempt = _connectAttemptListener.connectAttemptFailed(brokerDetails.getURI(), convertToJMSException(amqException));
+ }
+ catch (RuntimeException unexpected)
+ {
+ _logger.warn("Unexpected exception occurred on notifying about connect attempt failure", unexpected);
+ }
+ }
+ return repeatLastReconnectAttempt;
+ }
+
+ private void notifySuccessfulConnectAttempt(final BrokerDetails brokerDetails)
+ {
+ if (_connectAttemptListener != null)
+ {
+ try
+ {
+ _connectAttemptListener.connectAttemptSucceeded(brokerDetails.getURI());
+ }
+ catch (RuntimeException unexpected)
+ {
+ _logger.warn("Unexpected exception occurred on notifying about successful connect attempt", unexpected);
+ }
+ }
+ }
+
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
{
try
@@ -745,7 +803,7 @@
try
{
makeBrokerConnection(bd);
-
+ notifySuccessfulConnectAttempt(bd);
return true;
}
catch (Exception e)
@@ -754,8 +812,14 @@
{
_logger.info("Unable to connect to broker at " + bd);
}
-
- return useFailoverConfigOnFailure && attemptReconnection();
+ if (repeatLastConnectAttempt(e, bd))
+ {
+ return attemptReconnection(host, port, useFailoverConfigOnFailure);
+ }
+ else
+ {
+ return useFailoverConfigOnFailure && attemptReconnection();
+ }
}
}
@@ -780,6 +844,7 @@
try
{
makeBrokerConnection(broker);
+ notifySuccessfulConnectAttempt(broker);
return true;
}
catch (Exception e)
@@ -798,6 +863,10 @@
_logger.info(e.getMessage() + ":Unable to connect to broker at "
+ _failoverPolicy.getCurrentBrokerDetails());
}
+ if (repeatLastConnectAttempt(e, broker))
+ {
+ return attemptConnection(broker);
+ }
}
}
return false;
diff --git a/client/src/main/java/org/apache/qpid/client/AbstractConnectionFactory.java b/client/src/main/java/org/apache/qpid/client/AbstractConnectionFactory.java
index ea7549b..b559212 100644
--- a/client/src/main/java/org/apache/qpid/client/AbstractConnectionFactory.java
+++ b/client/src/main/java/org/apache/qpid/client/AbstractConnectionFactory.java
@@ -34,6 +34,7 @@
{
private final Map<ConnectionExtension, BiFunction<Connection, URI, Object>>
_extensions = new EnumMap<>(ConnectionExtension.class);
+ private volatile ConnectAttemptListener _connectAttemptListener;
public void setExtension(String extensionName, BiFunction<Connection, URI, Object> extension)
@@ -49,6 +50,11 @@
}
}
+ public void setConnectAttemptListener(final ConnectAttemptListener connectAttemptListener)
+ {
+ _connectAttemptListener = connectAttemptListener;
+ }
+
protected CommonConnection newConnectionInstance(final ConnectionURL connectionDetails) throws QpidException
{
return newAMQConnectionInstance(connectionDetails);
@@ -64,7 +70,7 @@
final Map<ConnectionExtension, BiFunction<Connection, URI, Object>> extensions = getExtensions();
final ConnectionURL connectionURL =
extensions.isEmpty() ? connectionDetails : new ExtensibleConnectionURL(connectionDetails, extensions);
- final AMQConnection connection = new AMQConnection(connectionURL);
+ final AMQConnection connection = new AMQConnection(connectionURL, _connectAttemptListener);
if (connectionURL instanceof ExtensibleConnectionURL)
{
((ExtensibleConnectionURL) connectionURL).setConnectionSupplier(() -> connection);
diff --git a/client/src/main/java/org/apache/qpid/client/BrokerDetails.java b/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
index c19f023..65208f1 100644
--- a/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
+++ b/client/src/main/java/org/apache/qpid/client/BrokerDetails.java
@@ -30,6 +30,9 @@
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLHelper;
@@ -37,6 +40,7 @@
public class BrokerDetails implements Serializable
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(BrokerDetails.class);
/*
* Known URL Options
* @see ConnectionURL
@@ -384,7 +388,12 @@
public String toString()
{
- StringBuffer sb = new StringBuffer();
+ return toURL(true);
+ }
+
+ private String toURL(boolean maskPasswords)
+ {
+ StringBuilder sb = new StringBuilder();
sb.append(_transport);
sb.append("://");
@@ -392,7 +401,7 @@
sb.append(':');
sb.append(_port);
- sb.append(printOptionsURL());
+ sb.append(printOptionsURL(maskPasswords));
return sb.toString();
}
@@ -426,9 +435,9 @@
return result;
}
- private String printOptionsURL()
+ private String printOptionsURL(boolean maskPassword)
{
- return URLHelper.printOptions(_options, PASSWORD_YIELDING_OPTIONS);
+ return URLHelper.printOptions(_options, maskPassword? PASSWORD_YIELDING_OPTIONS : Collections.emptySet());
}
public static String checkTransport(String broker)
@@ -624,4 +633,18 @@
{
_connectionUrl = connectionUrl;
}
+
+ public URI getURI()
+ {
+ URI uri = null;
+ try
+ {
+ uri = URI.create(toURL(false));
+ }
+ catch (RuntimeException unexpected)
+ {
+ LOGGER.warn("Unexpected exception occurred on evaluation of broker URI", unexpected);
+ }
+ return uri;
+ }
}
diff --git a/client/src/main/java/org/apache/qpid/client/ConnectAttemptListener.java b/client/src/main/java/org/apache/qpid/client/ConnectAttemptListener.java
new file mode 100644
index 0000000..27c731d
--- /dev/null
+++ b/client/src/main/java/org/apache/qpid/client/ConnectAttemptListener.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.client;
+
+import java.net.URI;
+import java.util.function.BiFunction;
+
+import javax.jms.JMSException;
+
+/**
+ * An implementation of ConnectAttemptListener can be set on a concrete implementation of {@link AbstractConnectionFactory}
+ * in order to notify messaging application about every successful and unsuccessful connectivity attempt.
+ *
+ * The {@link #connectAttemptFailed(URI, JMSException)} can be used as a trigger to rotate expired credentials, if those
+ * are set via extension mechanism {@link AbstractConnectionFactory#setExtension(String, BiFunction)}.
+ */
+public interface ConnectAttemptListener
+{
+ /**
+ * Invoked when connect attempt to the given broker URI failed with a given exception.
+ * This method can be used to rotate the credentials and re-attempt the connection to the broker with new
+ * credentials which can be set using extension mechanism {@link AbstractConnectionFactory#setExtension(String, BiFunction)}.
+ *
+ * The method can return true, if connection attempt needs to be repeated to the same broker immediately
+ * and without incrementing a failover re-try counter.
+ * Otherwise, the connection would be attempted as per failover settings.
+ *
+ * @param brokerURI target broker URI
+ * @param e exception thrown on connect attempt
+ * @return true if connect attempt to the given broker URI needs to be repeated again
+ */
+ boolean connectAttemptFailed(URI brokerURI, JMSException e);
+
+ /**
+ * Invoked when connection is established successfully to the broker with a given URI
+ *
+ * @param brokerURI target broker URI
+ */
+ void connectAttemptSucceeded(URI brokerURI);
+}
diff --git a/doc/jms-client-0-8/src/docbkx/JMS-Client-JMS-Extensions.xml b/doc/jms-client-0-8/src/docbkx/JMS-Client-JMS-Extensions.xml
index 431eb8a..b71fa26 100644
--- a/doc/jms-client-0-8/src/docbkx/JMS-Client-JMS-Extensions.xml
+++ b/doc/jms-client-0-8/src/docbkx/JMS-Client-JMS-Extensions.xml
@@ -128,6 +128,62 @@
A password extension is registered at (8). JMS connection is open at (9). The example uses a hypothetical
class TokenGenerator invoking underlying OAUTH2 API to generate/renew access token and get token expiration time.</para>
</section>
+ <section xml:id="JMS-Client-0-8-Appendix-JMS-Extensions-ConnectAttemptListener">
+ <title>ConnectAttemptListener</title>
+ <para>An implementation of <emphasis>ConnectAttemptListener</emphasis> can be set on <emphasis>AMQConnectionFactory</emphasis>
+ or <emphasis>PooledConnectionFactory</emphasis> in order to notify messaging application about every successful
+ and unsuccessful connectivity attempt.
+ </para>
+ <para>
+ The failed attempt notification can be used as a mechanism to rotate expired credentials,
+ if those are set as connection extensions.
+ The implementation can examine the error code reported as part of JMSException, and, if the error code corresponds
+ to authentication failure codes ("530" is reported by AMQP 0-8..0-91, "320" is reported by AMQP 0-10), the
+ credentials could be swapped with new ones using the connection extension mechanism.
+ See <xref linkend="JMS-Client-0-8-Appendix-JMS-Extensions-Connection"/> for details.
+ </para>
+ <para>The following <emphasis>ConnectAttemptListener</emphasis> illustrate the idea</para>
+ <example>
+ <title>Inject password extension</title>
+ <programlisting>
+class CredentialsRotatingListener implements ConnectAttemptListener
+{
+
+ @Override
+ public boolean connectAttemptFailed(final URI brokerURI, final JMSException e)
+ {
+ boolean reattempt = "530".equals(e.getErrorCode()) || "320".equals(e.getErrorCode());
+ if (reattempt)
+ {
+ rotateCredentials(brokerURI);
+ }
+ return reattempt;
+ }
+
+ @Override
+ public void connectAttemptSucceeded(final URI brokerURI)
+ {
+ credentialsRotatedSuccessfully(brokerURI);
+ }
+
+ private void rotateCredentials(inal URI brokerURI)
+ {
+ // credential rotating logic
+ }
+
+ private void credentialsRotatedSuccessfully(final URI brokerURI)
+ {
+ // notify that credentials have been rotated successfully
+ }
+}
+ </programlisting>
+ </example>
+ <para>
+ The method <emphasis>connectAttemptFailed</emphasis> can return true, if connection attempt needs to be repeated
+ to the same broker immediately and without incrementing a failover re-try counter.
+ Otherwise, the connection would be attempted as per failover settings.
+ </para>
+ </section>
<section xml:id="JMS-Client-0-8-Appendix-JMS-Extensions-Queue">
<title>Queue Management</title>
<para>These extensions allow queues to be created or removed.</para>
diff --git a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
index e5db8ee..b3c30cc 100644
--- a/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/connection/ConnectionFactoryTest.java
@@ -20,12 +20,23 @@
*/
package org.apache.qpid.systest.connection;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import javax.jms.Connection;
import javax.jms.JMSException;
import org.junit.Before;
@@ -33,13 +44,17 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.ConnectAttemptListener;
import org.apache.qpid.client.ConnectionExtension;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.systest.core.BrokerAdmin;
import org.apache.qpid.systest.core.JmsTestBase;
public class ConnectionFactoryTest extends JmsTestBase
{
- private static final String CONNECTION_URL = "amqp://%s:%s@clientID/?brokerlist='tcp://%s:%d'";
+ private static final String BROKER_URL = "tcp://%s:%d%s";
+ private static final String CONNECTION_URL = "amqp://%s:%s@clientID/?brokerlist='" + BROKER_URL + "'";
private String _urlWithCredentials;
private String _urlWithoutCredentials;
private String _userName;
@@ -54,8 +69,8 @@
_password = brokerAdmin.getValidPassword();
final String host = brokerAddress.getHostString();
final int port = brokerAddress.getPort();
- _urlWithCredentials = String.format(CONNECTION_URL, _userName, _password, host, port);
- _urlWithoutCredentials = String.format(CONNECTION_URL, "", "", host, port);
+ _urlWithCredentials = String.format(CONNECTION_URL, _userName, _password, host, port, "");
+ _urlWithoutCredentials = String.format(CONNECTION_URL, "", "", host, port, "");
}
@Test
@@ -197,4 +212,213 @@
}
}
}
+
+ @Test
+ public void testAccountRotationViaConnectAttemptListenerOnConnect() throws Exception
+ {
+ final BrokerAdmin brokerAdmin = getBrokerAdmin();
+ assumeThat("Requires authentication failure for invalid credentials",
+ brokerAdmin.getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+ final AMQConnectionFactory factory = new AMQConnectionFactory(_urlWithoutCredentials);
+ final AccountRotatingHelper accountRotatingHelper = new AccountRotatingHelper();
+ factory.setConnectAttemptListener(accountRotatingHelper);
+ factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(),
+ accountRotatingHelper.getPasswordExtension());
+ factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(),
+ accountRotatingHelper.getUsernameExtension());
+ AMQConnection con = null;
+ try
+ {
+ con = factory.createConnection();
+ assertNotNull(con);
+ assertEquals(_userName, con.getUsername());
+ assertEquals(_password, con.getPassword());
+ assertTrue(accountRotatingHelper.getUseValidCredentials());
+
+ final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ final URI expectedBrokerURI =
+ URI.create(String.format(BROKER_URL, brokerAddress.getHostName(), brokerAddress.getPort(), ""));
+ assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForFailedlAttempt());
+ assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForSuccessfulAttempt());
+ }
+ finally
+ {
+ if (con != null)
+ {
+ con.close();
+ }
+ }
+ }
+
+ @Test
+ public void testAccountRotationViaConnectAttemptListenerOnFailover() throws Exception
+ {
+ final BrokerAdmin brokerAdmin = getBrokerAdmin();
+ assumeThat("Requires authentication failure for invalid credentials",
+ brokerAdmin.getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+
+ final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ final String retriesOption = "?retries='1'";
+ final String url = String.format(CONNECTION_URL,
+ "",
+ "",
+ brokerAddress.getHostName(),
+ brokerAddress.getPort(),
+ retriesOption) + "&failover='singlebroker?cyclecount='1''";
+
+ final AMQConnectionFactory factory = new AMQConnectionFactory(url);
+
+ final AccountRotatingHelper accountRotatingHelper = new AccountRotatingHelper();
+ accountRotatingHelper.setUseValidCredentials(true);
+
+ factory.setConnectAttemptListener(accountRotatingHelper);
+ factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(),
+ accountRotatingHelper.getPasswordExtension());
+ factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(),
+ accountRotatingHelper.getUsernameExtension());
+
+ AMQConnection con = null;
+ try
+ {
+ con = factory.createConnection();
+ assertNotNull(con);
+ assertEquals(_userName, con.getUsername());
+ assertEquals(_password, con.getPassword());
+ assertEquals(1, accountRotatingHelper.getSuccessfulConnectCount());
+
+ accountRotatingHelper.setUseValidCredentials(false);
+
+ restartBrokerAndWaitForFailover(brokerAdmin, con);
+ assertEquals(_userName, con.getUsername());
+ assertEquals(_password, con.getPassword());
+ assertTrue(accountRotatingHelper.getUseValidCredentials());
+ assertEquals(2, accountRotatingHelper.getSuccessfulConnectCount());
+
+ final URI expectedBrokerURI =
+ URI.create(String.format(BROKER_URL,
+ brokerAddress.getHostName(),
+ brokerAddress.getPort(),
+ retriesOption));
+ assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForFailedlAttempt());
+ assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForSuccessfulAttempt());
+ }
+ finally
+ {
+ if (con != null)
+ {
+ con.close();
+ }
+ }
+ }
+
+ private void restartBrokerAndWaitForFailover(final BrokerAdmin brokerAdmin, final AMQConnection con)
+ throws InterruptedException
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final ConnectionListener connectionListener = new ConnectionListener()
+ {
+ @Override
+ public void bytesSent(final long count)
+ {
+
+ }
+
+ @Override
+ public void bytesReceived(final long count)
+ {
+
+ }
+
+ @Override
+ public boolean preFailover(final boolean redirect)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ @Override
+ public void failoverComplete()
+ {
+ latch.countDown();
+ }
+ };
+ con.setConnectionListener(connectionListener);
+
+ brokerAdmin.restart();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ }
+
+ private class AccountRotatingHelper implements ConnectAttemptListener
+ {
+
+ private final AtomicBoolean _useValidCredentials = new AtomicBoolean(false);
+ private final AtomicInteger _successCounter = new AtomicInteger();
+ private volatile URI _lastUriForSuccessfulAttempt;
+ private volatile URI _lastUriForFailedlAttempt;
+
+ @Override
+ public boolean connectAttemptFailed(final URI brokerURI, final JMSException e)
+ {
+ boolean reattempt = String.valueOf(ErrorCodes.CONNECTION_FORCED).equals(e.getErrorCode())
+ || String.valueOf(ErrorCodes.NOT_ALLOWED).equals(e.getErrorCode());
+ if (reattempt)
+ {
+ _useValidCredentials.set(true);
+ }
+ _lastUriForFailedlAttempt = brokerURI;
+ return reattempt;
+ }
+
+ @Override
+ public void connectAttemptSucceeded(final URI brokerURI)
+ {
+ _successCounter.incrementAndGet();
+ _lastUriForSuccessfulAttempt = brokerURI;
+ }
+
+ BiFunction<Connection, URI, Object> getPasswordExtension()
+ {
+ return (connection, uri) -> _useValidCredentials.get() ? _password : "invalid";
+ }
+
+ BiFunction<Connection, URI, Object> getUsernameExtension()
+ {
+ return (connection, uri) -> _useValidCredentials.get() ? _userName : "invalid";
+ }
+
+ void setUseValidCredentials(boolean value)
+ {
+ _useValidCredentials.set(value);
+ }
+
+ boolean getUseValidCredentials()
+ {
+ return _useValidCredentials.get();
+ }
+
+ int getSuccessfulConnectCount()
+ {
+ return _successCounter.get();
+ }
+
+ URI getLastUriForSuccessfulAttempt()
+ {
+ return _lastUriForSuccessfulAttempt;
+ }
+
+ URI getLastUriForFailedlAttempt()
+ {
+ return _lastUriForFailedlAttempt;
+ }
+ }
+
}