AMQNET-637 Allow closing on error handler, Await()
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index d64632d..63f6896 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -327,7 +327,7 @@
             
             if (Session.IsStarted && started && Listener != null)
             {
-                using(await syncRoot.LockAsync())
+                using(await syncRoot.LockAsync().Await())
                 {
                     try
                     {
@@ -400,7 +400,12 @@
                         //
                         // We need to decide how to respond to these, but definitely we cannot
                         // let this error propagate as it could take down the SessionDispatcher
-                        Session.Connection.OnAsyncException(e);
+
+                        // To let close the existing session/connection in error handler
+                        using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
+                        {
+                            Session.Connection.OnAsyncException(e);
+                        }
                     }
                 }
             }
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 0eb697c..15ef81e 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -841,6 +841,11 @@
             }
         }
 
+        internal IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+        {
+            return dispatcher?.ExcludeCheckIsOnDeliveryExecutionFlow();
+        }
+
         public async Task OnConnectionRecovery(IProvider provider)
         {
             await provider.CreateResource(SessionInfo).Await();
diff --git a/src/NMS.AMQP/SessionDispatcher.cs b/src/NMS.AMQP/SessionDispatcher.cs
index 70d26e4..53a5da9 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
@@ -62,5 +63,28 @@
             cts.Cancel();
             cts.Dispose();
         }
+
+        public IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+        {
+            return new ExcludeCheckIsOnDeliveryExecutionFlowBlock(this);
+        }
+
+        private class ExcludeCheckIsOnDeliveryExecutionFlowBlock : IDisposable
+        {
+            private readonly bool previousValue = false;
+            private readonly SessionDispatcher sessionDispatcher;
+
+            public ExcludeCheckIsOnDeliveryExecutionFlowBlock(SessionDispatcher sessionDispatcher)
+            {
+                this.sessionDispatcher = sessionDispatcher;
+                this.previousValue = sessionDispatcher.isOnDispatcherFlow.Value;
+                sessionDispatcher.isOnDispatcherFlow.Value = false;
+            }
+
+            public void Dispose()
+            {
+                sessionDispatcher.isOnDispatcherFlow.Value = previousValue;
+            }
+        }
     }
 }
\ No newline at end of file