Merge pull request #24 from Havret/connection_close_should_not_throw_exception
AMQNET-603: AmqpProvider shouldn't signal exception when connection is explicitly closed
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index 38f16d4..03ca15e 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -71,7 +71,7 @@
Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
- underlyingConnection.AddClosedCallback(Provider.OnInternalClosed);
+ underlyingConnection.AddClosedCallback((sender, error) => Provider.OnConnectionClosed(error));
// Wait for connection to be opened
await tsc.Task;
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index 64397e7..24022a8 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -56,9 +56,13 @@
return connection.Start();
}
- internal void OnInternalClosed(IAmqpObject sender, Error error)
+ internal void OnConnectionClosed(Error error)
{
- Listener?.OnConnectionFailure(ExceptionSupport.GetException(error));
+ bool connectionExplicitlyClosed = error == null;
+ if (!connectionExplicitlyClosed)
+ {
+ Listener?.OnConnectionFailure(ExceptionSupport.GetException(error));
+ }
}
internal void FireConnectionEstablished()
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs
index 0dac25a..e9849a5 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConnectionIntegrationTest.cs
@@ -40,6 +40,22 @@
}
[Test, Timeout(20_000)]
+ public void TestExplicitConnectionCloseListenerIsNotInvoked()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent exceptionFired = new ManualResetEvent(false);
+ IConnection connection = EstablishConnection(testPeer);
+ connection.ExceptionListener += exception => { exceptionFired.Set(); };
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ Assert.IsFalse(exceptionFired.WaitOne(TimeSpan.FromMilliseconds(100)));
+ }
+ }
+
+ [Test, Timeout(20_000)]
public void TestCreateAutoAckSession()
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())