blob: 39a9a94b798a12609531c8c48d45b0fab183c73b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.failover;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.ResourceAllocationException;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsResourceNotFoundException;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.ListDescribedType;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.apache.qpid.jms.util.StopWatch;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(QpidJMSTestRunner.class)
public class FailoverIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class);
private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS");
private static final Symbol PLAIN = Symbol.valueOf("PLAIN");
private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1);
private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 2);
private static final UnsignedByte SASL_SYS_PERM = UnsignedByte.valueOf((byte) 3);
private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4);
@Test(timeout = 20000)
public void testConnectThrowsSecurityViolationOnFailureFromOpen() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous connections not allowed", null);
final JmsConnection connection = establishAnonymousConnecton(testPeer);
try {
connection.start();
fail("Should have thrown JMSSecurityException");
} catch (JMSSecurityException ex) {
} catch (Exception ex) {
fail("Should have thrown JMSSecurityException: " + ex);
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConnectThrowsSecurityViolationOnFailureFromSaslWithClientID() throws Exception {
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_FAIL_AUTH);
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS);
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS_PERM);
}
@Test(timeout = 20000)
public void testConnectThrowsSecurityViolationOnFailureFromSaslExplicitlyWithoutClientID() throws Exception {
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_FAIL_AUTH);
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS);
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS_PERM);
}
private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception {
String optionString;
if(clientID) {
optionString = "?jms.clientID=myClientID";
} else {
optionString = "?jms.awaitClientID=false";
}
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode);
ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")" + optionString);
try {
factory.createConnection("username", "password");
fail("Excepted exception to be thrown");
}catch (JMSSecurityException jmsse) {
LOG.info("Caught expected security exception: {}", jmsse.getMessage());
}
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientID() throws Exception {
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_FAIL_AUTH);
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS);
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS_PERM);
}
private void doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(UnsignedByte saslFailureCode) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode);
ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")");
Connection connection = factory.createConnection("username", "password");
try {
connection.start();
fail("Excepted exception to be thrown");
}catch (JMSSecurityException jmsse) {
LOG.info("Caught expected security exception: {}", jmsse.getMessage());
}
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testConnectHandlesSaslTempFailure() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch finalConnected = new CountDownLatch(1);
final String finalURI = createPeerURI(finalPeer);
originalPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
try {
connection.start();
} catch (Exception ex) {
fail("Should not have thrown an Exception: " + ex);
}
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
String content = "myContent";
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content);
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
finalPeer.expectDispositionThatIsAcceptedAndSettled();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(2000);
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals(content, ((TextMessage) message).getText());
}
}
@Test(timeout = 20000)
public void testFailoverStopsOnNonTemporarySaslFailure() throws Exception {
doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_FAIL_AUTH);
doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS);
doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS_PERM);
}
private void doFailoverStopsOnNonTemporarySaslFailureTestImpl(UnsignedByte saslFailureCode) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
final AtomicReference<Throwable> failure = new AtomicReference<>();
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting Peer--- //
rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, saslFailureCode);
// --- Post Failover Expectations of FinalPeer --- //
// This shouldn't get hit, but if it does accept the connect so we don't pass the failed
// to connect assertion.
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
failure.compareAndSet(null, exception);
exceptionListenerFired.countDown();
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS));
Throwable ex = failure.get();
assertTrue("Unexpected failure exception: " + ex, ex instanceof JMSSecurityException);
// Verify the consumer gets marked closed
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageSelector();
} catch (IllegalStateException jmsise) {
return true;
}
return false;
}
}, 5000, 5));
// Shut down last peer and verify no connection made to it
finalPeer.purgeExpectations();
finalPeer.close();
assertNotNull("First peer should have accepted a TCP connection", originalPeer.getClientSocket());
assertNotNull("Rejecting peer should have accepted a TCP connection", rejectingPeer.getClientSocket());
assertNull("Final peer should not have accepted any TCP connection", finalPeer.getClientSocket());
}
}
@Test(timeout = 20000)
public void testFailoverHandlesTemporarySaslFailure() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting --- //
rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP);
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
final String expectedMessageContent = "myTextMessage";
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(expectedMessageContent));
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.set(true);
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
try {
connection.start();
} catch (Exception ex) {
fail("Should not have thrown an Exception: " + ex);
}
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
finalPeer.waitForAllHandlersToComplete(2000);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// Check message arrives
finalPeer.expectDispositionThatIsAcceptedAndSettled();
Message msg = consumer.receive(5000);
assertTrue("Expected an instance of TextMessage, got: " + msg, msg instanceof TextMessage);
assertEquals("Unexpected msg content", expectedMessageContent, ((TextMessage) msg).getText());
assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.get());
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesConnectErrorInvalidField() throws Exception {
doFailoverHandlesConnectErrorInvalidFieldTestImpl(false);
}
@Test(timeout = 20000)
public void testFailoverHandlesConnectErrorInvalidFieldWithContainerIdHint() throws Exception {
// As above but also including hint that the container-id is the invalid field, i.e invalid ClientID
doFailoverHandlesConnectErrorInvalidFieldTestImpl(true);
}
private void doFailoverHandlesConnectErrorInvalidFieldTestImpl(boolean includeContainerIdHint) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch finalConnected = new CountDownLatch(1);
final String finalURI = createPeerURI(finalPeer);
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
Map<Symbol, Object> errorInfo = null;
if (includeContainerIdHint) {
errorInfo = new HashMap<Symbol, Object>();
errorInfo.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID);
}
originalPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", errorInfo);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
try {
connection.start();
} catch (Exception ex) {
fail("Should not have thrown an Exception: " + ex);
}
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
finalPeer.expectDispositionThatIsAcceptedAndSettled();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(2000);
assertNotNull(message);
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesConnectErrorInvalidFieldOnReconnect() throws Exception {
doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(false);
}
@Test(timeout = 20000)
public void testFailoverHandlesConnectErrorInvalidFieldOnReconnectWithContainerIdHint() throws Exception {
// As above but also including hint that the container-id is the invalid field, i.e invalid ClientID
doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(true);
}
private void doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(boolean includeContainerIdHint) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch finalConnected = new CountDownLatch(1);
final String finalURI = createPeerURI(finalPeer);
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.dropAfterLastHandler(10);
Map<Symbol, Object> errorInfo = null;
if (includeContainerIdHint) {
errorInfo = new HashMap<Symbol, Object>();
errorInfo.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID);
}
rejectingPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", errorInfo);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
try {
connection.start();
} catch (Exception ex) {
fail("Should not have thrown an Exception: " + ex);
}
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
finalPeer.expectDispositionThatIsAcceptedAndSettled();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(2000);
assertNotNull(message);
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesConnectErrorNotFound() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch finalConnected = new CountDownLatch(1);
final String finalURI = createPeerURI(finalPeer);
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
originalPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
try {
connection.start();
} catch (Exception ex) {
fail("Should not have thrown an Exception: " + ex);
}
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
finalPeer.expectDispositionThatIsAcceptedAndSettled();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(2000);
assertNotNull(message);
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropThenRejectionCloseAfterConnect() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
long ird = 0;
long rd = 2000;
long start = System.currentTimeMillis();
final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + ird + "&failover.reconnectDelay=" + rd + "&failover.maxReconnectAttempts=10", originalPeer, rejectingPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount());
// Set expectations on rejecting and final peer
rejectingPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
// Close the original peer and wait for things to shake out.
originalPeer.close();
rejectingPeer.waitForAllHandlersToComplete(2000);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
long end = System.currentTimeMillis();
long margin = 2000;
assertThat("Elapsed time outwith expected range for reconnect", end - start,
both(greaterThanOrEqualTo(ird + rd)).and(lessThanOrEqualTo(ird + rd + margin)));
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesImmediateTransportDropAfterConnect() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount());
// Set expectations on rejecting and final peer
rejectingPeer.expectSaslHeaderThenDrop();
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
// Close the original peer and wait for things to shake out.
originalPeer.close();
rejectingPeer.waitForAllHandlersToComplete(2000);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesRemotelyEndConnectionForced() throws Exception {
try (TestAmqpPeer forcingPeer = new TestAmqpPeer();
TestAmqpPeer backupPeer = new TestAmqpPeer();) {
final String forcingPeerURI = createPeerURI(forcingPeer);
final String backupPeerURI = createPeerURI(backupPeer);
LOG.info("Primary is at {}: Backup peer is at: {}", forcingPeerURI, backupPeerURI);
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
forcingPeer.expectSaslAnonymous();
forcingPeer.expectOpen();
forcingPeer.expectBegin();
forcingPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 10);
backupPeer.expectSaslAnonymous();
backupPeer.expectOpen();
backupPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(forcingPeer, backupPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (remoteURI.toString().equals(forcingPeerURI)) {
connectedToPrimary.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
if (remoteURI.toString().equals(backupPeerURI)) {
connectedToBackup.countDown();
}
}
});
connection.start();
forcingPeer.waitForAllHandlersToComplete(3000);
assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
backupPeer.expectClose();
connection.close();
backupPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesTransportDropBeforeDispositionRecieived() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// Create session+producer, send a persistent message on auto-ack session for synchronous send
originalPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
originalPeer.expectSenderAttach();
final MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
final Message message = session.createTextMessage();
final CountDownLatch senderCompleted = new CountDownLatch(1);
final AtomicReference<Throwable> problem = new AtomicReference<Throwable>();
// Have the peer expect the message but NOT send any disposition for it
originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true);
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
producer.send(message);
} catch (Throwable t) {
problem.set(t);
LOG.error("Problem in sending thread", t);
} finally {
senderCompleted.countDown();
}
}
});
runner.start();
// Wait for the message to have been sent and received by peer
originalPeer.waitForAllHandlersToComplete(3000);
// Set the secondary peer to expect connection restoration, this time send disposition accepting the message
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach();
finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true);
assertEquals("Should not yet have connected to final peer", 1L, finalConnected.getCount());
assertEquals("Sender thread should not yet have completed", 1L, senderCompleted.getCount());
// Close the original peer to provoke reconnect, while send() is still outstanding
originalPeer.close();
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
boolean await = senderCompleted.await(5, TimeUnit.SECONDS);
Throwable t = problem.get();
assertTrue("Sender thread should have completed. Problem: " + t, await);
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesRemoteCloseBeforeDispositionRecieived() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectSenderAttach();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
final Message message = session.createTextMessage();
final CountDownLatch senderCompleted = new CountDownLatch(1);
final AtomicReference<Throwable> problem = new AtomicReference<Throwable>();
// Have the peer expect the message but NOT send any disposition for it
originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true);
originalPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 5);
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
producer.send(message);
} catch (Throwable t) {
problem.set(t);
LOG.error("Problem in sending thread", t);
} finally {
senderCompleted.countDown();
}
}
});
runner.start();
// Wait for the message to have been sent and received by peer
originalPeer.waitForAllHandlersToComplete(3000);
// Set the secondary peer to expect connection restoration, this time send disposition accepting the message
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach();
finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true);
finalPeer.expectClose();
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
boolean await = senderCompleted.await(5, TimeUnit.SECONDS);
Throwable t = problem.get();
assertTrue("Sender thread should have completed. Problem: " + t, await);
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFailoverHandlesDropWithModifiedInitialReconnectDelay() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
"failover.initialReconnectDelay=1&failover.reconnectDelay=600&failover.maxReconnectAttempts=10",
originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 30000)
public void testFailoverInitialReconnectDelayDoesNotApplyToInitialConnect() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();) {
// Create a peer to connect to
final String originalURI = createPeerURI(originalPeer);
LOG.info("Original peer is at: {}", originalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
int delay = 20000;
StopWatch watch = new StopWatch();
JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + delay + "&failover.maxReconnectAttempts=1", originalPeer);
connection.start();
long taken = watch.taken();
String message = "Initial connect should not have delayed for the specified initialReconnectDelay." + "Elapsed=" + taken + ", delay=" + delay;
assertTrue(message, taken < delay);
assertTrue("Connection took longer than reasonable: " + taken, taken < 5000);
// Shut it down
originalPeer.expectClose();
connection.close();
originalPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropZeroPrefetchPullConsumerReceiveNoWait() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE));
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
finalPeer.expectDetach(true, true, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receiveNoWait());
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
consumer.close();
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropZeroPrefetchPullConsumerReceiveWithTimeout() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectDetach(true, true, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(500));
LOG.info("Receive returned");
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
LOG.info("Closing consumer");
consumer.close();
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropZeroPrefetchPullConsumerReceive() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, 1, false, false, equalTo(UnsignedInteger.ONE), 1, true);
finalPeer.expectDispositionThatIsAcceptedAndSettled();
finalPeer.expectDetach(true, true, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive());
LOG.info("Receive returned");
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
LOG.info("Closing consumer");
consumer.close();
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropAfterQueueBrowserDrain() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
originalPeer.expectBegin();
originalPeer.expectQueueBrowserAttach();
originalPeer.expectLinkFlow();
originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectQueueBrowserAttach();
finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
finalPeer.expectDetach(true, true, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertFalse(queueView.hasMoreElements());
browser.close();
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverHandlesDropAfterSessionCloseRequested() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer()) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final String originalURI = createPeerURI(originalPeer);
LOG.info("Original peer is at: {}", originalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
originalPeer.expectBegin();
originalPeer.expectEnd(false);
originalPeer.dropAfterLastHandler();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final CountDownLatch sessionCloseCompleted = new CountDownLatch(1);
final AtomicBoolean sessionClosedThrew = new AtomicBoolean();
Thread sessionCloseThread = new Thread(new Runnable() {
@Override
public void run() {
try {
session.close();
LOG.debug("Close of session returned ok");
} catch (JMSException jmsEx) {
LOG.warn("Should not throw on session close when connection drops.", jmsEx);
sessionClosedThrew.set(true);
} finally {
sessionCloseCompleted.countDown();
}
}
}, "Session close thread");
sessionCloseThread.start();
originalPeer.waitForAllHandlersToComplete(2000);
assertTrue("Session close should have completed by now", sessionCloseCompleted.await(3, TimeUnit.SECONDS));
assertFalse("Session close should have completed normally", sessionClosedThrew.get());
connection.close();
}
}
@Test(timeout = 20000)
public void testCreateConsumerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
doCreateConsumerFailsWhenLinkRefusedTestImpl(false);
}
@Test(timeout = 20000)
public void testCreateConsumerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception {
doCreateConsumerFailsWhenLinkRefusedTestImpl(true);
}
private void doCreateConsumerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = establishAnonymousConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
//Expect a link to a topic node, which we will then refuse
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(topicName));
sourceMatcher.withDynamic(equalTo(false));
sourceMatcher.withDurable(equalTo(TerminusDurability.NONE));
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher, true, deferAttachResponseWrite);
//Expect the detach response to the test peer closing the consumer link after refusal.
testPeer.expectDetach(true, false, false);
try {
//Create a consumer, expect it to throw exception due to the link-refusal
session.createConsumer(dest);
fail("Consumer creation should have failed when link was refused");
} catch(InvalidDestinationException ide) {
LOG.info("Test caught expected error: {}", ide.getMessage());
}
// Shut it down
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
doCreateProducerFailsWhenLinkRefusedTestImpl(false);
}
@Test(timeout = 20000)
public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception {
doCreateProducerFailsWhenLinkRefusedTestImpl(true);
}
private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = establishAnonymousConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect a link to a topic node, which we will then refuse
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
testPeer.expectSenderAttach(notNullValue(), targetMatcher, true, deferAttachResponseWrite);
// Expect the detach response to the test peer closing the producer link after refusal.
testPeer.expectDetach(true, false, false);
try {
// Create a producer, expect it to throw exception due to the link-refusal
session.createProducer(dest);
fail("Producer creation should have failed when link was refused");
} catch(InvalidDestinationException ide) {
LOG.info("Test caught expected error: {}", ide.getMessage());
}
// Shut it down
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
originalPeer.expectBegin();
originalPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
originalPeer.expectDeclare(txnId);
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectCoordinatorAttach();
finalPeer.expectDeclare(txnId);
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with accepted and settled disposition to indicate the rollback succeeded.
finalPeer.expectDischarge(txnId, true);
finalPeer.expectEnd();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
session.close();
// Shut it down
finalPeer.expectClose();
connection.close();
originalPeer.waitForAllHandlersToComplete(2000);
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTempDestinationRecreatedAfterConnectionFailsOver() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
String dynamicAddress1 = "myTempTopicAddress";
originalPeer.expectTempTopicCreationAttach(dynamicAddress1);
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
String dynamicAddress2 = "myTempTopicAddress2";
finalPeer.expectTempTopicCreationAttach(dynamicAddress2);
// Session is recreated after previous temporary destinations are recreated on failover.
finalPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = session.createTemporaryTopic();
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
// Delete the temporary Topic and close the session.
finalPeer.expectDetach(true, true, true);
finalPeer.expectEnd();
tempTopic.delete();
session.close();
// Shut it down
finalPeer.expectClose();
connection.close();
originalPeer.waitForAllHandlersToComplete(2000);
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverEnforcesRequestTimeoutSession() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final CountDownLatch connected = new CountDownLatch(1);
final CountDownLatch disconnected = new CountDownLatch(1);
// Create a peer to connect to so we can get to a state where we
// can try to send when offline.
final String peerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", peerURI);
// Connect to the test peer
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
"jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionInterrupted(URI remoteURI) {
LOG.info("Connection Interrupted: {}", remoteURI);
disconnected.countDown();
}
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
connected.countDown();
}
});
connection.start();
assertTrue("Should connect to peer", connected.await(5, TimeUnit.SECONDS));
assertTrue("Should lose connection to peer", disconnected.await(5, TimeUnit.SECONDS));
try {
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Should have thrown an exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught timed out exception from send:", jmsEx);
} catch (Exception ex) {
fail("Should have caught a timed out exception");
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverEnforcesSendTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final CountDownLatch connected = new CountDownLatch(1);
final CountDownLatch disconnected = new CountDownLatch(1);
// Create a peer to connect to so we can get to a state where we
// can try to send when offline.
final String peerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", peerURI);
// Connect to the test peer
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
"jms.sendTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionInterrupted(URI remoteURI) {
LOG.info("Connection Interrupted: {}", remoteURI);
disconnected.countDown();
}
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
connected.countDown();
}
});
connection.start();
assertTrue("Should connect to peer", connected.await(5, TimeUnit.SECONDS));
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
assertTrue("Should lose connection to peer", disconnected.await(5, TimeUnit.SECONDS));
try {
producer.send(session.createMessage());
fail("Should have thrown an exception");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught timed out exception from send:", jmsEx);
} catch (Exception ex) {
fail("Should have caught a timed out exception");
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverEnforcesRequestTimeoutCreateTenpDestination() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final CountDownLatch connected = new CountDownLatch(1);
final CountDownLatch disconnected = new CountDownLatch(1);
// Create a peer to connect to so we can get to a state where we
// can try to send when offline.
final String peerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", peerURI);
// Connect to the test peer
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(
"jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionInterrupted(URI remoteURI) {
LOG.info("Connection Interrupted: {}", remoteURI);
disconnected.countDown();
}
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
connected.countDown();
}
});
connection.start();
assertTrue("Should connect to peer", connected.await(5, TimeUnit.SECONDS));
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue("Should lose connection to peer", disconnected.await(5, TimeUnit.SECONDS));
try {
session.createTemporaryQueue();
fail("Should have thrown an exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught timed out exception from send:", jmsEx);
} catch (Exception ex) {
fail("Should have caught a timed out exception");
}
try {
session.createTemporaryTopic();
fail("Should have thrown an exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught timed out exception from send:", jmsEx);
} catch (Exception ex) {
fail("Should have caught a timed out exception");
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfCompletedSyncSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = establishAnonymousConnecton(testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
//Do a warmup
String messageContent1 = "myMessage1";
TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher();
messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1));
testPeer.expectTransfer(messageMatcher1);
TextMessage message1 = session.createTextMessage(messageContent1);
producer.send(message1);
testPeer.waitForAllHandlersToComplete(1000);
// Create and send a new message, which is accepted
String messageContent2 = "myMessage2";
long delay = 15;
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2));
testPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true, 0, delay);
testPeer.expectClose();
TextMessage message2 = session.createTextMessage(messageContent2);
long start = System.currentTimeMillis();
producer.send(message2);
long elapsed = System.currentTimeMillis() - start;
MatcherAssert.assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay));
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfRejectedSyncSend() throws Exception {
Rejected failingState = new Rejected();
org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
rejectError.setDescription("RLE description");
failingState.setError(rejectError);
doFailoverPassthroughOfFailingSyncSendTestImpl(failingState, true);
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfReleasedSyncSend() throws Exception {
doFailoverPassthroughOfFailingSyncSendTestImpl(new Released(), false);
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfModifiedFailedSyncSend() throws Exception {
Modified failingState = new Modified();
failingState.setDeliveryFailed(true);
doFailoverPassthroughOfFailingSyncSendTestImpl(failingState, false);
}
private void doFailoverPassthroughOfFailingSyncSendTestImpl(ListDescribedType failingState, boolean inspectException) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = establishAnonymousConnecton(testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
//Do a warmup that succeeds
String messageContent1 = "myMessage1";
TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher();
messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1));
testPeer.expectTransfer(messageMatcher1);
TextMessage message1 = session.createTextMessage(messageContent1);
producer.send(message1);
testPeer.waitForAllHandlersToComplete(1000);
// Create and send a new message, which fails as it is not accepted
assertFalse(failingState instanceof Accepted);
String messageContent2 = "myMessage2";
TransferPayloadCompositeMatcher messageMatcher2 = new TransferPayloadCompositeMatcher();
messageMatcher2.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher2.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher2.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher2.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2));
long delay = 15;
testPeer.expectTransfer(messageMatcher2, nullValue(), false, true, failingState, true, 0, delay);
TextMessage message2 = session.createTextMessage(messageContent2);
long start = System.currentTimeMillis();
try {
producer.send(message2);
fail("Expected an exception for this send.");
} catch (JMSException jmse) {
//Expected
long elapsed = System.currentTimeMillis() - start;
MatcherAssert.assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay));
if (inspectException) {
assertTrue(jmse instanceof ResourceAllocationException);
assertTrue(jmse.getMessage().contains("RLE description"));
}
}
testPeer.waitForAllHandlersToComplete(1000);
//Do a final send that succeeds
String messageContent3 = "myMessage3";
TransferPayloadCompositeMatcher messageMatcher3 = new TransferPayloadCompositeMatcher();
messageMatcher3.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher3.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher3.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher3.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent3));
testPeer.expectTransfer(messageMatcher3);
TextMessage message3 = session.createTextMessage(messageContent3);
producer.send(message3);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfCompletedAsyncSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = establishAnonymousConnecton(
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
// Create and transfer a new message
String text = "myMessage";
testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
testPeer.expectClose();
TextMessage message = session.createTextMessage(text);
TestJmsCompletionListener listener = new TestJmsCompletionListener();
producer.send(message, listener);
assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
assertNull(listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverPassthroughOfRejectedAsyncCompletionSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final JmsConnection connection = establishAnonymousConnecton(
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("content");
testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), new Rejected(), true);
assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
TestJmsCompletionListener listener = new TestJmsCompletionListener();
try {
producer.send(message, listener);
} catch (JMSException e) {
LOG.warn("Caught unexpected error: {}", e.getMessage());
fail("No expected exception for this send.");
}
assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
assertNotNull(listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
testPeer.expectClose();
listener = new TestJmsCompletionListener();
try {
producer.send(message, listener);
} catch (JMSException e) {
LOG.warn("Caught unexpected error: {}", e.getMessage());
fail("No expected exception for this send.");
}
assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
assertNull(listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testFailoverConnectionLossFailsWaitingAsyncCompletionSends() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final JmsConnection connection = establishAnonymousConnecton(
"failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
final int MSG_COUNT = 5;
for (int i = 0; i < MSG_COUNT; ++i) {
testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
}
// Accept one which shouldn't complete until after the others have failed.
testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), new Accepted(), true);
testPeer.dropAfterLastHandler();
TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT + 1);
try {
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = session.createTextMessage("content");
producer.send(message, listener);
}
Message message = session.createTextMessage("content");
producer.send(message, listener);
} catch (JMSException e) {
LOG.warn("Caught unexpected error: {}", e.getMessage());
fail("No expected exception for this send.");
}
assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
assertEquals(MSG_COUNT, listener.errorCount);
assertEquals(1, listener.successCount);
assertNotNull(listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
connection.close();
}
}
@Test(timeout = 20000)
public void testCreateSessionAfterConnectionDrops() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin(nullValue(), false);
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectEnd();
finalPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
session.close();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testCreateConsumerAfterConnectionDrops() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1)));
finalPeer.expectDetach(true, true, true);
finalPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(500));
LOG.info("Receive returned");
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
LOG.info("Closing consumer");
consumer.close();
// Shut it down
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCreateProducerAfterConnectionDrops() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.dropAfterLastHandler();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach();
finalPeer.expectDetach(true, true, true);
finalPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
LOG.info("Closing consumer");
producer.close();
// Shut it down
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testTxCommitThrowsWhenNoDischargeResponseSentAndConnectionDrops() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final CountDownLatch testConnected = new CountDownLatch(1);
final CountDownLatch failedConnection = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String testPeerURI = createPeerURI(testPeer);
LOG.info("test peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton(
"failover.maxReconnectAttempts=3&failover.useReconnectBackOff=false", testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (testPeerURI.equals(remoteURI.toString())) {
testConnected.countDown();
}
}
@Override
public void onConnectionFailure(Throwable cause) {
LOG.info("Connection Failed: {}", cause);
failedConnection.countDown();
}
});
connection.start();
assertTrue("Should connect to test peer", testConnected.await(6, TimeUnit.SECONDS));
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
// The session should send a commit but we don't respond, then drop the connection
// and check that the commit is failed due to dropped connection.
testPeer.expectDischargeButDoNotRespond(txnId, false);
testPeer.expectDeclareButDoNotRespond();
testPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 100);
// --- Failover should handle the connection close ---------------//
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Commit should have thrown an exception");
} catch (JMSException jmsEx) {
LOG.debug("Commit threw: ", jmsEx);
}
assertTrue("Should reported failed", failedConnection.await(5, TimeUnit.SECONDS));
try {
connection.close();
} catch (JMSException jmsEx) {}
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testDropAndRejectAfterwardsHonorsMax() throws Exception {
try (TestAmqpPeer firstPeer = new TestAmqpPeer();
TestAmqpPeer secondPeer = new TestAmqpPeer();
TestAmqpPeer thirdPeer = new TestAmqpPeer();
TestAmqpPeer fourthPeer = new TestAmqpPeer()) {
final CountDownLatch testConnected = new CountDownLatch(1);
final CountDownLatch failedConnection = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String firstPeerURI = createPeerURI(firstPeer);
LOG.info("First peer is at: {}", firstPeerURI);
LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
firstPeer.expectSaslAnonymous();
firstPeer.expectOpen();
firstPeer.expectBegin();
firstPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 100);
secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
// This shouldn't get hit, but if it does accept the connect so we don't pass the failed
// to connect assertion.
fourthPeer.expectSaslAnonymous();
fourthPeer.expectOpen();
fourthPeer.expectBegin();
fourthPeer.expectClose();
final JmsConnection connection = establishAnonymousConnecton(
"failover.maxReconnectAttempts=2&failover.useReconnectBackOff=false", firstPeer, secondPeer, thirdPeer, fourthPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (firstPeerURI.equals(remoteURI.toString())) {
testConnected.countDown();
}
}
@Override
public void onConnectionFailure(Throwable cause) {
LOG.info("Connection Failed: {}", cause);
failedConnection.countDown();
}
});
connection.start();
assertTrue("Should connect to first peer", testConnected.await(5, TimeUnit.SECONDS));
// --- Failover should handle the connection close ---------------//
assertTrue("Should reported failed", failedConnection.await(5, TimeUnit.SECONDS));
try {
connection.close();
} catch (JMSException jmsEx) {}
secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
// Shut down last peer and verify no connection made to it
fourthPeer.purgeExpectations();
fourthPeer.close();
assertNotNull("Peer 1 should have accepted a TCP connection", firstPeer.getClientSocket());
assertNotNull("Peer 2 should have accepted a TCP connection", secondPeer.getClientSocket());
assertNotNull("Peer 3 should have accepted a TCP connection", thirdPeer.getClientSocket());
assertNull("Peer 4 should not have accepted any TCP connection", fourthPeer.getClientSocket());
}
}
@Test(timeout = 20000)
public void testStartMaxReconnectAttemptsTriggeredWhenRemotesAreRejecting() throws Exception {
try (TestAmqpPeer firstPeer = new TestAmqpPeer();
TestAmqpPeer secondPeer = new TestAmqpPeer();
TestAmqpPeer thirdPeer = new TestAmqpPeer();
TestAmqpPeer fourthPeer = new TestAmqpPeer()) {
final CountDownLatch failedConnection = new CountDownLatch(1);
LOG.info("First peer is at: {}", createPeerURI(firstPeer));
LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
firstPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
// This shouldn't get hit, but if it does accept the connect so we don't pass the failed
// to connect assertion.
fourthPeer.expectSaslAnonymous();
fourthPeer.expectOpen();
fourthPeer.expectBegin();
fourthPeer.expectClose();
final JmsConnection connection = establishAnonymousConnecton(
"failover.startupMaxReconnectAttempts=3&failover.reconnectDelay=15&failover.useReconnectBackOff=false",
firstPeer, secondPeer, thirdPeer, fourthPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionFailure(Throwable cause) {
LOG.info("Connection Failed: {}", cause);
failedConnection.countDown();
}
});
try {
connection.start();
fail("Should not be able to connect");
} catch (JmsResourceNotFoundException jmsrnfe) {}
// --- Failover should handle the connection close ---------------//
assertTrue("Should reported failed", failedConnection.await(5, TimeUnit.SECONDS));
try {
connection.close();
} catch (JMSException jmsEx) {}
firstPeer.waitForAllHandlersToCompleteNoAssert(2000);
secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
// Shut down last peer and verify no connection made to it
fourthPeer.purgeExpectations();
fourthPeer.close();
assertNotNull("Peer 1 should have accepted a TCP connection", firstPeer.getClientSocket());
assertNotNull("Peer 2 should have accepted a TCP connection", secondPeer.getClientSocket());
assertNotNull("Peer 3 should have accepted a TCP connection", thirdPeer.getClientSocket());
assertNull("Peer 4 should not have accepted any TCP connection", fourthPeer.getClientSocket());
}
}
@Test(timeout = 20000)
public void testConnectionConsumerRecreatedAfterReconnect() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
ServerSessionPool sessionPool = Mockito.mock(ServerSessionPool.class);
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
finalPeer.expectLinkFlow();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
Queue queue = new JmsQueue("myQueue");
connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener";
doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(errorCondition, errorDescription);
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseConsumerWithMessageListenerWithoutErrorFiresJMSExceptionListener() throws Exception {
// As above but with the peer not including any error condition in its consumer close
doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(null, null);
}
private void doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(Symbol errorCondition, String errorDescription) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
CountDownLatch consumerClosed = new CountDownLatch(1);
CountDownLatch exceptionListenerFired = new CountDownLatch(1);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
final JmsConnection connection = establishAnonymousConnecton("failover.maxReconnectAttempts=1", testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.countDown();
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
consumerClosed.countDown();
}
});
testPeer.expectBegin();
testPeer.expectBegin();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session2.createQueue("myQueue");
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.expectEnd();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, errorCondition, errorDescription, 10);
final MessageConsumer consumer = session2.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
// Close first session to allow the receiver remote close timing to be deterministic
session1.close();
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
if(errorCondition != null) {
return message.contains(errorCondition.toString()) &&
message.contains(errorDescription);
} else {
return message.contains("Unknown error from remote peer");
}
} else {
return false;
}
}
return false;
}
}, 5000, 10));
assertTrue("Consumer closed callback didn't trigger", consumerClosed.await(2000, TimeUnit.MILLISECONDS));
assertTrue("JMS Exception listener should have fired with a MessageListener", exceptionListenerFired.await(2000, TimeUnit.MILLISECONDS));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
// Shut the connection down
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerFailsConnectionAndRetries() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testFailoverCannotRecreateConsumerFailsConnectionAndRetries";
doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(errorCondition, errorDescription);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerFailsConnectionAndRetriesNoErrorConditionGiven() throws Exception {
doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(null, null);
}
private void doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(Symbol errorCondition, String errorMessage) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting --- //
rejectingPeer.expectSaslAnonymous();
rejectingPeer.expectOpen();
rejectingPeer.expectBegin();
rejectingPeer.expectBegin();
rejectingPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, errorCondition, errorMessage);
// --- Client will clean up connection and then reconnect to next peer --- //
rejectingPeer.expectDetach(true, false, false);
rejectingPeer.expectClose();
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach();
final String expectedMessageContent = "myTextMessage";
finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(expectedMessageContent));
finalPeer.expectDispositionThatIsAcceptedAndSettled();
AtomicReference<Message> msgRef = new AtomicReference<>();
final CountDownLatch msgReceived = new CountDownLatch(1);
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.set(true);
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
msgRef.set(message);
msgReceived.countDown();
}
});
finalPeer.waitForAllHandlersToComplete(1000);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// Check message arrives
assertTrue("The onMessage listener should have fired", msgReceived.await(3, TimeUnit.SECONDS));
Message msg = msgRef.get();
assertTrue("Expected an instance of TextMessage, got: " + msg, msg instanceof TextMessage);
assertEquals("Unexpected msg content", expectedMessageContent, ((TextMessage) msg).getText());
// Check that consumer isn't closed
try {
consumer.getMessageListener();
} catch (JMSException ex) {
fail("Consumer should be in open state and not throw here.");
}
assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.get());
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateProducerFailsConnectionAndRetries() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testFailoverCannotRecreateProducerFailsConnectionAndRetries";
doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(errorCondition, errorDescription);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateProducerFailsConnectionAndRetriesNoErrorConditionGiven() throws Exception {
doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(null, null);
}
private void doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(Symbol errorCondition, String errorMessage) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer rejectingPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String rejectingURI = createPeerURI(rejectingPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Rejecting peer is at: {}", rejectingURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectSenderAttach();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting --- //
rejectingPeer.expectSaslAnonymous();
rejectingPeer.expectOpen();
rejectingPeer.expectBegin();
rejectingPeer.expectBegin();
rejectingPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, false, -1, errorCondition, errorMessage);
// --- Client will clean up connection and then reconnect to next peer --- //
rejectingPeer.expectDetach(true, false, false);
rejectingPeer.expectClose();
// --- Post Failover Expectations of FinalPeer --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.set(true);
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
finalPeer.waitForAllHandlersToComplete(1000);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// Check that producer isn't closed
try {
producer.getDestination();
} catch (JMSException ex) {
fail("Producer should be in open state and not throw here.");
}
// Send a message
String messageContent = "myMessage";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent));
finalPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage(messageContent);
producer.send(message);
assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.get());
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled";
doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(true, errorCondition, errorDescription);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoMessageListener() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoMessageListener";
doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(false, errorCondition, errorDescription);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoErrorConditionGiven() throws Exception {
doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(true, null, null);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoErrorConditionGivenNoMessageListener() throws Exception {
doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(false, null, null);
}
private void doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(boolean addListener, Symbol errorCondition, String errorDescription) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectReceiverAttach();
originalPeer.expectLinkFlow();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, errorCondition, errorDescription);
finalPeer.expectDetach(true, false, false);
final int prefetch;
if (addListener) {
prefetch = 0;
} else {
prefetch = 1;
}
final JmsConnection connection = establishAnonymousConnecton(
"jms.prefetchPolicy.all="+ prefetch + "&jms.closeLinksThatFailOnReconnect=true", originalPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.countDown();
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageConsumer consumer = session.createConsumer(queue);
if (addListener) {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
}
finalPeer.waitForAllHandlersToComplete(1000);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// Verify the consumer gets marked closed
assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
if (errorCondition != null) {
return message.contains(errorCondition.toString()) &&
message.contains(errorDescription);
} else {
return message.contains("Link creation was refused");
}
} else {
return false;
}
}
return false;
}
}, 5000, 10));
// Verify the exception listener behaviour
if (addListener) {
assertTrue("JMS Exception listener should have fired with a MessageListener", exceptionListenerFired.await(2, TimeUnit.SECONDS));
} else {
assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.await(10, TimeUnit.MILLISECONDS));
}
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateProducerWithCloseFailedLinksEnabled() throws Exception {
Symbol errorCondition = AmqpError.RESOURCE_DELETED;
String errorDescription = "testFailoverCannotRecreateProducerWithCloseFailedLinksEnabled";
doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(errorCondition, errorDescription);
}
@Test(timeout = 20000)
public void testFailoverCannotRecreateProducerWithCloseFailedLinksEnabledNoErrorConditionGiven() throws Exception {
doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(null, null);
}
private void doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(Symbol errorCondition, String errorDescription) throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
// Expect connection to the first peer (and have it drop)
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectSenderAttach();
originalPeer.dropAfterLastHandler();
// --- Post Failover Expectations of Rejecting --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, false, -1, errorCondition, errorDescription);
finalPeer.expectDetach(true, false, false);
final JmsConnection connection = establishAnonymousConnecton("jms.closeLinksThatFailOnReconnect=true", originalPeer, finalPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.trace("JMS ExceptionListener: ", exception);
exceptionListenerFired.set(true);
}
});
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
finalPeer.waitForAllHandlersToComplete(1000);
assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS));
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// Verify the producer gets marked closed
assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
producer.getDestination();
} catch (IllegalStateException jmsise) {
if (jmsise.getCause() != null) {
String message = jmsise.getCause().getMessage();
if (errorCondition != null) {
return message.contains(errorCondition.toString()) &&
message.contains(errorDescription);
} else {
return message.contains("Link creation was refused");
}
} else {
return false;
}
}
return false;
}
}, 5000, 10));
assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.get());
// Shut it down
finalPeer.expectClose();
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testInDoubtTransactionFromFailoverCompletesAsyncCompletions() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
final CountDownLatch originalConnected = new CountDownLatch(1);
final CountDownLatch finalConnected = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
originalPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
originalPeer.expectDeclare(txnId);
originalPeer.expectSenderAttach();
originalPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
originalPeer.dropAfterLastHandler(10);
// --- Post Failover Expectations of sender --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectCoordinatorAttach();
finalPeer.expectDeclare(txnId);
finalPeer.expectSenderAttach();
// Attempt to commit the in-doubt TX will result in rollback and a new TX will be started.
finalPeer.expectDischarge(txnId, true);
finalPeer.expectDeclare(txnId);
// this rollback comes from the session being closed on connection close.
finalPeer.expectDischarge(txnId, true);
finalPeer.expectClose();
final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
if (originalURI.equals(remoteURI.toString())) {
originalConnected.countDown();
}
}
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Restored: {}", remoteURI);
if (finalURI.equals(remoteURI.toString())) {
finalConnected.countDown();
}
}
});
connection.start();
assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
// Create and transfer a new message
String text = "myMessage";
TextMessage message = session.createTextMessage(text);
TestJmsCompletionListener listener1 = new TestJmsCompletionListener();
TestJmsCompletionListener listener2 = new TestJmsCompletionListener();
try {
producer.send(message, listener1);
} catch (JMSException jmsEx) {
fail("Should not have failed the async completion send.");
}
assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
// This should fire after reconnect without an error, if it fires with an error at
// any time then something is wrong.
assertTrue("Did not get async callback for send #1", listener1.awaitCompletion(5, TimeUnit.SECONDS));
assertNull("Completion of send #1 should not have been on error", listener1.exception);
assertNotNull(listener1.message);
assertTrue(listener1.message instanceof TextMessage);
try {
producer.send(message, listener2);
} catch (JMSException jmsEx) {
fail("Should not have failed the async completion send.");
}
assertTrue("Did not get async callback for send #2", listener2.awaitCompletion(5, TimeUnit.SECONDS));
assertNull("Completion of send #2 should not have been on error", listener2.exception);
assertNotNull(listener2.message);
assertTrue(listener2.message instanceof TextMessage);
try {
session.commit();
fail("Transaction should have been rolled back");
} catch (TransactionRolledBackException txrbex) {}
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();
TestAmqpPeer finalPeer = new TestAmqpPeer();) {
// Create a peer to connect to, then one to reconnect to
final String originalURI = createPeerURI(originalPeer);
final String finalURI = createPeerURI(finalPeer);
LOG.info("Original peer is at: {}", originalURI);
LOG.info("Final peer is at: {}", finalURI);
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
originalPeer.expectBegin();
// Ensure our send blocks in the provider waiting for credit so that on failover
// the message will actually get sent from the Failover bits once we grant some
// credit for the recovered sender.
originalPeer.expectSenderAttachWithoutGrantingCredit();
originalPeer.dropAfterLastHandler(10); // Wait for sender to get into wait state
// --- Post Failover Expectations of sender --- //
finalPeer.expectSaslAnonymous();
finalPeer.expectOpen();
finalPeer.expectBegin();
finalPeer.expectBegin();
finalPeer.expectSenderAttach();
finalPeer.expectTransfer(new TransferPayloadCompositeMatcher());
finalPeer.expectClose();
final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, finalPeer);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
// Create and transfer a new message
String text = "myMessage";
TextMessage message = session.createTextMessage(text);
TestJmsCompletionListener listener = new TestJmsCompletionListener();
try {
producer.send(message, listener);
} catch (JMSException jmsEx) {
fail("Should not have failed the async completion send.");
}
// This should fire after reconnect without an error, if it fires with an error at
// any time then something is wrong.
assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
assertNull("Completion should not have been on error", listener.exception);
assertNotNull(listener.message);
assertTrue(listener.message instanceof TextMessage);
connection.close();
finalPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false);
}
@Test(timeout = 20000)
public void testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception {
doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, true);
}
@Test(timeout = 20000)
public void testPassthroughCreateTemporaryTopicFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception {
doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(true, false);
}
@Test(timeout = 20000)
public void testPassthroughCreateTemporaryTopicFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception {
doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(true, true);
}
private void doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(boolean topic, boolean deferAttachResponseWrite) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
if (topic) {
testPeer.expectAndRefuseTempTopicCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp topics.", false);
//Expect the detach response to the test peer after refusal.
testPeer.expectDetach(true, false, false);
session.createTemporaryTopic();
} else {
testPeer.expectAndRefuseTempQueueCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp queues.", false);
//Expect the detach response to the test peer after refusal.
testPeer.expectDetach(true, false, false);
session.createTemporaryQueue();
}
fail("Should have thrown security exception");
} catch (JMSSecurityException jmsse) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPassthroughRemotelyCloseProducer() throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch producerClosed = new CountDownLatch(1);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onProducerClosed(MessageProducer producer, Throwable exception) {
producerClosed.countDown();
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a producer, then remotely end it afterwards.
testPeer.expectSenderAttach();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
// Verify the producer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
producer.getDestination();
} catch (Exception ex) {
if (ex instanceof IllegalStateException && ex.getCause() != null) {
String message = ex.getCause().getMessage();
if (message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
message.contains(BREAD_CRUMB)) {
return true;
}
}
LOG.debug("Caught unexpected exception: {}", ex);
}
return false;
}
}, 10000, 10));
assertTrue("Producer closed callback didn't trigger", producerClosed.await(10, TimeUnit.SECONDS));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPassthroughOfSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
// DO NOT add capability to indicate server support for DELAYED-DELIVERY so that
// send fails and we can see if the error passes through the failover provider
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
Symbol[] offeredCapabilities = null;
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(5000);
// Producer should fail to send when message has delivery delay since remote
// did not report that it supports that option.
Message message = session.createMessage();
try {
producer.send(message);
fail("Send should fail");
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPassthroughOfSendTimesOutWhenNoDispostionArrives() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.setSendTimeout(500);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
Message message = session.createTextMessage("text");
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
// Expect the producer to attach and grant it some credit, it should send
// a transfer which we will not send any response for which should cause the
// send operation to time out.
testPeer.expectSenderAttach();
testPeer.expectTransferButDoNotRespond(messageMatcher);
testPeer.expectClose();
MessageProducer producer = session.createProducer(queue);
try {
producer.send(message);
fail("Send should time out.");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught expected error: {}", jmsEx.getMessage());
} catch (Throwable error) {
fail("Send should time out, but got: " + error.getMessage());
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testPassthroughOfRollbackErrorCoordinatorClosedOnCommit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Transaction should have rolled back");
} catch (TransactionRolledBackException ex) {
LOG.info("Caught expected TransactionRolledBackException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testPassthroughOfSessionCreateFailsOnDeclareTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
testPeer.expectDeclareButDoNotRespond();
// Expect the AMQP session to be closed due to the JMS session creation failure.
testPeer.expectEnd();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("Should have timed out waiting for declare.");
} catch (JmsOperationTimedOutException jmsEx) {
} catch (Throwable error) {
fail("Should have caught an timed out exception:");
LOG.error("Caught -> ", error);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
return establishAnonymousConnecton(null, null, peers);
}
private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws JMSException {
return establishAnonymousConnecton(null, failoverParams, peers);
}
private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws JMSException {
if (peers.length == 0) {
throw new IllegalArgumentException("No test peers were given, at least 1 required");
}
String remoteURI = "failover:(";
boolean first = true;
for (TestAmqpPeer peer : peers) {
if (!first) {
remoteURI += ",";
}
remoteURI += createPeerURI(peer, connectionParams);
first = false;
}
if (failoverParams == null) {
remoteURI += ")?failover.maxReconnectAttempts=10";
} else {
remoteURI += ")?" + failoverParams;
}
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
return (JmsConnection) connection;
}
private String createPeerURI(TestAmqpPeer peer) {
return createPeerURI(peer, null);
}
private String createPeerURI(TestAmqpPeer peer, String params) {
return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : "");
}
private class TestJmsCompletionListener implements CompletionListener {
private final CountDownLatch completed;
public volatile int successCount;
public volatile int errorCount;
public volatile Message message;
public volatile Exception exception;
public TestJmsCompletionListener() {
this(1);
}
public TestJmsCompletionListener(int expected) {
this.completed = new CountDownLatch(expected);
}
public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
return completed.await(timeout, units);
}
@Override
public void onCompletion(Message message) {
LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
this.message = message;
this.successCount++;
completed.countDown();
}
@Override
public void onException(Message message, Exception exception) {
LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
this.message = message;
this.exception = exception;
this.errorCount++;
completed.countDown();
}
}
}