Merge pull request #65 from lukeabsent/AMQNET-637
AMQNET-637 Allow closing on error handler
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index d64632d..63f6896 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -327,7 +327,7 @@
if (Session.IsStarted && started && Listener != null)
{
- using(await syncRoot.LockAsync())
+ using(await syncRoot.LockAsync().Await())
{
try
{
@@ -400,7 +400,12 @@
//
// We need to decide how to respond to these, but definitely we cannot
// let this error propagate as it could take down the SessionDispatcher
- Session.Connection.OnAsyncException(e);
+
+ // To let close the existing session/connection in error handler
+ using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
+ {
+ Session.Connection.OnAsyncException(e);
+ }
}
}
}
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 0eb697c..15ef81e 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -841,6 +841,11 @@
}
}
+ internal IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+ {
+ return dispatcher?.ExcludeCheckIsOnDeliveryExecutionFlow();
+ }
+
public async Task OnConnectionRecovery(IProvider provider)
{
await provider.CreateResource(SessionInfo).Await();
diff --git a/src/NMS.AMQP/SessionDispatcher.cs b/src/NMS.AMQP/SessionDispatcher.cs
index 70d26e4..53a5da9 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
@@ -62,5 +63,28 @@
cts.Cancel();
cts.Dispose();
}
+
+ public IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+ {
+ return new ExcludeCheckIsOnDeliveryExecutionFlowBlock(this);
+ }
+
+ private class ExcludeCheckIsOnDeliveryExecutionFlowBlock : IDisposable
+ {
+ private readonly bool previousValue = false;
+ private readonly SessionDispatcher sessionDispatcher;
+
+ public ExcludeCheckIsOnDeliveryExecutionFlowBlock(SessionDispatcher sessionDispatcher)
+ {
+ this.sessionDispatcher = sessionDispatcher;
+ this.previousValue = sessionDispatcher.isOnDispatcherFlow.Value;
+ sessionDispatcher.isOnDispatcherFlow.Value = false;
+ }
+
+ public void Dispose()
+ {
+ sessionDispatcher.isOnDispatcherFlow.Value = previousValue;
+ }
+ }
}
}
\ No newline at end of file