QPIDJMS-519 Correctly recover transaction after failover

Adds additional guards against allowing a send or acknowledge
to occur outside the bounds of a transaction when recovering a
connection following failover.  Adds tests to ensure that the
transaction becomes in-doubt whenever the state of recovery
cannot ensure that there will be no lost work or untransacted
work.  Test added to cover cases where this was previously seen.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 0b0b370..e659834 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -16,8 +16,10 @@
  */
 package org.apache.qpid.jms;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.jms.JMSException;
@@ -43,7 +45,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
 
-    private final Map<JmsResourceId, JmsResourceId> participants = new HashMap<JmsResourceId, JmsResourceId>();
+    private final Map<JmsResourceId, JmsResourceId> participants = Collections.synchronizedMap(new HashMap<JmsResourceId, JmsResourceId>());
     private final JmsSession session;
     private final JmsConnection connection;
     private JmsTransactionInfo transactionInfo;
@@ -61,6 +63,9 @@
         lock.readLock().lock();
         try {
             if (isInDoubt()) {
+                // Prevent recovery from reseting transaction to normal operating state.
+                participants.put(envelope.getProducerId(), envelope.getProducerId());
+
                 // Need to signal that the request is going to pass before completing
                 if (outcome != null) {
                     outcome.onPendingSuccess();
@@ -69,6 +74,7 @@
                 if (envelope.isCompletionRequired()) {
                     connection.onCompletedMessageSend(envelope);
                 }
+
                 return;
             }
 
@@ -137,6 +143,8 @@
         try {
             reset();
             final JmsTransactionInfo transactionInfo = getNextTransactionInfo();
+
+            LOG.debug("Initiating Begin of txn: {}", transactionInfo.getId());
             connection.createResource(transactionInfo, new ProviderSynchronization() {
 
                 @Override
@@ -159,7 +167,7 @@
                 }
             }
 
-            LOG.debug("Begin: {}", transactionInfo.getId());
+            LOG.trace("Completed Begin of txn: {}", transactionInfo.getId());
         } finally {
             lock.writeLock().unlock();
         }
@@ -179,7 +187,7 @@
             } else {
                 LOG.debug("Commit: {}", transactionInfo.getId());
 
-                JmsTransactionId oldTransactionId = transactionInfo.getId();
+                final JmsTransactionId oldTransactionId = transactionInfo.getId();
                 final JmsTransactionInfo nextTx = getNextTransactionInfo();
 
                 try {
@@ -251,13 +259,14 @@
     private void doRollback(boolean startNewTx) throws JMSException {
         lock.writeLock().lock();
         try {
-            if(transactionInfo == null) {
+            if (transactionInfo == null) {
                 return;
             }
 
             LOG.debug("Rollback: {}", transactionInfo.getId());
-            JmsTransactionId oldTransactionId = transactionInfo.getId();
+            final JmsTransactionId oldTransactionId = transactionInfo.getId();
             final JmsTransactionInfo nextTx;
+
             if (startNewTx) {
                 nextTx = getNextTransactionInfo();
             } else {
@@ -335,7 +344,7 @@
     public void onConnectionInterrupted() {
         lock.writeLock().tryLock();
         try {
-            if(transactionInfo != null) {
+            if (transactionInfo != null) {
                 transactionInfo.setInDoubt(true);
             }
         } finally {
@@ -347,25 +356,34 @@
 
     @Override
     public void onConnectionRecovery(Provider provider) throws Exception {
-        // If we get the lock then no TX commit / rollback / begin is in progress
-        // otherwise one is and we can only assume that it should fail given the
-        // connection was dropped.
-        if (lock.writeLock().tryLock()) {
+        if (lock.writeLock().tryLock(5, TimeUnit.MILLISECONDS)) {
+            // If we got the lock then there is no pending commit / rollback / begin / send or
+            // acknowledgement so we can safely create a new transaction, if there is work pending
+            // on the current transaction we must mark it as in-doubt so that a commit attempt
+            // will then roll it back.  In all other cases the transaction should be marked as
+            // in-doubt if not already done so as this currently runs outside the IO thread and
+            // cannot guard against sudden appearance of sends or acks within the transaction.
             try {
-                // If we got the lock then there is no pending commit / rollback / begin so
-                // we can safely create a new transaction, if there is work pending on the
-                // current transaction we must mark it as in-doubt so that a commit attempt
-                // will then roll it back.
+                // Session must create a new transaction on start and there could be a connection
+                // drop before that happens which means we don't need to create one yet as there
+                // wasn't one before that needs replacing.
+                if (transactionInfo == null) {
+                    LOG.trace("Transaction context skipping recovery because no transaction previously existed.");
+                    return;
+                }
+
                 transactionInfo = getNextTransactionInfo();
                 ProviderFuture request = provider.newProviderFuture(new ProviderSynchronization() {
 
                     @Override
                     public void onPendingSuccess() {
+                        LOG.trace("TX:{} Recovery of Transaction succeeded: in-doubt state: {}.", transactionInfo.getId(), !participants.isEmpty());
                         transactionInfo.setInDoubt(!participants.isEmpty());
                     }
 
                     @Override
                     public void onPendingFailure(ProviderException cause) {
+                        LOG.trace("TX:{} Recovery of Transaction failed and current state set to in-doubt: {}.", transactionInfo.getId(), cause);
                         transactionInfo.setInDoubt(true);
                     }
                 });
@@ -377,11 +395,11 @@
             } finally {
                 lock.writeLock().unlock();
             }
-        } else {
+        } else if (transactionInfo != null) {
+            // A previous transaction exists and a pending transaction write locked scoped operation is awaiting
+            // its chance to run within the IO thread, we don't know what work it performed in that TX so our only
+            // option is to mark it as in doubt and rolled it back on next commit.
             LOG.trace("Transaction recovery marking current TX:{} as in-doubt.", transactionInfo.getId());
-
-            // We did not get the lock so there is an operation in progress and our only
-            // option is to mark the state as failed so a commit will roll back.
             transactionInfo.setInDoubt(true);
         }
     }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index c5990e5..39b597c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -88,7 +88,7 @@
     }
 
     private void acknowledgeUndeliveredRecoveredMessages() {
-        if(acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
+        if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE
                 || acknowledgementMode == Session.AUTO_ACKNOWLEDGE
                     || acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE
                         || acknowledgementMode == INDIVIDUAL_ACKNOWLEDGE) {
@@ -257,7 +257,7 @@
             }
 
             JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext();
-            if(ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
+            if (ackType == ACK_TYPE.SESSION_SHUTDOWN && (envelope.isDelivered() || envelope.isRecovered())) {
                 handleDisposition(envelope, current, MODIFIED_FAILED);
             } else if (envelope.isDelivered()) {
                 final DeliveryState disposition;
@@ -347,8 +347,7 @@
         LOG.debug("Accepted Ack of message: {}", envelope);
         if (!delivery.remotelySettled()) {
             if (session.isTransacted() && !getResourceInfo().isBrowser()) {
-
-                if (session.isTransactionFailed()) {
+                if (session.isTransactionInDoubt()) {
                     LOG.trace("Skipping ack of message {} in failed transaction.", envelope);
                     return;
                 }
@@ -462,7 +461,7 @@
             deliver(reverseIterator.previous());
         }
 
-        if(deferredClose) {
+        if (deferredClose) {
             acknowledgeUndeliveredRecoveredMessages();
             tryCompleteDeferredClose();
         }
@@ -523,7 +522,7 @@
 
     @Override
     public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws ProviderException {
-        if(delivery.getDefaultDeliveryState() == null){
+        if (delivery.getDefaultDeliveryState() == null){
             delivery.setDefaultDeliveryState(Released.getInstance());
         }
 
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index ad993bd..d222dc0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -93,14 +93,18 @@
             request.onFailure(new ProviderIllegalStateException("The MessageProducer is closed"));
         }
 
+        final InFlightSend send = new InFlightSend(envelope, request);
+
         if (!delayedDeliverySupported && envelope.getMessage().getFacade().isDeliveryTimeTransmitted()) {
             // Don't allow sends with delay if the remote has not said it can handle them
-            request.onFailure(new ProviderUnsupportedOperationException("Remote does not support delayed message delivery"));
+            send.onFailure(new ProviderUnsupportedOperationException("Remote does not support delayed message delivery"));
+        } else if (session.isTransactionInDoubt()) {
+            // If the transaction has failed due to remote termination etc then we just indicate
+            // the send has succeeded until the a new transaction is started.
+            send.onSuccess();
         } else if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");
 
-            InFlightSend send = new InFlightSend(envelope, request);
-
             if (getSendTimeout() > JmsConnectionInfo.INFINITE) {
                 send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send);
             }
@@ -108,14 +112,7 @@
             blocked.put(envelope.getMessageId(), send);
             getParent().getProvider().pumpToProtonTransport(request);
         } else {
-            // If the transaction has failed due to remote termination etc then we just indicate
-            // the send has succeeded until the a new transaction is started.
-            if (session.isTransacted() && session.isTransactionFailed()) {
-                request.onSuccess();
-                return;
-            }
-
-            doSend(envelope, new InFlightSend(envelope, request));
+            doSend(envelope, send);
         }
     }
 
@@ -190,7 +187,7 @@
                 try {
                     // If the transaction has failed due to remote termination etc then we just indicate
                     // the send has succeeded until the a new transaction is started.
-                    if (session.isTransacted() && session.isTransactionFailed()) {
+                    if (session.isTransacted() && session.isTransactionInDoubt()) {
                         held.onSuccess();
                         return;
                     }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 62d3071..111e774 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -273,8 +273,8 @@
         return getResourceInfo().isTransacted();
     }
 
-    public boolean isTransactionFailed() {
-        return txContext == null ? false : txContext.isTransactionFailed();
+    public boolean isTransactionInDoubt() {
+        return txContext == null ? false : txContext.isTransactionInDoubt();
     }
 
     boolean isAsyncAck() {
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index b81cee1..2415967 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -224,8 +224,10 @@
         return current;
     }
 
-    public boolean isTransactionFailed() {
-        return coordinator == null ? false : coordinator.isClosed();
+    public boolean isTransactionInDoubt() {
+        // A context either has an active transaction or the transaction state of all
+        // operations is in-doubt and cannot proceed as normal.
+        return coordinator == null ? true : coordinator.isClosed();
     }
 
     public Binary getAmqpTransactionId() {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
index abfd4b5..f22b018 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java
@@ -1733,4 +1733,63 @@
             connection.close(); // Already nuked under the covers due to txn-id being missing
         }
     }
+
+    @Test(timeout = 30_000)
+    public void testAsyncConsumerAcksAfterCommitAndBeginWhenCommitCalledInOnMessage() throws Exception {
+        final Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+        final TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+        messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+        messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+        final TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+        transferStateMatcher.withTxnId(equalTo(txnId));
+        transferStateMatcher.withOutcome(nullValue());
+
+        final TransactionalStateMatcher dispositionStateMatcher = new TransactionalStateMatcher();
+        dispositionStateMatcher.withTxnId(equalTo(txnId));
+        dispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+        final TransactionalState transferTxnOutcome = new TransactionalState();
+        transferTxnOutcome.setTxnId(txnId);
+        transferTxnOutcome.setOutcome(new Accepted());
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?amqp.drainTimeout=1000");
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+            testPeer.expectDisposition(true, dispositionStateMatcher);
+            testPeer.expectSenderAttach();
+            testPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+            testPeer.expectDischarge(txnId, false);
+            testPeer.expectDeclare(txnId);
+
+            // Test that consumer onMessage delivery and a send within the listener are
+            // both included into the same transaction prior to the commit in the listener.
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener((message) -> {
+                try {
+                    session.createProducer(queue).send(session.createTextMessage("sample"));
+                    session.commit();
+                } catch (JMSException jmsEx) {
+                    throw new RuntimeException("Behaving badly since commit already did", jmsEx);
+                }
+            });
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 899e99e..805ae70 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -90,6 +90,7 @@
 import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
 import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
@@ -1645,13 +1646,15 @@
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout=20000)
-    public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
+    public void testTxRecreatedAfterConnectionFailsOverDropsAfterCoordinatorAttach() throws Exception {
         doTxRecreatedAfterConnectionFailsOver(true);
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout=20000)
-    public void testTxRecreatedAfterConnectionFailsOver2() throws Exception {
+    public void testTxRecreatedAfterConnectionFailsOverDropsAfterSessionBegin() throws Exception {
         doTxRecreatedAfterConnectionFailsOver(false);
     }
 
@@ -1697,13 +1700,15 @@
 
             originalPeer.expectBegin();
 
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
-            if(dropAfterCoordinator) {
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+            if (dropAfterCoordinator) {
                 originalPeer.expectCoordinatorAttach();
 
                 // First expect an unsettled 'declare' transfer to the txn coordinator, and
                 // reply with a Declared disposition state containing the txnId.
-                originalPeer.expectDeclare(txnId);
+                originalPeer.expectDeclare(txnId1);
             }
 
             originalPeer.dropAfterLastHandler();
@@ -1715,25 +1720,25 @@
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectCoordinatorAttach();
-            finalPeer.expectDeclare(txnId);
+            finalPeer.expectDeclare(txnId2);
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
             // and reply with accepted and settled disposition to indicate the rollback succeeded.
-            finalPeer.expectDischarge(txnId, true);
+            finalPeer.expectDischarge(txnId2, true);
             finalPeer.expectEnd();
+            finalPeer.expectClose();
 
             Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 
             assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
 
+            LOG.debug("About to close session following final peer connection.");
             session.close();
-
-            // Shut it down
-            finalPeer.expectClose();
+            LOG.debug("About to close connection following final peer connection.");
             connection.close();
 
             originalPeer.waitForAllHandlersToComplete(2000);
-            finalPeer.waitForAllHandlersToComplete(1000);
+            finalPeer.waitForAllHandlersToComplete(2000);
         }
     }
 
@@ -2527,6 +2532,7 @@
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout=20000)
     public void testTxCommitThrowsWhenNoDischargeResponseSentAndConnectionDrops() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
@@ -2878,9 +2884,8 @@
                     } catch (IllegalStateException jmsise) {
                         if (jmsise.getCause() != null) {
                             String message = jmsise.getCause().getMessage();
-                            if(errorCondition != null) {
-                                return message.contains(errorCondition.toString()) &&
-                                        message.contains(errorDescription);
+                            if (errorCondition != null) {
+                                return message.contains(errorCondition.toString()) && message.contains(errorDescription);
                             } else {
                                 return message.contains("Unknown error from remote peer");
                             }
@@ -3435,7 +3440,8 @@
             LOG.info("Original peer is at: {}", originalURI);
             LOG.info("Final peer is at: {}", finalURI);
 
-            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
 
             // Expect the message which was sent under the current transaction. Check it carries
             // TransactionalState with the above txnId but has no outcome. Respond with a
@@ -3445,11 +3451,11 @@
             messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
 
             TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
-            stateMatcher.withTxnId(equalTo(txnId));
+            stateMatcher.withTxnId(equalTo(txnId1));
             stateMatcher.withOutcome(nullValue());
 
             TransactionalState txState = new TransactionalState();
-            txState.setTxnId(txnId);
+            txState.setTxnId(txnId1);
             txState.setOutcome(new Accepted());
 
             originalPeer.expectSaslAnonymous();
@@ -3459,7 +3465,7 @@
             originalPeer.expectCoordinatorAttach();
             // First expect an unsettled 'declare' transfer to the txn coordinator, and
             // reply with a Declared disposition state containing the txnId.
-            originalPeer.expectDeclare(txnId);
+            originalPeer.expectDeclare(txnId1);
             originalPeer.expectSenderAttach();
             originalPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
             originalPeer.dropAfterLastHandler(10);
@@ -3470,13 +3476,13 @@
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectCoordinatorAttach();
-            finalPeer.expectDeclare(txnId);
+            finalPeer.expectDeclare(txnId2);
             finalPeer.expectSenderAttach();
             // Attempt to commit the in-doubt TX will result in rollback and a new TX will be started.
-            finalPeer.expectDischarge(txnId, true);
-            finalPeer.expectDeclare(txnId);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectDeclare(txnId1);
             // this rollback comes from the session being closed on connection close.
-            finalPeer.expectDischarge(txnId, true);
+            finalPeer.expectDischarge(txnId1, true);
             finalPeer.expectClose();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -3552,6 +3558,854 @@
 
     @Repeat(repetitions = 1)
     @Test(timeout = 20000)
+    public void testSendAndConnectionDropsRecoveredAsInDoubtTransaction() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+            // Send should occurs within transaction #1
+            TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher();
+            txn1StateMatcher.withTxnId(equalTo(txnId1));
+            txn1StateMatcher.withOutcome(nullValue());
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectSenderAttach();
+            // Send is synchronous so we don't respond in order to stall the MessageProducer
+            // in the send call to block recovery from initiating a new transaction which
+            // will then cause the current transaction to become in-doubt and commit should
+            // throw a transaction rolled back exception.
+            originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false);
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            // Send will be blocked waiting to fire so it will not be filtered by the local
+            // transaction context in-doubt checks since there was no other work pending the
+            // transaction will be fully recovered so the fixed producer must ensure that no
+            // send occurs outside the transaction boundaries.
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.setForceSyncSend(true);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed to send.");
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            try {
+                session.commit();
+                fail("Transaction should throw rolled back error as an operation is pending on recover.");
+            } catch (TransactionRolledBackException txrbex) {
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testSecondSendAndConnectionDropsResendsButTransactionRollsBackAsInDoubt() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+            // Send should occurs within transaction #1
+            TransactionalStateMatcher txn1StateMatcher = new TransactionalStateMatcher();
+            txn1StateMatcher.withTxnId(equalTo(txnId1));
+            txn1StateMatcher.withOutcome(nullValue());
+
+            // Disposition should occurs within transaction #1 before failover
+            TransactionalState txn1Disposition = new TransactionalState();
+            txn1Disposition.setTxnId(txnId2);
+            txn1Disposition.setOutcome(new Accepted());
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, txn1Disposition, true);
+            // Send is synchronous so we don't respond in order to stall the MessageProducer
+            // in the send call to block recovery from initiating a new transaction which
+            // will then cause the current transaction to become in-doubt and commit should
+            // throw a transaction rolled back exception.
+            originalPeer.expectTransfer(messageMatcher, txn1StateMatcher, false, false, null, false);
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            // Send will be blocked waiting to fire so it will not be filtered
+            // by the local transaction context in-doubt checks, however since there
+            // was pending transactional work the transaction will be rolled back
+            // on commit and then a new transaction will be activated.  The producer
+            // will filter the held send as there is no active transaction.
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.setForceSyncSend(true);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed to send.");
+            }
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed to send.");
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            try {
+                session.commit();
+                fail("Transaction should have been been in-doubt and a rolled back error thrown.");
+            } catch (TransactionRolledBackException txrbex) {
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testTransactionalAcknowledgeAfterRecoveredWhileSendBlocked() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+            // The initial send should occur in transaction #1
+            TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher();
+            transfer1StateMatcher.withTxnId(equalTo(txnId1));
+            transfer1StateMatcher.withOutcome(nullValue());
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectSenderAttach();
+            // Send is synchronous so we don't respond in order to stall the MessageProducer
+            // in the send call to block recovery from initiating a new transaction which
+            // will then cause the current transaction to become in-doubt and commit should
+            // throw a transaction rolled back exception.
+            originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false);
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            // Send will be blocked waiting to fire so it will not be filtered
+            // by the local transaction context in-doubt checks, however since there
+            // was pending transactional work the transaction will be rolled back.
+            // The AmqpFixedProducer should filter the send after reconnect as there
+            // won't be an active transaction coordinator until we start a new TXN.
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.setForceSyncSend(true);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the send after connection dropped.");
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer.receive(5000));
+
+            try {
+                session.commit();
+                fail("Transaction should have been rolled back");
+            } catch (TransactionRolledBackException txrbex) {
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 30_000)
+    public void testReceiveAndSendInTransactionFailsCommitWhenConnectionDropsDuringSend() throws Exception {
+        final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+        final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+        final TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+        messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+        messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+        // The initial send before failover should arrive in transaction #1
+        final TransactionalStateMatcher transfer1StateMatcher = new TransactionalStateMatcher();
+        transfer1StateMatcher.withTxnId(equalTo(txnId1));
+        transfer1StateMatcher.withOutcome(nullValue());
+
+        // Transactional Acknowledge should happen before the failover then no others should arrive.
+        final TransactionalStateMatcher dispositionStateMatcher = new TransactionalStateMatcher();
+        dispositionStateMatcher.withTxnId(equalTo(txnId1));
+        dispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+            TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+            final CountDownLatch transactionRollback = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+            originalPeer.expectDisposition(true, dispositionStateMatcher);
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransfer(messageMatcher, transfer1StateMatcher, false, false, null, false);
+            originalPeer.dropAfterLastHandler();
+
+            // Following failover the blocked send will be retried but the transaction will have
+            // been marked as in-dbout by the context and no new transactional work will be done
+            // until the commit is called and it throws a transaction rolled back error.
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow();
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.setForceSyncSend(true);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener((message) -> {
+                try {
+                    session.createProducer(queue).send(session.createTextMessage("sample"));
+                    session.commit();
+                } catch (TransactionRolledBackException txnRbEx) {
+                    transactionRollback.countDown();
+                } catch (JMSException jmsEx) {
+                    throw new RuntimeException("Behaving badly since commit already did", jmsEx);
+                }
+            });
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectClose();
+
+            assertTrue("Should have encounted a Transaction Rollback Error", transactionRollback.await(5, TimeUnit.SECONDS));
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testTransactionDeclareWithNoResponseRecoveredAsInDoubtAndCommitFails() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+            // The initial send should occur in transaction #1
+            TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+            transferStateMatcher.withTxnId(equalTo(txnId1));
+            transferStateMatcher.withOutcome(nullValue());
+
+            // Accept the initial send after failover in transaction #1
+            TransactionalState transferTxnOutcome = new TransactionalState();
+            transferTxnOutcome.setTxnId(txnId1);
+            transferTxnOutcome.setOutcome(new Accepted());
+
+            // Receive call after failover should occur in transaction #1
+            TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher();
+            txnDispositionStateMatcher.withTxnId(equalTo(txnId1));
+            txnDispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+            // Drop the connection after the declare giving a chance for recovery to attempt
+            // to rebuild while the context is still reacting to the failed begin.
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclareButDoNotRespond();
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId1);
+            finalPeer.expectSenderAttach();
+            // Send will be blocked waiting to fire so it will not be filtered
+            // by the local transaction context in-doubt checks, however since there
+            // was pending transactional work the transaction will be rolled back.
+            finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            finalPeer.expectDisposition(true, txnDispositionStateMatcher);
+            finalPeer.expectDischarge(txnId1, false);
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.setForceSyncSend(true);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            TextMessage message = session.createTextMessage("myMessage");
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer.receive(5000));
+
+            try {
+                session.commit();
+            } catch (TransactionRolledBackException txrbex) {
+                fail("Transaction should not have been rolled back");
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testTransactionCommitWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception {
+        doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(true);
+    }
+
+    @Test(timeout = 20000)
+    public void testTransactionRollbackWithNoResponseRecoveredAsInDoubtAndPerformsNoWork() throws Exception {
+        doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(false);
+    }
+
+    private void doTestTransactionRetirementWithNoResponseRecoveredAsInDoubtAndCommitRollsBack(boolean commit) throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            final DescribedType amqpValueNullContent = new AmqpValueDescribedType("myContent");
+
+            // Expect the message which was sent under the current transaction. Check it carries
+            // TransactionalState with the above txnId but has no outcome. Respond with a
+            // TransactionalState with Accepted outcome.
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+
+            // The replayed send should occur in transaction #2
+            TransactionalStateMatcher transferStateMatcher = new TransactionalStateMatcher();
+            transferStateMatcher.withTxnId(equalTo(txnId2));
+            transferStateMatcher.withOutcome(nullValue());
+
+            // Receive call after failover should occur in transaction #2
+            TransactionalStateMatcher txnDispositionStateMatcher = new TransactionalStateMatcher();
+            txnDispositionStateMatcher.withTxnId(equalTo(txnId2));
+            txnDispositionStateMatcher.withOutcome(new AcceptedMatcher());
+
+            // Re-send after failover should occur in TXN #2
+            TransactionalState transferTxnOutcome = new TransactionalState();
+            transferTxnOutcome.setTxnId(txnId2);
+            transferTxnOutcome.setOutcome(new Accepted());
+
+            // Drop the connection after the declare giving a chance for recovery to attempt
+            // to rebuild while the context is still reacting to the failed begin.
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectDischargeButDoNotRespond(txnId1, !commit);
+            originalPeer.expectDeclareButDoNotRespond();
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectSenderAttach();
+            // Send will be blocked waiting to fire so it will not be filtered
+            // by the local transaction context in-doubt checks, however since there
+            // was pending transactional work the transaction will be rolled back.
+            finalPeer.expectTransfer(messageMatcher, transferStateMatcher, transferTxnOutcome, true);
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            finalPeer.expectDisposition(true, txnDispositionStateMatcher);
+            finalPeer.expectDischarge(txnId2, false);
+            finalPeer.expectDeclare(txnId1);
+            finalPeer.expectDischarge(txnId1, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            if (commit) {
+                try {
+                    session.commit();
+                    fail("Should have failed on the commit when connection dropped");
+                } catch (TransactionRolledBackException txnRbEx) {
+                    // Expected
+                }
+            } else {
+                try {
+                    session.rollback();
+                } catch (JMSException jmsEx) {
+                    fail("Should not have failed on the rollback when connection dropped");
+                }
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS));
+
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            TextMessage message = session.createTextMessage("myMessage");
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer.receive(5000));
+
+            try {
+                session.commit();
+            } catch (TransactionRolledBackException txrbex) {
+                fail("Transaction should not have been rolled back");
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20_000)
+    public void testSendWhileOfflinePreventsRecoveredTransactionFromCommitting() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            final Binary txnId1 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            final Binary txnId2 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+            // Drop the connection after the declare giving a chance for recovery to attempt
+            // to rebuild while the context is still reacting to the failed begin.
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectCoordinatorAttach();
+            originalPeer.expectDeclare(txnId1);
+            originalPeer.expectSenderAttach();
+            originalPeer.expectDischargeButDoNotRespond(txnId1, false);
+            originalPeer.expectDeclareButDoNotRespond();
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectCoordinatorAttach();
+            finalPeer.expectDeclare(txnId2);
+            finalPeer.expectSenderAttach();
+            finalPeer.expectDischarge(txnId2, true);
+            finalPeer.expectDeclare(txnId1);
+            finalPeer.expectDischarge(txnId1, true);
+            finalPeer.expectClose();
+
+            // Need to allow time for the asynchronous send to fire after connection drop in order to ensure
+            // that the send is no-op'd when the TXN is in-doubt
+            final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=1000", originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalPeer.getServerPort() == remoteURI.getPort()) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalPeer.getServerPort() == remoteURI.getPort()) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            TextMessage message = session.createTextMessage("myMessage");
+
+            try {
+                session.commit();
+                fail("Should have failed on the commit when connection dropped");
+            } catch (TransactionRolledBackException txnRbEx) {
+                // Expected
+            }
+
+            // Following the failed commit the Transaction should be in-doubt and the send
+            // should be skipped since the TXN is in-doubt because we should not have connected
+            // to the final peer yet so a recovery wouldn't have happened and the transaction
+            // state couldn't be marked good since currently there should be no active transaction.
+
+            try {
+                producer.send(message);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            try {
+                session.commit();
+                fail("Transaction should have been rolled back since a send was skipped.");
+            } catch (TransactionRolledBackException txrbex) {
+                // Expected
+            }
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
     public void testFailoverDoesNotFailPendingAsyncCompletionSend() throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();
              TestAmqpPeer finalPeer = new TestAmqpPeer();) {