/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
using System; | |
using System.Threading; | |
using NUnit.Framework; | |
namespace Apache.NMS.Test | |
{ | |
//[TestFixture] | |
public class ConsumerTest : NMSTest | |
{ | |
protected const int COUNT = 25; | |
protected const string VALUE_NAME = "value"; | |
private bool dontAck; | |
protected ConsumerTest(NMSTestSupport testSupport) | |
: base(testSupport) | |
{ | |
} | |
// The .NET CF does not have the ability to interrupt threads, so this test is impossible. | |
#if !NETCF | |
//[Test] | |
public virtual void TestNoTimeoutConsumer( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode) | |
{ | |
// Launch a thread to perform IMessageConsumer.Receive(). | |
// If it doesn't fail in less than three seconds, no exception was thrown. | |
Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
using(this.timeoutConsumer = session.CreateConsumer(queue)) | |
{ | |
receiveThread.Start(); | |
if(receiveThread.Join(3000)) | |
{ | |
Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); | |
} | |
else | |
{ | |
// Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
receiveThread.Interrupt(); | |
} | |
} | |
} | |
} | |
} | |
protected IMessageConsumer timeoutConsumer; | |
protected void TimeoutConsumerThreadProc() | |
{ | |
try | |
{ | |
timeoutConsumer.Receive(); | |
} | |
catch(ArgumentOutOfRangeException e) | |
{ | |
// The test failed. We will know because the timeout will expire inside TestNoTimeoutConsumer(). | |
Assert.Fail("Test failed with exception: " + e.Message); | |
} | |
catch(ThreadInterruptedException) | |
{ | |
// The test succeeded! We were still blocked when we were interrupted. | |
} | |
catch(Exception e) | |
{ | |
// Some other exception occurred. | |
Assert.Fail("Test failed with exception: " + e.Message); | |
} | |
} | |
//[Test] | |
public virtual void TestSyncReceiveConsumerClose( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode) | |
{ | |
// Launch a thread to perform IMessageConsumer.Receive(). | |
// If it doesn't fail in less than three seconds, no exception was thrown. | |
Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); | |
using (IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
using (ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
using (this.timeoutConsumer = session.CreateConsumer(queue)) | |
{ | |
receiveThread.Start(); | |
if (receiveThread.Join(3000)) | |
{ | |
Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); | |
} | |
else | |
{ | |
// Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
this.timeoutConsumer.Close(); | |
receiveThread.Join(10000); | |
if (receiveThread.IsAlive) | |
{ | |
// Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
receiveThread.Interrupt(); | |
Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it."); | |
} | |
} | |
} | |
} | |
} | |
} | |
internal class ThreadArg | |
{ | |
internal IConnection connection = null; | |
internal ISession session = null; | |
internal IDestination destination = null; | |
} | |
protected void DelayedProducerThreadProc(Object arg) | |
{ | |
try | |
{ | |
ThreadArg args = arg as ThreadArg; | |
using(ISession session = args.connection.CreateSession()) | |
{ | |
using(IMessageProducer producer = session.CreateProducer(args.destination)) | |
{ | |
// Give the consumer time to enter the receive. | |
Thread.Sleep(5000); | |
producer.Send(args.session.CreateTextMessage("Hello World")); | |
} | |
} | |
} | |
catch(Exception e) | |
{ | |
// Some other exception occurred. | |
Assert.Fail("Test failed with exception: " + e.Message); | |
} | |
} | |
//[Test] | |
public virtual void TestDoChangeSentMessage( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode, | |
//[Values(true, false)] | |
bool doClear) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
using(IMessageConsumer consumer = session.CreateConsumer(queue)) | |
{ | |
IMessageProducer producer = session.CreateProducer(queue); | |
ITextMessage message = session.CreateTextMessage(); | |
string prefix = "ConsumerTest - TestDoChangeSentMessage: "; | |
for(int i = 0; i < COUNT; i++) | |
{ | |
message.Properties[VALUE_NAME] = i; | |
message.Text = prefix + Convert.ToString(i); | |
producer.Send(message); | |
if(doClear) | |
{ | |
message.ClearBody(); | |
message.ClearProperties(); | |
} | |
} | |
if(ackMode == AcknowledgementMode.Transactional) | |
{ | |
session.Commit(); | |
} | |
for(int i = 0; i < COUNT; i++) | |
{ | |
ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; | |
Assert.AreEqual(msg.Text, prefix + Convert.ToString(i)); | |
Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i); | |
} | |
if(ackMode == AcknowledgementMode.Transactional) | |
{ | |
session.Commit(); | |
} | |
} | |
} | |
} | |
} | |
//[Test] | |
public virtual void TestConsumerReceiveBeforeMessageDispatched( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode) | |
{ | |
// Launch a thread to perform a delayed send. | |
Thread sendThread = new Thread(DelayedProducerThreadProc); | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
using(IMessageConsumer consumer = session.CreateConsumer(queue)) | |
{ | |
ThreadArg arg = new ThreadArg(); | |
arg.connection = connection; | |
arg.session = session; | |
arg.destination = queue; | |
sendThread.Start(arg); | |
IMessage message = consumer.Receive(TimeSpan.FromMinutes(1)); | |
Assert.IsNotNull(message); | |
} | |
} | |
} | |
} | |
//[Test] | |
public virtual void TestDontStart( | |
//[Values(MsgDeliveryMode.NonPersistent)] | |
MsgDeliveryMode deliveryMode, | |
//[Values(DestinationType.Queue, DestinationType.Topic)] | |
DestinationType destinationType, string testDestinationRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
ISession session = connection.CreateSession(); | |
IDestination destination = GetClearDestination(session, destinationType, testDestinationRef); | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
// Send the messages | |
SendMessages(session, destination, deliveryMode, 1); | |
// Make sure no messages were delivered. | |
Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000))); | |
} | |
} | |
//[Test] | |
public void TestSendReceiveTransacted( | |
//[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)] | |
MsgDeliveryMode deliveryMode, | |
//[Values(DestinationType.Queue, DestinationType.Topic, DestinationType.TemporaryQueue, DestinationType.TemporaryTopic)] | |
DestinationType destinationType, string testDestinationRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
// Send a message to the broker. | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.Transactional); | |
IDestination destination = GetClearDestination(session, destinationType, testDestinationRef); | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.DeliveryMode = deliveryMode; | |
producer.Send(session.CreateTextMessage("Test")); | |
// Message should not be delivered until commit. | |
Thread.Sleep(1000); | |
Assert.IsNull(consumer.ReceiveNoWait()); | |
session.Commit(); | |
// Make sure only 1 message was delivered. | |
IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(message); | |
Assert.IsFalse(message.NMSRedelivered); | |
Assert.IsNull(consumer.ReceiveNoWait()); | |
// Message should be redelivered is rollback is used. | |
session.Rollback(); | |
// Make sure only 1 message was delivered. | |
message = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
Assert.IsNotNull(message); | |
Assert.IsTrue(message.NMSRedelivered); | |
Assert.IsNull(consumer.ReceiveNoWait()); | |
// If we commit now, the message should not be redelivered. | |
session.Commit(); | |
Thread.Sleep(1000); | |
Assert.IsNull(consumer.ReceiveNoWait()); | |
} | |
} | |
//[Test] | |
public virtual void TestAckedMessageAreConsumed(string testQueueRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.Send(session.CreateTextMessage("Hello")); | |
// Consume the message... | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(msg); | |
msg.Acknowledge(); | |
// Reset the session. | |
session.Close(); | |
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
// Attempt to Consume the message... | |
consumer = session.CreateConsumer(destination); | |
msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNull(msg); | |
session.Close(); | |
} | |
} | |
//[Test] | |
public virtual void TestLastMessageAcked(string testQueueRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.Send(session.CreateTextMessage("Hello")); | |
producer.Send(session.CreateTextMessage("Hello2")); | |
producer.Send(session.CreateTextMessage("Hello3")); | |
// Consume the message... | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(msg); | |
msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(msg); | |
msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(msg); | |
msg.Acknowledge(); | |
// Reset the session. | |
session.Close(); | |
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
// Attempt to Consume the message... | |
consumer = session.CreateConsumer(destination); | |
msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNull(msg); | |
session.Close(); | |
} | |
} | |
//[Test] | |
public virtual void TestUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.Send(session.CreateTextMessage("Hello")); | |
// Consume the message... | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNotNull(msg); | |
// Don't ack the message. | |
// Reset the session. This should cause the unacknowledged message to be re-delivered. | |
session.Close(); | |
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
// Attempt to Consume the message... | |
consumer = session.CreateConsumer(destination); | |
msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
Assert.IsNotNull(msg); | |
msg.Acknowledge(); | |
session.Close(); | |
} | |
} | |
//[Test] | |
public virtual void TestAsyncAckedMessageAreConsumed(string testQueueRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.Send(session.CreateTextMessage("Hello")); | |
// Consume the message... | |
IMessageConsumer consumer = session.CreateConsumer(destination); | |
consumer.Listener += new MessageListener(OnMessage); | |
Thread.Sleep(5000); | |
// Reset the session. | |
session.Close(); | |
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
// Attempt to Consume the message... | |
consumer = session.CreateConsumer(destination); | |
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
Assert.IsNull(msg); | |
session.Close(); | |
} | |
} | |
//[Test] | |
public virtual void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef) | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
// don't aknowledge message on onMessage() call | |
dontAck = true; | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
IMessageProducer producer = session.CreateProducer(destination); | |
producer.Send(session.CreateTextMessage("Hello")); | |
// Consume the message... | |
using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
{ | |
consumer.Listener += new MessageListener(OnMessage); | |
// Don't ack the message. | |
} | |
// Reset the session. This should cause the Unacked message to be | |
// redelivered. | |
session.Close(); | |
Thread.Sleep(5000); | |
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
// Attempt to Consume the message... | |
using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
{ | |
IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
Assert.IsNotNull(msg); | |
msg.Acknowledge(); | |
} | |
session.Close(); | |
} | |
} | |
//[Test] | |
public virtual void TestAddRemoveAsnycMessageListener() | |
{ | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
ITemporaryTopic topic = session.CreateTemporaryTopic(); | |
IMessageConsumer consumer = session.CreateConsumer(topic); | |
consumer.Listener += OnMessage; | |
consumer.Listener -= OnMessage; | |
consumer.Listener += OnMessage; | |
consumer.Close(); | |
} | |
} | |
public void OnMessage(IMessage message) | |
{ | |
Assert.IsNotNull(message); | |
if(!dontAck) | |
{ | |
try | |
{ | |
message.Acknowledge(); | |
} | |
catch(Exception) | |
{ | |
} | |
} | |
} | |
//[Test] | |
public virtual void TestReceiveNoWait( | |
//[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
// AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
AcknowledgementMode ackMode, | |
//[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)] | |
MsgDeliveryMode deliveryMode) | |
{ | |
const int RETRIES = 20; | |
using(IConnection connection = CreateConnection()) | |
{ | |
connection.Start(); | |
using(ISession session = connection.CreateSession(ackMode)) | |
{ | |
IDestination destination = session.CreateTemporaryQueue(); | |
using(IMessageProducer producer = session.CreateProducer(destination)) | |
{ | |
producer.DeliveryMode = deliveryMode; | |
ITextMessage message = session.CreateTextMessage("TEST"); | |
producer.Send(message); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
} | |
using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
{ | |
IMessage message = null; | |
for(int i = 0; i < RETRIES && message == null; ++i) | |
{ | |
message = consumer.ReceiveNoWait(); | |
Thread.Sleep(100); | |
} | |
Assert.IsNotNull(message); | |
message.Acknowledge(); | |
if(AcknowledgementMode.Transactional == ackMode) | |
{ | |
session.Commit(); | |
} | |
} | |
} | |
} | |
} | |
#endif | |
} | |
} |