AMQNET-622: Invoke IConnection event listeners on connection lost and on reconnect
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 6ce0903..978c2f4 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -326,6 +326,8 @@
{
listener.OnConnectionRestored(remoteUri);
}
+
+ ConnectionResumedListener?.Invoke();
}
public void OnResourceClosed(INmsResource resource, Exception error)
@@ -370,6 +372,8 @@
foreach (INmsConnectionListener listener in connectionListeners)
listener.OnConnectionInterrupted(failedUri);
+
+ ConnectionInterruptedListener?.Invoke();
}
private void CheckClosedOrFailed()
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 2bd93ff..d853df0 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1060,6 +1060,59 @@
}
}
+ [Test, Timeout(20_000), Category("Windows")]
+ public void TestConnectionInterruptedInvokedWhenConnectionToBrokerLost()
+ {
+ using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent connectionInterruptedInvoked = new ManualResetEvent(false);
+
+ originalPeer.ExpectSaslAnonymous();
+ originalPeer.ExpectOpen();
+ originalPeer.ExpectBegin();
+ originalPeer.ExpectBegin();
+
+ NmsConnection connection = EstablishAnonymousConnection(originalPeer);
+
+ connection.ConnectionInterruptedListener += () => connectionInterruptedInvoked.Set();
+
+ connection.Start();
+
+ originalPeer.Close();
+
+ Assert.IsTrue(connectionInterruptedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+ }
+ }
+
+ [Test, Timeout(20_000), Category("Windows")]
+ public void TestConnectionResumedInvokedWhenConnectionToBrokerLost()
+ {
+ using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+ using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent connectionResumedInvoked = new ManualResetEvent(false);
+
+ originalPeer.ExpectSaslAnonymous();
+ originalPeer.ExpectOpen();
+ originalPeer.ExpectBegin();
+ originalPeer.ExpectBegin();
+
+ finalPeer.ExpectSaslAnonymous();
+ finalPeer.ExpectOpen();
+ finalPeer.ExpectBegin();
+ finalPeer.ExpectBegin();
+
+ NmsConnection connection = EstablishAnonymousConnection(originalPeer, finalPeer);
+
+ connection.ConnectionResumedListener += () => connectionResumedInvoked.Set();
+
+ connection.Start();
+
+ originalPeer.Close();
+ Assert.IsTrue(connectionResumedInvoked.WaitOne(TimeSpan.FromSeconds(10)));
+ }
+ }
+
private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
{
return EstablishAnonymousConnection(null, null, peers);