Merge pull request #25 from Havret/update_docs
NO-JIRA: Update docs
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index 586fa62..f8bf283 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -514,7 +514,7 @@
}
}
- public void OnAsyncException(Exception error)
+ internal void OnAsyncException(Exception error)
{
ExceptionListener?.Invoke(error);
}
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index deb3141..13a4cf0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -213,55 +213,68 @@
{
lock (SyncRoot)
{
- if (started && Listener != null)
+ try
{
- var envelope = messageQueue.DequeueNoWait();
- if (envelope == null)
- return;
-
- if (IsMessageExpired(envelope))
+ if (started && Listener != null)
{
- if (Tracer.IsDebugEnabled)
- Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
+ var envelope = messageQueue.DequeueNoWait();
+ if (envelope == null)
+ return;
- DoAckExpired(envelope);
- }
- else if (IsRedeliveryExceeded(envelope))
- {
- if (Tracer.IsDebugEnabled)
- Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
+ if (IsMessageExpired(envelope))
+ {
+ if (Tracer.IsDebugEnabled)
+ Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
- // TODO: Apply redelivery policy
- DoAckExpired(envelope);
- }
- else
- {
- bool deliveryFailed = false;
- bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
+ DoAckExpired(envelope);
+ }
+ else if (IsRedeliveryExceeded(envelope))
+ {
+ if (Tracer.IsDebugEnabled)
+ Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount}");
- if (autoAckOrDupsOk)
- DoAckDelivered(envelope);
+ // TODO: Apply redelivery policy
+ DoAckExpired(envelope);
+ }
else
- AckFromReceive(envelope);
+ {
+ bool deliveryFailed = false;
+ bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
- try
- {
- Listener.Invoke(envelope.Message.Copy());
- }
- catch (Exception)
- {
- deliveryFailed = true;
- }
-
- if (autoAckOrDupsOk)
- {
- if (!deliveryFailed)
- DoAckConsumed(envelope);
+ if (autoAckOrDupsOk)
+ DoAckDelivered(envelope);
else
- DoAckReleased(envelope);
+ AckFromReceive(envelope);
+
+ try
+ {
+ Listener.Invoke(envelope.Message.Copy());
+ }
+ catch (Exception)
+ {
+ deliveryFailed = true;
+ }
+
+ if (autoAckOrDupsOk)
+ {
+ if (!deliveryFailed)
+ DoAckConsumed(envelope);
+ else
+ DoAckReleased(envelope);
+ }
}
}
}
+ catch (Exception e)
+ {
+ // TODO - There are two cases when we can get an error here:
+ // 1) error returned from the attempted ACK that was sent
+ // 2) error while attempting to copy the incoming message.
+ //
+ // We need to decide how to respond to these, but definitely we cannot
+ // let this error propagate as it could take down the SessionDispatcher
+ Session.Connection.OnAsyncException(e);
+ }
}
}
}
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 897daa0..e4fb515 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -952,6 +952,71 @@
}
}
+ [Test, Timeout(20_000)]
+ public void TestConsumerCanReceivesMessagesWhenConnectionLostDuringAutoAck()
+ {
+ using (TestAmqpPeer originalPeer = new TestAmqpPeer())
+ using (TestAmqpPeer finalPeer = new TestAmqpPeer())
+ {
+ ManualResetEvent originalConnected = new ManualResetEvent(false);
+ ManualResetEvent finalConnected = new ManualResetEvent(false);
+ ManualResetEvent exceptionThrown = new ManualResetEvent(false);
+
+ // Connect to the first peer
+ originalPeer.ExpectSaslAnonymous();
+ originalPeer.ExpectOpen();
+ originalPeer.ExpectBegin();
+ originalPeer.ExpectBegin();
+
+ NmsConnection connection = EstablishAnonymousConnection(originalPeer, finalPeer);
+ connection.ExceptionListener += exception => { exceptionThrown.Set(); };
+
+ Mock<INmsConnectionListener> connectionListener = new Mock<INmsConnectionListener>();
+
+ connectionListener
+ .Setup(listener => listener.OnConnectionEstablished(It.IsAny<Uri>()))
+ .Callback(() => { originalConnected.Set(); });
+
+ connectionListener
+ .Setup(listener => listener.OnConnectionRestored(It.IsAny<Uri>()))
+ .Callback(() => { finalConnected.Set(); });
+
+ connection.AddConnectionListener(connectionListener.Object);
+
+ connection.Start();
+
+ Assert.True(originalConnected.WaitOne(TimeSpan.FromSeconds(5)), "Should connect to original peer");
+
+ originalPeer.ExpectReceiverAttach();
+ originalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+ originalPeer.DropAfterLastMatcher();
+
+ // Post Failover Expectations of FinalPeer
+ finalPeer.ExpectSaslAnonymous();
+ finalPeer.ExpectOpen();
+ finalPeer.ExpectBegin();
+ finalPeer.ExpectBegin();
+ finalPeer.ExpectReceiverAttach();
+ finalPeer.ExpectLinkFlowRespondWithTransfer(message: CreateMessageWithContent(), 1);
+ finalPeer.ExpectDispositionThatIsAcceptedAndSettled();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+ int msgReceivedCount = 0;
+ messageConsumer.Listener += message =>
+ {
+ finalConnected.WaitOne(TimeSpan.FromSeconds(5));
+ msgReceivedCount++;
+ };
+
+ finalPeer.WaitForAllMatchersToComplete(5000);
+
+ Assert.AreEqual(2, msgReceivedCount);
+ Assert.IsTrue(exceptionThrown.WaitOne(TimeSpan.FromSeconds(1)));
+ }
+ }
+
private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
{
return EstablishAnonymousConnection(null, null, peers);