| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| using System; |
| using Apache.NMS.ActiveMQ; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.Test; |
| using Apache.NMS.Util; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.ActiveMQ.Test |
| { |
| [TestFixture] |
| public class NMSSessionRecoverTest : NMSTestSupport |
| { |
| private IConnection connection; |
| private IDestination destination; |
| private CountDownLatch doneCountDownLatch; |
| private ISession session; |
| private int counter; |
| private String errorMessage; |
| |
| [SetUp] |
| public override void SetUp() |
| { |
| base.SetUp(); |
| |
| counter = 0; |
| errorMessage = null; |
| doneCountDownLatch = new CountDownLatch(1); |
| connection = CreateConnection(); |
| } |
| |
| [TearDown] |
| public override void TearDown() |
| { |
| base.TearDown(); |
| |
| if (connection != null) |
| { |
| connection.Close(); |
| } |
| } |
| |
| [Test] |
| public void TestQueueSynchRecover() |
| { |
| destination = new ActiveMQQueue("TEST.Queue-" + DateTime.Now.Ticks); |
| DoTestSynchRecover(); |
| } |
| |
| [Test] |
| public void TestQueueAsynchRecover() |
| { |
| destination = new ActiveMQQueue("TEST.Queue-" + DateTime.Now.Ticks); |
| DoTestAsynchRecover(); |
| } |
| |
| [Test] |
| public void TestTopicSynchRecover() |
| { |
| destination = new ActiveMQTopic("TEST.Topic-" + DateTime.Now.Ticks); |
| DoTestSynchRecover(); |
| } |
| |
| [Test] |
| public void TestTopicAsynchRecover() |
| { |
| destination = new ActiveMQTopic("TEST.Topic-" + DateTime.Now.Ticks); |
| DoTestAsynchRecover(); |
| } |
| |
| [Test] |
| public void TestQueueAsynchRecoverWithAutoAck() |
| { |
| destination = new ActiveMQQueue("TEST.Queue-" + DateTime.Now.Ticks); |
| DoTestAsynchRecoverWithAutoAck(); |
| } |
| |
| [Test] |
| public void TestTopicAsynchRecoverWithAutoAck() |
| { |
| destination = new ActiveMQTopic("TEST.Topic-" + DateTime.Now.Ticks); |
| DoTestAsynchRecoverWithAutoAck(); |
| } |
| |
| public void DoTestSynchRecover() |
| { |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| connection.Start(); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| producer.DeliveryMode = MsgDeliveryMode.NonPersistent; |
| producer.Send(session.CreateTextMessage("First")); |
| producer.Send(session.CreateTextMessage("Second")); |
| |
| ITextMessage message = consumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; |
| Assert.AreEqual("First", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| message.Acknowledge(); |
| |
| message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(2000)); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| |
| session.Recover(); |
| |
| message = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(3000)); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsTrue(message.NMSRedelivered); |
| |
| message.Acknowledge(); |
| } |
| |
| private void OnTestAsynchRecoverMessage(IMessage msg) |
| { |
| counter++; |
| try |
| { |
| ITextMessage message = msg as ITextMessage; |
| switch (counter) |
| { |
| case 1: |
| Tracer.Debug("Got first Message: " + message.Text); |
| Assert.AreEqual("First", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| message.Acknowledge(); |
| break; |
| case 2: |
| Tracer.Debug("Got Second Message: " + message.Text); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| session.Recover(); |
| break; |
| case 3: |
| Tracer.Debug("Got Third Message: " + message.Text); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsTrue(message.NMSRedelivered); |
| message.Acknowledge(); |
| doneCountDownLatch.countDown(); |
| break; |
| default: |
| errorMessage = "Got too many messages: " + counter; |
| Tracer.Debug(errorMessage); |
| doneCountDownLatch.countDown(); |
| break; |
| } |
| } |
| catch (Exception e) |
| { |
| errorMessage = "Got exception: " + e.Message; |
| Tracer.Warn("Exception on Message Receive: " + e.Message); |
| doneCountDownLatch.countDown(); |
| } |
| } |
| |
| public void DoTestAsynchRecover() |
| { |
| session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| producer.DeliveryMode = MsgDeliveryMode.NonPersistent; |
| producer.Send(session.CreateTextMessage("First")); |
| producer.Send(session.CreateTextMessage("Second")); |
| |
| consumer.Listener += OnTestAsynchRecoverMessage; |
| connection.Start(); |
| |
| if (doneCountDownLatch.await(TimeSpan.FromSeconds(10))) |
| { |
| if (!String.IsNullOrEmpty(errorMessage)) |
| { |
| Assert.Fail(errorMessage); |
| } |
| } |
| else |
| { |
| Assert.Fail("Timeout waiting for async message delivery to complete."); |
| } |
| } |
| |
| private void OnTestAsynchRecoverWithAutoAck(IMessage msg) |
| { |
| counter++; |
| try |
| { |
| ITextMessage message = msg as ITextMessage; |
| switch (counter) |
| { |
| case 1: |
| Tracer.Debug("Got first Message: " + message.Text); |
| Assert.AreEqual("First", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| break; |
| case 2: |
| // This should rollback the delivery of this message.. |
| // and re-deliver. |
| Tracer.Debug("Got Second Message: " + message.Text); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsFalse(message.NMSRedelivered); |
| session.Recover(); |
| break; |
| case 3: |
| Tracer.Debug("Got Third Message: " + message.Text); |
| Assert.AreEqual("Second", message.Text); |
| Assert.IsTrue(message.NMSRedelivered); |
| doneCountDownLatch.countDown(); |
| break; |
| default: |
| errorMessage = "Got too many messages: " + counter; |
| Tracer.Debug(errorMessage); |
| doneCountDownLatch.countDown(); |
| break; |
| } |
| } |
| catch (Exception e) |
| { |
| errorMessage = "Got exception: " + e.Message; |
| Tracer.Warn("Exception on Message Receive: " + e.Message); |
| doneCountDownLatch.countDown(); |
| } |
| } |
| |
| public void DoTestAsynchRecoverWithAutoAck() |
| { |
| session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| producer.DeliveryMode = MsgDeliveryMode.NonPersistent; |
| producer.Send(session.CreateTextMessage("First")); |
| producer.Send(session.CreateTextMessage("Second")); |
| |
| consumer.Listener += OnTestAsynchRecoverWithAutoAck; |
| connection.Start(); |
| |
| if (doneCountDownLatch.await(TimeSpan.FromSeconds(10))) |
| { |
| Tracer.Info("Finished waiting for async message delivery to complete."); |
| if (!String.IsNullOrEmpty(errorMessage)) |
| { |
| Assert.Fail(errorMessage); |
| } |
| } |
| else |
| { |
| Tracer.Warn("Timeout waiting for async message delivery to complete."); |
| Assert.Fail("Timeout waiting for async message delivery to complete."); |
| } |
| } |
| } |
| } |
| |