| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using Apache.NMS; |
| using Apache.NMS.Util; |
| using NUnit.Framework; |
| using NUnit.Framework.Extensions; |
| |
| namespace Apache.NMS.Test |
| { |
| [TestFixture] |
| public class ConsumerTest : NMSTestSupport |
| { |
| protected static string TEST_CLIENT_ID = "TestConsumerClientId"; |
| protected const int COUNT = 25; |
| protected const string VALUE_NAME = "value"; |
| |
| private bool dontAck; |
| |
| // The .NET CF does not have the ability to interrupt threads, so this test is impossible. |
| #if !NETCF |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge)] |
| [Row(AcknowledgementMode.ClientAcknowledge)] |
| [Row(AcknowledgementMode.DupsOkAcknowledge)] |
| [Row(AcknowledgementMode.Transactional)] |
| public void TestNoTimeoutConsumer(AcknowledgementMode ackMode) |
| { |
| // Launch a thread to perform IMessageConsumer.Receive(). |
| // If it doesn't fail in less than three seconds, no exception was thrown. |
| Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); |
| using(this.timeoutConsumer = session.CreateConsumer(queue)) |
| { |
| receiveThread.Start(); |
| if(receiveThread.Join(3000)) |
| { |
| Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); |
| } |
| else |
| { |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. |
| receiveThread.Interrupt(); |
| } |
| } |
| } |
| } |
| } |
| |
| protected IMessageConsumer timeoutConsumer; |
| |
| protected void TimeoutConsumerThreadProc() |
| { |
| try |
| { |
| timeoutConsumer.Receive(); |
| } |
| catch(ArgumentOutOfRangeException e) |
| { |
| // The test failed. We will know because the timeout will expire inside TestNoTimeoutConsumer(). |
| Assert.Fail("Test failed with exception: " + e.Message); |
| } |
| catch(ThreadInterruptedException) |
| { |
| // The test succeeded! We were still blocked when we were interrupted. |
| } |
| catch(Exception e) |
| { |
| // Some other exception occurred. |
| Assert.Fail("Test failed with exception: " + e.Message); |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge)] |
| [Row(AcknowledgementMode.ClientAcknowledge)] |
| [Row(AcknowledgementMode.DupsOkAcknowledge)] |
| [Row(AcknowledgementMode.Transactional)] |
| public void TestSyncReceiveConsumerClose(AcknowledgementMode ackMode) |
| { |
| // Launch a thread to perform IMessageConsumer.Receive(). |
| // If it doesn't fail in less than three seconds, no exception was thrown. |
| Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc)); |
| using (IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| using (ISession session = connection.CreateSession(ackMode)) |
| { |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); |
| using (this.timeoutConsumer = session.CreateConsumer(queue)) |
| { |
| receiveThread.Start(); |
| if (receiveThread.Join(3000)) |
| { |
| Assert.Fail("IMessageConsumer.Receive() returned without blocking. Test failed."); |
| } |
| else |
| { |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. |
| this.timeoutConsumer.Close(); |
| receiveThread.Join(10000); |
| if (receiveThread.IsAlive) |
| { |
| // Kill the thread - otherwise it'll sit in Receive() until a message arrives. |
| receiveThread.Interrupt(); |
| Assert.Fail("IMessageConsumer.Receive() thread is still alive, Close should have killed it."); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| internal class ThreadArg |
| { |
| internal IConnection connection = null; |
| internal ISession session = null; |
| internal IDestination destination = null; |
| } |
| |
| protected void DelayedProducerThreadProc(Object arg) |
| { |
| try |
| { |
| ThreadArg args = arg as ThreadArg; |
| |
| using(ISession session = args.connection.CreateSession()) |
| { |
| using(IMessageProducer producer = session.CreateProducer(args.destination)) |
| { |
| // Give the consumer time to enter the receive. |
| Thread.Sleep(5000); |
| |
| producer.Send(args.session.CreateTextMessage("Hello World")); |
| } |
| } |
| } |
| catch(Exception e) |
| { |
| // Some other exception occurred. |
| Assert.Fail("Test failed with exception: " + e.Message); |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge)] |
| [Row(AcknowledgementMode.ClientAcknowledge)] |
| [Row(AcknowledgementMode.DupsOkAcknowledge)] |
| [Row(AcknowledgementMode.Transactional)] |
| public void TestDoChangeSentMessage(AcknowledgementMode ackMode) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| IMessageProducer producer = session.CreateProducer(queue); |
| ITextMessage message = session.CreateTextMessage(); |
| |
| string prefix = "ConsumerTest - TestDoChangeSentMessage: "; |
| |
| for(int i = 0; i < COUNT; i++) |
| { |
| message.Properties[VALUE_NAME] = i; |
| message.Text = prefix + Convert.ToString(i); |
| |
| producer.Send(message); |
| |
| message.ClearBody(); |
| message.ClearProperties(); |
| } |
| |
| if(ackMode == AcknowledgementMode.Transactional) |
| { |
| session.Commit(); |
| } |
| |
| for(int i = 0; i < COUNT; i++) |
| { |
| ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; |
| Assert.AreEqual(msg.Text, prefix + Convert.ToString(i)); |
| Assert.AreEqual(msg.Properties[VALUE_NAME], i); |
| } |
| |
| if(ackMode == AcknowledgementMode.Transactional) |
| { |
| session.Commit(); |
| } |
| |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(AcknowledgementMode.AutoAcknowledge)] |
| [Row(AcknowledgementMode.ClientAcknowledge)] |
| [Row(AcknowledgementMode.DupsOkAcknowledge)] |
| [Row(AcknowledgementMode.Transactional)] |
| public void TestConsumerReceiveBeforeMessageDispatched(AcknowledgementMode ackMode) |
| { |
| // Launch a thread to perform a delayed send. |
| Thread sendThread = new Thread(DelayedProducerThreadProc); |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| using(ISession session = connection.CreateSession(ackMode)) |
| { |
| ITemporaryQueue queue = session.CreateTemporaryQueue(); |
| using(IMessageConsumer consumer = session.CreateConsumer(queue)) |
| { |
| ThreadArg arg = new ThreadArg(); |
| |
| arg.connection = connection; |
| arg.session = session; |
| arg.destination = queue; |
| |
| sendThread.Start(arg); |
| IMessage message = consumer.Receive(TimeSpan.FromMinutes(1)); |
| Assert.IsNotNull(message); |
| } |
| } |
| } |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.Queue)] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.Topic)] |
| public void TestDontStart(MsgDeliveryMode deliveryMode, DestinationType destinationType ) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| ISession session = connection.CreateSession(); |
| IDestination destination = CreateDestination(session, destinationType); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| SendMessages(session, destination, deliveryMode, 1); |
| |
| // Make sure no messages were delivered. |
| Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000))); |
| } |
| } |
| |
| [RowTest] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.Queue)] |
| [Row(MsgDeliveryMode.Persistent, DestinationType.Queue)] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.Topic)] |
| [Row(MsgDeliveryMode.Persistent, DestinationType.Topic)] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.TemporaryQueue)] |
| [Row(MsgDeliveryMode.Persistent, DestinationType.TemporaryQueue)] |
| [Row(MsgDeliveryMode.NonPersistent, DestinationType.TemporaryTopic)] |
| [Row(MsgDeliveryMode.Persistent, DestinationType.TemporaryTopic)] |
| public void TestSendReceiveTransacted(MsgDeliveryMode deliveryMode, DestinationType destinationType) |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| // Send a message to the broker. |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = CreateDestination(session, destinationType); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| producer.DeliveryMode = deliveryMode; |
| producer.Send(session.CreateTextMessage("Test")); |
| |
| // Message should not be delivered until commit. |
| Thread.Sleep(1000); |
| Assert.IsNull(consumer.ReceiveNoWait()); |
| session.Commit(); |
| |
| // Make sure only 1 message was delivered. |
| IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(message); |
| Assert.IsFalse(message.NMSRedelivered); |
| Assert.IsNull(consumer.ReceiveNoWait()); |
| |
| // Message should be redelivered is rollback is used. |
| session.Rollback(); |
| |
| // Make sure only 1 message was delivered. |
| message = consumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.IsNotNull(message); |
| Assert.IsTrue(message.NMSRedelivered); |
| Assert.IsNull(consumer.ReceiveNoWait()); |
| |
| // If we commit now, the message should not be redelivered. |
| session.Commit(); |
| Thread.Sleep(1000); |
| Assert.IsNull(consumer.ReceiveNoWait()); |
| } |
| } |
| |
| [Test] |
| public void TestAckedMessageAreConsumed() |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IQueue queue = session.GetQueue(Guid.NewGuid().ToString()); |
| IMessageProducer producer = session.CreateProducer(queue); |
| producer.Send(session.CreateTextMessage("Hello")); |
| |
| // Consume the message... |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(msg); |
| msg.Acknowledge(); |
| |
| // Reset the session. |
| session.Close(); |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| |
| // Attempt to Consume the message... |
| consumer = session.CreateConsumer(queue); |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNull(msg); |
| |
| session.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestLastMessageAcked() |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IQueue queue = session.GetQueue(Guid.NewGuid().ToString()); |
| IMessageProducer producer = session.CreateProducer(queue); |
| producer.Send(session.CreateTextMessage("Hello")); |
| producer.Send(session.CreateTextMessage("Hello2")); |
| producer.Send(session.CreateTextMessage("Hello3")); |
| |
| // Consume the message... |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(msg); |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(msg); |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(msg); |
| msg.Acknowledge(); |
| |
| // Reset the session. |
| session.Close(); |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| |
| // Attempt to Consume the message... |
| consumer = session.CreateConsumer(queue); |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNull(msg); |
| |
| session.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestUnAckedMessageAreNotConsumedOnSessionClose() |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IQueue queue = session.GetQueue(Guid.NewGuid().ToString()); |
| IMessageProducer producer = session.CreateProducer(queue); |
| producer.Send(session.CreateTextMessage("Hello")); |
| |
| // Consume the message... |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(msg); |
| // Don't ack the message. |
| |
| // Reset the session. This should cause the unacknowledged message to be re-delivered. |
| session.Close(); |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| |
| // Attempt to Consume the message... |
| consumer = session.CreateConsumer(queue); |
| msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.IsNotNull(msg); |
| msg.Acknowledge(); |
| |
| session.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestAsyncAckedMessageAreConsumed() |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IQueue queue = session.GetQueue(Guid.NewGuid().ToString()); |
| IMessageProducer producer = session.CreateProducer(queue); |
| producer.Send(session.CreateTextMessage("Hello")); |
| |
| // Consume the message... |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| consumer.Listener += new MessageListener(OnMessage); |
| |
| Thread.Sleep(5000); |
| |
| // Reset the session. |
| session.Close(); |
| |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| |
| // Attempt to Consume the message... |
| consumer = session.CreateConsumer(queue); |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNull(msg); |
| |
| session.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose() |
| { |
| using(IConnection connection = CreateConnection(TEST_CLIENT_ID)) |
| { |
| connection.Start(); |
| // don't aknowledge message on onMessage() call |
| dontAck = true; |
| ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IQueue queue = session.GetQueue("Guid.NewGuid().ToString()"); |
| IMessageProducer producer = session.CreateProducer(queue); |
| producer.Send(session.CreateTextMessage("Hello")); |
| |
| // Consume the message... |
| IMessageConsumer consumer = session.CreateConsumer(queue); |
| consumer.Listener += new MessageListener(OnMessage); |
| // Don't ack the message. |
| |
| // Reset the session. This should cause the Unacked message to be |
| // redelivered. |
| session.Close(); |
| |
| Thread.Sleep(5000); |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| // Attempt to Consume the message... |
| consumer = session.CreateConsumer(queue); |
| IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.IsNotNull(msg); |
| msg.Acknowledge(); |
| |
| session.Close(); |
| } |
| } |
| |
| public void OnMessage(IMessage message) |
| { |
| Assert.IsNotNull(message); |
| |
| if(!dontAck) |
| { |
| try |
| { |
| message.Acknowledge(); |
| } |
| catch(Exception) |
| { |
| } |
| } |
| } |
| |
| #endif |
| |
| } |
| } |