| /* | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| using System; | |
| using System.Threading; | |
| using NUnit.Framework; | |
| namespace Apache.NMS.Test | |
| { | |
| //[TestFixture] | |
| public class ConsumerTest : NMSTest | |
| { | |
| protected const int COUNT = 25; | |
| protected const string VALUE_NAME = "value"; | |
| private bool dontAck; | |
| protected ConsumerTest(NMSTestSupport testSupport) | |
| : base(testSupport) | |
| { | |
| } | |
| // The .NET CF does not have the ability to interrupt threads, so this test is impossible. | |
| #if !NETCF | |
| //[Test] | |
| public virtual void TestNoTimeoutConsumer( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
| AcknowledgementMode ackMode) | |
| { | |
| // Launch a thread to perform IMessageConsumer.Receive(). | |
| // If it doesn't fail in less than three seconds, no exception was thrown. | |
| Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
| using(this.timeoutConsumer = session.CreateConsumer(queue)) | |
| { | |
| receiveThread.Start(); | |
| if(receiveThread.Join(3000)) | |
| { | |
| Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); | |
| } | |
| else | |
| { | |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
| receiveThread.Interrupt(); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| protected IMessageConsumer timeoutConsumer; | |
| protected void TimeoutConsumerThreadProc() | |
| { | |
| try | |
| { | |
| timeoutConsumer.Receive(); | |
| } | |
| catch(ArgumentOutOfRangeException e) | |
| { | |
| // The test failed. We will know because the timeout will expire inside TestNoTimeoutConsumer(). | |
| Assert.Fail("Test failed with exception: " + e.Message); | |
| } | |
| catch(ThreadInterruptedException) | |
| { | |
| // The test succeeded! We were still blocked when we were interrupted. | |
| } | |
| catch(Exception e) | |
| { | |
| // Some other exception occurred. | |
| Assert.Fail("Test failed with exception: " + e.Message); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestSyncReceiveConsumerClose( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
| AcknowledgementMode ackMode) | |
| { | |
| // Launch a thread to perform IMessageConsumer.Receive(). | |
| // If it doesn't fail in less than three seconds, no exception was thrown. | |
| Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); | |
| using (IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| using (ISession session = connection.CreateSession(ackMode)) | |
| { | |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
| using (this.timeoutConsumer = session.CreateConsumer(queue)) | |
| { | |
| receiveThread.Start(); | |
| if (receiveThread.Join(3000)) | |
| { | |
| Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); | |
| } | |
| else | |
| { | |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
| this.timeoutConsumer.Close(); | |
| receiveThread.Join(10000); | |
| if (receiveThread.IsAlive) | |
| { | |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. | |
| receiveThread.Interrupt(); | |
| Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it."); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |
| internal class ThreadArg | |
| { | |
| internal IConnection connection = null; | |
| internal ISession session = null; | |
| internal IDestination destination = null; | |
| } | |
| protected void DelayedProducerThreadProc(Object arg) | |
| { | |
| try | |
| { | |
| ThreadArg args = arg as ThreadArg; | |
| using(ISession session = args.connection.CreateSession()) | |
| { | |
| using(IMessageProducer producer = session.CreateProducer(args.destination)) | |
| { | |
| // Give the consumer time to enter the receive. | |
| Thread.Sleep(5000); | |
| producer.Send(args.session.CreateTextMessage("Hello World")); | |
| } | |
| } | |
| } | |
| catch(Exception e) | |
| { | |
| // Some other exception occurred. | |
| Assert.Fail("Test failed with exception: " + e.Message); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestDoChangeSentMessage( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
| AcknowledgementMode ackMode, | |
| //[Values(true, false)] | |
| bool doClear) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
| using(IMessageConsumer consumer = session.CreateConsumer(queue)) | |
| { | |
| IMessageProducer producer = session.CreateProducer(queue); | |
| ITextMessage message = session.CreateTextMessage(); | |
| string prefix = "ConsumerTest - TestDoChangeSentMessage: "; | |
| for(int i = 0; i < COUNT; i++) | |
| { | |
| message.Properties[VALUE_NAME] = i; | |
| message.Text = prefix + Convert.ToString(i); | |
| producer.Send(message); | |
| if(doClear) | |
| { | |
| message.ClearBody(); | |
| message.ClearProperties(); | |
| } | |
| } | |
| if(ackMode == AcknowledgementMode.Transactional) | |
| { | |
| session.Commit(); | |
| } | |
| for(int i = 0; i < COUNT; i++) | |
| { | |
| ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; | |
| Assert.AreEqual(msg.Text, prefix + Convert.ToString(i)); | |
| Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i); | |
| } | |
| if(ackMode == AcknowledgementMode.Transactional) | |
| { | |
| session.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestConsumerReceiveBeforeMessageDispatched( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
| AcknowledgementMode ackMode) | |
| { | |
| // Launch a thread to perform a delayed send. | |
| Thread sendThread = new Thread(DelayedProducerThreadProc); | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); | |
| using(IMessageConsumer consumer = session.CreateConsumer(queue)) | |
| { | |
| ThreadArg arg = new ThreadArg(); | |
| arg.connection = connection; | |
| arg.session = session; | |
| arg.destination = queue; | |
| sendThread.Start(arg); | |
| IMessage message = consumer.Receive(TimeSpan.FromMinutes(1)); | |
| Assert.IsNotNull(message); | |
| } | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestDontStart( | |
| //[Values(MsgDeliveryMode.NonPersistent)] | |
| MsgDeliveryMode deliveryMode, | |
| //[Values(DestinationType.Queue, DestinationType.Topic)] | |
| DestinationType destinationType, string testDestinationRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| ISession session = connection.CreateSession(); | |
| IDestination destination = GetClearDestination(session, destinationType, testDestinationRef); | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| // Send the messages | |
| SendMessages(session, destination, deliveryMode, 1); | |
| // Make sure no messages were delivered. | |
| Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000))); | |
| } | |
| } | |
| //[Test] | |
| public void TestSendReceiveTransacted( | |
| //[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)] | |
| MsgDeliveryMode deliveryMode, | |
| //[Values(DestinationType.Queue, DestinationType.Topic, DestinationType.TemporaryQueue, DestinationType.TemporaryTopic)] | |
| DestinationType destinationType, string testDestinationRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| // Send a message to the broker. | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); | |
| IDestination destination = GetClearDestination(session, destinationType, testDestinationRef); | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.DeliveryMode = deliveryMode; | |
| producer.Send(session.CreateTextMessage("Test")); | |
| // Message should not be delivered until commit. | |
| Thread.Sleep(1000); | |
| Assert.IsNull(consumer.ReceiveNoWait()); | |
| session.Commit(); | |
| // Make sure only 1 message was delivered. | |
| IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(message); | |
| Assert.IsFalse(message.NMSRedelivered); | |
| Assert.IsNull(consumer.ReceiveNoWait()); | |
| // Message should be redelivered is rollback is used. | |
| session.Rollback(); | |
| // Make sure only 1 message was delivered. | |
| message = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
| Assert.IsNotNull(message); | |
| Assert.IsTrue(message.NMSRedelivered); | |
| Assert.IsNull(consumer.ReceiveNoWait()); | |
| // If we commit now, the message should not be redelivered. | |
| session.Commit(); | |
| Thread.Sleep(1000); | |
| Assert.IsNull(consumer.ReceiveNoWait()); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestAckedMessageAreConsumed(string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.Send(session.CreateTextMessage("Hello")); | |
| // Consume the message... | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(msg); | |
| msg.Acknowledge(); | |
| // Reset the session. | |
| session.Close(); | |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| // Attempt to Consume the message... | |
| consumer = session.CreateConsumer(destination); | |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNull(msg); | |
| session.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestLastMessageAcked(string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.Send(session.CreateTextMessage("Hello")); | |
| producer.Send(session.CreateTextMessage("Hello2")); | |
| producer.Send(session.CreateTextMessage("Hello3")); | |
| // Consume the message... | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(msg); | |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(msg); | |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(msg); | |
| msg.Acknowledge(); | |
| // Reset the session. | |
| session.Close(); | |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| // Attempt to Consume the message... | |
| consumer = session.CreateConsumer(destination); | |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNull(msg); | |
| session.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.Send(session.CreateTextMessage("Hello")); | |
| // Consume the message... | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNotNull(msg); | |
| // Don't ack the message. | |
| // Reset the session. This should cause the unacknowledged message to be re-delivered. | |
| session.Close(); | |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| // Attempt to Consume the message... | |
| consumer = session.CreateConsumer(destination); | |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
| Assert.IsNotNull(msg); | |
| msg.Acknowledge(); | |
| session.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestAsyncAckedMessageAreConsumed(string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.Send(session.CreateTextMessage("Hello")); | |
| // Consume the message... | |
| IMessageConsumer consumer = session.CreateConsumer(destination); | |
| consumer.Listener += new MessageListener(OnMessage); | |
| Thread.Sleep(5000); | |
| // Reset the session. | |
| session.Close(); | |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| // Attempt to Consume the message... | |
| consumer = session.CreateConsumer(destination); | |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); | |
| Assert.IsNull(msg); | |
| session.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose(string testQueueRef) | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| // don't aknowledge message on onMessage() call | |
| dontAck = true; | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| IDestination destination = GetClearDestination(session, DestinationType.Queue, testQueueRef); | |
| IMessageProducer producer = session.CreateProducer(destination); | |
| producer.Send(session.CreateTextMessage("Hello")); | |
| // Consume the message... | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| { | |
| consumer.Listener += new MessageListener(OnMessage); | |
| // Don't ack the message. | |
| } | |
| // Reset the session. This should cause the Unacked message to be | |
| // redelivered. | |
| session.Close(); | |
| Thread.Sleep(5000); | |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| // Attempt to Consume the message... | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| { | |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); | |
| Assert.IsNotNull(msg); | |
| msg.Acknowledge(); | |
| } | |
| session.Close(); | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestAddRemoveAsnycMessageListener() | |
| { | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); | |
| ITemporaryTopic topic = session.CreateTemporaryTopic(); | |
| IMessageConsumer consumer = session.CreateConsumer(topic); | |
| consumer.Listener += OnMessage; | |
| consumer.Listener -= OnMessage; | |
| consumer.Listener += OnMessage; | |
| consumer.Close(); | |
| } | |
| } | |
| public void OnMessage(IMessage message) | |
| { | |
| Assert.IsNotNull(message); | |
| if(!dontAck) | |
| { | |
| try | |
| { | |
| message.Acknowledge(); | |
| } | |
| catch(Exception) | |
| { | |
| } | |
| } | |
| } | |
| //[Test] | |
| public virtual void TestReceiveNoWait( | |
| //[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, | |
| // AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)] | |
| AcknowledgementMode ackMode, | |
| //[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)] | |
| MsgDeliveryMode deliveryMode) | |
| { | |
| const int RETRIES = 20; | |
| using(IConnection connection = CreateConnection()) | |
| { | |
| connection.Start(); | |
| using(ISession session = connection.CreateSession(ackMode)) | |
| { | |
| IDestination destination = session.CreateTemporaryQueue(); | |
| using(IMessageProducer producer = session.CreateProducer(destination)) | |
| { | |
| producer.DeliveryMode = deliveryMode; | |
| ITextMessage message = session.CreateTextMessage("TEST"); | |
| producer.Send(message); | |
| if(AcknowledgementMode.Transactional == ackMode) | |
| { | |
| session.Commit(); | |
| } | |
| } | |
| using(IMessageConsumer consumer = session.CreateConsumer(destination)) | |
| { | |
| IMessage message = null; | |
| for(int i = 0; i < RETRIES && message == null; ++i) | |
| { | |
| message = consumer.ReceiveNoWait(); | |
| Thread.Sleep(100); | |
| } | |
| Assert.IsNotNull(message); | |
| message.Acknowledge(); | |
| if(AcknowledgementMode.Transactional == ackMode) | |
| { | |
| session.Commit(); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| #endif | |
| } | |
| } |