AMQNET-609: Error during message delivery may block consumer
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 586fa62..f8bf283 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -514,7 +514,7 @@
             }
         }
 
-        public void OnAsyncException(Exception error)
+        internal void OnAsyncException(Exception error)
         {
             ExceptionListener?.Invoke(error);
         }
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index deb3141..13a4cf0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -213,55 +213,68 @@
             {
                 lock (SyncRoot)
                 {
-                    if (started && Listener != null)
+                    try
                     {
-                        var envelope = messageQueue.DequeueNoWait();
-                        if (envelope == null)
-                            return;
-
-                        if (IsMessageExpired(envelope))
+                        if (started && Listener != null)
                         {
-                            if (Tracer.IsDebugEnabled)
-                                Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
+                            var envelope = messageQueue.DequeueNoWait();
+                            if (envelope == null)
+                                return;
 
-                            DoAckExpired(envelope);
-                        }
-                        else if (IsRedeliveryExceeded(envelope))
-                        {
-                            if (Tracer.IsDebugEnabled)
-                                Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
+                            if (IsMessageExpired(envelope))
+                            {
+                                if (Tracer.IsDebugEnabled)
+                                    Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
 
-                            // TODO: Apply redelivery policy
-                            DoAckExpired(envelope);
-                        }
-                        else
-                        {
-                            bool deliveryFailed = false;
-                            bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
+                                DoAckExpired(envelope);
+                            }
+                            else if (IsRedeliveryExceeded(envelope))
+                            {
+                                if (Tracer.IsDebugEnabled)
+                                    Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
 
-                            if (autoAckOrDupsOk)
-                                DoAckDelivered(envelope);
+                                // TODO: Apply redelivery policy
+                                DoAckExpired(envelope);
+                            }
                             else
-                                AckFromReceive(envelope);
+                            {
+                                bool deliveryFailed = false;
+                                bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
 
-                            try
-                            {
-                                Listener.Invoke(envelope.Message.Copy());
-                            }
-                            catch (Exception)
-                            {
-                                deliveryFailed = true;
-                            }
-
-                            if (autoAckOrDupsOk)
-                            {
-                                if (!deliveryFailed)
-                                    DoAckConsumed(envelope);
+                                if (autoAckOrDupsOk)
+                                    DoAckDelivered(envelope);
                                 else
-                                    DoAckReleased(envelope);
+                                    AckFromReceive(envelope);
+
+                                try
+                                {
+                                    Listener.Invoke(envelope.Message.Copy());
+                                }
+                                catch (Exception)
+                                {
+                                    deliveryFailed = true;
+                                }
+
+                                if (autoAckOrDupsOk)
+                                {
+                                    if (!deliveryFailed)
+                                        DoAckConsumed(envelope);
+                                    else
+                                        DoAckReleased(envelope);
+                                }
                             }
                         }
                     }
+                    catch (Exception e)
+                    {
+                        // TODO - There are two cases when we can get an error here:
+                        // 1) error returned from the attempted ACK that was sent
+                        // 2) error while attempting to copy the incoming message.
+                        //
+                        // We need to decide how to respond to these, but definitely we cannot
+                        // let this error propagate as it could take down the SessionDispatcher
+                        Session.Connection.OnAsyncException(e);
+                    }
                 }
             }
         }
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 897daa0..e4fb515 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -952,6 +952,71 @@
             }
         }
 
+        [Test, Timeout(20_000)]
+        public void TestConsumerCanReceivesMessagesWhenConnectionLostDuringAutoAck()
+        {
+            using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+            using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+            {
+                ManualResetEvent originalConnected = new ManualResetEvent(false);
+                ManualResetEvent finalConnected = new ManualResetEvent(false);
+                ManualResetEvent exceptionThrown = new ManualResetEvent(false);
+
+                // Connect to the first peer
+                originalPeer.ExpectSaslAnonymous();
+                originalPeer.ExpectOpen();
+                originalPeer.ExpectBegin();
+                originalPeer.ExpectBegin();
+
+                NmsConnection connection = EstablishAnonymousConnection(originalPeer, finalPeer);
+                connection.ExceptionListener += exception => { exceptionThrown.Set(); };
+
+                Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+
+                connectionListener
+                    .Setup(listener => listener.OnConnectionEstablished(It.IsAny<Uri>()))
+                    .Callback(() => { originalConnected.Set(); });
+
+                connectionListener
+                    .Setup(listener => listener.OnConnectionRestored(It.IsAny<Uri>()))
+                    .Callback(() => { finalConnected.Set(); });
+
+                connection.AddConnectionListener(connectionListener.Object);
+
+                connection.Start();
+
+                Assert.True(originalConnected.WaitOne(TimeSpan.FromSeconds(5)), "Should connect to original peer");
+
+                originalPeer.ExpectReceiverAttach();
+                originalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+                originalPeer.DropAfterLastMatcher();
+
+                // Post Failover Expectations of FinalPeer
+                finalPeer.ExpectSaslAnonymous();
+                finalPeer.ExpectOpen();
+                finalPeer.ExpectBegin();
+                finalPeer.ExpectBegin();
+                finalPeer.ExpectReceiverAttach();
+                finalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+                finalPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+                int msgReceivedCount = 0;
+                messageConsumer.Listener += message =>
+                {
+                    finalConnected.WaitOne(TimeSpan.FromSeconds(5));
+                    msgReceivedCount++;
+                };
+
+                finalPeer.WaitForAllMatchersToComplete(5000);
+
+                Assert.AreEqual(2, msgReceivedCount);
+                Assert.IsTrue(exceptionThrown.WaitOne(TimeSpan.FromSeconds(1)));
+            }
+        }
+
         private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
         {
             return EstablishAnonymousConnection(null, null, peers);