blob: d0c20523adeade7b2d65472b832f84048d20dc0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using Apache.NMS.Util;
using NUnit.Framework;
using NUnit.Framework.Extensions;
namespace Apache.NMS.Test
{
[TestFixture]
public class ConsumerTest : NMSTestSupport
{
protected static string TEST_CLIENT_ID = "ConsumerTestClientId";
protected static string TOPIC = "TestTopicConsumerTest";
protected static string CONSUMER_ID = "ConsumerTestConsumerId";
#if !NET_1_1
[RowTest]
[Row(true)]
[Row(false)]
#endif
public void TestDurableConsumerSelectorChange(bool persistent)
{
try
{
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
ITopic topic = SessionUtil.GetTopic(session, TOPIC);
IMessageProducer producer = session.CreateProducer(topic);
IMessageConsumer consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='red'", false);
producer.Persistent = persistent;
// Send the messages
ITextMessage sendMessage = session.CreateTextMessage("1st");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);
ITextMessage receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve 1st durable message.");
Assert.AreEqual("1st", receiveMsg.Text);
Assert.AreEqual(persistent, receiveMsg.NMSPersistent, "NMSPersistent does not match");
// Change the subscription.
consumer.Dispose();
consumer = session.CreateDurableConsumer(topic, CONSUMER_ID, "color='blue'", false);
sendMessage = session.CreateTextMessage("2nd");
sendMessage.Properties["color"] = "red";
producer.Send(sendMessage);
sendMessage = session.CreateTextMessage("3rd");
sendMessage.Properties["color"] = "blue";
producer.Send(sendMessage);
// Selector should skip the 2nd message.
receiveMsg = consumer.Receive(receiveTimeout) as ITextMessage;
Assert.IsNotNull(receiveMsg, "Failed to retrieve durable message.");
Assert.AreEqual("3rd", receiveMsg.Text, "Retrieved the wrong durable message.");
Assert.AreEqual(persistent, receiveMsg.NMSPersistent, "NMSPersistent does not match");
// Make sure there are no pending messages.
Assert.IsNull(consumer.ReceiveNoWait(), "Wrong number of messages in durable subscription.");
}
}
}
catch(Exception ex)
{
Assert.Fail(ex.Message);
}
finally
{
UnregisterDurableConsumer(TEST_CLIENT_ID, CONSUMER_ID);
}
}
// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
#if !NETCF
[Test]
public void TestNoTimeoutConsumer()
{
// Launch a thread to perform IMessageConsumer.Receive().
// If it doesn't fail in less than three seconds, no exception was thrown.
Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
ITemporaryQueue queue = session.CreateTemporaryQueue();
using(this.timeoutConsumer = session.CreateConsumer(queue))
{
receiveThread.Start();
if(receiveThread.Join(3000))
{
Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed.");
}
else
{
// Kill the thread - otherwise it'll sit in Receive() until a message arrives.
receiveThread.Interrupt();
}
}
}
}
}
protected IMessageConsumer timeoutConsumer;
protected void TimeoutConsumerThreadProc()
{
try
{
timeoutConsumer.Receive();
}
catch(ArgumentOutOfRangeException e)
{
// The test failed. We will know because the timeout will expire inside TestNoTimeoutConsumer().
Assert.Fail("Test failed with exception: " + e.Message);
}
catch(ThreadInterruptedException)
{
// The test succeeded! We were still blocked when we were interrupted.
}
catch(Exception e)
{
// Some other exception occurred.
Assert.Fail("Test failed with exception: " + e.Message);
}
}
#endif
}
}