ARTEMIS-4744 Fully support multple host broker connections URIs

Create a new NettyConnector for each connection attempt that is configured from
distinct broker connection URIs which allows for differing TLS configuration
per remote connection configuration.
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 0223f9a..5b4c1c9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -49,8 +49,11 @@
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.remoting.CertificateUtil;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -65,10 +68,13 @@
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager.ClientProtocolManagerWithAMQP;
 import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
 import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
 import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
 import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@@ -120,10 +126,13 @@
     */
    public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
 
+   private static final NettyConnectorFactory CONNECTOR_FACTORY = new NettyConnectorFactory().setServerConnector(true);
+
+   private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
+   private final ReferenceIDSupplier referenceIdSupplier;
    private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
-   private final ProtonProtocolManager protonProtocolManager;
    private final ActiveMQServer server;
-   private final NettyConnector bridgesConnector;
+   private final List<TransportConfiguration> configurations;
    private NettyConnection connection;
    private Session session;
    private AMQPSessionContext sessionContext;
@@ -134,6 +143,7 @@
    private AMQPFederationSource brokerFederation;
    private int retryCounter = 0;
    private int lastRetryCounter;
+   private int connectionTimeout;
    private boolean connecting = false;
    private volatile ScheduledFuture<?> reconnectFuture;
    private final Set<Queue> senders = new HashSet<>();
@@ -153,16 +163,16 @@
 
    public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
                                AMQPBrokerConnectConfiguration brokerConnectConfiguration,
-                               ProtonProtocolManager protonProtocolManager,
-                               ActiveMQServer server,
-                               NettyConnector bridgesConnector) {
+                               ProtonProtocolManagerFactory protonProtocolManagerFactory,
+                               ActiveMQServer server) throws Exception {
       this.bridgeManager = bridgeManager;
       this.brokerConnectConfiguration = brokerConnectConfiguration;
-      this.protonProtocolManager = protonProtocolManager;
       this.server = server;
-      this.bridgesConnector = bridgesConnector;
-      connectExecutor = server.getExecutorFactory().getExecutor();
-      scheduledExecutorService = server.getScheduledPool();
+      this.configurations = brokerConnectConfiguration.getTransportConfigurations();
+      this.connectExecutor = server.getExecutorFactory().getExecutor();
+      this.scheduledExecutorService = server.getScheduledPool();
+      this.protonProtocolManagerFactory = protonProtocolManagerFactory;
+      this.referenceIdSupplier = new ReferenceIDSupplier(server);
    }
 
    @Override
@@ -190,7 +200,7 @@
    }
 
    public int getConnectionTimeout() {
-      return bridgesConnector.getConnectTimeoutMillis();
+      return connectionTimeout;
    }
 
    @Override
@@ -340,19 +350,32 @@
       try {
          connecting = true;
 
-         List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
+         TransportConfiguration configuration = configurations.get(retryCounter % configurations.size());
+         host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration.getParams());
+         port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration.getParams());
 
-         TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
+         ProtonProtocolManager protonProtocolManager =
+            (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getExtraParams(), null, null);
+         NettyConnector connector = (NettyConnector)CONNECTOR_FACTORY.createConnector(
+            configuration.getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
+         connector.start();
 
-         String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
-         int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
-         this.host = hostOnParameter;
-         this.port = portOnParameter;
-         connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
+         logger.debug("Connecting {}", configuration);
 
-         if (connection == null) {
-            retryConnection();
-            return;
+         connectionTimeout = connector.getConnectTimeoutMillis();
+         try {
+            connection = (NettyConnection) connector.createConnection();
+            if (connection == null) {
+               retryConnection();
+               return;
+            }
+         } finally {
+            if (connection == null) {
+               try {
+                  connector.close();
+               } catch (Exception ex) {
+               }
+            }
          }
 
          lastRetryCounter = retryCounter;
@@ -368,12 +391,15 @@
 
          ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
 
+         NettyConnectorCloseHandler connectorCloseHandler = new NettyConnectorCloseHandler(connector, connectExecutor);
          ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
          server.getRemotingService().addConnectionEntry(connection, entry);
          protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
          protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
+         protonRemotingConnection.addCloseListener(connectorCloseHandler);
+         protonRemotingConnection.addFailureListener(connectorCloseHandler);
 
-         connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
+         connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(connector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
 
          session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
          sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
@@ -531,7 +557,7 @@
          throw new IllegalAccessException("Cannot start replica");
       }
 
-      AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
+      AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(referenceIdSupplier, snfQueue, server, replicaConfig, this);
 
       this.mirrorControllerSource = newPartition;
 
@@ -702,11 +728,11 @@
 
             AtomicBoolean cancelled = new AtomicBoolean(false);
 
-            if (bridgesConnector.getConnectTimeoutMillis() > 0) {
+            if (getConnectionTimeout() > 0) {
                futureTimeout = server.getScheduledPool().schedule(() -> {
                   cancelled.set(true);
                   error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
-               }, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
+               }, getConnectionTimeout(), TimeUnit.MILLISECONDS);
             } else {
                futureTimeout = null;
             }
@@ -1059,4 +1085,39 @@
          return DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED;
       }
    }
+
+   public static class NettyConnectorCloseHandler implements FailureListener, CloseListener {
+
+      private final NettyConnector connector;
+      private final Executor connectionExecutor;
+
+      public NettyConnectorCloseHandler(NettyConnector connector, Executor connectionExecutor) {
+         this.connector = connector;
+         this.connectionExecutor = connectionExecutor;
+      }
+
+      @Override
+      public void connectionClosed() {
+         doCloseConnector();
+      }
+
+      @Override
+      public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+         doCloseConnector();
+      }
+
+      @Override
+      public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+         doCloseConnector();
+      }
+
+      private void doCloseConnector() {
+         connectionExecutor.execute(() -> {
+            try {
+               connector.close();
+            } catch (Exception ex) {
+            }
+         });
+      }
+   }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index b0fde0c..73f77b9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -33,8 +33,6 @@
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
 import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -60,8 +58,6 @@
    private final Map<String, AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
    private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new HashMap<>();
 
-   private ProtonProtocolManager protonProtocolManager;
-
    public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
       this.amqpConnectionsConfig =
          amqpConnectionsConfig.stream()
@@ -71,10 +67,6 @@
       this.protonProtocolManagerFactory = factory;
    }
 
-   public ProtonProtocolManagerFactory getProtocolManagerFactory() {
-      return protonProtocolManagerFactory;
-   }
-
    @Override
    public void start() throws Exception {
       if (!started) {
@@ -94,14 +86,7 @@
    }
 
    private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
-      NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
-      protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getTransportConfigurations().get(0).getExtraParams(), null, null);
-      NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
-      bridgesConnector.start();
-
-      logger.debug("Connecting {}", configuration);
-
-      AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManager, server, bridgesConnector);
+      AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
       amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
       server.registerBrokerConnection(amqpBrokerConnection);
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b7ec781..1b22567 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -49,7 +49,6 @@
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -164,7 +163,7 @@
       return started;
    }
 
-   public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
+   public AMQPMirrorControllerSource(ReferenceIDSupplier referenceIdSupplier, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
                                      AMQPBrokerConnection brokerConnection) {
       super(server);
       assert snfQueue != null;
@@ -175,7 +174,7 @@
          snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
       }
       this.server = server;
-      this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
+      this.idSupplier = referenceIdSupplier;
       this.addQueues = replicaConfig.isQueueCreation();
       this.deleteQueues = replicaConfig.isQueueRemoval();
       this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
index f8b14b6..e8314c4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
@@ -40,6 +40,7 @@
    private static final int BROKER_PORT_NUM = AMQP_PORT + 1;
 
    private static final String SERVER_KEYSTORE_NAME = "server-keystore.jks";
+   private static final String UNKNOWN_SERVER_KEYSTORE_NAME = "unknown-server-keystore.jks";
    private static final String SERVER_KEYSTORE_PASSWORD = "securepass";
    private static final String CLIENT_KEYSTORE_NAME = "client-keystore.jks";
    private static final String CLIENT_KEYSTORE_PASSWORD = "securepass";
@@ -194,7 +195,8 @@
          logger.debug("Connect test started, peer listening on: {}", remoteURI);
 
          String amqpServerConnectionURI = "tcp://localhost:" + remoteURI.getPort() +
-                  "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME + ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
+                  "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+                  ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
          if (requireClientCert) {
             amqpServerConnectionURI +=
                      ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD;
@@ -214,4 +216,189 @@
          peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
       }
    }
+
+   @Test(timeout = 20_000)
+   public void testReconnectConnectsWithVerifyHostOffOnSecondURI() throws Exception {
+      final String keyStorePath = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+
+      ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+      server1Options.setSecure(true);
+      server1Options.setKeyStoreLocation(keyStorePath);
+      server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server1Options.setVerifyHost(false);
+
+      ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+      server2Options.setSecure(true);
+      server2Options.setKeyStoreLocation(keyStorePath);
+      server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server2Options.setVerifyHost(false);
+
+      try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+           ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+         firstPeer.expectConnectionToDrop();
+         firstPeer.start();
+
+         secondPeer.expectSASLHeader().respondWithSASLHeader();
+         secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+         secondPeer.expectSaslInit().withMechanism(PLAIN).withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+         secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+         secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+         secondPeer.expectOpen().respond();
+         secondPeer.expectBegin().respond();
+         secondPeer.start();
+
+         final URI firstPeerURI = firstPeer.getServerURI();
+         logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+         final URI secondPeerURI = secondPeer.getServerURI();
+         logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+         // First connection fails because we use a server certificate with whose common name
+         // doesn't match the host, second connection should work as we disable host verification
+         String amqpServerConnectionURI =
+            "tcp://localhost:" + firstPeerURI.getPort() + "?verifyHost=true" +
+               ";sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+               ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
+            "#tcp://localhost:" + secondPeerURI.getPort() + "?verifyHost=false";
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+         amqpConnection.setReconnectAttempts(20); // Allow reconnects
+         amqpConnection.setRetryInterval(100); // Allow reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+
+         server.start();
+
+         firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test(timeout = 20_000)
+   public void testReconnectionUsesConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
+      final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+      final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
+
+      ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+      server1Options.setSecure(true);
+      server1Options.setKeyStoreLocation(keyStore1Path);
+      server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server1Options.setVerifyHost(false);
+
+      ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+      server2Options.setSecure(true);
+      server2Options.setKeyStoreLocation(keyStore2Path);
+      server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server2Options.setVerifyHost(false);
+
+      try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+           ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+         firstPeer.expectConnectionToDrop();
+         firstPeer.start();
+
+         secondPeer.expectSASLHeader().respondWithSASLHeader();
+         secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+         secondPeer.expectSaslInit().withMechanism(PLAIN)
+                                    .withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+         secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+         secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+         secondPeer.expectOpen().respond();
+         secondPeer.expectBegin().respond();
+         secondPeer.start();
+
+         final URI firstPeerURI = firstPeer.getServerURI();
+         logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+         final URI secondPeerURI = secondPeer.getServerURI();
+         logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+         String amqpServerConnectionURI =
+            "tcp://127.0.0.1:" + firstPeerURI.getPort() + "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+                                                          ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
+            "#tcp://localhost:" + secondPeerURI.getPort();
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+         amqpConnection.setReconnectAttempts(20); // Allow reconnects
+         amqpConnection.setRetryInterval(100); // Allow reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+
+         server.start();
+
+         firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test(timeout = 20_000)
+   public void testReconnectionUsesHostSpecificConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
+      final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+      final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
+
+      ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+      server1Options.setSecure(true);
+      server1Options.setKeyStoreLocation(keyStore1Path);
+      server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server1Options.setVerifyHost(false);
+
+      ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+      server2Options.setSecure(true);
+      server2Options.setKeyStoreLocation(keyStore2Path);
+      server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+      server2Options.setVerifyHost(false);
+
+      try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+           ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+         firstPeer.expectConnectionToDrop();
+         firstPeer.start();
+
+         secondPeer.expectSASLHeader().respondWithSASLHeader();
+         secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+         secondPeer.expectSaslInit().withMechanism(PLAIN)
+                                    .withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+         secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+         secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+         secondPeer.expectOpen().respond();
+         secondPeer.expectBegin().respond();
+         secondPeer.start();
+
+         final URI firstPeerURI = firstPeer.getServerURI();
+         logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+         final URI secondPeerURI = secondPeer.getServerURI();
+         logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+         // First connection fails because we use the wrong trust store for the TLS handshake
+         String amqpServerConnectionURI =
+            "tcp://localhost:" + firstPeerURI.getPort() +
+               "?sslEnabled=true;trustStorePath=" + CLIENT_TRUSTSTORE_NAME +
+               ";trustStorePassword=" + CLIENT_TRUSTSTORE_PASSWORD +
+            "#tcp://localhost:" + secondPeerURI.getPort() +
+               "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+               ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
+
+         AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+         amqpConnection.setReconnectAttempts(20); // Allow reconnects
+         amqpConnection.setRetryInterval(100); // Allow reconnects
+         amqpConnection.setUser(USER);
+         amqpConnection.setPassword(PASSWD);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+
+         server.start();
+
+         firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
 }