QPIDJMS-600 Ensure session and connection close await async sends

Session and Connection close should be awaiting the outcome of async send
completions before returning. This change allows them to await up to the
close timeout value before moving on and failing any completions that are
not completed after that point. Several tests added to cover this behavior.
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 2f4e53f..ce83ed4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -29,27 +29,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionConsumer;
-import jakarta.jms.ConnectionMetaData;
-import jakarta.jms.Destination;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidClientIDException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSRuntimeException;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueConnection;
-import jakarta.jms.QueueSession;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicConnection;
-import jakarta.jms.TopicSession;
-
 import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -91,6 +70,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionConsumer;
+import jakarta.jms.ConnectionMetaData;
+import jakarta.jms.Destination;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidClientIDException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSRuntimeException;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueConnection;
+import jakarta.jms.QueueSession;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicConnection;
+import jakarta.jms.TopicSession;
+
 /**
  * Implementation of a JMS Connection
  */
@@ -916,6 +916,14 @@
         }
     }
 
+    ProviderFuture newProviderFuture() {
+    	return newProviderFuture(null);
+    }
+
+    ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return provider.newProviderFuture(synchronization);
+    }
+
     //----- Property setters and getters -------------------------------------//
 
     @Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 19c5fa8..6503682 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -39,6 +39,36 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderException;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.selector.SelectorParser;
+import org.apache.qpid.jms.selector.filter.FilterException;
+import org.apache.qpid.jms.util.NoOpExecutor;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import jakarta.jms.BytesMessage;
 import jakarta.jms.CompletionListener;
 import jakarta.jms.DeliveryMode;
@@ -71,36 +101,6 @@
 import jakarta.jms.TopicSession;
 import jakarta.jms.TopicSubscriber;
 
-import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
-import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageTransformation;
-import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
-import org.apache.qpid.jms.meta.JmsConsumerId;
-import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.meta.JmsProducerId;
-import org.apache.qpid.jms.meta.JmsProducerInfo;
-import org.apache.qpid.jms.meta.JmsResource.ResourceState;
-import org.apache.qpid.jms.meta.JmsSessionId;
-import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
-import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
-import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
-import org.apache.qpid.jms.policy.JmsPresettlePolicy;
-import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
-import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.ProviderException;
-import org.apache.qpid.jms.provider.ProviderFuture;
-import org.apache.qpid.jms.provider.ProviderSynchronization;
-import org.apache.qpid.jms.selector.SelectorParser;
-import org.apache.qpid.jms.selector.filter.FilterException;
-import org.apache.qpid.jms.util.NoOpExecutor;
-import org.apache.qpid.jms.util.QpidJMSThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * JMS Session implementation
  */
@@ -126,6 +126,7 @@
     private final ReentrantLock sendLock = new ReentrantLock();
     private volatile ThreadPoolExecutor deliveryExecutor;
     private volatile ThreadPoolExecutor completionExcecutor;
+    private volatile ProviderFuture asyncSendsCompletion;
     private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
     private boolean deliveryThreadCheckEnabled = true;
     private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
@@ -351,6 +352,7 @@
                     for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) {
                         producer.shutdown(cause);
                     }
+
                 } catch (JMSException jmsEx) {
                     shutdownError = jmsEx;
                 }
@@ -367,22 +369,6 @@
                     }
                 }
 
-                // Ensure that no asynchronous completion sends remain blocked after close.
-                synchronized (sessionInfo) {
-                    if (cause == null) {
-                        cause = new JMSException("Session closed remotely before message transfer result was notified");
-                    }
-
-                    getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
-                    getCompletionExecutor().shutdown();
-                }
-
-                try {
-                    getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    LOG.trace("Session close awaiting send completions was interrupted");
-                }
-
                 try {
                     if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
                         acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
@@ -391,6 +377,44 @@
                     LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
                 }
 
+                // Ensure that no asynchronous completion sends remain blocked after close but wait
+                // using the close timeout for the asynchronous sends to complete normally.
+                final ExecutorService completionExecutor = getCompletionExecutor();
+
+                synchronized (sessionInfo) {
+                    // Producers are now quiesced and we can await completion of asynchronous sends
+                    // that are still pending a result or timeout once we've done a quick check to
+                    // see if any are actually pending or have completed already.
+                    asyncSendsCompletion = connection.newProviderFuture();
+
+                    completionExecutor.execute(() -> {
+                        if (asyncSendQueue.isEmpty()) {
+                            asyncSendsCompletion.onSuccess();
+                        }
+                    });
+                }
+
+                try {
+                    asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+                } catch (Exception ex) {
+                    LOG.trace("Exception during wait for asynchronous sends to complete", ex);
+                } finally {
+                    if (cause == null) {
+                        cause = new JMSException("Session closed remotely before message transfer result was notified");
+                    }
+
+                    // as a last task we want to fail any stragglers in the asynchronous send queue and then
+                    // shutdown the queue to prevent any more submissions while the cleanup goes on.
+                    completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+                    completionExecutor.shutdown();
+                }
+
+                try {
+                    completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    LOG.trace("Session close awaiting send completions was interrupted");
+                }
+
                 if (shutdownError != null) {
                     throw shutdownError;
                 }
@@ -856,11 +880,12 @@
     }
 
     private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
+        JmsMessage outbound = null;
         sendLock.lock();
 
-        JmsMessage outbound = null;
-
         try {
+            checkClosed();
+
             original.setJMSDeliveryMode(deliveryMode);
             original.setJMSPriority(priority);
             original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@
             }
 
             outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
-            if(!isJmsMessage) {
+            if (!isJmsMessage) {
                 // If the original was a foreign message, we still need to update it too.
                 setForeignMessageDeliveryTime(original, deliveryTime);
             }
@@ -977,7 +1002,7 @@
             }
         } catch (JMSException jmsEx) {
             // Ensure that on failure case the message is returned to usable state for another send attempt.
-            if(outbound != null) {
+            if (outbound != null) {
                 outbound.onSendComplete();
             }
             throw jmsEx;
@@ -1511,6 +1536,10 @@
             if (producerId == null) {
                 asyncSendQueue.clear();
             }
+
+            if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
+                asyncSendsCompletion.onSuccess();
+            }
         }
     }
 
@@ -1577,6 +1606,10 @@
                         }
                     }
                 }
+
+                if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
+                    asyncSendsCompletion.onSuccess();
+                }
             } catch (Exception ex) {
                 LOG.error("Async completion task encountered unexpected failure", ex);
             }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 9fabb11..d30955b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -30,19 +30,6 @@
 import java.util.Arrays;
 import java.util.Map;
 
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -62,6 +49,19 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class BytesMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
@@ -581,7 +581,7 @@
     @Timeout(20)
     public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index e67c9e9..039ac87 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -30,18 +30,6 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.MapMessage;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -58,6 +46,18 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MapMessage;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class MapMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
@@ -440,7 +440,7 @@
     @Timeout(20)
     public void testAsyncCompletionSendMarksMapMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 5698cf5..5dd03d4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -38,21 +38,6 @@
 import java.util.Map;
 import java.util.UUID;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Destination;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-
 import org.apache.qpid.jms.JmsClientProperties;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
@@ -80,6 +65,21 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+
 public class MessageIntegrationTest extends QpidJmsTestCase
 {
     private static final String NULL_STRING_PROP = "nullStringProperty";
@@ -2231,7 +2231,7 @@
     @Timeout(20)
     public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index 0585b87..b5036dc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -35,19 +35,6 @@
 import java.util.Map;
 import java.util.UUID;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.ObjectMessage;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -70,6 +57,19 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.ObjectMessage;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(ObjectMessageIntegrationTest.class);
@@ -660,6 +660,7 @@
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(10);
 
             testPeer.expectBegin();
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index df9087c..6a5e420 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -51,21 +51,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
@@ -103,6 +88,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+
 public class ProducerIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(ProducerIntegrationTest.class);
@@ -2322,7 +2322,7 @@
     @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=100");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2343,6 +2343,8 @@
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted()); // Close should complete it as failed on timeout
+
             session.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback");
@@ -2358,9 +2360,49 @@
 
     @Test
     @Timeout(20)
+    public void testAsyncCompletionGetsNotifiedWhenSessionClosedAndWaitForCompletion() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted()); // Close should complete it as accepted after the delay
+
+            session.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test
+    @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2380,6 +2422,8 @@
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted());
+
             connection.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback");
@@ -2393,6 +2437,43 @@
 
     @Test
     @Timeout(20)
+    public void testAsyncCompletionAllowedToCompleteNormallyWhenConnectionClosed() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted());
+
+            connection.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test
+    @Timeout(20)
     public void testAsyncCompletionResetsBytesMessage() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
@@ -2856,7 +2937,7 @@
     @Timeout(20)
     public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2895,7 +2976,7 @@
 
             assertFalse(listener.awaitCompletion(10, TimeUnit.MILLISECONDS), "Should not get async callback");
 
-            // Closing the session should complete the send with an exception
+            // Closing the session should complete the send with an exception after timeout
             testPeer.expectEnd();
             session.close();
 
@@ -3044,6 +3125,10 @@
             this.completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+        	return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
             return completed.await(timeout, units);
         }
@@ -3239,4 +3324,74 @@
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndConnectionCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch connectionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionFailure(Throwable exception) {
+                	connectionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+            testPeer.expectSenderAttach();
+            testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB, 50);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(connectionClosed.await(5, TimeUnit.SECONDS), "Session closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index c8c13df..472bcd3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -41,27 +41,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.Destination;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.InvalidSelectorException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicSubscriber;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
@@ -101,6 +80,27 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.Destination;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.InvalidSelectorException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicSubscriber;
+
 public class SessionIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(SessionIntegrationTest.class);
@@ -2333,6 +2333,10 @@
             completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+        	return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
             return completed.await(timeout, units);
         }
@@ -2836,6 +2840,82 @@
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndSessionCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Throwable exception) {
+                	sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertFalse(listener.hasCompleted());
+
+            testPeer.expectSenderAttach();
+            testPeer.remotelyEndLastOpenedSession(true, 50, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(sessionClosed.await(5, TimeUnit.SECONDS), "Session closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private boolean verifyConsumerClosure(final String BREAD_CRUMB, final MessageConsumer consumer) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
             @Override
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index 3fb47df..0589a26 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -29,18 +29,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.StreamMessage;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -57,6 +45,18 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.StreamMessage;
+
 public class StreamMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
@@ -439,6 +439,7 @@
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setSendTimeout(15);
 
             testPeer.expectBegin();
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index e4a55db..52f861f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -28,19 +28,6 @@
 
 import java.io.IOException;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -58,6 +45,19 @@
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
 public class TextMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
 
@@ -447,6 +447,7 @@
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(15);
 
             testPeer.expectBegin();
 
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 53b58bb..b7393b7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -45,28 +45,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionFactory;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TransactionRolledBackException;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.JmsConnectionFactory;
@@ -113,6 +91,28 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TransactionRolledBackException;
+
 public class FailoverIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class);
@@ -4560,6 +4560,69 @@
 
     @Test
     @Timeout(20)
+    public void testFailoverDoesFailPendingAsyncCompletionSend() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+            originalPeer.dropAfterLastHandler(15);  // Wait for sender to get into wait state
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+
+            final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, finalPeer);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            // This should fire after reconnect without an error, if it fires with an error at
+            // any time then something is wrong.
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not get async callback");
+            assertNotNull(listener.exception, "Completion should have been due to error");
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+            finalPeer.expectClose();
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+        }
+    }
+
+    @Test
+    @Timeout(20)
     public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws Exception {
         try (TestAmqpPeer originalPeer = new TestAmqpPeer();
              TestAmqpPeer finalPeer = new TestAmqpPeer();) {