Ensure that a QueueBrowser does not filter expired messages, instead deliver the complete Queue snapshot.
Fixes [AMQNET-AMQNET-505]. (See https://issues.apache.org/jira/browse/AMQNET-AMQNET-505)
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index a500b4f..62e64d0 100755
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -1035,7 +1035,7 @@
{
return null;
}
- else if(!IgnoreExpiration && dispatch.Message.IsExpired())
+ else if(ConsumeExpiredMessage(dispatch))
{
Tracer.DebugFormat("Consumer[{0}] received expired message: {1}",
ConsumerId, dispatch.Message.MessageId);
@@ -1073,6 +1073,16 @@
}
}
+ private bool ConsumeExpiredMessage(MessageDispatch dispatch)
+ {
+ if (dispatch.Message.IsExpired())
+ {
+ return !info.Browser && !IgnoreExpiration;
+ }
+
+ return false;
+ }
+
public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
{
dispatch.DeliverySequenceId = session.NextDeliveryId;
diff --git a/src/test/csharp/QueueBrowserTests.cs b/src/test/csharp/QueueBrowserTests.cs
index 3faaa12..2d9919b 100644
--- a/src/test/csharp/QueueBrowserTests.cs
+++ b/src/test/csharp/QueueBrowserTests.cs
@@ -205,6 +205,76 @@
IQueueBrowser browser = session.CreateBrowser(queue);
browser.Close();
}
- }
+ }
+
+ [Test]
+ public void TestBrowsingExpiration()
+ {
+ const int MESSAGES_TO_SEND = 50;
+ const string QUEUE_NAME = "TEST.TestBrowsingExpiration";
+
+ SendTestMessages(MESSAGES_TO_SEND, QUEUE_NAME);
+
+ // Browse the queue.
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ connection.Start();
+ int browsed = Browse(QUEUE_NAME, connection);
+
+ // The number of messages browsed should be equal to the number of
+ // messages sent.
+ Assert.AreEqual(MESSAGES_TO_SEND, browsed);
+
+ // Broker expired message period is 30 seconds by default
+ for (int i = 0; i < 12; ++i)
+ {
+ Thread.Sleep(5000);
+ browsed = Browse(QUEUE_NAME, connection);
+ }
+
+ session.DeleteDestination(session.GetQueue(QUEUE_NAME));
+
+ Assert.AreEqual(0, browsed);
+ }
+ }
+
+ private int Browse(String queueName, Connection connection)
+ {
+ int browsed = 0;
+
+ using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IQueueBrowser browser = session.CreateBrowser(queue))
+ {
+ IEnumerator enumeration = browser.GetEnumerator();
+ while (enumeration.MoveNext())
+ {
+ ITextMessage message = enumeration.Current as ITextMessage;
+ browsed++;
+ }
+ }
+
+ return browsed;
+ }
+
+ protected void SendTestMessages(int count, String queueName)
+ {
+ // Send the messages to the Queue.
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ for (int i = 1; i <= count; i++)
+ {
+ String msgStr = "Message: " + i;
+ producer.Send(session.CreateTextMessage(msgStr),
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(1500));
+ }
+ }
+ }
}
}