blob: 01ecbf845680f6702d83bdf0804693b87d79c5e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.failover;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.net.ssl.SSLContext;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.URISupport;
import org.apache.qpid.proton.amqp.Symbol;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(FailoverWithAmqpOpenProvidedServerListIntegrationTest.class);
private static final String BROKER_PKCS12_KEYSTORE = "src/test/resources/broker-pkcs12.keystore";
private static final String BROKER_PKCS12_TRUSTSTORE = "src/test/resources/broker-pkcs12.truststore";
private static final String PASSWORD = "password";
private static final String CLIENT_JKS_KEYSTORE = "src/test/resources/client-jks.keystore";
private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
private static final String JAVAX_NET_SSL_KEY_STORE = "javax.net.ssl.keyStore";
private static final String JAVAX_NET_SSL_KEY_STORE_PASSWORD = "javax.net.ssl.keyStorePassword";
private static final String JAVAX_NET_SSL_TRUST_STORE = "javax.net.ssl.trustStore";
private static final String JAVAX_NET_SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword";
private static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
private static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
private static final Symbol HOSTNAME = Symbol.valueOf("hostname");
private static final Symbol PORT = Symbol.valueOf("port");
/*
* Verify that when the Open frame contains a failover server list, and the client is configured to
* replace the servers in its existing URI pool, it does so, leaving the server successfully connected
* to plus the announced failover servers.
*/
@Test(timeout = 20000)
public void testFailoverHandlesServerProvidedFailoverListReplace() throws Exception {
doFailoverHandlesServerProvidedFailoverListTestImpl(true);
}
/*
* Verify that when the Open frame contains a failover server list, and the client is configured to
* add the servers to its existing URI pool, it does so.
*/
@Test(timeout = 20000)
public void testFailoverHandlesServerProvidedFailoverListAdd() throws Exception {
doFailoverHandlesServerProvidedFailoverListTestImpl(false);
}
private void doFailoverHandlesServerProvidedFailoverListTestImpl(boolean replace) throws Exception {
try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
TestAmqpPeer backupPeer1 = new TestAmqpPeer();
TestAmqpPeer backupPeer2 = new TestAmqpPeer();) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeer1URI = createPeerURI(backupPeer1);
final URI backupPeer2URI = createPeerURI(backupPeer2);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup1 is at: {}", backupPeer1URI);
LOG.info("Backup2 is at: {}", backupPeer2URI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup1 = new CountDownLatch(1);
final CountDownLatch connectedToBackup2 = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
String failoverParams = null;
if (replace) {
failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListAction=REPLACE";
} else {
failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListAction=ADD";
}
// We only give it the primary/dropping peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeer1URI, remoteURI)) {
connectedToBackup1.countDown();
} else if (isExpectedHost(backupPeer2URI, remoteURI)) {
connectedToBackup2.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(primaryPeerURI);
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing the backup1 advertised
Map<Symbol,Object> backupPeer1Details = new HashMap<>();
backupPeer1Details.put(NETWORK_HOST, "localhost");
backupPeer1Details.put(PORT, backupPeer1.getServerPort());
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeer1Details);
Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(server1ConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial peer and the backup1
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(primaryPeerURI);
afterOpenFailoverURIs.add(backupPeer1URI);
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Set the backup1 to expect a connection, have the failover list containing the backup2 advertised
Map<Symbol,Object> backupPeer2Details = new HashMap<>();
backupPeer2Details.put(NETWORK_HOST, "localhost");
backupPeer2Details.put(PORT, backupPeer2.getServerPort());
List<Map<Symbol, Object>> backup1FailoverServerList = new ArrayList<Map<Symbol, Object>>();
backup1FailoverServerList.add(backupPeer2Details);
Map<Symbol,Object> backup1serverConnectionProperties = new HashMap<Symbol, Object>();
backup1serverConnectionProperties.put(FAILOVER_SERVER_LIST, backup1FailoverServerList);
backupPeer1.expectSaslAnonymous();
backupPeer1.expectOpen(backup1serverConnectionProperties);
backupPeer1.expectBegin();
// Kill the primary peer
primaryPeer.close();
assertTrue("Should connect to backup1 peer", connectedToBackup1.await(5, TimeUnit.SECONDS));
assertEquals("Should not yet connect to backup2 peer", 1, connectedToBackup2.getCount());
// Verify the failover URIs are as expected
List<URI> afterFirstReconnectFailoverURIs = new ArrayList<>();
if (replace) {
// Now containing backup1 and backup2 peers
afterFirstReconnectFailoverURIs.add(backupPeer1URI);
afterFirstReconnectFailoverURIs.add(backupPeer2URI);
} else {
// Now containing primary, backup1, and backup2 peers
afterFirstReconnectFailoverURIs.add(primaryPeerURI);
afterFirstReconnectFailoverURIs.add(backupPeer1URI);
afterFirstReconnectFailoverURIs.add(backupPeer2URI);
}
assertFailoverURIList(connection, afterFirstReconnectFailoverURIs);
// Set the backup2 to expect a connection
backupPeer2.expectSaslAnonymous();
backupPeer2.expectOpen();
backupPeer2.expectBegin();
// Kill the backup1 peer
backupPeer1.close();
assertTrue("Should connect to backup2 peer", connectedToBackup2.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected
List<URI> afterSecondReconnectFailoverURIs = new ArrayList<>();
if (replace) {
// Still containing backup1 and backup2 peers
afterSecondReconnectFailoverURIs.add(backupPeer1URI);
afterSecondReconnectFailoverURIs.add(backupPeer2URI);
} else {
// Still containing primary, backup1, and backup2 peers
afterSecondReconnectFailoverURIs.add(primaryPeerURI);
afterSecondReconnectFailoverURIs.add(backupPeer1URI);
afterSecondReconnectFailoverURIs.add(backupPeer2URI);
}
assertFailoverURIList(connection, afterSecondReconnectFailoverURIs);
backupPeer2.expectClose();
connection.close();
backupPeer2.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list, and the client is configured to ignore it,
* no change occurs in the failover URIs in use by the client after connecting.
*/
@Test(timeout = 20000)
public void testFailoverHandlesServerProvidedFailoverListIgnore() throws Exception {
try (TestAmqpPeer primaryPeer = new TestAmqpPeer();) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
LOG.info("Peer is at: {}", primaryPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
String failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListAction=IGNORE";
// We only give it the primary peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> primaryPeerOnlyFailoverURIs = new ArrayList<>();
primaryPeerOnlyFailoverURIs.add(primaryPeerURI);
assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing another server
Map<Symbol,Object> otherPeerDetails = new HashMap<>();
otherPeerDetails.put(NETWORK_HOST, "testhost");
otherPeerDetails.put(PORT, "4567");
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(otherPeerDetails);
Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the existing failover URIs are as expected, still the initial peer only
assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
primaryPeer.expectClose();
connection.close();
primaryPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list, and it specifies an AMQP hostname in
* a particular servers details, the hostname is used when failover occurs.
*/
@Test(timeout = 20000)
public void testFailoverHandlesServerProvidedFailoverListWithHostname() throws Exception {
try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
TestAmqpPeer backupPeer = new TestAmqpPeer();) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup is at: {}", backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
// We only give it the primary/dropping peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeerURI, remoteURI)) {
connectedToBackup.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(primaryPeerURI);
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing the backup1 advertised
Map<Symbol,Object> backupPeer1Details = new HashMap<>();
backupPeer1Details.put(NETWORK_HOST, "localhost");
backupPeer1Details.put(PORT, backupPeer.getServerPort());
String myAmqpVhost = "myAmqpHostname";
backupPeer1Details.put(HOSTNAME, myAmqpVhost);
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeer1Details);
Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(server1ConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial peer and the backup (with vhost details)
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(primaryPeerURI);
afterOpenFailoverURIs.add(new URI(backupPeerURI.toString() + "?amqp.vhost=" + myAmqpVhost));
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen(null, Matchers.equalTo(myAmqpVhost), false);
backupPeer.expectBegin();
primaryPeer.close();
backupPeer.waitForAllHandlersToComplete(3000);
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
* system properties the redirect uses those properties to connect to the new host.
*/
@Test(timeout = 20000)
public void testFailoverUsingSSLConfiguredBySystemProperties() throws Exception {
TransportOptions serverSslOptions = new TransportOptions();
serverSslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
serverSslOptions.setTrustStoreLocation(BROKER_PKCS12_TRUSTSTORE);
serverSslOptions.setKeyStorePassword(PASSWORD);
serverSslOptions.setTrustStorePassword(PASSWORD);
serverSslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(serverSslOptions);
setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup is at: {}", backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
// We only give it the primary/dropping peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(null, null, true, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeerURI, remoteURI)) {
connectedToBackup.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(primaryPeerURI);
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing the backup advertised
Map<Symbol,Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, backupPeer.getServerPort());
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial peer and the backup1
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(primaryPeerURI);
afterOpenFailoverURIs.add(backupPeerURI);
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Verify the client fails over to the advertised backup
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen();
backupPeer.expectBegin();
backupPeer.expectBegin();
// Create a predictable connection drop condition
primaryPeer.expectBegin();
primaryPeer.remotelyCloseConnection(true, AmqpError.INTERNAL_ERROR, "Remote is going down");
connection.createSession();
primaryPeer.waitForAllHandlersToComplete(1000);
primaryPeer.close();
assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
backupPeer.waitForAllHandlersToComplete(3000);
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
* URI options on the AMQP URI the redirect uses those properties to connect to the new host.
*/
@Test(timeout = 20000)
public void testFailoverUsingSSLConfiguredByTransportOptions() throws Exception {
TransportOptions sslOptions = new TransportOptions();
sslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
sslOptions.setKeyStorePassword(PASSWORD);
sslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(sslOptions);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup is at: {}", backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is
// created
primaryPeer.expectSaslAnonymous();
String connectionOptions = "transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
"transport.trustStorePassword=" + PASSWORD;
Map<String, String> expectedUriOptions = new LinkedHashMap<>();
expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
// We only give it the primary/dropping peer details. It can only
// connect to the backup
// peer by identifying the details in the announced
// failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(connectionOptions, null, true, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeerURI, remoteURI)) {
connectedToBackup.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial
// peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover
// list containing the backup advertised
Map<Symbol, Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, backupPeer.getServerPort());
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial
// peer and the backup1
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Verify the client fails over to the advertised backup
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen();
backupPeer.expectBegin();
backupPeer.expectBegin();
// Create a predictable connection drop condition
primaryPeer.expectBegin();
primaryPeer.remotelyCloseConnection(true, AmqpError.INTERNAL_ERROR, "Remote is going down");
connection.createSession();
primaryPeer.waitForAllHandlersToComplete(1000);
primaryPeer.close();
assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
backupPeer.waitForAllHandlersToComplete(3000);
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL
* configured with the Failover URI with nested options the redirect uses those properties to
* connect to the new host.
*/
@Test(timeout = 20000)
public void testFailoverUsingSSLConfiguredByNestedTransportOptions() throws Exception {
TransportOptions sslOptions = new TransportOptions();
sslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
sslOptions.setKeyStorePassword(PASSWORD);
sslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(sslOptions);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup is at: {}", backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is
// created
primaryPeer.expectSaslAnonymous();
String failoverOptions = "?failover.nested.transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
"failover.nested.transport.trustStorePassword=" + PASSWORD;
Map<String, String> expectedUriOptions = new LinkedHashMap<>();
expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
// We only give it the primary/dropping peer details. It can only
// connect to the backup
// peer by identifying the details in the announced
// failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeerURI, remoteURI)) {
connectedToBackup.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial
// peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover
// list containing the backup advertised
Map<Symbol, Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, backupPeer.getServerPort());
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial
// peer and the backup1
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Verify the client fails over to the advertised backup
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen();
backupPeer.expectBegin();
backupPeer.expectBegin();
// Create a predictable connection drop condition
primaryPeer.expectBegin();
primaryPeer.remotelyCloseConnection(true, AmqpError.INTERNAL_ERROR, "Remote is going down");
connection.createSession();
primaryPeer.waitForAllHandlersToComplete(1000);
primaryPeer.close();
assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
backupPeer.waitForAllHandlersToComplete(3000);
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL
* configured with with a custom SSLContext the redirect uses those properties to connect to
* the new host.
*/
@Test(timeout = 20000)
public void testFailoverUsingSSLConfiguredByCustomSSLContext() throws Exception {
TransportOptions serverSslOptions = new TransportOptions();
serverSslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
serverSslOptions.setTrustStoreLocation(BROKER_PKCS12_TRUSTSTORE);
serverSslOptions.setKeyStorePassword(PASSWORD);
serverSslOptions.setTrustStorePassword(PASSWORD);
serverSslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(serverSslOptions);
TransportOptions clientSslOptions = new TransportOptions();
clientSslOptions.setKeyStoreLocation(CLIENT_JKS_KEYSTORE);
clientSslOptions.setTrustStoreLocation(CLIENT_JKS_TRUSTSTORE);
clientSslOptions.setKeyStorePassword(PASSWORD);
clientSslOptions.setTrustStorePassword(PASSWORD);
SSLContext clientSslContext = TransportSupport.createJdkSslContext(clientSslOptions);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
final URI backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
LOG.info("Backup is at: {}", backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is
// created
primaryPeer.expectSaslAnonymous();
// We only give it the primary/dropping peer details. It can only
// connect to the backup peer by identifying the details in the announced
// failover-server-list.
final JmsConnectionFactory factory = new JmsConnectionFactory(
"failover:(amqps://localhost:" + primaryPeer.getServerPort() + ")");
factory.setSslContext(clientSslContext);
JmsConnection connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (isExpectedHost(backupPeerURI, remoteURI)) {
connectedToBackup.countDown();
}
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(primaryPeerURI);
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover
// list containing the backup advertised
Map<Symbol, Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, backupPeer.getServerPort());
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial
// peer and the backup1
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(primaryPeerURI);
afterOpenFailoverURIs.add(backupPeerURI);
assertFailoverURIList(connection, afterOpenFailoverURIs);
// Verify the client fails over to the advertised backup
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen();
backupPeer.expectBegin();
backupPeer.expectBegin();
// Create a predictable connection drop condition
primaryPeer.expectBegin();
primaryPeer.remotelyCloseConnection(true, AmqpError.INTERNAL_ERROR, "Remote is going down");
connection.createSession();
primaryPeer.waitForAllHandlersToComplete(1000);
primaryPeer.close();
assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
backupPeer.waitForAllHandlersToComplete(3000);
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL
* that a remote listed in the open frame failover list is ignored when insecure redirects are
* prohibited.
*/
@Test(timeout = 20000)
public void testFailoverIgnoresInsecureServerWhenNotConfiguredToAllow() throws Exception {
doTestFailoverHandlingOfInsecureRedirectAdvertisement(false);
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via SSL
* that a remote listed in the open frame failover list is accepted when insecure redirects are
* allowed.
*/
@Test(timeout = 20000)
public void testFailoverAcceptsInsecureServerWhenConfiguredToAllow() throws Exception {
doTestFailoverHandlingOfInsecureRedirectAdvertisement(true);
}
private void doTestFailoverHandlingOfInsecureRedirectAdvertisement(boolean allow) throws Exception {
TransportOptions serverSslOptions = new TransportOptions();
serverSslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
serverSslOptions.setTrustStoreLocation(BROKER_PKCS12_TRUSTSTORE);
serverSslOptions.setKeyStorePassword(PASSWORD);
serverSslOptions.setTrustStorePassword(PASSWORD);
serverSslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(serverSslOptions);
setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=" + allow;
String connectionOptions = "amqp.allowNonSecureRedirects=" + allow;
// We only give it the primary/dropping peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing the backup advertised
Map<Symbol,Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, 5673);
backupPeerDetails.put(SCHEME, "amqp");
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
primaryPeer.expectClose();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, now containing initial peer and if non-secure redirect
// was permitted, the non-secure backup as well.
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
if (allow) {
afterOpenFailoverURIs.add(new URI("amqp://localhost:5673?" + connectionOptions));
}
assertFailoverURIList(connection, afterOpenFailoverURIs);
connection.close();
primaryPeer.waitForAllHandlersToComplete(1000);
}
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via
* the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list
* is updated to contain the 'amqpws' redirect.
*/
@Test(timeout = 20000)
public void testFailoverAcceptsUpdateUsingTransportSchemeWS() throws Exception {
doTestFailoverAcceptsUpdateUsingTransportSchemes("ws", "amqpws");
}
/*
* Verify that when the Open frame contains a failover server list and we are connected via
* the 'amqp' transport and the redirect contains a 'wss' scheme that failover reconnect list
* is updated to contain the 'amqpwss' redirect.
*/
@Test(timeout = 20000)
public void testFailoverAcceptsUpdateUsingTransportSchemeWSS() throws Exception {
doTestFailoverAcceptsUpdateUsingTransportSchemes("wss", "amqpwss");
}
private void doTestFailoverAcceptsUpdateUsingTransportSchemes(String transportScheme, String expected) throws Exception {
TransportOptions serverSslOptions = new TransportOptions();
serverSslOptions.setKeyStoreLocation(BROKER_PKCS12_KEYSTORE);
serverSslOptions.setTrustStoreLocation(BROKER_PKCS12_TRUSTSTORE);
serverSslOptions.setKeyStorePassword(PASSWORD);
serverSslOptions.setTrustStorePassword(PASSWORD);
serverSslOptions.setVerifyHost(false);
SSLContext serverSslContext = TransportSupport.createJdkSslContext(serverSslOptions);
setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
final URI primaryPeerURI = createPeerURI(primaryPeer);
LOG.info("Primary is at: {}", primaryPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
// Expect the authentication as soon as the connection object is created
primaryPeer.expectSaslAnonymous();
// Allow non-secure redirects for this test for simplicity.
String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=true";
String connectionOptions = "amqp.allowNonSecureRedirects=true";
// We only give it the primary/dropping peer details. It can only connect to the backup
// peer by identifying the details in the announced failover-server-list.
final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (isExpectedHost(primaryPeerURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
}
});
// Verify the existing failover URIs are as expected, the initial peer only
List<URI> beforeOpenFailoverURIs = new ArrayList<>();
beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
assertFailoverURIList(connection, beforeOpenFailoverURIs);
// Set the primary up to expect the connection, have the failover list containing the backup advertised
Map<Symbol,Object> backupPeerDetails = new HashMap<>();
backupPeerDetails.put(NETWORK_HOST, "localhost");
backupPeerDetails.put(PORT, 5673);
backupPeerDetails.put(SCHEME, transportScheme);
List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
failoverServerList.add(backupPeerDetails);
Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
primaryPeer.expectOpen(serverConnectionProperties);
primaryPeer.expectBegin();
primaryPeer.expectClose();
// Provoke the actual AMQP connection
connection.start();
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
// Verify the failover URIs are as expected, containing initial peer and a backup with expected uri scheme.
List<URI> afterOpenFailoverURIs = new ArrayList<>();
afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
afterOpenFailoverURIs.add(new URI(expected + "://localhost:5673?" + connectionOptions));
assertFailoverURIList(connection, afterOpenFailoverURIs);
connection.close();
primaryPeer.waitForAllHandlersToComplete(1000);
}
}
private void setSslSystemPropertiesForCurrentTest(String keystore, String keystorePassword, String truststore, String truststorePassword) {
setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE, keystore);
setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE_PASSWORD, keystorePassword);
setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE, truststore);
setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE_PASSWORD, truststorePassword);
}
private void assertFailoverURIList(JmsConnection connection, List<URI> expectedURIs) throws Exception {
FailoverProvider provider = getFailoverProvider(connection);
Field urisField = provider.getClass().getDeclaredField("uris");
urisField.setAccessible(true);
Object urisObj = urisField.get(provider);
assertNotNull("Expected to get a uri pool instance", urisObj);
assertTrue("Unexpected uri pool type: " + urisObj.getClass(), urisObj instanceof FailoverUriPool);
FailoverUriPool uriPool = (FailoverUriPool) urisObj;
List<URI> current = uriPool.getList();
assertEquals(expectedURIs, current);
}
private FailoverProvider getFailoverProvider(JmsConnection connection) throws Exception {
Field field = connection.getClass().getDeclaredField("provider");
field.setAccessible(true);
Object providerObj = field.get(connection);
assertNotNull("Expected to get a provdier instance", providerObj);
assertTrue("Unexpected provider type: " + providerObj.getClass(), providerObj instanceof FailoverProvider);
FailoverProvider provider = (FailoverProvider) providerObj;
return provider;
}
private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws Exception {
return establishAnonymousConnecton(null, failoverParams, peers);
}
private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws Exception {
return establishAnonymousConnecton(connectionParams, failoverParams, false, peers);
}
private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, boolean ssl, TestAmqpPeer... peers) throws Exception {
if (peers.length == 0) {
throw new IllegalArgumentException("No test peers were given, at least 1 required");
}
String remoteURI = "failover:(";
boolean first = true;
for (TestAmqpPeer peer : peers) {
if (!first) {
remoteURI += ",";
}
remoteURI += createPeerURI(peer, connectionParams).toString();
first = false;
}
if (failoverParams == null) {
remoteURI += ")?failover.maxReconnectAttempts=10";
} else {
remoteURI += ")" + (failoverParams.startsWith("?") ? "" : "?") + failoverParams;
}
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
return (JmsConnection) connection;
}
private URI createPeerURI(TestAmqpPeer peer) throws Exception {
return createPeerURI(peer, null);
}
private URI createPeerURI(TestAmqpPeer peer, String params) throws Exception {
String scheme = peer.isSSL() ? "amqps" : "amqp";
URI result = new URI(scheme, "localhost:" + peer.getServerPort(), null, null, null);
Map<String, String> queryParameters = PropertyUtil.parseQuery(params);
return URISupport.applyParameters(result, queryParameters);
}
private boolean isExpectedHost(URI expected, URI actual) {
if (!expected.getHost().equals(actual.getHost())) {
LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost());
return false;
}
if (expected.getPort() != actual.getPort()) {
LOG.info("Expected host {} on port {} but got host {} on port {}",
expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort());
return false;
}
return true;
}
}