| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using Apache.NMS.Test; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.Util; |
| using NUnit.Framework; |
| |
| namespace Apache.NMS.ActiveMQ.Test |
| { |
| [TestFixture] |
| public class AMQRedeliveryPolicyTest : NMSTestSupport |
| { |
| private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest"; |
| private const string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; |
| |
| [Test] |
| public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 500; |
| policy.BackOffMultiplier = 2; |
| policy.UseExponentialBackOff = true; |
| policy.UseCollisionAvoidance = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // No delay on first Rollback.. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNotNull(m); |
| session.Rollback(); |
| |
| // Show subsequent re-delivery delay is incrementing. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // Show re-delivery delay is incrementing exponentially |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| } |
| } |
| |
| [Test] |
| public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 500; |
| policy.UseExponentialBackOff = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // No delay on first Rollback.. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNotNull(m); |
| session.Rollback(); |
| |
| // Show subsequent re-delivery delay is incrementing. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // The message gets redelivered after 500 ms every time since |
| // we are not using exponential backoff. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| } |
| } |
| |
| [Test] |
| public void TestDLQHandling() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| policy.MaximumRedeliveries = 2; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| session.DeleteDestination(new ActiveMQQueue("ActiveMQ.DLQ")); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| IMessageConsumer dlqConsumer = session.CreateConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); |
| |
| // Purge any messages already in the DLQ. |
| while(dlqConsumer.ReceiveNoWait() != null) |
| { |
| session.Commit(); |
| } |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // The last Rollback should cause the 1st message to get sent to the DLQ |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| |
| // We should be able to get the message off the DLQ now. |
| m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| [Test] |
| public void TestInfiniteMaximumNumberOfRedeliveries() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| // let's set the maximum redeliveries to no maximum (ie. infinite) |
| policy.MaximumRedeliveries = -1; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| //we should be able to get the 1st message redelivered until a session.Commit is called |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Commit(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| [Test] |
| public void TestZeroMaximumNumberOfRedeliveries() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| //let's set the maximum redeliveries to 0 |
| policy.MaximumRedeliveries = 0; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| //the 1st message should not be redelivered since maximumRedeliveries is set to 0 |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| [Test] |
| public void TestURIForRedeliverPolicyHandling() |
| { |
| string uri1 = "activemq:tcp://${activemqhost}:61616" + |
| "?nms.RedeliveryPolicy.BackOffMultiplier=10" + |
| "&nms.RedeliveryPolicy.InitialRedeliveryDelay=2000" + |
| "&nms.RedeliveryPolicy.UseExponentialBackOff=true" + |
| "&nms.RedeliveryPolicy.UseCollisionAvoidance=true" + |
| "&nms.RedeliveryPolicy.CollisionAvoidancePercent=20"; |
| |
| string uri2 = "activemq:tcp://${activemqhost}:61616" + |
| "?nms.RedeliveryPolicy.backOffMultiplier=50" + |
| "&nms.RedeliveryPolicy.initialRedeliveryDelay=4000" + |
| "&nms.RedeliveryPolicy.useExponentialBackOff=false" + |
| "&nms.RedeliveryPolicy.useCollisionAvoidance=false" + |
| "&nms.RedeliveryPolicy.collisionAvoidancePercent=10"; |
| |
| NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1)); |
| |
| Assert.IsNotNull(factory); |
| Assert.IsNotNull(factory.ConnectionFactory); |
| using(IConnection connection = factory.CreateConnection("", "")) |
| { |
| Assert.IsNotNull(connection); |
| |
| Connection amqConnection = connection as Connection; |
| |
| Assert.AreEqual(10, amqConnection.RedeliveryPolicy.BackOffMultiplier); |
| Assert.AreEqual(2000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay); |
| Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseExponentialBackOff); |
| Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseCollisionAvoidance); |
| Assert.AreEqual(20, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent); |
| } |
| |
| factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2)); |
| |
| Assert.IsNotNull(factory); |
| Assert.IsNotNull(factory.ConnectionFactory); |
| using(IConnection connection = factory.CreateConnection("", "")) |
| { |
| Assert.IsNotNull(connection); |
| |
| Connection amqConnection = connection as Connection; |
| Assert.AreEqual(50, amqConnection.RedeliveryPolicy.BackOffMultiplier); |
| Assert.AreEqual(4000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay); |
| Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseExponentialBackOff); |
| Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseCollisionAvoidance); |
| Assert.AreEqual(10, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent); |
| } |
| } |
| |
| [Test] |
| public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.MaximumRedeliveries = -1; |
| policy.InitialRedeliveryDelay = 500; |
| policy.UseExponentialBackOff = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| ITextMessage textMessage = session.CreateTextMessage("1st"); |
| textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); |
| producer.Send(textMessage); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // No delay on first Rollback.. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNotNull(m); |
| session.Rollback(); |
| |
| // Show subsequent re-delivery delay is incrementing. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // The message gets redelivered after 500 ms every time since |
| // we are not using exponential backoff. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNull(m); |
| } |
| } |
| |
| [Test] |
| public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.MaximumRedeliveries = -1; |
| policy.InitialRedeliveryDelay = 500; |
| policy.UseExponentialBackOff = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| CallbackClass cc = new CallbackClass(session); |
| consumer.Listener += new MessageListener(cc.consumer_Listener); |
| |
| // Send the messages |
| ITextMessage textMessage = session.CreateTextMessage("1st"); |
| textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); |
| producer.Send(textMessage, MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0)); |
| session.Commit(); |
| |
| // sends normal message, then immediate retry, then retry after 500 ms, then expire. |
| Thread.Sleep(2000); |
| Assert.AreEqual(3, cc.numReceived); |
| } |
| } |
| |
| class CallbackClass |
| { |
| private ISession session; |
| public int numReceived = 0; |
| |
| public CallbackClass(ISession session) |
| { |
| this.session = session; |
| } |
| |
| public void consumer_Listener(IMessage message) |
| { |
| numReceived++; |
| ITextMessage m = message as ITextMessage; |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| } |
| } |
| |
| [Test] |
| public void TestRepeatedRedeliveryReceiveNoCommit() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| connection.Start(); |
| |
| ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); |
| IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryReceiveNoCommit"); |
| IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ"); |
| connection.DeleteDestination(destination); |
| connection.DeleteDestination(dlq); |
| IMessageProducer producer = dlqSession.CreateProducer(destination); |
| producer.Send(dlqSession.CreateTextMessage("1st")); |
| IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq); |
| |
| const int maxRedeliveries = 4; |
| for (int i = 0; i <= maxRedeliveries + 1; i++) |
| { |
| using(Connection loopConnection = (Connection) CreateConnection()) |
| { |
| // Receive a message with the JMS API |
| IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 0; |
| policy.UseExponentialBackOff = false; |
| policy.MaximumRedeliveries = maxRedeliveries; |
| |
| loopConnection.Start(); |
| ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| ActiveMQTextMessage m = consumer.Receive(TimeSpan.FromMilliseconds(4000)) as ActiveMQTextMessage; |
| if (m != null) |
| { |
| Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter); |
| } |
| |
| if (i <= maxRedeliveries) |
| { |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| Assert.AreEqual(i, m.RedeliveryCounter); |
| } |
| else |
| { |
| Assert.IsNull(m, "null on exceeding redelivery count"); |
| } |
| } |
| } |
| |
| // We should be able to get the message off the DLQ now. |
| ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; |
| Assert.IsNotNull(msg, "Got message from DLQ"); |
| Assert.AreEqual("1st", msg.Text); |
| String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); |
| if (cause != null) |
| { |
| Tracer.DebugFormat("Rollback Cause = {0}", cause); |
| Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref"); |
| } |
| else |
| { |
| Tracer.Debug("DLQ'd message has no cause tag."); |
| } |
| } |
| } |
| |
| [Test] |
| public void TestRepeatedRedeliveryOnMessageNoCommit() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| connection.Start(); |
| ISession dlqSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); |
| IDestination destination = dlqSession.GetQueue("TestRepeatedRedeliveryOnMessageNoCommit"); |
| IDestination dlq = dlqSession.GetQueue("ActiveMQ.DLQ"); |
| connection.DeleteDestination(destination); |
| connection.DeleteDestination(dlq); |
| IMessageProducer producer = dlqSession.CreateProducer(destination); |
| IMessageConsumer dlqConsumer = dlqSession.CreateConsumer(dlq); |
| |
| producer.Send(dlqSession.CreateTextMessage("1st")); |
| |
| const int maxRedeliveries = 4; |
| Atomic<int> receivedCount = new Atomic<int>(0); |
| |
| for (int i = 0; i <= maxRedeliveries + 1; i++) |
| { |
| using(Connection loopConnection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = loopConnection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 0; |
| policy.UseExponentialBackOff = false; |
| policy.MaximumRedeliveries = maxRedeliveries; |
| |
| loopConnection.Start(); |
| ISession session = loopConnection.CreateSession(AcknowledgementMode.Transactional); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| OnMessageNoCommitCallback callback = new OnMessageNoCommitCallback(receivedCount); |
| consumer.Listener += new MessageListener(callback.consumer_Listener); |
| |
| if (i <= maxRedeliveries) |
| { |
| Assert.IsTrue(callback.Await(), "listener should have dispatched a message"); |
| } |
| else |
| { |
| // final redlivery gets poisoned before dispatch |
| Assert.IsFalse(callback.Await(), "listener should not have dispatched after max redliveries"); |
| } |
| } |
| } |
| |
| // We should be able to get the message off the DLQ now. |
| ITextMessage msg = dlqConsumer.Receive(TimeSpan.FromMilliseconds(2000)) as ITextMessage; |
| Assert.IsNotNull(msg, "Got message from DLQ"); |
| Assert.AreEqual("1st", msg.Text); |
| String cause = msg.Properties.GetString(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); |
| if (cause != null) |
| { |
| Tracer.DebugFormat("Rollback Cause = {0}", cause); |
| Assert.IsTrue(cause.Contains("RedeliveryPolicy"), "cause exception has no policy ref"); |
| } |
| else |
| { |
| Tracer.Debug("DLQ'd message has no cause tag."); |
| } |
| } |
| } |
| |
| class OnMessageNoCommitCallback |
| { |
| private Atomic<int> receivedCount; |
| private CountDownLatch done = new CountDownLatch(1); |
| |
| public OnMessageNoCommitCallback(Atomic<int> receivedCount) |
| { |
| this.receivedCount = receivedCount; |
| } |
| |
| public bool Await() |
| { |
| return done.await(TimeSpan.FromMilliseconds(5000)); |
| } |
| |
| public void consumer_Listener(IMessage message) |
| { |
| ActiveMQTextMessage m = message as ActiveMQTextMessage; |
| Tracer.DebugFormat("Received Message: {0} delivery count = {1}", m.Text, m.RedeliveryCounter); |
| Assert.AreEqual("1st", m.Text); |
| Assert.AreEqual(receivedCount.Value, m.RedeliveryCounter); |
| receivedCount.GetAndSet(receivedCount.Value + 1); |
| done.countDown(); |
| } |
| } |
| } |
| } |