| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.jms.provider.failover; |
| |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.allOf; |
| import static org.hamcrest.Matchers.arrayContaining; |
| import static org.hamcrest.Matchers.both; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| import static org.hamcrest.Matchers.hasEntry; |
| import static org.hamcrest.Matchers.lessThanOrEqualTo; |
| import static org.hamcrest.Matchers.not; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| import java.net.URI; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import jakarta.jms.CompletionListener; |
| import jakarta.jms.Connection; |
| import jakarta.jms.ConnectionFactory; |
| import jakarta.jms.ExceptionListener; |
| import jakarta.jms.IllegalStateException; |
| import jakarta.jms.InvalidDestinationException; |
| import jakarta.jms.JMSException; |
| import jakarta.jms.JMSSecurityException; |
| import jakarta.jms.Message; |
| import jakarta.jms.MessageConsumer; |
| import jakarta.jms.MessageListener; |
| import jakarta.jms.MessageProducer; |
| import jakarta.jms.Queue; |
| import jakarta.jms.QueueBrowser; |
| import jakarta.jms.ResourceAllocationException; |
| import jakarta.jms.ServerSessionPool; |
| import jakarta.jms.Session; |
| import jakarta.jms.TemporaryTopic; |
| import jakarta.jms.TextMessage; |
| import jakarta.jms.Topic; |
| import jakarta.jms.TransactionRolledBackException; |
| |
| import org.apache.qpid.jms.JmsConnection; |
| import org.apache.qpid.jms.JmsConnectionExtensions; |
| import org.apache.qpid.jms.JmsConnectionFactory; |
| import org.apache.qpid.jms.JmsDefaultConnectionListener; |
| import org.apache.qpid.jms.JmsOperationTimedOutException; |
| import org.apache.qpid.jms.JmsQueue; |
| import org.apache.qpid.jms.JmsResourceNotFoundException; |
| import org.apache.qpid.jms.JmsSendTimedOutException; |
| import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; |
| import org.apache.qpid.jms.provider.amqp.AmqpSupport; |
| import org.apache.qpid.jms.test.QpidJmsTestCase; |
| import org.apache.qpid.jms.test.Wait; |
| import org.apache.qpid.jms.test.testpeer.ListDescribedType; |
| import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; |
| import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; |
| import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; |
| import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Released; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; |
| import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; |
| import org.apache.qpid.jms.util.StopWatch; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.DescribedType; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedByte; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.hamcrest.Matcher; |
| import org.hamcrest.Matchers; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.Timeout; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class FailoverIntegrationTest extends QpidJmsTestCase { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class); |
| |
| private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS"); |
| private static final Symbol PLAIN = Symbol.valueOf("PLAIN"); |
| private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1); |
| private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 2); |
| private static final UnsignedByte SASL_SYS_PERM = UnsignedByte.valueOf((byte) 3); |
| private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4); |
| |
| @Test |
| @Timeout(20) |
| public void testConnectThrowsSecurityViolationOnFailureFromOpen() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous connections not allowed", null); |
| |
| final JmsConnection connection = establishAnonymousConnecton(testPeer); |
| try { |
| connection.start(); |
| fail("Should have thrown JMSSecurityException"); |
| } catch (JMSSecurityException ex) { |
| } catch (Exception ex) { |
| fail("Should have thrown JMSSecurityException: " + ex); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectThrowsSecurityViolationOnFailureFromSaslWithClientID() throws Exception { |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_FAIL_AUTH); |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS); |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS_PERM); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectThrowsSecurityViolationOnFailureFromSaslExplicitlyWithoutClientID() throws Exception { |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_FAIL_AUTH); |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS); |
| doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS_PERM); |
| } |
| |
| private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception { |
| String optionString; |
| if (clientID) { |
| optionString = "?jms.clientID=myClientID"; |
| } else { |
| optionString = "?jms.awaitClientID=false"; |
| } |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode); |
| |
| ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")" + optionString); |
| |
| try { |
| factory.createConnection("username", "password"); |
| fail("Excepted exception to be thrown"); |
| }catch (JMSSecurityException jmsse) { |
| LOG.info("Caught expected security exception: {}", jmsse.getMessage()); |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientID() throws Exception { |
| doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_FAIL_AUTH); |
| doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS); |
| doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS_PERM); |
| } |
| |
| private void doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(UnsignedByte saslFailureCode) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode); |
| |
| ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")"); |
| Connection connection = factory.createConnection("username", "password"); |
| |
| try { |
| connection.start(); |
| fail("Excepted exception to be thrown"); |
| }catch (JMSSecurityException jmsse) { |
| LOG.info("Caught expected security exception: {}", jmsse.getMessage()); |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectHandlesSaslTempFailure() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| originalPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| String content = "myContent"; |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); |
| |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message message = consumer.receive(2000); |
| |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertNotNull(message); |
| assertTrue(message instanceof TextMessage); |
| assertEquals(content, ((TextMessage) message).getText()); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverStopsOnNonTemporarySaslFailure() throws Exception { |
| doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_FAIL_AUTH); |
| doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS); |
| doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS_PERM); |
| } |
| |
| private void doFailoverStopsOnNonTemporarySaslFailureTestImpl(UnsignedByte saslFailureCode) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch exceptionListenerFired = new CountDownLatch(1); |
| final AtomicReference<Throwable> failure = new AtomicReference<>(); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting Peer--- // |
| rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, saslFailureCode); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| // This shouldn't get hit, but if it does accept the connect so we don't pass the failed |
| // to connect assertion. |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| failure.compareAndSet(null, exception); |
| exceptionListenerFired.countDown(); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| assertTrue(exceptionListenerFired.await(3, TimeUnit.SECONDS), "The ExceptionListener should have been alerted"); |
| Throwable ex = failure.get(); |
| assertTrue(ex instanceof JMSSecurityException, "Unexpected failure exception: " + ex); |
| |
| // Verify the consumer gets marked closed |
| assertTrue(Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| try { |
| consumer.getMessageSelector(); |
| } catch (IllegalStateException jmsise) { |
| return true; |
| } |
| return false; |
| } |
| }, 5000, 5), "consumer never closed."); |
| |
| // Shut down last peer and verify no connection made to it |
| finalPeer.purgeExpectations(); |
| finalPeer.close(); |
| assertNotNull(originalPeer.getClientSocket(), "First peer should have accepted a TCP connection"); |
| assertNotNull(rejectingPeer.getClientSocket(), "Rejecting peer should have accepted a TCP connection"); |
| assertNull(finalPeer.getClientSocket(), "Final peer should not have accepted any TCP connection"); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesTemporarySaslFailure() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final AtomicBoolean exceptionListenerFired = new AtomicBoolean(); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting --- // |
| rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| final String expectedMessageContent = "myTextMessage"; |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(expectedMessageContent)); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.set(true); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageConsumer consumer = session.createConsumer(queue); |
| |
| finalPeer.waitForAllHandlersToComplete(2000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Check message arrives |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| Message msg = consumer.receive(5000); |
| assertTrue(msg instanceof TextMessage, "Expected an instance of TextMessage, got: " + msg); |
| assertEquals(expectedMessageContent, ((TextMessage) msg).getText(), "Unexpected msg content"); |
| |
| assertFalse(exceptionListenerFired.get(), "The ExceptionListener should not have been alerted"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesConnectErrorInvalidField() throws Exception { |
| doFailoverHandlesConnectErrorInvalidFieldTestImpl(false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesConnectErrorInvalidFieldWithContainerIdHint() throws Exception { |
| // As above but also including hint that the container-id is the invalid field, i.e invalid ClientID |
| doFailoverHandlesConnectErrorInvalidFieldTestImpl(true); |
| } |
| |
| private void doFailoverHandlesConnectErrorInvalidFieldTestImpl(boolean includeContainerIdHint) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final String finalURI = createPeerURI(finalPeer); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| Map<Symbol, Object> errorInfo = null; |
| if (includeContainerIdHint) { |
| errorInfo = new HashMap<Symbol, Object>(); |
| errorInfo.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID); |
| } |
| |
| originalPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", errorInfo); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message message = consumer.receive(2000); |
| |
| assertNotNull(message); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesConnectErrorInvalidFieldOnReconnect() throws Exception { |
| doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesConnectErrorInvalidFieldOnReconnectWithContainerIdHint() throws Exception { |
| // As above but also including hint that the container-id is the invalid field, i.e invalid ClientID |
| doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(true); |
| } |
| |
| private void doFailoverHandlesConnectErrorInvalidFieldOnReconnectTestImpl(boolean includeContainerIdHint) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final String finalURI = createPeerURI(finalPeer); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.dropAfterLastHandler(10); |
| |
| Map<Symbol, Object> errorInfo = null; |
| if (includeContainerIdHint) { |
| errorInfo = new HashMap<Symbol, Object>(); |
| errorInfo.put(AmqpSupport.INVALID_FIELD, AmqpSupport.CONTAINER_ID); |
| } |
| rejectingPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", errorInfo); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message message = consumer.receive(2000); |
| |
| assertNotNull(message); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesConnectErrorNotFound() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final String finalURI = createPeerURI(finalPeer); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| originalPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message message = consumer.receive(2000); |
| |
| assertNotNull(message); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropThenRejectionCloseAfterConnect() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| long ird = 0; |
| long rd = 2000; |
| long start = System.currentTimeMillis(); |
| |
| final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + ird + "&failover.reconnectDelay=" + rd + "&failover.maxReconnectAttempts=10", originalPeer, rejectingPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertEquals(1L, finalConnected.getCount(), "should not yet have connected to final peer"); |
| |
| // Set expectations on rejecting and final peer |
| rejectingPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| // Close the original peer and wait for things to shake out. |
| originalPeer.close(); |
| |
| rejectingPeer.waitForAllHandlersToComplete(2000); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| long end = System.currentTimeMillis(); |
| |
| long margin = 2000; |
| assertThat("Elapsed time outwith expected range for reconnect", end - start, |
| both(greaterThanOrEqualTo(ird + rd)).and(lessThanOrEqualTo(ird + rd + margin))); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesImmediateTransportDropAfterConnect() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertEquals(1L, finalConnected.getCount(), "should not yet have connected to final peer"); |
| |
| // Set expectations on rejecting and final peer |
| rejectingPeer.expectSaslHeaderThenDrop(); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| |
| // Close the original peer and wait for things to shake out. |
| originalPeer.close(); |
| |
| rejectingPeer.waitForAllHandlersToComplete(2000); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesRemotelyEndConnectionForced() throws Exception { |
| try (TestAmqpPeer forcingPeer = new TestAmqpPeer(); |
| TestAmqpPeer backupPeer = new TestAmqpPeer();) { |
| |
| final String forcingPeerURI = createPeerURI(forcingPeer); |
| final String backupPeerURI = createPeerURI(backupPeer); |
| LOG.info("Primary is at {}: Backup peer is at: {}", forcingPeerURI, backupPeerURI); |
| |
| final CountDownLatch connectedToPrimary = new CountDownLatch(1); |
| final CountDownLatch connectedToBackup = new CountDownLatch(1); |
| |
| forcingPeer.expectSaslAnonymous(); |
| forcingPeer.expectOpen(); |
| forcingPeer.expectBegin(); |
| forcingPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 10); |
| |
| backupPeer.expectSaslAnonymous(); |
| backupPeer.expectOpen(); |
| backupPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(forcingPeer, backupPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (remoteURI.toString().equals(forcingPeerURI)) { |
| connectedToPrimary.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Reestablished: {}", remoteURI); |
| if (remoteURI.toString().equals(backupPeerURI)) { |
| connectedToBackup.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| forcingPeer.waitForAllHandlersToComplete(3000); |
| |
| assertTrue(connectedToPrimary.await(5, TimeUnit.SECONDS), "Should connect to primary peer"); |
| assertTrue(connectedToBackup.await(5, TimeUnit.SECONDS), "Should connect to backup peer"); |
| |
| backupPeer.expectClose(); |
| connection.close(); |
| backupPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesTransportDropBeforeDispositionRecieived() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // Create session+producer, send a persistent message on auto-ack session for synchronous send |
| originalPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| originalPeer.expectSenderAttach(); |
| |
| final MessageProducer producer = session.createProducer(queue); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| |
| final Message message = session.createTextMessage(); |
| |
| final CountDownLatch senderCompleted = new CountDownLatch(1); |
| final AtomicReference<Throwable> problem = new AtomicReference<Throwable>(); |
| |
| // Have the peer expect the message but NOT send any disposition for it |
| originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true); |
| |
| Thread runner = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| producer.send(message); |
| } catch (Throwable t) { |
| problem.set(t); |
| LOG.error("Problem in sending thread", t); |
| } finally { |
| senderCompleted.countDown(); |
| } |
| } |
| }); |
| runner.start(); |
| |
| // Wait for the message to have been sent and received by peer |
| originalPeer.waitForAllHandlersToComplete(3000); |
| |
| // Set the secondary peer to expect connection restoration, this time send disposition accepting the message |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true); |
| |
| assertEquals(1L, finalConnected.getCount(), "Should not yet have connected to final peer"); |
| assertEquals(1L, senderCompleted.getCount(), "Sender thread should not yet have completed"); |
| |
| // Close the original peer to provoke reconnect, while send() is still outstanding |
| originalPeer.close(); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| boolean await = senderCompleted.await(5, TimeUnit.SECONDS); |
| Throwable t = problem.get(); |
| assertTrue(await, "Sender thread should have completed. Problem: " + t); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesRemoteCloseBeforeDispositionRecieived() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| final MessageProducer producer = session.createProducer(queue); |
| |
| MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); |
| MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(headersMatcher); |
| messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); |
| |
| final Message message = session.createTextMessage(); |
| |
| final CountDownLatch senderCompleted = new CountDownLatch(1); |
| final AtomicReference<Throwable> problem = new AtomicReference<Throwable>(); |
| |
| // Have the peer expect the message but NOT send any disposition for it |
| originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true); |
| originalPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 5); |
| |
| Thread runner = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| producer.send(message); |
| } catch (Throwable t) { |
| problem.set(t); |
| LOG.error("Problem in sending thread", t); |
| } finally { |
| senderCompleted.countDown(); |
| } |
| } |
| }); |
| runner.start(); |
| |
| // Wait for the message to have been sent and received by peer |
| originalPeer.waitForAllHandlersToComplete(3000); |
| |
| // Set the secondary peer to expect connection restoration, this time send disposition accepting the message |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true); |
| finalPeer.expectClose(); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| boolean await = senderCompleted.await(5, TimeUnit.SECONDS); |
| Throwable t = problem.get(); |
| assertTrue(await, "Sender thread should have completed. Problem: " + t); |
| |
| connection.close(); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropWithModifiedInitialReconnectDelay() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.initialReconnectDelay=1&failover.reconnectDelay=600&failover.maxReconnectAttempts=10", |
| originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(30) |
| public void testFailoverInitialReconnectDelayDoesNotApplyToInitialConnect() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer();) { |
| // Create a peer to connect to |
| final String originalURI = createPeerURI(originalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| int delay = 20000; |
| StopWatch watch = new StopWatch(); |
| |
| JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + delay + "&failover.maxReconnectAttempts=1", originalPeer); |
| connection.start(); |
| |
| long taken = watch.taken(); |
| |
| String message = "Initial connect should not have delayed for the specified initialReconnectDelay." + "Elapsed=" + taken + ", delay=" + delay; |
| assertTrue(taken < delay, message); |
| assertTrue(taken < 5000, "Connection took longer than reasonable: " + taken); |
| |
| // Shut it down |
| originalPeer.expectClose(); |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropZeroPrefetchPullConsumerReceiveNoWait() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE)); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE)); |
| finalPeer.expectDetach(true, true, true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertNull(consumer.receiveNoWait()); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| consumer.close(); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropZeroPrefetchPullConsumerReceiveWithTimeout() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); |
| finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); |
| finalPeer.expectDetach(true, true, true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertNull(consumer.receive(500)); |
| LOG.info("Receive returned"); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| LOG.info("Closing consumer"); |
| consumer.close(); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropZeroPrefetchPullConsumerReceive() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, 1, false, false, equalTo(UnsignedInteger.ONE), 1, true); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| finalPeer.expectDetach(true, true, true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertNotNull(consumer.receive()); |
| LOG.info("Receive returned"); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| LOG.info("Closing consumer"); |
| consumer.close(); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropAfterQueueBrowserDrain() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| originalPeer.expectBegin(); |
| originalPeer.expectQueueBrowserAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectQueueBrowserAttach(); |
| finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); |
| finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); |
| finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH))); |
| finalPeer.expectDetach(true, true, true); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| QueueBrowser browser = session.createBrowser(queue); |
| Enumeration<?> queueView = browser.getEnumeration(); |
| |
| assertNotNull(queueView); |
| assertFalse(queueView.hasMoreElements()); |
| |
| browser.close(); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesDropAfterSessionCloseRequested() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| |
| final String originalURI = createPeerURI(originalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| originalPeer.expectBegin(); |
| originalPeer.expectEnd(false); |
| originalPeer.dropAfterLastHandler(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| final CountDownLatch sessionCloseCompleted = new CountDownLatch(1); |
| final AtomicBoolean sessionClosedThrew = new AtomicBoolean(); |
| Thread sessionCloseThread = new Thread(new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| session.close(); |
| LOG.debug("Close of session returned ok"); |
| } catch (JMSException jmsEx) { |
| LOG.warn("Should not throw on session close when connection drops.", jmsEx); |
| sessionClosedThrew.set(true); |
| } finally { |
| sessionCloseCompleted.countDown(); |
| } |
| } |
| }, "Session close thread"); |
| |
| sessionCloseThread.start(); |
| |
| originalPeer.waitForAllHandlersToComplete(2000); |
| |
| assertTrue(sessionCloseCompleted.await(3, TimeUnit.SECONDS), "Session close should have completed by now"); |
| assertFalse(sessionClosedThrew.get(), "Session close should have completed normally"); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateConsumerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateConsumerFailsWhenLinkRefusedTestImpl(false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateConsumerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateConsumerFailsWhenLinkRefusedTestImpl(true); |
| } |
| |
| private void doCreateConsumerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| Connection connection = establishAnonymousConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| //Expect a link to a topic node, which we will then refuse |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(topicName)); |
| sourceMatcher.withDynamic(equalTo(false)); |
| sourceMatcher.withDurable(equalTo(TerminusDurability.NONE)); |
| |
| testPeer.expectReceiverAttach(notNullValue(), sourceMatcher, true, deferAttachResponseWrite); |
| //Expect the detach response to the test peer closing the consumer link after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| try { |
| //Create a consumer, expect it to throw exception due to the link-refusal |
| session.createConsumer(dest); |
| fail("Consumer creation should have failed when link was refused"); |
| } catch(InvalidDestinationException ide) { |
| LOG.info("Test caught expected error: {}", ide.getMessage()); |
| } |
| |
| // Shut it down |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateProducerFailsWhenLinkRefusedTestImpl(false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateProducerFailsWhenLinkRefusedTestImpl(true); |
| } |
| |
| private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| Connection connection = establishAnonymousConnecton(testPeer); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| // Expect a link to a topic node, which we will then refuse |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(equalTo(topicName)); |
| targetMatcher.withDynamic(equalTo(false)); |
| targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); |
| |
| testPeer.expectSenderAttach(notNullValue(), targetMatcher, true, deferAttachResponseWrite); |
| // Expect the detach response to the test peer closing the producer link after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| try { |
| // Create a producer, expect it to throw exception due to the link-refusal |
| session.createProducer(dest); |
| fail("Producer creation should have failed when link was refused"); |
| } catch(InvalidDestinationException ide) { |
| LOG.info("Test caught expected error: {}", ide.getMessage()); |
| } |
| |
| // Shut it down |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTxRecreatedAfterConnectionFailsOverDropsAfterCoordinatorAttach() throws Exception { |
| doTxRecreatedAfterConnectionFailsOver(true); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTxRecreatedAfterConnectionFailsOverDropsAfterSessionBegin() throws Exception { |
| doTxRecreatedAfterConnectionFailsOver(false); |
| } |
| |
| private void doTxRecreatedAfterConnectionFailsOver(boolean dropAfterCoordinator) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| originalPeer.expectBegin(); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| if (dropAfterCoordinator) { |
| originalPeer.expectCoordinatorAttach(); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a Declared disposition state containing the txnId. |
| originalPeer.expectDeclare(txnId1); |
| } |
| |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with accepted and settled disposition to indicate the rollback succeeded. |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectEnd(); |
| finalPeer.expectClose(); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| LOG.debug("About to close session following final peer connection."); |
| session.close(); |
| LOG.debug("About to close connection following final peer connection."); |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(2000); |
| finalPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTempDestinationRecreatedAfterConnectionFailsOver() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| String dynamicAddress1 = "myTempTopicAddress"; |
| originalPeer.expectTempTopicCreationAttach(dynamicAddress1); |
| |
| originalPeer.dropAfterLastHandler(); |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| String dynamicAddress2 = "myTempTopicAddress2"; |
| finalPeer.expectTempTopicCreationAttach(dynamicAddress2); |
| |
| // Session is recreated after previous temporary destinations are recreated on failover. |
| finalPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| TemporaryTopic tempTopic = session.createTemporaryTopic(); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Delete the temporary Topic and close the session. |
| finalPeer.expectDetach(true, true, true); |
| finalPeer.expectEnd(); |
| |
| tempTopic.delete(); |
| |
| session.close(); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(2000); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverEnforcesRequestTimeoutSession() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch connected = new CountDownLatch(1); |
| final CountDownLatch disconnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to so we can get to a state where we |
| // can try to send when offline. |
| final String peerURI = createPeerURI(testPeer); |
| |
| LOG.info("Original peer is at: {}", peerURI); |
| |
| // Connect to the test peer |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", |
| testPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| |
| @Override |
| public void onConnectionInterrupted(URI remoteURI) { |
| LOG.info("Connection Interrupted: {}", remoteURI); |
| disconnected.countDown(); |
| } |
| |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| connected.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(connected.await(5, TimeUnit.SECONDS), "Should connect to peer"); |
| assertTrue(disconnected.await(5, TimeUnit.SECONDS), "Should lose connection to peer"); |
| |
| try { |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| fail("Should have thrown an exception"); |
| } catch (JmsOperationTimedOutException jmsEx) { |
| LOG.info("Caught timed out exception from send:", jmsEx); |
| } catch (Exception ex) { |
| fail("Should have caught a timed out exception"); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverEnforcesRequestTimeoutSessionWhenBeginSent() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| // Create a peer to connect to so we can get to a state where we |
| // can try to send when offline. |
| final String peerURI = createPeerURI(testPeer); |
| |
| LOG.info("Original peer is at: {}", peerURI); |
| |
| // Connect to the test peer |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(false); |
| testPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=30", testPeer); |
| connection.start(); |
| |
| try { |
| connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| fail("Should have thrown an exception"); |
| } catch (JmsOperationTimedOutException jmsEx) { |
| LOG.info("Caught timed out exception from send:", jmsEx); |
| } catch (Exception ex) { |
| fail("Should have caught a timed out exception"); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverEnforcesSendTimeout() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch connected = new CountDownLatch(1); |
| final CountDownLatch disconnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to so we can get to a state where we |
| // can try to send when offline. |
| final String peerURI = createPeerURI(testPeer); |
| |
| LOG.info("Original peer is at: {}", peerURI); |
| |
| // Connect to the test peer |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| testPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "jms.sendTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", |
| testPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| |
| @Override |
| public void onConnectionInterrupted(URI remoteURI) { |
| LOG.info("Connection Interrupted: {}", remoteURI); |
| disconnected.countDown(); |
| } |
| |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| connected.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(connected.await(5, TimeUnit.SECONDS), "Should connect to peer"); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| |
| assertTrue(disconnected.await(5, TimeUnit.SECONDS), "Should lose connection to peer"); |
| |
| try { |
| producer.send(session.createMessage()); |
| fail("Should have thrown an exception"); |
| } catch (JmsSendTimedOutException jmsEx) { |
| LOG.info("Caught timed out exception from send:", jmsEx); |
| } catch (Exception ex) { |
| fail("Should have caught a timed out exception"); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverEnforcesRequestTimeoutCreateTenpDestination() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch connected = new CountDownLatch(1); |
| final CountDownLatch disconnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to so we can get to a state where we |
| // can try to send when offline. |
| final String peerURI = createPeerURI(testPeer); |
| |
| LOG.info("Original peer is at: {}", peerURI); |
| |
| // Connect to the test peer |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", |
| testPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| |
| @Override |
| public void onConnectionInterrupted(URI remoteURI) { |
| LOG.info("Connection Interrupted: {}", remoteURI); |
| disconnected.countDown(); |
| } |
| |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| connected.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(connected.await(5, TimeUnit.SECONDS), "Should connect to peer"); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| assertTrue(disconnected.await(5, TimeUnit.SECONDS), "Should lose connection to peer"); |
| |
| try { |
| session.createTemporaryQueue(); |
| fail("Should have thrown an exception"); |
| } catch (JmsOperationTimedOutException jmsEx) { |
| LOG.info("Caught timed out exception from send:", jmsEx); |
| } catch (Exception ex) { |
| fail("Should have caught a timed out exception"); |
| } |
| |
| try { |
| session.createTemporaryTopic(); |
| fail("Should have thrown an exception"); |
| } catch (JmsOperationTimedOutException jmsEx) { |
| LOG.info("Caught timed out exception from send:", jmsEx); |
| } catch (Exception ex) { |
| fail("Should have caught a timed out exception"); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfCompletedSyncSend() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final Connection connection = establishAnonymousConnecton(testPeer); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| //Do a warmup |
| String messageContent1 = "myMessage1"; |
| TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher(); |
| messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1)); |
| |
| testPeer.expectTransfer(messageMatcher1); |
| |
| TextMessage message1 = session.createTextMessage(messageContent1); |
| producer.send(message1); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| // Create and send a new message, which is accepted |
| String messageContent2 = "myMessage2"; |
| long delay = 15; |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2)); |
| |
| testPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true, 0, delay); |
| testPeer.expectClose(); |
| |
| TextMessage message2 = session.createTextMessage(messageContent2); |
| |
| long start = System.currentTimeMillis(); |
| producer.send(message2); |
| |
| long elapsed = System.currentTimeMillis() - start; |
| assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay)); |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfRejectedSyncSend() throws Exception { |
| Rejected failingState = new Rejected(); |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); |
| rejectError.setDescription("RLE description"); |
| failingState.setError(rejectError); |
| |
| doFailoverPassthroughOfFailingSyncSendTestImpl(failingState, true); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfReleasedSyncSend() throws Exception { |
| doFailoverPassthroughOfFailingSyncSendTestImpl(new Released(), false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfModifiedFailedSyncSend() throws Exception { |
| Modified failingState = new Modified(); |
| failingState.setDeliveryFailed(true); |
| |
| doFailoverPassthroughOfFailingSyncSendTestImpl(failingState, false); |
| } |
| |
| private void doFailoverPassthroughOfFailingSyncSendTestImpl(ListDescribedType failingState, boolean inspectException) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final Connection connection = establishAnonymousConnecton(testPeer); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| //Do a warmup that succeeds |
| String messageContent1 = "myMessage1"; |
| TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher(); |
| messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1)); |
| |
| testPeer.expectTransfer(messageMatcher1); |
| |
| TextMessage message1 = session.createTextMessage(messageContent1); |
| producer.send(message1); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| // Create and send a new message, which fails as it is not accepted |
| assertFalse(failingState instanceof Accepted); |
| |
| String messageContent2 = "myMessage2"; |
| TransferPayloadCompositeMatcher messageMatcher2 = new TransferPayloadCompositeMatcher(); |
| messageMatcher2.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher2.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher2.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher2.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2)); |
| |
| long delay = 15; |
| testPeer.expectTransfer(messageMatcher2, nullValue(), false, true, failingState, true, 0, delay); |
| |
| TextMessage message2 = session.createTextMessage(messageContent2); |
| |
| long start = System.currentTimeMillis(); |
| try { |
| producer.send(message2); |
| fail("Expected an exception for this send."); |
| } catch (JMSException jmse) { |
| //Expected |
| long elapsed = System.currentTimeMillis() - start; |
| assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay)); |
| |
| if (inspectException) { |
| assertTrue(jmse instanceof ResourceAllocationException); |
| assertTrue(jmse.getMessage().contains("RLE description")); |
| } |
| } |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| |
| //Do a final send that succeeds |
| String messageContent3 = "myMessage3"; |
| TransferPayloadCompositeMatcher messageMatcher3 = new TransferPayloadCompositeMatcher(); |
| messageMatcher3.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher3.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher3.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher3.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent3)); |
| |
| testPeer.expectTransfer(messageMatcher3); |
| |
| TextMessage message3 = session.createTextMessage(messageContent3); |
| producer.send(message3); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfCompletedAsyncSend() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final Connection connection = establishAnonymousConnecton( |
| "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); |
| testPeer.expectClose(); |
| |
| TextMessage message = session.createTextMessage(text); |
| TestJmsCompletionListener listener = new TestJmsCompletionListener(); |
| |
| producer.send(message, listener); |
| |
| assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS), "Did not get async callback"); |
| assertNull(listener.exception); |
| assertNotNull(listener.message); |
| assertTrue(listener.message instanceof TextMessage); |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverPassthroughOfRejectedAsyncCompletionSend() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| Message message = session.createTextMessage("content"); |
| testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), new Rejected(), true); |
| |
| assertNull(message.getJMSDestination(), "Should not yet have a JMSDestination"); |
| |
| TestJmsCompletionListener listener = new TestJmsCompletionListener(); |
| try { |
| producer.send(message, listener); |
| } catch (JMSException e) { |
| LOG.warn("Caught unexpected error: {}", e.getMessage()); |
| fail("No expected exception for this send."); |
| } |
| |
| assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS), "Did not get async callback"); |
| assertNotNull(listener.exception); |
| assertNotNull(listener.message); |
| assertTrue(listener.message instanceof TextMessage); |
| |
| testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); |
| testPeer.expectClose(); |
| |
| listener = new TestJmsCompletionListener(); |
| try { |
| producer.send(message, listener); |
| } catch (JMSException e) { |
| LOG.warn("Caught unexpected error: {}", e.getMessage()); |
| fail("No expected exception for this send."); |
| } |
| |
| assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS), "Did not get async callback"); |
| assertNull(listener.exception); |
| assertNotNull(listener.message); |
| assertTrue(listener.message instanceof TextMessage); |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverConnectionLossFailsWaitingAsyncCompletionSends() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.reconnectDelay=2000&failover.maxReconnectAttempts=60", |
| testPeer); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectSenderAttach(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| final int MSG_COUNT = 5; |
| |
| for (int i = 0; i < MSG_COUNT; ++i) { |
| testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); |
| } |
| |
| // Accept one which shouldn't complete until after the others have failed. |
| testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), new Accepted(), true); |
| testPeer.dropAfterLastHandler(); |
| |
| TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT + 1); |
| try { |
| for (int i = 0; i < MSG_COUNT; ++i) { |
| Message message = session.createTextMessage("content"); |
| producer.send(message, listener); |
| } |
| |
| Message message = session.createTextMessage("content"); |
| producer.send(message, listener); |
| } catch (JMSException e) { |
| LOG.warn("Caught unexpected error: {}", e.getMessage()); |
| fail("No expected exception for this send."); |
| } |
| |
| assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS), "Did not get async callback"); |
| assertEquals(MSG_COUNT, listener.errorCount); |
| assertEquals(1, listener.successCount); |
| assertNotNull(listener.exception); |
| assertNotNull(listener.message); |
| assertTrue(listener.message instanceof TextMessage); |
| |
| connection.close(); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateSessionAfterConnectionDrops() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(nullValue(), false); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectEnd(); |
| finalPeer.expectClose(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| session.close(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateConsumerAfterConnectionDrops() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); |
| finalPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(1))); |
| finalPeer.expectDetach(true, true, true); |
| finalPeer.expectClose(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertNull(consumer.receive(500)); |
| LOG.info("Receive returned"); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| LOG.info("Closing consumer"); |
| consumer.close(); |
| |
| // Shut it down |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testCreateProducerAfterConnectionDrops() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Connect to the first peer |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.dropAfterLastHandler(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| ((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setQueuePrefetch(0); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectDetach(true, true, true); |
| finalPeer.expectClose(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| LOG.info("Closing consumer"); |
| producer.close(); |
| |
| // Shut it down |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTxCommitThrowsWhenNoDischargeResponseSentAndConnectionDrops() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch testConnected = new CountDownLatch(1); |
| final CountDownLatch failedConnection = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String testPeerURI = createPeerURI(testPeer); |
| |
| LOG.info("test peer is at: {}", testPeerURI); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.maxReconnectAttempts=3&failover.useReconnectBackOff=false", testPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (testPeerURI.equals(remoteURI.toString())) { |
| testConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionFailure(Throwable cause) { |
| LOG.info("Connection Failed: {}", cause); |
| failedConnection.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(testConnected.await(6, TimeUnit.SECONDS), "Should connect to test peer"); |
| |
| testPeer.expectBegin(); |
| testPeer.expectCoordinatorAttach(); |
| |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a Declared disposition state containing the txnId. |
| Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| testPeer.expectDeclare(txnId); |
| |
| // The session should send a commit but we don't respond, then drop the connection |
| // and check that the commit is failed due to dropped connection. |
| testPeer.expectDischargeButDoNotRespond(txnId, false); |
| testPeer.expectDeclareButDoNotRespond(); |
| |
| testPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 100); |
| |
| // --- Failover should handle the connection close ---------------// |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| try { |
| session.commit(); |
| fail("Commit should have thrown an exception"); |
| } catch (JMSException jmsEx) { |
| LOG.debug("Commit threw: ", jmsEx); |
| } |
| |
| assertTrue(failedConnection.await(5, TimeUnit.SECONDS), "Should reported failed"); |
| |
| try { |
| connection.close(); |
| } catch (JMSException jmsEx) {} |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testDropAndRejectAfterwardsHonorsMax() throws Exception { |
| try (TestAmqpPeer firstPeer = new TestAmqpPeer(); |
| TestAmqpPeer secondPeer = new TestAmqpPeer(); |
| TestAmqpPeer thirdPeer = new TestAmqpPeer(); |
| TestAmqpPeer fourthPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch testConnected = new CountDownLatch(1); |
| final CountDownLatch failedConnection = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String firstPeerURI = createPeerURI(firstPeer); |
| |
| LOG.info("First peer is at: {}", firstPeerURI); |
| LOG.info("Second peer is at: {}", createPeerURI(secondPeer)); |
| LOG.info("Third peer is at: {}", createPeerURI(thirdPeer)); |
| LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer)); |
| |
| firstPeer.expectSaslAnonymous(); |
| firstPeer.expectOpen(); |
| firstPeer.expectBegin(); |
| firstPeer.remotelyCloseConnection(true, ConnectionError.CONNECTION_FORCED, "Server is going away", 100); |
| |
| secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| |
| // This shouldn't get hit, but if it does accept the connect so we don't pass the failed |
| // to connect assertion. |
| fourthPeer.expectSaslAnonymous(); |
| fourthPeer.expectOpen(); |
| fourthPeer.expectBegin(); |
| fourthPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.maxReconnectAttempts=2&failover.useReconnectBackOff=false", firstPeer, secondPeer, thirdPeer, fourthPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (firstPeerURI.equals(remoteURI.toString())) { |
| testConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionFailure(Throwable cause) { |
| LOG.info("Connection Failed: {}", cause); |
| failedConnection.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(testConnected.await(5, TimeUnit.SECONDS), "Should connect to first peer"); |
| |
| // --- Failover should handle the connection close ---------------// |
| |
| assertTrue(failedConnection.await(5, TimeUnit.SECONDS), "Should reported failed"); |
| |
| try { |
| connection.close(); |
| } catch (JMSException jmsEx) {} |
| |
| secondPeer.waitForAllHandlersToCompleteNoAssert(2000); |
| thirdPeer.waitForAllHandlersToCompleteNoAssert(2000); |
| |
| // Shut down last peer and verify no connection made to it |
| fourthPeer.purgeExpectations(); |
| fourthPeer.close(); |
| assertNotNull(firstPeer.getClientSocket(), "Peer 1 should have accepted a TCP connection"); |
| assertNotNull(secondPeer.getClientSocket(), "Peer 2 should have accepted a TCP connection"); |
| assertNotNull(thirdPeer.getClientSocket(), "Peer 3 should have accepted a TCP connection"); |
| assertNull(fourthPeer.getClientSocket(), "Peer 4 should not have accepted any TCP connection"); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testStartMaxReconnectAttemptsTriggeredWhenRemotesAreRejecting() throws Exception { |
| try (TestAmqpPeer firstPeer = new TestAmqpPeer(); |
| TestAmqpPeer secondPeer = new TestAmqpPeer(); |
| TestAmqpPeer thirdPeer = new TestAmqpPeer(); |
| TestAmqpPeer fourthPeer = new TestAmqpPeer()) { |
| |
| final CountDownLatch failedConnection = new CountDownLatch(1); |
| |
| LOG.info("First peer is at: {}", createPeerURI(firstPeer)); |
| LOG.info("Second peer is at: {}", createPeerURI(secondPeer)); |
| LOG.info("Third peer is at: {}", createPeerURI(thirdPeer)); |
| LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer)); |
| |
| firstPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null); |
| |
| // This shouldn't get hit, but if it does accept the connect so we don't pass the failed |
| // to connect assertion. |
| fourthPeer.expectSaslAnonymous(); |
| fourthPeer.expectOpen(); |
| fourthPeer.expectBegin(); |
| fourthPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.startupMaxReconnectAttempts=3&failover.reconnectDelay=15&failover.useReconnectBackOff=false", |
| firstPeer, secondPeer, thirdPeer, fourthPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| |
| @Override |
| public void onConnectionFailure(Throwable cause) { |
| LOG.info("Connection Failed: {}", cause); |
| failedConnection.countDown(); |
| } |
| }); |
| |
| try { |
| connection.start(); |
| fail("Should not be able to connect"); |
| } catch (JmsResourceNotFoundException jmsrnfe) {} |
| |
| // --- Failover should handle the connection close ---------------// |
| |
| assertTrue(failedConnection.await(5, TimeUnit.SECONDS), "Should reported failed"); |
| |
| try { |
| connection.close(); |
| } catch (JMSException jmsEx) {} |
| |
| firstPeer.waitForAllHandlersToCompleteNoAssert(2000); |
| secondPeer.waitForAllHandlersToCompleteNoAssert(2000); |
| thirdPeer.waitForAllHandlersToCompleteNoAssert(2000); |
| |
| // Shut down last peer and verify no connection made to it |
| fourthPeer.purgeExpectations(); |
| fourthPeer.close(); |
| assertNotNull(firstPeer.getClientSocket(), "Peer 1 should have accepted a TCP connection"); |
| assertNotNull(secondPeer.getClientSocket(), "Peer 2 should have accepted a TCP connection"); |
| assertNotNull(thirdPeer.getClientSocket(), "Peer 3 should have accepted a TCP connection"); |
| assertNull(fourthPeer.getClientSocket(), "Peer 4 should not have accepted any TCP connection"); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectionConsumerRecreatedAfterReconnect() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| ServerSessionPool sessionPool = Mockito.mock(ServerSessionPool.class); |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Queue queue = new JmsQueue("myQueue"); |
| connection.createConnectionConsumer(queue, null, sessionPool, 100); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListener"; |
| |
| doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testRemotelyCloseConsumerWithMessageListenerWithoutErrorFiresJMSExceptionListener() throws Exception { |
| // As above but with the peer not including any error condition in its consumer close |
| doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(null, null); |
| } |
| |
| private void doRemotelyCloseConsumerWithMessageListenerFiresJMSExceptionListenerTestImpl(Symbol errorCondition, String errorDescription) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| CountDownLatch consumerClosed = new CountDownLatch(1); |
| CountDownLatch exceptionListenerFired = new CountDownLatch(1); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| |
| final JmsConnection connection = establishAnonymousConnecton("failover.maxReconnectAttempts=1", testPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.countDown(); |
| } |
| }); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConsumerClosed(MessageConsumer consumer, Throwable exception) { |
| consumerClosed.countDown(); |
| } |
| }); |
| |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session2.createQueue("myQueue"); |
| |
| // Create a consumer, then remotely end it afterwards. |
| testPeer.expectReceiverAttach(); |
| testPeer.expectLinkFlow(); |
| testPeer.expectEnd(); |
| testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, errorCondition, errorDescription, 10); |
| |
| final MessageConsumer consumer = session2.createConsumer(queue); |
| consumer.setMessageListener(new MessageListener() { |
| @Override |
| public void onMessage(Message message) { |
| } |
| }); |
| |
| // Close first session to allow the receiver remote close timing to be deterministic |
| session1.close(); |
| |
| // Verify the consumer gets marked closed |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertTrue(Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| try { |
| consumer.getMessageListener(); |
| } catch (IllegalStateException jmsise) { |
| if (jmsise.getCause() != null) { |
| String message = jmsise.getCause().getMessage(); |
| if (errorCondition != null) { |
| return message.contains(errorCondition.toString()) && message.contains(errorDescription); |
| } else { |
| return message.contains("Unknown error from remote peer"); |
| } |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| }, 5000, 10), "consumer never closed."); |
| |
| assertTrue(consumerClosed.await(2000, TimeUnit.MILLISECONDS), "Consumer closed callback didn't trigger"); |
| assertTrue(exceptionListenerFired.await(2000, TimeUnit.MILLISECONDS), "JMS Exception listener should have fired with a MessageListener"); |
| |
| // Try closing it explicitly, should effectively no-op in client. |
| // The test peer will throw during close if it sends anything. |
| consumer.close(); |
| |
| // Shut the connection down |
| testPeer.expectClose(); |
| connection.close(); |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerFailsConnectionAndRetries() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testFailoverCannotRecreateConsumerFailsConnectionAndRetries"; |
| |
| doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerFailsConnectionAndRetriesNoErrorConditionGiven() throws Exception { |
| doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(null, null); |
| } |
| |
| private void doTestFailoverCannotRecreateConsumerFailsConnectionAndRetries(Symbol errorCondition, String errorMessage) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final AtomicBoolean exceptionListenerFired = new AtomicBoolean(); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting --- // |
| rejectingPeer.expectSaslAnonymous(); |
| rejectingPeer.expectOpen(); |
| rejectingPeer.expectBegin(); |
| rejectingPeer.expectBegin(); |
| rejectingPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, errorCondition, errorMessage); |
| // --- Client will clean up connection and then reconnect to next peer --- // |
| rejectingPeer.expectDetach(true, false, false); |
| rejectingPeer.expectClose(); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| final String expectedMessageContent = "myTextMessage"; |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(expectedMessageContent)); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| |
| AtomicReference<Message> msgRef = new AtomicReference<>(); |
| final CountDownLatch msgReceived = new CountDownLatch(1); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.set(true); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageConsumer consumer = session.createConsumer(queue); |
| consumer.setMessageListener(new MessageListener() { |
| @Override |
| public void onMessage(Message message) { |
| msgRef.set(message); |
| msgReceived.countDown(); |
| } |
| }); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Check message arrives |
| assertTrue(msgReceived.await(3, TimeUnit.SECONDS), "The onMessage listener should have fired"); |
| Message msg = msgRef.get(); |
| assertTrue(msg instanceof TextMessage, "Expected an instance of TextMessage, got: " + msg); |
| assertEquals(expectedMessageContent, ((TextMessage) msg).getText(), "Unexpected msg content"); |
| |
| // Check that consumer isn't closed |
| try { |
| consumer.getMessageListener(); |
| } catch (JMSException ex) { |
| fail("Consumer should be in open state and not throw here."); |
| } |
| |
| assertFalse(exceptionListenerFired.get(), "The ExceptionListener should not have been alerted"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateProducerFailsConnectionAndRetries() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testFailoverCannotRecreateProducerFailsConnectionAndRetries"; |
| |
| doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateProducerFailsConnectionAndRetriesNoErrorConditionGiven() throws Exception { |
| doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(null, null); |
| } |
| |
| private void doTestFailoverCannotRecreateProducerFailsConnectionAndRetries(Symbol errorCondition, String errorMessage) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer rejectingPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final AtomicBoolean exceptionListenerFired = new AtomicBoolean(); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String rejectingURI = createPeerURI(rejectingPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Rejecting peer is at: {}", rejectingURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting --- // |
| rejectingPeer.expectSaslAnonymous(); |
| rejectingPeer.expectOpen(); |
| rejectingPeer.expectBegin(); |
| rejectingPeer.expectBegin(); |
| rejectingPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, false, -1, errorCondition, errorMessage); |
| // --- Client will clean up connection and then reconnect to next peer --- // |
| rejectingPeer.expectDetach(true, false, false); |
| rejectingPeer.expectClose(); |
| |
| // --- Post Failover Expectations of FinalPeer --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.set(true); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageProducer producer = session.createProducer(queue); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Check that producer isn't closed |
| try { |
| producer.getDestination(); |
| } catch (JMSException ex) { |
| fail("Producer should be in open state and not throw here."); |
| } |
| |
| // Send a message |
| String messageContent = "myMessage"; |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); |
| messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent)); |
| finalPeer.expectTransfer(messageMatcher); |
| |
| Message message = session.createTextMessage(messageContent); |
| producer.send(message); |
| |
| assertFalse(exceptionListenerFired.get(), "The ExceptionListener should not have been alerted"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled"; |
| |
| doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(true, errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoMessageListener() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoMessageListener"; |
| |
| doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(false, errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoErrorConditionGiven() throws Exception { |
| doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(true, null, null); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateConsumerWithCloseFailedLinksEnabledNoErrorConditionGivenNoMessageListener() throws Exception { |
| doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(false, null, null); |
| } |
| |
| private void doTestFailoverCannotRecreateConsumerWithCloseFailedLinksEnabled(boolean addListener, Symbol errorCondition, String errorDescription) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final CountDownLatch exceptionListenerFired = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlow(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, errorCondition, errorDescription); |
| finalPeer.expectDetach(true, false, false); |
| |
| final int prefetch; |
| if (addListener) { |
| prefetch = 0; |
| } else { |
| prefetch = 1; |
| } |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "jms.prefetchPolicy.all="+ prefetch + "&jms.closeLinksThatFailOnReconnect=true", originalPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.countDown(); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageConsumer consumer = session.createConsumer(queue); |
| if (addListener) { |
| consumer.setMessageListener(new MessageListener() { |
| @Override |
| public void onMessage(Message message) { |
| } |
| }); |
| } |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Verify the consumer gets marked closed |
| assertTrue(Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| try { |
| consumer.getMessageListener(); |
| } catch (IllegalStateException jmsise) { |
| if (jmsise.getCause() != null) { |
| String message = jmsise.getCause().getMessage(); |
| if (errorCondition != null) { |
| return message.contains(errorCondition.toString()) && |
| message.contains(errorDescription); |
| } else { |
| return message.contains("Link creation was refused"); |
| } |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| }, 5000, 10), "consumer never closed."); |
| |
| // Verify the exception listener behaviour |
| if (addListener) { |
| assertTrue(exceptionListenerFired.await(2, TimeUnit.SECONDS), "JMS Exception listener should have fired with a MessageListener"); |
| } else { |
| assertFalse(exceptionListenerFired.await(10, TimeUnit.MILLISECONDS), "The ExceptionListener should not have been alerted"); |
| } |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateProducerWithCloseFailedLinksEnabled() throws Exception { |
| Symbol errorCondition = AmqpError.RESOURCE_DELETED; |
| String errorDescription = "testFailoverCannotRecreateProducerWithCloseFailedLinksEnabled"; |
| |
| doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(errorCondition, errorDescription); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverCannotRecreateProducerWithCloseFailedLinksEnabledNoErrorConditionGiven() throws Exception { |
| doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(null, null); |
| } |
| |
| private void doTestFailoverCannotRecreateWithCloseFailedLinksEnabled(Symbol errorCondition, String errorDescription) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final AtomicBoolean exceptionListenerFired = new AtomicBoolean(); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| // Expect connection to the first peer (and have it drop) |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of Rejecting --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, false, -1, errorCondition, errorDescription); |
| finalPeer.expectDetach(true, false, false); |
| |
| final JmsConnection connection = establishAnonymousConnecton("jms.closeLinksThatFailOnReconnect=true", originalPeer, finalPeer); |
| connection.setExceptionListener(new ExceptionListener() { |
| @Override |
| public void onException(JMSException exception) { |
| LOG.trace("JMS ExceptionListener: ", exception); |
| exceptionListenerFired.set(true); |
| } |
| }); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| final MessageProducer producer = session.createProducer(queue); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // Verify the producer gets marked closed |
| assertTrue(Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| try { |
| producer.getDestination(); |
| } catch (IllegalStateException jmsise) { |
| if (jmsise.getCause() != null) { |
| String message = jmsise.getCause().getMessage(); |
| if (errorCondition != null) { |
| return message.contains(errorCondition.toString()) && |
| message.contains(errorDescription); |
| } else { |
| return message.contains("Link creation was refused"); |
| } |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| }, 5000, 10), "producer never closed."); |
| |
| assertFalse(exceptionListenerFired.get(), "The ExceptionListener should not have been alerted"); |
| |
| // Shut it down |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testInDoubtTransactionFromFailoverCompletesAsyncCompletions() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); |
| stateMatcher.withTxnId(equalTo(txnId1)); |
| stateMatcher.withOutcome(nullValue()); |
| |
| TransactionalState txState = new TransactionalState(); |
| txState.setTxnId(txnId1); |
| txState.setOutcome(new Accepted()); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| // First expect an unsettled 'declare' transfer to the txn coordinator, and |
| // reply with a Declared disposition state containing the txnId. |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectTransfer(messageMatcher, stateMatcher, txState, true); |
| originalPeer.dropAfterLastHandler(10); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectSenderAttach(); |
| // Attempt to commit the in-doubt TX will result in rollback and a new TX will be started. |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectDeclare(txnId1); |
| // this rollback comes from the session being closed on connection close. |
| finalPeer.expectDischarge(txnId1, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| TestJmsCompletionListener listener1 = new TestJmsCompletionListener(); |
| TestJmsCompletionListener listener2 = new TestJmsCompletionListener(); |
| |
| try { |
| producer.send(message, listener1); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| // This should fire after reconnect without an error, if it fires with an error at |
| // any time then something is wrong. |
| assertTrue(listener1.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback for send #1"); |
| assertNull(listener1.exception, "Completion of send #1 should not have been on error"); |
| assertNotNull(listener1.message); |
| assertTrue(listener1.message instanceof TextMessage); |
| |
| try { |
| producer.send(message, listener2); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| assertTrue(listener2.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback for send #2"); |
| assertNull(listener2.exception, "Completion of send #2 should not have been on error"); |
| assertNotNull(listener2.message); |
| assertTrue(listener2.message instanceof TextMessage); |
| |
| try { |
| session.commit(); |
| fail("Transaction should have been rolled back"); |
| } catch (TransactionRolledBackException txrbex) {} |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testSendAndConnectionDropsRecoveredAsInDoubtTransaction() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // Send should occurs within transaction #1 |
| TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher(); |
| txn1StateMatcher.withTxnId(equalTo(txnId1)); |
| txn1StateMatcher.withOutcome(nullValue()); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectSenderAttach(); |
| // Send is synchronous so we don't respond in order to stall the MessageProducer |
| // in the send call to block recovery from initiating a new transaction which |
| // will then cause the current transaction to become in-doubt and commit should |
| // throw a transaction rolled back exception. |
| originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| // Send will be blocked waiting to fire so it will not be filtered by the local |
| // transaction context in-doubt checks since there was no other work pending the |
| // transaction will be fully recovered so the fixed producer must ensure that no |
| // send occurs outside the transaction boundaries. |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.setForceSyncSend(true); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed to send."); |
| } |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| try { |
| session.commit(); |
| fail("Transaction should throw rolled back error as an operation is pending on recover."); |
| } catch (TransactionRolledBackException txrbex) { |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testSecondSendAndConnectionDropsResendsButTransactionRollsBackAsInDoubt() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // Send should occurs within transaction #1 |
| TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher(); |
| txn1StateMatcher.withTxnId(equalTo(txnId1)); |
| txn1StateMatcher.withOutcome(nullValue()); |
| |
| // Disposition should occurs within transaction #1 before failover |
| TransactionalState txn1Disposition = new TransactionalState(); |
| txn1Disposition.setTxnId(txnId2); |
| txn1Disposition.setOutcome(new Accepted()); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, txn1Disposition, true); |
| // Send is synchronous so we don't respond in order to stall the MessageProducer |
| // in the send call to block recovery from initiating a new transaction which |
| // will then cause the current transaction to become in-doubt and commit should |
| // throw a transaction rolled back exception. |
| originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| // Send will be blocked waiting to fire so it will not be filtered |
| // by the local transaction context in-doubt checks, however since there |
| // was pending transactional work the transaction will be rolled back |
| // on commit and then a new transaction will be activated. The producer |
| // will filter the held send as there is no active transaction. |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.setForceSyncSend(true); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed to send."); |
| } |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed to send."); |
| } |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| try { |
| session.commit(); |
| fail("Transaction should have been been in-doubt and a rolled back error thrown."); |
| } catch (TransactionRolledBackException txrbex) { |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTransactionalAcknowledgeAfterRecoveredWhileSendBlocked() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent"); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // The initial send should occur in transaction #1 |
| TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher(); |
| transfer1StateMatcher.withTxnId(equalTo(txnId1)); |
| transfer1StateMatcher.withOutcome(nullValue()); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectSenderAttach(); |
| // Send is synchronous so we don't respond in order to stall the MessageProducer |
| // in the send call to block recovery from initiating a new transaction which |
| // will then cause the current transaction to become in-doubt and commit should |
| // throw a transaction rolled back exception. |
| originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| // Send will be blocked waiting to fire so it will not be filtered |
| // by the local transaction context in-doubt checks, however since there |
| // was pending transactional work the transaction will be rolled back. |
| // The AmqpFixedProducer should filter the send after reconnect as there |
| // won't be an active transaction coordinator until we start a new TXN. |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.setForceSyncSend(true); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the send after connection dropped."); |
| } |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| MessageConsumer consumer = session.createConsumer(queue); |
| assertNotNull(consumer.receive(5000)); |
| |
| try { |
| session.commit(); |
| fail("Transaction should have been rolled back"); |
| } catch (TransactionRolledBackException txrbex) { |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(30) |
| public void testReceiveAndSendInTransactionFailsCommitWhenConnectionDropsDuringSend() throws Exception { |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| final TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // The initial send before failover should arrive in transaction #1 |
| final TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher(); |
| transfer1StateMatcher.withTxnId(equalTo(txnId1)); |
| transfer1StateMatcher.withOutcome(nullValue()); |
| |
| // Transactional Acknowledge should happen before the failover then no others should arrive. |
| final TransactionalStateMatcher dispositionStateMatcher = new TransactionalStateMatcher(); |
| dispositionStateMatcher.withTxnId(equalTo(txnId1)); |
| dispositionStateMatcher.withOutcome(new AcceptedMatcher()); |
| |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| final CountDownLatch transactionRollback = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectReceiverAttach(); |
| originalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1); |
| originalPeer.expectDisposition(true, dispositionStateMatcher); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false); |
| originalPeer.dropAfterLastHandler(); |
| |
| // Following failover the blocked send will be retried but the transaction will have |
| // been marked as in-dbout by the context and no new transactional work will be done |
| // until the commit is called and it throws a transaction rolled back error. |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.setForceSyncSend(true); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer messageConsumer = session.createConsumer(queue); |
| messageConsumer.setMessageListener((message) -> { |
| try { |
| session.createProducer(queue).send(session.createTextMessage("sample")); |
| session.commit(); |
| } catch (TransactionRolledBackException txnRbEx) { |
| transactionRollback.countDown(); |
| } catch (JMSException jmsEx) { |
| throw new RuntimeException("Behaving badly since commit already did", jmsEx); |
| } |
| }); |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectClose(); |
| |
| assertTrue(transactionRollback.await(5, TimeUnit.SECONDS), "Should have encounted a Transaction Rollback Error"); |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTransactionDeclareWithNoResponseRecoveredAsInDoubtAndCommitFails() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent"); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // The initial send should occur in transaction #1 |
| TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher(); |
| transferStateMatcher.withTxnId(equalTo(txnId1)); |
| transferStateMatcher.withOutcome(nullValue()); |
| |
| // Accept the initial send after failover in transaction #1 |
| TransactionalState transferTxnOutcome = new TransactionalState(); |
| transferTxnOutcome.setTxnId(txnId1); |
| transferTxnOutcome.setOutcome(new Accepted()); |
| |
| // Receive call after failover should occur in transaction #1 |
| TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher(); |
| txnDispositionStateMatcher.withTxnId(equalTo(txnId1)); |
| txnDispositionStateMatcher.withOutcome(new AcceptedMatcher()); |
| |
| // Drop the connection after the declare giving a chance for recovery to attempt |
| // to rebuild while the context is still reacting to the failed begin. |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclareButDoNotRespond(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId1); |
| finalPeer.expectSenderAttach(); |
| // Send will be blocked waiting to fire so it will not be filtered |
| // by the local transaction context in-doubt checks, however since there |
| // was pending transactional work the transaction will be rolled back. |
| finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDisposition(true, txnDispositionStateMatcher); |
| finalPeer.expectDischarge(txnId1, false); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.setForceSyncSend(true); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| TextMessage message = session.createTextMessage("myMessage"); |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| MessageConsumer consumer = session.createConsumer(queue); |
| assertNotNull(consumer.receive(5000)); |
| |
| try { |
| session.commit(); |
| } catch (TransactionRolledBackException txrbex) { |
| fail("Transaction should not have been rolled back"); |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTransactionCommitWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception { |
| doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(true); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testTransactionRollbackWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception { |
| doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(false); |
| } |
| |
| private void doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(boolean commit) throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent"); |
| |
| // Expect the message which was sent under the current transaction. Check it carries |
| // TransactionalState with the above txnId but has no outcome. Respond with a |
| // TransactionalState with Accepted outcome. |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); |
| messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); |
| |
| // The replayed send should occur in transaction #2 |
| TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher(); |
| transferStateMatcher.withTxnId(equalTo(txnId2)); |
| transferStateMatcher.withOutcome(nullValue()); |
| |
| // Receive call after failover should occur in transaction #2 |
| TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher(); |
| txnDispositionStateMatcher.withTxnId(equalTo(txnId2)); |
| txnDispositionStateMatcher.withOutcome(new AcceptedMatcher()); |
| |
| // Re-send after failover should occur in TXN #2 |
| TransactionalState transferTxnOutcome = new TransactionalState(); |
| transferTxnOutcome.setTxnId(txnId2); |
| transferTxnOutcome.setOutcome(new Accepted()); |
| |
| // Drop the connection after the declare giving a chance for recovery to attempt |
| // to rebuild while the context is still reacting to the failed begin. |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectDischargeButDoNotRespond(txnId1, !commit); |
| originalPeer.expectDeclareButDoNotRespond(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectSenderAttach(); |
| // Send will be blocked waiting to fire so it will not be filtered |
| // by the local transaction context in-doubt checks, however since there |
| // was pending transactional work the transaction will be rolled back. |
| finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDisposition(true, txnDispositionStateMatcher); |
| finalPeer.expectDischarge(txnId2, false); |
| finalPeer.expectDeclare(txnId1); |
| finalPeer.expectDischarge(txnId1, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| if (commit) { |
| try { |
| session.commit(); |
| fail("Should have failed on the commit when connection dropped"); |
| } catch (TransactionRolledBackException txnRbEx) { |
| // Expected |
| } |
| } else { |
| try { |
| session.rollback(); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed on the rollback when connection dropped"); |
| } |
| } |
| |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| TextMessage message = session.createTextMessage("myMessage"); |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| MessageConsumer consumer = session.createConsumer(queue); |
| assertNotNull(consumer.receive(5000)); |
| |
| try { |
| session.commit(); |
| } catch (TransactionRolledBackException txrbex) { |
| fail("Transaction should not have been rolled back"); |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testSendWhileOfflinePreventsRecoveredTransactionFromCommitting() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| |
| // Drop the connection after the declare giving a chance for recovery to attempt |
| // to rebuild while the context is still reacting to the failed begin. |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectCoordinatorAttach(); |
| originalPeer.expectDeclare(txnId1); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectDischargeButDoNotRespond(txnId1, false); |
| originalPeer.expectDeclareButDoNotRespond(); |
| originalPeer.dropAfterLastHandler(); |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectCoordinatorAttach(); |
| finalPeer.expectDeclare(txnId2); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectDischarge(txnId2, true); |
| finalPeer.expectDeclare(txnId1); |
| finalPeer.expectDischarge(txnId1, true); |
| finalPeer.expectClose(); |
| |
| // Need to allow time for the asynchronous send to fire after connection drop in order to ensure |
| // that the send is no-op'd when the TXN is in-doubt |
| final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=1000", originalPeer, finalPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalPeer.getServerPort() == remoteURI.getPort()) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalPeer.getServerPort() == remoteURI.getPort()) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| connection.start(); |
| |
| assertTrue(originalConnected.await(5, TimeUnit.SECONDS), "Should connect to original peer"); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = session.createQueue("myQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| TextMessage message = session.createTextMessage("myMessage"); |
| |
| try { |
| session.commit(); |
| fail("Should have failed on the commit when connection dropped"); |
| } catch (TransactionRolledBackException txnRbEx) { |
| // Expected |
| } |
| |
| // Following the failed commit the Transaction should be in-doubt and the send |
| // should be skipped since the TXN is in-doubt because we should not have connected |
| // to the final peer yet so a recovery wouldn't have happened and the transaction |
| // state couldn't be marked good since currently there should be no active transaction. |
| |
| try { |
| producer.send(message); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| assertTrue(finalConnected.await(5, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| try { |
| session.commit(); |
| fail("Transaction should have been rolled back since a send was skipped."); |
| } catch (TransactionRolledBackException txrbex) { |
| // Expected |
| } |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| // Ensure our send blocks in the provider waiting for credit so that on failover |
| // the message will actually get sent from the Failover bits once we grant some |
| // credit for the recovered sender. |
| originalPeer.expectSenderAttachWithoutGrantingCredit(); |
| originalPeer.dropAfterLastHandler(10); // Wait for sender to get into wait state |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, finalPeer); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| TestJmsCompletionListener listener = new TestJmsCompletionListener(); |
| |
| try { |
| producer.send(message, listener); |
| } catch (JMSException jmsEx) { |
| fail("Should not have failed the async completion send."); |
| } |
| |
| // This should fire after reconnect without an error, if it fires with an error at |
| // any time then something is wrong. |
| assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback"); |
| assertNull(listener.exception, "Completion should not have been on error"); |
| assertNotNull(listener.message); |
| assertTrue(listener.message instanceof TextMessage); |
| |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| // DO NOT add capability to indicate server support for ANONYMOUS-RELAY |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); |
| // Ensure that sender detach is not answered so that next send must wait for close |
| originalPeer.expectDetach(true, false, false); |
| originalPeer.dropAfterLastHandler(20); // Wait for sender to get into wait state |
| |
| // --- Post Failover Expectations of sender --- // |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); |
| finalPeer.expectDetach(true, true, true); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton( |
| "failover.initialReconnectDelay=25" + |
| "&failover.nested.amqp.anonymousFallbackCacheSize=0" + |
| "&failover.nested.amqp.anonymousFallbackCacheTimeout=0", |
| originalPeer, finalPeer); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(null); |
| |
| // Send 2 messages |
| String text = "myMessage"; |
| |
| TextMessage message = session.createTextMessage(text); |
| |
| producer.send(queue, message); |
| producer.send(queue, message); |
| |
| producer.close(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, true); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughCreateTemporaryTopicFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { |
| doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(true, false); |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughCreateTemporaryTopicFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { |
| doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(true, true); |
| } |
| |
| private void doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(boolean topic, boolean deferAttachResponseWrite) throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| try { |
| if (topic) { |
| testPeer.expectAndRefuseTempTopicCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp topics.", false); |
| //Expect the detach response to the test peer after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| session.createTemporaryTopic(); |
| } else { |
| testPeer.expectAndRefuseTempQueueCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp queues.", false); |
| //Expect the detach response to the test peer after refusal. |
| testPeer.expectDetach(true, false, false); |
| |
| session.createTemporaryQueue(); |
| } |
| fail("Should have thrown security exception"); |
| } catch (JMSSecurityException jmsse) { |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughRemotelyCloseProducer() throws Exception { |
| final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| final CountDownLatch producerClosed = new CountDownLatch(1); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onProducerClosed(MessageProducer producer, Throwable exception) { |
| producerClosed.countDown(); |
| } |
| }); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| // Create a producer, then remotely end it afterwards. |
| testPeer.expectSenderAttach(); |
| testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); |
| |
| Queue queue = session.createQueue("myQueue"); |
| final MessageProducer producer = session.createProducer(queue); |
| |
| // Verify the producer gets marked closed |
| testPeer.waitForAllHandlersToComplete(1000); |
| assertTrue(Wait.waitFor(new Wait.Condition() { |
| @Override |
| public boolean isSatisfied() throws Exception { |
| try { |
| producer.getDestination(); |
| } catch (Exception ex) { |
| if (ex instanceof IllegalStateException && ex.getCause() != null) { |
| String message = ex.getCause().getMessage(); |
| if (message.contains(AmqpError.RESOURCE_DELETED.toString()) && |
| message.contains(BREAD_CRUMB)) { |
| return true; |
| } |
| } |
| |
| LOG.debug("Caught unexpected exception: {}", ex); |
| } |
| |
| return false; |
| } |
| }, 10000, 10), "producer never closed."); |
| |
| assertTrue(producerClosed.await(10, TimeUnit.SECONDS), "Producer closed callback didn't trigger"); |
| |
| // Try closing it explicitly, should effectively no-op in client. |
| // The test peer will throw during close if it sends anything. |
| producer.close(); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughOfSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer()) { |
| |
| final String testPeerURI = createPeerURI(testPeer); |
| LOG.info("Original peer is at: {}", testPeerURI); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| |
| // DO NOT add capability to indicate server support for DELAYED-DELIVERY so that |
| // send fails and we can see if the error passes through the failover provider |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.start(); |
| |
| Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY }); |
| Symbol[] offeredCapabilities = null; |
| testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| String topicName = "myTopic"; |
| Topic dest = session.createTopic(topicName); |
| |
| MessageProducer producer = session.createProducer(dest); |
| producer.setDeliveryDelay(5000); |
| |
| // Producer should fail to send when message has delivery delay since remote |
| // did not report that it supports that option. |
| Message message = session.createMessage(); |
| try { |
| producer.send(message); |
| fail("Send should fail"); |
| } catch (JMSException jmsEx) { |
| LOG.debug("Caught expected error from failed send."); |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughOfSendTimesOutWhenNoDispostionArrives() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| final String testPeerURI = createPeerURI(testPeer); |
| LOG.info("Original peer is at: {}", testPeerURI); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.setSendTimeout(500); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| String queueName = "myQueue"; |
| Queue queue = session.createQueue(queueName); |
| |
| Message message = session.createTextMessage("text"); |
| TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); |
| |
| // Expect the producer to attach and grant it some credit, it should send |
| // a transfer which we will not send any response for which should cause the |
| // send operation to time out. |
| testPeer.expectSenderAttach(); |
| testPeer.expectTransferButDoNotRespond(messageMatcher); |
| testPeer.expectClose(); |
| |
| MessageProducer producer = session.createProducer(queue); |
| |
| try { |
| producer.send(message); |
| fail("Send should time out."); |
| } catch (JmsSendTimedOutException jmsEx) { |
| LOG.info("Caught expected error: {}", jmsEx.getMessage()); |
| } catch (Throwable error) { |
| fail("Send should time out, but got: " + error.getMessage()); |
| } |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughOfRollbackErrorCoordinatorClosedOnCommit() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| final String testPeerURI = createPeerURI(testPeer); |
| LOG.info("Original peer is at: {}", testPeerURI); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectCoordinatorAttach(); |
| |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.start(); |
| |
| Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); |
| Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); |
| |
| testPeer.expectDeclare(txnId1); |
| testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2); |
| testPeer.expectCoordinatorAttach(); |
| testPeer.expectDeclare(txnId2); |
| testPeer.expectDischarge(txnId2, true); |
| |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| try { |
| session.commit(); |
| fail("Transaction should have rolled back"); |
| } catch (TransactionRolledBackException ex) { |
| LOG.info("Caught expected TransactionRolledBackException"); |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testPassthroughOfSessionCreateFailsOnDeclareTimeout() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| |
| final String testPeerURI = createPeerURI(testPeer); |
| LOG.info("Original peer is at: {}", testPeerURI); |
| |
| testPeer.expectSaslAnonymous(); |
| testPeer.expectOpen(); |
| testPeer.expectBegin(); |
| testPeer.expectBegin(); |
| testPeer.expectCoordinatorAttach(); |
| testPeer.expectDeclareButDoNotRespond(); |
| // Expect the AMQP session to be closed due to the JMS session creation failure. |
| testPeer.expectEnd(); |
| |
| JmsConnection connection = establishAnonymousConnecton(testPeer); |
| connection.setRequestTimeout(500); |
| connection.start(); |
| |
| try { |
| connection.createSession(true, Session.SESSION_TRANSACTED); |
| fail("Should have timed out waiting for declare."); |
| } catch (JmsOperationTimedOutException jmsEx) { |
| } catch (Throwable error) { |
| fail("Should have caught an timed out exception:"); |
| LOG.error("Caught -> ", error); |
| } |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testConnectionPropertiesExtensionAppliedOnEachReconnect() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final CountDownLatch originalConnected = new CountDownLatch(1); |
| final CountDownLatch finalConnected = new CountDownLatch(1); |
| |
| // Create a peer to connect to, then one to reconnect to |
| final String originalURI = createPeerURI(originalPeer); |
| final String finalURI = createPeerURI(finalPeer); |
| |
| LOG.info("Original peer is at: {}", originalURI); |
| LOG.info("Final peer is at: {}", finalURI); |
| |
| final String property1 = "property1"; |
| final String property2 = "property2"; |
| |
| final UUID value1 = UUID.randomUUID(); |
| final UUID value2 = UUID.randomUUID(); |
| |
| Matcher<?> connPropsMatcher1 = allOf( |
| hasEntry(Symbol.valueOf(property1), value1), |
| not(hasEntry(Symbol.valueOf(property2), value2))); |
| |
| Matcher<?> connPropsMatcher2 = allOf( |
| not(hasEntry(Symbol.valueOf(property1), value1)), |
| hasEntry(Symbol.valueOf(property2), value2)); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(connPropsMatcher1, null, false); |
| originalPeer.expectBegin(); |
| originalPeer.dropAfterLastHandler(10); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(connPropsMatcher2, null, false); |
| finalPeer.expectBegin(); |
| |
| final URI remoteURI = new URI("failover:(" + originalURI + "," + finalURI + ")"); |
| |
| JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| |
| factory.setExtension(JmsConnectionExtensions.AMQP_OPEN_PROPERTIES.toString(), (connection, uri) -> { |
| Map<String, Object> properties = new HashMap<>(); |
| |
| if (originalConnected.getCount() == 1) { |
| properties.put(property1, value1); |
| } else { |
| properties.put(property2, value2); |
| } |
| |
| return properties; |
| }); |
| |
| JmsConnection connection = (JmsConnection) factory.createConnection(); |
| |
| connection.addConnectionListener(new JmsDefaultConnectionListener() { |
| @Override |
| public void onConnectionEstablished(URI remoteURI) { |
| LOG.info("Connection Established: {}", remoteURI); |
| if (originalURI.equals(remoteURI.toString())) { |
| originalConnected.countDown(); |
| } |
| } |
| |
| @Override |
| public void onConnectionRestored(URI remoteURI) { |
| LOG.info("Connection Restored: {}", remoteURI); |
| if (finalURI.equals(remoteURI.toString())) { |
| finalConnected.countDown(); |
| } |
| } |
| }); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| finalPeer.waitForAllHandlersToComplete(2000); |
| |
| assertTrue(originalConnected.await(3, TimeUnit.SECONDS), "Should connect to original peer"); |
| assertTrue(finalConnected.await(3, TimeUnit.SECONDS), "Should connect to final peer"); |
| |
| finalPeer.expectClose(); |
| connection.close(); |
| |
| finalPeer.waitForAllHandlersToComplete(1000); } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testSessionCreationRecoversAfterDropWithNoBeginResponse() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final String content = "myContent"; |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(false); |
| originalPeer.dropAfterLastHandler(20); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| Message message = consumer.receive(2000); |
| |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToCompleteNoAssert(1000); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertNotNull(message); |
| assertTrue(message instanceof TextMessage); |
| assertEquals(content, ((TextMessage) message).getText()); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testMultipleSessionCreationRecoversAfterDropWithNoBeginResponseAndFailedRecoveryAttempt() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer intermediatePeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| final String content = "myContent"; |
| final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(false); |
| originalPeer.dropAfterLastHandler(20); |
| |
| intermediatePeer.expectSaslAnonymous(); |
| intermediatePeer.expectOpen(); |
| intermediatePeer.expectBegin(); |
| intermediatePeer.expectBegin(false); |
| intermediatePeer.dropAfterLastHandler(); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); |
| finalPeer.expectDispositionThatIsAcceptedAndSettled(); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| Queue queue1 = session1.createQueue("myQueue"); |
| MessageConsumer consumer1 = session1.createConsumer(queue1); |
| Message message1 = consumer1.receive(2000); |
| |
| Queue queue2 = session2.createQueue("myQueue"); |
| MessageConsumer consumer2 = session2.createConsumer(queue2); |
| Message message2 = consumer2.receive(2000); |
| |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(1000); |
| intermediatePeer.waitForAllHandlersToComplete(1000); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| |
| assertNotNull(message1); |
| assertTrue(message1 instanceof TextMessage); |
| assertEquals(content, ((TextMessage) message1).getText()); |
| assertNotNull(message2); |
| assertTrue(message2 instanceof TextMessage); |
| assertEquals(content, ((TextMessage) message2).getText()); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testMultipleSenderCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer intermediatePeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectSenderAttachButDoNotRespond(); |
| originalPeer.dropAfterLastHandler(20); |
| |
| intermediatePeer.expectSaslAnonymous(); |
| intermediatePeer.expectOpen(); |
| intermediatePeer.expectBegin(); |
| intermediatePeer.expectBegin(); |
| intermediatePeer.expectSenderAttachButDoNotRespond(); |
| intermediatePeer.dropAfterLastHandler(); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer1 = session.createProducer(queue); |
| MessageProducer producer2 = session.createProducer(queue); |
| |
| assertNotNull(producer1); |
| assertNotNull(producer2); |
| |
| assertEquals(queue, producer1.getDestination()); |
| assertEquals(queue, producer2.getDestination()); |
| |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(1000); |
| intermediatePeer.waitForAllHandlersToComplete(1000); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| @Test |
| @Timeout(20) |
| public void testSenderAndReceiverCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt() throws Exception { |
| try (TestAmqpPeer originalPeer = new TestAmqpPeer(); |
| TestAmqpPeer intermediatePeer = new TestAmqpPeer(); |
| TestAmqpPeer finalPeer = new TestAmqpPeer();) { |
| |
| originalPeer.expectSaslAnonymous(); |
| originalPeer.expectOpen(); |
| originalPeer.expectBegin(); |
| originalPeer.expectBegin(); |
| originalPeer.expectSenderAttach(); |
| originalPeer.expectReceiverAttachButDoNotRespond(); |
| originalPeer.dropAfterLastHandler(20); |
| |
| intermediatePeer.expectSaslAnonymous(); |
| intermediatePeer.expectOpen(); |
| intermediatePeer.expectBegin(); |
| intermediatePeer.expectBegin(); |
| intermediatePeer.expectSenderAttachButDoNotRespond(); |
| intermediatePeer.dropAfterLastHandler(10); |
| |
| finalPeer.expectSaslAnonymous(); |
| finalPeer.expectOpen(); |
| finalPeer.expectBegin(); |
| finalPeer.expectBegin(); |
| finalPeer.expectSenderAttach(); |
| finalPeer.expectReceiverAttach(); |
| finalPeer.expectLinkFlow(); |
| finalPeer.expectClose(); |
| |
| final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); |
| |
| try { |
| connection.start(); |
| } catch (Exception ex) { |
| fail("Should not have thrown an Exception: " + ex); |
| } |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| MessageProducer producer = session.createProducer(queue); |
| MessageConsumer consumer = session.createConsumer(queue); |
| |
| assertNotNull(producer); |
| assertNotNull(consumer); |
| |
| assertEquals(queue, producer.getDestination()); |
| assertNull(consumer.getMessageListener()); |
| |
| connection.close(); |
| |
| originalPeer.waitForAllHandlersToComplete(1000); |
| intermediatePeer.waitForAllHandlersToComplete(1000); |
| finalPeer.waitForAllHandlersToComplete(1000); |
| } |
| } |
| |
| private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { |
| return establishAnonymousConnecton(null, null, peers); |
| } |
| |
| private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws JMSException { |
| return establishAnonymousConnecton(null, failoverParams, peers); |
| } |
| |
| private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws JMSException { |
| if (peers.length == 0) { |
| throw new IllegalArgumentException("No test peers were given, at least 1 required"); |
| } |
| |
| String remoteURI = "failover:("; |
| boolean first = true; |
| for (TestAmqpPeer peer : peers) { |
| if (!first) { |
| remoteURI += ","; |
| } |
| remoteURI += createPeerURI(peer, connectionParams); |
| first = false; |
| } |
| |
| if (failoverParams == null) { |
| remoteURI += ")?failover.maxReconnectAttempts=10"; |
| } else { |
| remoteURI += ")?" + failoverParams; |
| } |
| |
| ConnectionFactory factory = new JmsConnectionFactory(remoteURI); |
| Connection connection = factory.createConnection(); |
| |
| return (JmsConnection) connection; |
| } |
| |
| private String createPeerURI(TestAmqpPeer peer) { |
| return createPeerURI(peer, null); |
| } |
| |
| private String createPeerURI(TestAmqpPeer peer, String params) { |
| return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : ""); |
| } |
| |
| private class TestJmsCompletionListener implements CompletionListener { |
| |
| private final CountDownLatch completed; |
| |
| public volatile int successCount; |
| public volatile int errorCount; |
| |
| public volatile Message message; |
| public volatile Exception exception; |
| |
| public TestJmsCompletionListener() { |
| this(1); |
| } |
| |
| public TestJmsCompletionListener(int expected) { |
| this.completed = new CountDownLatch(expected); |
| } |
| |
| public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { |
| return completed.await(timeout, units); |
| } |
| |
| @Override |
| public void onCompletion(Message message) { |
| LOG.info("JmsCompletionListener onCompletion called with message: {}", message); |
| this.message = message; |
| this.successCount++; |
| |
| completed.countDown(); |
| } |
| |
| @Override |
| public void onException(Message message, Exception exception) { |
| LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); |
| |
| this.message = message; |
| this.exception = exception; |
| this.errorCount++; |
| |
| completed.countDown(); |
| } |
| } |
| } |