QPIDJMS-492 Add a connection extension for injecting properties for Open
Allows client code to inject connection properties values into the Open
performative.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
index 5e1b03d..7a4a150 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
@@ -123,7 +123,28 @@
* </li>
* </ul>
*/
- PROXY_HANDLER_SUPPLIER("proxyHandlerSupplier");
+ PROXY_HANDLER_SUPPLIER("proxyHandlerSupplier"),
+
+ /**
+ * Allows a user to inject custom properties into the AMQP Open performative that is sent
+ * after a successful remote connection has been made. The properties are injected by adding
+ * {@link String} keys and {@link Object} values into a {@link Map} instance and returning it.
+ * The value entries in the provided {@link Map} must be valid AMQP primitive types that can
+ * be encoded to form a valid AMQP Open performative or an error will be thrown and the connection
+ * attempt will fail. If a user supplied property collides with an internal client specific
+ * property the client version is always given precedence.
+ * <p>
+ * This method will be invoked on the initial connect and on each successive reconnect if a connection
+ * failures occurs and the client is configured to provide automatic reconnect support.
+ * <p>
+ * The extension function takes the form of a BiFunction defined as the following:
+ * <ul>
+ * <li>
+ * {@link BiFunction}<{@link Connection}, {@link URI}, {@link Map}>
+ * </li>
+ * </ul>
+ */
+ AMQP_OPEN_PROPERTIES("amqpOpenProperties");
private final String extensionKey;
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index fb8d625..188111f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -29,6 +29,7 @@
import javax.jms.Session;
+import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.provider.AsyncResult;
@@ -104,6 +105,7 @@
};
}
+ @SuppressWarnings("unchecked")
@Override
protected Connection createEndpoint(JmsConnectionInfo resourceInfo) {
String hostname = getParent().getVhost();
@@ -113,7 +115,17 @@
hostname = null;
}
- Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
+ final Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
+
+ if (resourceInfo.getExtensionMap().containsKey(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES)) {
+ final Map<String, Object> userConnectionProperties = (Map<String, Object>) resourceInfo.getExtensionMap().get(
+ JmsConnectionExtensions.AMQP_OPEN_PROPERTIES).apply(resourceInfo.getConnection(), parent.getTransport().getRemoteLocation());
+ if (userConnectionProperties != null && !userConnectionProperties.isEmpty()) {
+ userConnectionProperties.forEach((key, value) -> props.put(Symbol.valueOf(key), value));
+ }
+ }
+
+ // Client properties override anything the user added.
props.put(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME);
props.put(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION);
props.put(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 4d1a9b2..de001a0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +63,7 @@
import javax.jms.Session;
import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionRemotelyClosedException;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
@@ -907,6 +909,155 @@
}
}
+ @Test(timeout = 20000)
+ public void testConnectionPropertiesExtensionAddedValues() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final String property1 = "property1";
+ final String property2 = "property2";
+
+ final String value1 = UUID.randomUUID().toString();
+ final String value2 = UUID.randomUUID().toString();
+
+ Matcher<?> connPropsMatcher = allOf(
+ hasEntry(Symbol.valueOf(property1), value1),
+ hasEntry(Symbol.valueOf(property2), value2),
+ hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
+ hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
+ hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));
+
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen(connPropsMatcher, null, false);
+ testPeer.expectBegin();
+
+ final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+ factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+ Map<String, Object> properties = new HashMap<>();
+
+ properties.put(property1, value1);
+ properties.put(property2, value2);
+
+ return properties;
+ });
+
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertNull(testPeer.getThrowable());
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionPropertiesExtensionAddedValuesOfNonString() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final String property1 = "property1";
+ final String property2 = "property2";
+
+ final UUID value1 = UUID.randomUUID();
+ final UUID value2 = UUID.randomUUID();
+
+ Matcher<?> connPropsMatcher = allOf(
+ hasEntry(Symbol.valueOf(property1), value1),
+ hasEntry(Symbol.valueOf(property2), value2));
+
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen(connPropsMatcher, null, false);
+ testPeer.expectBegin();
+
+ final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+ factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+ Map<String, Object> properties = new HashMap<>();
+
+ properties.put(property1, value1);
+ properties.put(property2, value2);
+
+ return properties;
+ });
+
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionPropertiesExtensionProtectsClientProperties() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ Matcher<?> connPropsMatcher = allOf(
+ hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
+ hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
+ hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));
+
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen(connPropsMatcher, null, false);
+ testPeer.expectBegin();
+
+ final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+ factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+ Map<String, Object> properties = new HashMap<>();
+
+ properties.put(AmqpSupport.PRODUCT.toString(), "Super-Duper-Qpid-JMS");
+ properties.put(AmqpSupport.VERSION.toString(), "5.0.32.Final");
+ properties.put(AmqpSupport.PLATFORM.toString(), "Commodore 64");
+
+ return properties;
+ });
+
+ Connection connection = factory.createConnection();
+ connection.start();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionFailsWhenUserSuppliesIllegalProperties() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ testPeer.expectSaslAnonymous();
+
+ final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+ factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+ Map<String, Object> properties = new HashMap<>();
+
+ properties.put("not-amqp-encodable", factory);
+
+ return properties;
+ });
+
+ Connection connection = factory.createConnection();
+
+ try {
+ connection.start();
+ fail("Should not be able to connect when illegal types are in the properties");
+ } catch (JMSException ex) {
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
@Ignore("Disabled due to requirement of hard coded port")
@Test(timeout = 20000)
public void testLocalPortOption() throws Exception {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 9352fee..899e99e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -18,11 +18,14 @@
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
@@ -36,6 +39,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +68,7 @@
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
@@ -159,7 +164,7 @@
private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception {
String optionString;
- if(clientID) {
+ if (clientID) {
optionString = "?jms.clientID=myClientID";
} else {
optionString = "?jms.awaitClientID=false";
@@ -3964,6 +3969,97 @@
}
}
+ @Test(timeout = 20000)
+ public void testConnectionPropertiesExtensionAppliedOnEachReconnect() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ final String property1 = "property1";
+ final String property2 = "property2";
+
+ final UUID value1 = UUID.randomUUID();
+ final UUID value2 = UUID.randomUUID();
+
+ Matcher<?> connPropsMatcher1 = allOf(
+ hasEntry(Symbol.valueOf(property1), value1),
+ not(hasEntry(Symbol.valueOf(property2), value2)));
+
+ Matcher<?> connPropsMatcher2 = allOf(
+ not(hasEntry(Symbol.valueOf(property1), value1)),
+ hasEntry(Symbol.valueOf(property2), value2));
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen(connPropsMatcher1, null, false);
+ originalPeer.expectBegin();
+ originalPeer.dropAfterLastHandler(10);
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen(connPropsMatcher2, null, false);
+ finalPeer.expectBegin();
+
+ final URI remoteURI = new URI("failover:(" + originalURI + "," + finalURI + ")");
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+ factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+ Map<String, Object> properties = new HashMap<>();
+
+ if (originalConnected.getCount() == 1) {
+ properties.put(property1, value1);
+ } else {
+ properties.put(property2, value2);
+ }
+
+ return properties;
+ });
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalURI.equals(remoteURI.toString())) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalURI.equals(remoteURI.toString())) {
+ finalConnected.countDown();
+ }
+ }
+ });
+
+ try {
+ connection.start();
+ } catch (Exception ex) {
+ fail("Should not have thrown an Exception: " + ex);
+ }
+
+ finalPeer.waitForAllHandlersToComplete(2000);
+
+ assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
+ assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+ finalPeer.expectClose();
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000); }
+ }
+
private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
return establishAnonymousConnecton(null, null, peers);
}