QPIDJMS-492 Add a connection extension for injecting properties for Open

Allows client code to inject connection properties values into the Open
performative.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
index 5e1b03d..7a4a150 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionExtensions.java
@@ -123,7 +123,28 @@
      *   </li>
      * </ul>
      */
-    PROXY_HANDLER_SUPPLIER("proxyHandlerSupplier");
+    PROXY_HANDLER_SUPPLIER("proxyHandlerSupplier"),
+
+    /**
+     * Allows a user to inject custom properties into the AMQP Open performative that is sent
+     * after a successful remote connection has been made.  The properties are injected by adding
+     * {@link String} keys and {@link Object} values into a {@link Map} instance and returning it.
+     * The value entries in the provided {@link Map} must be valid AMQP primitive types that can
+     * be encoded to form a valid AMQP Open performative or an error will be thrown and the connection
+     * attempt will fail.  If a user supplied property collides with an internal client specific
+     * property the client  version is always given precedence.
+     * <p>
+     * This method will be invoked on the initial connect and on each successive reconnect if a connection
+     * failures occurs and the client is configured to provide automatic reconnect support.
+     * <p>
+     * The extension function takes the form of a BiFunction defined as the following:
+     * <ul>
+     *   <li>
+     *     {@link BiFunction}&lt;{@link Connection}, {@link URI}, {@link Map}&gt;
+     *   </li>
+     * </ul>
+     */
+    AMQP_OPEN_PROPERTIES("amqpOpenProperties");
 
     private final String extensionKey;
 
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index fb8d625..188111f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -29,6 +29,7 @@
 
 import javax.jms.Session;
 
+import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
@@ -104,6 +105,7 @@
         };
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     protected Connection createEndpoint(JmsConnectionInfo resourceInfo) {
         String hostname = getParent().getVhost();
@@ -113,7 +115,17 @@
             hostname = null;
         }
 
-        Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
+        final Map<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
+
+        if (resourceInfo.getExtensionMap().containsKey(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES)) {
+            final Map<String, Object> userConnectionProperties = (Map<String, Object>) resourceInfo.getExtensionMap().get(
+                JmsConnectionExtensions.AMQP_OPEN_PROPERTIES).apply(resourceInfo.getConnection(), parent.getTransport().getRemoteLocation());
+            if (userConnectionProperties != null && !userConnectionProperties.isEmpty()) {
+                userConnectionProperties.forEach((key, value) -> props.put(Symbol.valueOf(key), value));
+            }
+        }
+
+        // Client properties override anything the user added.
         props.put(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME);
         props.put(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION);
         props.put(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 4d1a9b2..de001a0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -43,6 +43,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +63,7 @@
 import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsConnectionRemotelyClosedException;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
@@ -907,6 +909,155 @@
         }
     }
 
+    @Test(timeout = 20000)
+    public void testConnectionPropertiesExtensionAddedValues() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final String property1 = "property1";
+            final String property2 = "property2";
+
+            final String value1 = UUID.randomUUID().toString();
+            final String value2 = UUID.randomUUID().toString();
+
+            Matcher<?> connPropsMatcher = allOf(
+                    hasEntry(Symbol.valueOf(property1), value1),
+                    hasEntry(Symbol.valueOf(property2), value2),
+                    hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
+                    hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
+                    hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(connPropsMatcher, null, false);
+            testPeer.expectBegin();
+
+            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+                Map<String, Object> properties = new HashMap<>();
+
+                properties.put(property1, value1);
+                properties.put(property2, value2);
+
+                return properties;
+            });
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertNull(testPeer.getThrowable());
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectionPropertiesExtensionAddedValuesOfNonString() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final String property1 = "property1";
+            final String property2 = "property2";
+
+            final UUID value1 = UUID.randomUUID();
+            final UUID value2 = UUID.randomUUID();
+
+            Matcher<?> connPropsMatcher = allOf(
+                    hasEntry(Symbol.valueOf(property1), value1),
+                    hasEntry(Symbol.valueOf(property2), value2));
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(connPropsMatcher, null, false);
+            testPeer.expectBegin();
+
+            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+                Map<String, Object> properties = new HashMap<>();
+
+                properties.put(property1, value1);
+                properties.put(property2, value2);
+
+                return properties;
+            });
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectionPropertiesExtensionProtectsClientProperties() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            Matcher<?> connPropsMatcher = allOf(
+                    hasEntry(AmqpSupport.PRODUCT, MetaDataSupport.PROVIDER_NAME),
+                    hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
+                    hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));
+
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(connPropsMatcher, null, false);
+            testPeer.expectBegin();
+
+            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+                Map<String, Object> properties = new HashMap<>();
+
+                properties.put(AmqpSupport.PRODUCT.toString(), "Super-Duper-Qpid-JMS");
+                properties.put(AmqpSupport.VERSION.toString(), "5.0.32.Final");
+                properties.put(AmqpSupport.PLATFORM.toString(), "Commodore 64");
+
+                return properties;
+            });
+
+            Connection connection = factory.createConnection();
+            connection.start();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectionFailsWhenUserSuppliesIllegalProperties() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            testPeer.expectSaslAnonymous();
+
+            final URI remoteURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+
+            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+                Map<String, Object> properties = new HashMap<>();
+
+                properties.put("not-amqp-encodable", factory);
+
+                return properties;
+            });
+
+            Connection connection = factory.createConnection();
+
+            try {
+                connection.start();
+                fail("Should not be able to connect when illegal types are in the properties");
+            } catch (JMSException ex) {
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     @Ignore("Disabled due to requirement of hard coded port")
     @Test(timeout = 20000)
     public void testLocalPortOption() throws Exception {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 9352fee..899e99e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -18,11 +18,14 @@
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -36,6 +39,7 @@
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +68,7 @@
 import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
@@ -159,7 +164,7 @@
 
     private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception {
         String optionString;
-        if(clientID) {
+        if (clientID) {
             optionString = "?jms.clientID=myClientID";
         } else {
             optionString = "?jms.awaitClientID=false";
@@ -3964,6 +3969,97 @@
         }
     }
 
+    @Test(timeout = 20000)
+    public void testConnectionPropertiesExtensionAppliedOnEachReconnect() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final String property1 = "property1";
+            final String property2 = "property2";
+
+            final UUID value1 = UUID.randomUUID();
+            final UUID value2 = UUID.randomUUID();
+
+            Matcher<?> connPropsMatcher1 = allOf(
+                    hasEntry(Symbol.valueOf(property1), value1),
+                    not(hasEntry(Symbol.valueOf(property2), value2)));
+
+            Matcher<?> connPropsMatcher2 = allOf(
+                    not(hasEntry(Symbol.valueOf(property1), value1)),
+                    hasEntry(Symbol.valueOf(property2), value2));
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen(connPropsMatcher1, null, false);
+            originalPeer.expectBegin();
+            originalPeer.dropAfterLastHandler(10);
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen(connPropsMatcher2, null, false);
+            finalPeer.expectBegin();
+
+            final URI remoteURI = new URI("failover:(" + originalURI + "," + finalURI + ")");
+
+            JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+
+            factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> {
+                Map<String, Object> properties = new HashMap<>();
+
+                if (originalConnected.getCount() == 1) {
+                    properties.put(property1, value1);
+                } else {
+                    properties.put(property2, value2);
+                }
+
+                return properties;
+            });
+
+            JmsConnection connection = (JmsConnection) factory.createConnection();
+
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            finalPeer.waitForAllHandlersToComplete(2000);
+
+            assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            finalPeer.expectClose();
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }