| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using System.Collections; |
| using System.Collections.Generic; |
| using System.Diagnostics; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.Test; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.ActiveMQ.Test |
| { |
| [TestFixture] |
| public class QueueBrowserTests : NMSTestSupport |
| { |
| [Test] |
| public void TestReceiveBrowseReceive() |
| { |
| using (IConnection connection = CreateConnection()) |
| { |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| { |
| IDestination destination = session.GetQueue("TEST.ReceiveBrowseReceive"); |
| IMessageProducer producer = session.CreateProducer(destination); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| connection.Start(); |
| |
| IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"), |
| session.CreateTextMessage("Second Message"), |
| session.CreateTextMessage("Third Message")}; |
| |
| // lets consume any outstanding messages from previous test runs |
| while (consumer.Receive(TimeSpan.FromMilliseconds(1000)) != null) |
| { |
| } |
| |
| producer.Send(outbound[0]); |
| producer.Send(outbound[1]); |
| producer.Send(outbound[2]); |
| |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| |
| // Get the first. |
| Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)msg).Text); |
| consumer.Close(); |
| |
| IQueueBrowser browser = session.CreateBrowser((IQueue)destination); |
| IEnumerator enumeration = browser.GetEnumerator(); |
| |
| // browse the second |
| Assert.IsTrue(enumeration.MoveNext(), "should have received the second message"); |
| Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text); |
| |
| // browse the third. |
| Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message"); |
| Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text); |
| |
| // There should be no more. |
| bool tooMany = false; |
| while (enumeration.MoveNext()) |
| { |
| Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text); |
| tooMany = true; |
| } |
| Assert.IsFalse(tooMany); |
| |
| //Reset should take us back to the start. |
| enumeration.Reset(); |
| |
| // browse the second |
| Assert.IsTrue(enumeration.MoveNext(), "should have received the second message"); |
| Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text); |
| |
| // browse the third. |
| Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message"); |
| Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text); |
| |
| // There should be no more. |
| tooMany = false; |
| while (enumeration.MoveNext()) |
| { |
| Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text); |
| tooMany = true; |
| } |
| Assert.IsFalse(tooMany); |
| |
| browser.Close(); |
| |
| // Re-open the consumer. |
| consumer = session.CreateConsumer(destination); |
| |
| // Receive the second. |
| Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); |
| // Receive the third. |
| Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); |
| consumer.Close(); |
| } |
| } |
| } |
| |
| [Test] |
| public void TestBroserIteratively() |
| { |
| using (IConnection connection = CreateConnection()) |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| { |
| connection.Start(); |
| |
| IQueue queue = session.CreateTemporaryQueue(); |
| // enqueue a message |
| using (IMessageProducer producer = session.CreateProducer(queue)) |
| { |
| IMessage message = producer.CreateMessage(); |
| producer.Send(message); |
| } |
| |
| Thread.Sleep(2000); |
| |
| // browse queue several times |
| for (int j = 0; j < 1000; j++) |
| { |
| using(QueueBrowser browser = session.CreateBrowser(queue) as QueueBrowser) |
| { |
| Tracer.DebugFormat("Running Iterative QueueBrowser sample #{0}", j); |
| IEnumerator enumeration = browser.GetEnumerator(); |
| Assert.IsTrue(enumeration.MoveNext(), "should have received the second message"); |
| } |
| } |
| } |
| } |
| |
| [Test] |
| public void TestBrowseReceive() |
| { |
| using (IConnection connection = CreateConnection()) |
| { |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| { |
| IDestination destination = session.GetQueue("TEST.BrowseReceive"); |
| |
| connection.Start(); |
| |
| using(IMessageConsumer purger = session.CreateConsumer(destination)) |
| { |
| // lets consume any outstanding messages from previous test runs |
| while(purger.Receive(TimeSpan.FromMilliseconds(1000)) != null) |
| { |
| } |
| |
| purger.Close(); |
| } |
| |
| IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"), |
| session.CreateTextMessage("Second Message"), |
| session.CreateTextMessage("Third Message")}; |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| producer.Send(outbound[0]); |
| |
| // create browser first |
| IQueueBrowser browser = session.CreateBrowser((IQueue)destination); |
| IEnumerator enumeration = browser.GetEnumerator(); |
| |
| // create consumer |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // browse the first message |
| Assert.IsTrue(enumeration.MoveNext(), "should have received the first message"); |
| Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)enumeration.Current).Text); |
| |
| // Receive the first message. |
| Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); |
| consumer.Close(); |
| browser.Close(); |
| producer.Close(); |
| } |
| } |
| } |
| |
| [Test] |
| [ExpectedException("Apache.NMS.NMSException")] |
| public void TestCreateBrowserFailsWithZeroPrefetch() |
| { |
| using (Connection connection = CreateConnection() as Connection) |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| { |
| connection.PrefetchPolicy.QueueBrowserPrefetch = 0; |
| IQueue queue = session.CreateTemporaryQueue(); |
| IQueueBrowser browser = session.CreateBrowser(queue); |
| browser.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestBrowsingExpiration() |
| { |
| const int MESSAGES_TO_SEND = 50; |
| const string QUEUE_NAME = "TEST.TestBrowsingExpiration"; |
| |
| // Browse the queue. |
| using (Connection connection = CreateConnection() as Connection) |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| using (IQueue queue = session.GetQueue(QUEUE_NAME)) |
| { |
| session.DeleteDestination(queue); |
| |
| SendTestMessages(MESSAGES_TO_SEND, QUEUE_NAME); |
| |
| connection.Start(); |
| int browsed = Browse(QUEUE_NAME, connection); |
| |
| // The number of messages browsed should be equal to the number of |
| // messages sent. |
| Assert.AreEqual(MESSAGES_TO_SEND, browsed); |
| |
| // Broker expired message period is 30 seconds by default |
| for (int i = 0; i < 12; ++i) |
| { |
| Thread.Sleep(5000); |
| browsed = Browse(QUEUE_NAME, connection); |
| } |
| |
| session.DeleteDestination(session.GetQueue(QUEUE_NAME)); |
| |
| Assert.AreEqual(0, browsed); |
| } |
| } |
| |
| private int Browse(String queueName, Connection connection) |
| { |
| int browsed = 0; |
| |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| using (IQueue queue = session.GetQueue(queueName)) |
| using (IQueueBrowser browser = session.CreateBrowser(queue)) |
| { |
| IEnumerator enumeration = browser.GetEnumerator(); |
| while (enumeration.MoveNext()) |
| { |
| ITextMessage message = enumeration.Current as ITextMessage; |
| Tracer.DebugFormat("Browsed message: {0}", message.NMSMessageId); |
| browsed++; |
| } |
| } |
| |
| return browsed; |
| } |
| |
| protected void SendTestMessages(int count, String queueName) |
| { |
| // Send the messages to the Queue. |
| using (Connection connection = CreateConnection() as Connection) |
| using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) |
| using (IQueue queue = session.GetQueue(queueName)) |
| using (IMessageProducer producer = session.CreateProducer(queue)) |
| { |
| for (int i = 1; i <= count; i++) |
| { |
| String msgStr = "Message: " + i; |
| producer.Send(session.CreateTextMessage(msgStr), |
| MsgDeliveryMode.NonPersistent, |
| MsgPriority.Normal, |
| TimeSpan.FromMilliseconds(1500)); |
| } |
| } |
| } |
| } |
| } |