blob: 5e9bf6c50f9d1573b8e349ec4c85035537a1d6e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Test;
using NUnit.Framework;
namespace Apache.NMS.ActiveMQ.Test
{
[TestFixture]
public class QueueBrowserTests : NMSTestSupport
{
[Test]
public void TestReceiveBrowseReceive()
{
using (IConnection connection = CreateConnection())
{
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination = session.GetQueue("TEST.ReceiveBrowseReceive");
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
connection.Start();
IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message"),
session.CreateTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
while (consumer.Receive(TimeSpan.FromMilliseconds(1000)) != null)
{
}
producer.Send(outbound[0]);
producer.Send(outbound[1]);
producer.Send(outbound[2]);
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
// Get the first.
Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)msg).Text);
consumer.Close();
IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
IEnumerator enumeration = browser.GetEnumerator();
// browse the second
Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
// browse the third.
Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
// There should be no more.
bool tooMany = false;
while (enumeration.MoveNext())
{
Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
tooMany = true;
}
Assert.IsFalse(tooMany);
//Reset should take us back to the start.
enumeration.Reset();
// browse the second
Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
// browse the third.
Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
// There should be no more.
tooMany = false;
while (enumeration.MoveNext())
{
Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
tooMany = true;
}
Assert.IsFalse(tooMany);
browser.Close();
// Re-open the consumer.
consumer = session.CreateConsumer(destination);
// Receive the second.
Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
// Receive the third.
Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
consumer.Close();
}
}
}
[Test]
public void TestBroserIteratively()
{
using (IConnection connection = CreateConnection())
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
connection.Start();
IQueue queue = session.CreateTemporaryQueue();
// enqueue a message
using (IMessageProducer producer = session.CreateProducer(queue))
{
IMessage message = producer.CreateMessage();
producer.Send(message);
}
Thread.Sleep(2000);
// browse queue several times
for (int j = 0; j < 1000; j++)
{
using(QueueBrowser browser = session.CreateBrowser(queue) as QueueBrowser)
{
Tracer.DebugFormat("Running Iterative QueueBrowser sample #{0}", j);
IEnumerator enumeration = browser.GetEnumerator();
Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
}
}
}
}
[Test]
public void TestBrowseReceive()
{
using (IConnection connection = CreateConnection())
{
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination = session.GetQueue("TEST.BrowseReceive");
connection.Start();
using(IMessageConsumer purger = session.CreateConsumer(destination))
{
// lets consume any outstanding messages from previous test runs
while(purger.Receive(TimeSpan.FromMilliseconds(1000)) != null)
{
}
purger.Close();
}
IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
session.CreateTextMessage("Second Message"),
session.CreateTextMessage("Third Message")};
IMessageProducer producer = session.CreateProducer(destination);
producer.Send(outbound[0]);
// create browser first
IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
IEnumerator enumeration = browser.GetEnumerator();
// create consumer
IMessageConsumer consumer = session.CreateConsumer(destination);
// browse the first message
Assert.IsTrue(enumeration.MoveNext(), "should have received the first message");
Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)enumeration.Current).Text);
// Receive the first message.
Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
consumer.Close();
browser.Close();
producer.Close();
}
}
}
[Test]
[ExpectedException("Apache.NMS.NMSException")]
public void TestCreateBrowserFailsWithZeroPrefetch()
{
using (Connection connection = CreateConnection() as Connection)
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
connection.PrefetchPolicy.QueueBrowserPrefetch = 0;
IQueue queue = session.CreateTemporaryQueue();
IQueueBrowser browser = session.CreateBrowser(queue);
browser.Close();
}
}
[Test]
public void TestBrowsingExpiration()
{
const int MESSAGES_TO_SEND = 50;
const string QUEUE_NAME = "TEST.TestBrowsingExpiration";
SendTestMessages(MESSAGES_TO_SEND, QUEUE_NAME);
// Browse the queue.
using (Connection connection = CreateConnection() as Connection)
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
connection.Start();
int browsed = Browse(QUEUE_NAME, connection);
// The number of messages browsed should be equal to the number of
// messages sent.
Assert.AreEqual(MESSAGES_TO_SEND, browsed);
// Broker expired message period is 30 seconds by default
for (int i = 0; i < 12; ++i)
{
Thread.Sleep(5000);
browsed = Browse(QUEUE_NAME, connection);
}
session.DeleteDestination(session.GetQueue(QUEUE_NAME));
Assert.AreEqual(0, browsed);
}
}
private int Browse(String queueName, Connection connection)
{
int browsed = 0;
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using (IQueue queue = session.GetQueue(queueName))
using (IQueueBrowser browser = session.CreateBrowser(queue))
{
IEnumerator enumeration = browser.GetEnumerator();
while (enumeration.MoveNext())
{
browsed++;
}
}
return browsed;
}
protected void SendTestMessages(int count, String queueName)
{
// Send the messages to the Queue.
using (Connection connection = CreateConnection() as Connection)
using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
using (IQueue queue = session.GetQueue(queueName))
using (IMessageProducer producer = session.CreateProducer(queue))
{
for (int i = 1; i <= count; i++)
{
String msgStr = "Message: " + i;
producer.Send(session.CreateTextMessage(msgStr),
MsgDeliveryMode.NonPersistent,
MsgPriority.Normal,
TimeSpan.FromMilliseconds(1500));
}
}
}
}
}