ARTEMIS-4744 Fully support multple host broker connections URIs
Create a new NettyConnector for each connection attempt that is configured from
distinct broker connection URIs which allows for differing TLS configuration
per remote connection configuration.
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 0223f9a..5b4c1c9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -49,8 +49,11 @@
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -65,10 +68,13 @@
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager.ClientProtocolManagerWithAMQP;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@@ -120,10 +126,13 @@
*/
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
+ private static final NettyConnectorFactory CONNECTOR_FACTORY = new NettyConnectorFactory().setServerConnector(true);
+
+ private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
+ private final ReferenceIDSupplier referenceIdSupplier;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
- private final ProtonProtocolManager protonProtocolManager;
private final ActiveMQServer server;
- private final NettyConnector bridgesConnector;
+ private final List<TransportConfiguration> configurations;
private NettyConnection connection;
private Session session;
private AMQPSessionContext sessionContext;
@@ -134,6 +143,7 @@
private AMQPFederationSource brokerFederation;
private int retryCounter = 0;
private int lastRetryCounter;
+ private int connectionTimeout;
private boolean connecting = false;
private volatile ScheduledFuture<?> reconnectFuture;
private final Set<Queue> senders = new HashSet<>();
@@ -153,16 +163,16 @@
public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
- ProtonProtocolManager protonProtocolManager,
- ActiveMQServer server,
- NettyConnector bridgesConnector) {
+ ProtonProtocolManagerFactory protonProtocolManagerFactory,
+ ActiveMQServer server) throws Exception {
this.bridgeManager = bridgeManager;
this.brokerConnectConfiguration = brokerConnectConfiguration;
- this.protonProtocolManager = protonProtocolManager;
this.server = server;
- this.bridgesConnector = bridgesConnector;
- connectExecutor = server.getExecutorFactory().getExecutor();
- scheduledExecutorService = server.getScheduledPool();
+ this.configurations = brokerConnectConfiguration.getTransportConfigurations();
+ this.connectExecutor = server.getExecutorFactory().getExecutor();
+ this.scheduledExecutorService = server.getScheduledPool();
+ this.protonProtocolManagerFactory = protonProtocolManagerFactory;
+ this.referenceIdSupplier = new ReferenceIDSupplier(server);
}
@Override
@@ -190,7 +200,7 @@
}
public int getConnectionTimeout() {
- return bridgesConnector.getConnectTimeoutMillis();
+ return connectionTimeout;
}
@Override
@@ -340,19 +350,32 @@
try {
connecting = true;
- List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
+ TransportConfiguration configuration = configurations.get(retryCounter % configurations.size());
+ host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration.getParams());
+ port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration.getParams());
- TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
+ ProtonProtocolManager protonProtocolManager =
+ (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getExtraParams(), null, null);
+ NettyConnector connector = (NettyConnector)CONNECTOR_FACTORY.createConnector(
+ configuration.getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
+ connector.start();
- String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
- int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
- this.host = hostOnParameter;
- this.port = portOnParameter;
- connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
+ logger.debug("Connecting {}", configuration);
- if (connection == null) {
- retryConnection();
- return;
+ connectionTimeout = connector.getConnectTimeoutMillis();
+ try {
+ connection = (NettyConnection) connector.createConnection();
+ if (connection == null) {
+ retryConnection();
+ return;
+ }
+ } finally {
+ if (connection == null) {
+ try {
+ connector.close();
+ } catch (Exception ex) {
+ }
+ }
}
lastRetryCounter = retryCounter;
@@ -368,12 +391,15 @@
ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
+ NettyConnectorCloseHandler connectorCloseHandler = new NettyConnectorCloseHandler(connector, connectExecutor);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
+ protonRemotingConnection.addCloseListener(connectorCloseHandler);
+ protonRemotingConnection.addFailureListener(connectorCloseHandler);
- connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
+ connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(connector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
@@ -531,7 +557,7 @@
throw new IllegalAccessException("Cannot start replica");
}
- AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
+ AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(referenceIdSupplier, snfQueue, server, replicaConfig, this);
this.mirrorControllerSource = newPartition;
@@ -702,11 +728,11 @@
AtomicBoolean cancelled = new AtomicBoolean(false);
- if (bridgesConnector.getConnectTimeoutMillis() > 0) {
+ if (getConnectionTimeout() > 0) {
futureTimeout = server.getScheduledPool().schedule(() -> {
cancelled.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
- }, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
+ }, getConnectionTimeout(), TimeUnit.MILLISECONDS);
} else {
futureTimeout = null;
}
@@ -1059,4 +1085,39 @@
return DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED;
}
}
+
+ public static class NettyConnectorCloseHandler implements FailureListener, CloseListener {
+
+ private final NettyConnector connector;
+ private final Executor connectionExecutor;
+
+ public NettyConnectorCloseHandler(NettyConnector connector, Executor connectionExecutor) {
+ this.connector = connector;
+ this.connectionExecutor = connectionExecutor;
+ }
+
+ @Override
+ public void connectionClosed() {
+ doCloseConnector();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver) {
+ doCloseConnector();
+ }
+
+ @Override
+ public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
+ doCloseConnector();
+ }
+
+ private void doCloseConnector() {
+ connectionExecutor.execute(() -> {
+ try {
+ connector.close();
+ } catch (Exception ex) {
+ }
+ });
+ }
+ }
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index b0fde0c..73f77b9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -33,8 +33,6 @@
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@@ -60,8 +58,6 @@
private final Map<String, AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new HashMap<>();
- private ProtonProtocolManager protonProtocolManager;
-
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
this.amqpConnectionsConfig =
amqpConnectionsConfig.stream()
@@ -71,10 +67,6 @@
this.protonProtocolManagerFactory = factory;
}
- public ProtonProtocolManagerFactory getProtocolManagerFactory() {
- return protonProtocolManagerFactory;
- }
-
@Override
public void start() throws Exception {
if (!started) {
@@ -94,14 +86,7 @@
}
private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
- NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
- protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getTransportConfigurations().get(0).getExtraParams(), null, null);
- NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
- bridgesConnector.start();
-
- logger.debug("Connecting {}", configuration);
-
- AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManager, server, bridgesConnector);
+ AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index b7ec781..1b22567 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -49,7 +49,6 @@
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@@ -164,7 +163,7 @@
return started;
}
- public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
+ public AMQPMirrorControllerSource(ReferenceIDSupplier referenceIdSupplier, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
assert snfQueue != null;
@@ -175,7 +174,7 @@
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
- this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
+ this.idSupplier = referenceIdSupplier;
this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
index f8b14b6..e8314c4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java
@@ -40,6 +40,7 @@
private static final int BROKER_PORT_NUM = AMQP_PORT + 1;
private static final String SERVER_KEYSTORE_NAME = "server-keystore.jks";
+ private static final String UNKNOWN_SERVER_KEYSTORE_NAME = "unknown-server-keystore.jks";
private static final String SERVER_KEYSTORE_PASSWORD = "securepass";
private static final String CLIENT_KEYSTORE_NAME = "client-keystore.jks";
private static final String CLIENT_KEYSTORE_PASSWORD = "securepass";
@@ -194,7 +195,8 @@
logger.debug("Connect test started, peer listening on: {}", remoteURI);
String amqpServerConnectionURI = "tcp://localhost:" + remoteURI.getPort() +
- "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME + ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
+ "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+ ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
if (requireClientCert) {
amqpServerConnectionURI +=
";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD;
@@ -214,4 +216,189 @@
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test(timeout = 20_000)
+ public void testReconnectConnectsWithVerifyHostOffOnSecondURI() throws Exception {
+ final String keyStorePath = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+
+ ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+ server1Options.setSecure(true);
+ server1Options.setKeyStoreLocation(keyStorePath);
+ server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server1Options.setVerifyHost(false);
+
+ ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+ server2Options.setSecure(true);
+ server2Options.setKeyStoreLocation(keyStorePath);
+ server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server2Options.setVerifyHost(false);
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+ ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+ firstPeer.expectConnectionToDrop();
+ firstPeer.start();
+
+ secondPeer.expectSASLHeader().respondWithSASLHeader();
+ secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+ secondPeer.expectSaslInit().withMechanism(PLAIN).withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+ secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+ secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+ secondPeer.expectOpen().respond();
+ secondPeer.expectBegin().respond();
+ secondPeer.start();
+
+ final URI firstPeerURI = firstPeer.getServerURI();
+ logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+ final URI secondPeerURI = secondPeer.getServerURI();
+ logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+ // First connection fails because we use a server certificate with whose common name
+ // doesn't match the host, second connection should work as we disable host verification
+ String amqpServerConnectionURI =
+ "tcp://localhost:" + firstPeerURI.getPort() + "?verifyHost=true" +
+ ";sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+ ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
+ "#tcp://localhost:" + secondPeerURI.getPort() + "?verifyHost=false";
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+ amqpConnection.setReconnectAttempts(20); // Allow reconnects
+ amqpConnection.setRetryInterval(100); // Allow reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server.start();
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test(timeout = 20_000)
+ public void testReconnectionUsesConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
+ final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+ final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
+
+ ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+ server1Options.setSecure(true);
+ server1Options.setKeyStoreLocation(keyStore1Path);
+ server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server1Options.setVerifyHost(false);
+
+ ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+ server2Options.setSecure(true);
+ server2Options.setKeyStoreLocation(keyStore2Path);
+ server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server2Options.setVerifyHost(false);
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+ ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+ firstPeer.expectConnectionToDrop();
+ firstPeer.start();
+
+ secondPeer.expectSASLHeader().respondWithSASLHeader();
+ secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+ secondPeer.expectSaslInit().withMechanism(PLAIN)
+ .withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+ secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+ secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+ secondPeer.expectOpen().respond();
+ secondPeer.expectBegin().respond();
+ secondPeer.start();
+
+ final URI firstPeerURI = firstPeer.getServerURI();
+ logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+ final URI secondPeerURI = secondPeer.getServerURI();
+ logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+ String amqpServerConnectionURI =
+ "tcp://127.0.0.1:" + firstPeerURI.getPort() + "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+ ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
+ "#tcp://localhost:" + secondPeerURI.getPort();
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+ amqpConnection.setReconnectAttempts(20); // Allow reconnects
+ amqpConnection.setRetryInterval(100); // Allow reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server.start();
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test(timeout = 20_000)
+ public void testReconnectionUsesHostSpecificConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
+ final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
+ final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
+
+ ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
+ server1Options.setSecure(true);
+ server1Options.setKeyStoreLocation(keyStore1Path);
+ server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server1Options.setVerifyHost(false);
+
+ ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
+ server2Options.setSecure(true);
+ server2Options.setKeyStoreLocation(keyStore2Path);
+ server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
+ server2Options.setVerifyHost(false);
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
+ ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
+
+ firstPeer.expectConnectionToDrop();
+ firstPeer.start();
+
+ secondPeer.expectSASLHeader().respondWithSASLHeader();
+ secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
+ secondPeer.expectSaslInit().withMechanism(PLAIN)
+ .withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
+ secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
+ secondPeer.expectAMQPHeader().respondWithAMQPHeader();
+ secondPeer.expectOpen().respond();
+ secondPeer.expectBegin().respond();
+ secondPeer.start();
+
+ final URI firstPeerURI = firstPeer.getServerURI();
+ logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
+
+ final URI secondPeerURI = secondPeer.getServerURI();
+ logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
+
+ // First connection fails because we use the wrong trust store for the TLS handshake
+ String amqpServerConnectionURI =
+ "tcp://localhost:" + firstPeerURI.getPort() +
+ "?sslEnabled=true;trustStorePath=" + CLIENT_TRUSTSTORE_NAME +
+ ";trustStorePassword=" + CLIENT_TRUSTSTORE_PASSWORD +
+ "#tcp://localhost:" + secondPeerURI.getPort() +
+ "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
+ ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
+ amqpConnection.setReconnectAttempts(20); // Allow reconnects
+ amqpConnection.setRetryInterval(100); // Allow reconnects
+ amqpConnection.setUser(USER);
+ amqpConnection.setPassword(PASSWD);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ server.start();
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}