| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using System.Collections.Generic; |
| using Apache.NMS.Test; |
| using Apache.NMS.Policies; |
| using Apache.NMS.Stomp; |
| using Apache.NMS.Stomp.Commands; |
| using NUnit.Framework; |
| using NUnit.Framework.Extensions; |
| |
| namespace Apache.NMS.Stomp.Test |
| { |
| [TestFixture] |
| public class StompRedeliveryPolicyTest : NMSTestSupport |
| { |
| private const string DESTINATION_NAME = "RedeliveryPolicyTestDest"; |
| |
| [Test] |
| public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 500; |
| policy.BackOffMultiplier = 2; |
| policy.UseExponentialBackOff = true; |
| policy.UseCollisionAvoidance = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // No delay on first Rollback.. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNotNull(m); |
| session.Rollback(); |
| |
| // Show subsequent re-delivery delay is incrementing. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // Show re-delivery delay is incrementing exponentially |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| } |
| } |
| |
| [Test] |
| public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 500; |
| policy.UseExponentialBackOff = false; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| |
| IMessageProducer producer = session.CreateProducer(destination); |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // No delay on first Rollback.. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNotNull(m); |
| session.Rollback(); |
| |
| // Show subsequent re-delivery delay is incrementing. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // The message gets redelivered after 500 ms every time since |
| // we are not using exponential backoff. |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); |
| Assert.IsNull(m); |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| } |
| } |
| |
| //[Test] |
| public void TestDLQHandling() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| policy.MaximumRedeliveries = 2; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| IDestination dlq = session.GetQueue("ActiveMQ.DLQ"); |
| IMessageConsumer dlqConsumer = session.CreateConsumer(dlq); |
| |
| // Purge any messages already in the DLQ. |
| while(dlqConsumer.ReceiveNoWait() != null) |
| { |
| session.Commit(); |
| } |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| // The last Rollback should cause the 1st message to get sent to the DLQ |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| |
| // We should be able to get the message off the DLQ now. |
| m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| [Test] |
| public void TestInfiniteMaximumNumberOfRedeliveries() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| // let's set the maximum redeliveries to no maximum (ie. infinite) |
| policy.MaximumRedeliveries = -1; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| //we should be able to get the 1st message redelivered until a session.Commit is called |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Commit(); |
| |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| [Test] |
| public void TestZeroMaximumNumberOfRedeliveries() |
| { |
| using(Connection connection = (Connection) CreateConnection()) |
| { |
| IRedeliveryPolicy policy = connection.RedeliveryPolicy; |
| policy.InitialRedeliveryDelay = 100; |
| policy.UseExponentialBackOff = false; |
| //let's set the maximum redeliveries to 0 |
| policy.MaximumRedeliveries = 0; |
| |
| connection.Start(); |
| ISession session = connection.CreateSession(AcknowledgementMode.Transactional); |
| IDestination destination = session.CreateTemporaryQueue(); |
| IMessageProducer producer = session.CreateProducer(destination); |
| |
| IMessageConsumer consumer = session.CreateConsumer(destination); |
| |
| // Send the messages |
| producer.Send(session.CreateTextMessage("1st")); |
| producer.Send(session.CreateTextMessage("2nd")); |
| session.Commit(); |
| |
| ITextMessage m; |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("1st", m.Text); |
| session.Rollback(); |
| |
| //the 1st message should not be redelivered since maximumRedeliveries is set to 0 |
| m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); |
| Assert.IsNotNull(m); |
| Assert.AreEqual("2nd", m.Text); |
| session.Commit(); |
| } |
| } |
| |
| #if !NETCF |
| [Test] |
| #endif |
| public void TestURIForRedeliverPolicyHandling() |
| { |
| string uri1 = "stomp:tcp://${activemqhost}:61613" + |
| "?nms.RedeliveryPolicy.BackOffMultiplier=10" + |
| "&nms.RedeliveryPolicy.InitialRedeliveryDelay=2000" + |
| "&nms.RedeliveryPolicy.UseExponentialBackOff=true" + |
| "&nms.RedeliveryPolicy.UseCollisionAvoidance=true" + |
| "&nms.RedeliveryPolicy.CollisionAvoidancePercent=20"; |
| |
| string uri2 = "stomp:tcp://${activemqhost}:61613" + |
| "?nms.RedeliveryPolicy.backOffMultiplier=50" + |
| "&nms.RedeliveryPolicy.initialRedeliveryDelay=4000" + |
| "&nms.RedeliveryPolicy.useExponentialBackOff=false" + |
| "&nms.RedeliveryPolicy.useCollisionAvoidance=false" + |
| "&nms.RedeliveryPolicy.collisionAvoidancePercent=10"; |
| |
| NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1)); |
| |
| Assert.IsNotNull(factory); |
| Assert.IsNotNull(factory.ConnectionFactory); |
| using(IConnection connection = factory.CreateConnection("", "")) |
| { |
| Assert.IsNotNull(connection); |
| |
| Connection amqConnection = connection as Connection; |
| |
| Assert.AreEqual(10, amqConnection.RedeliveryPolicy.BackOffMultiplier); |
| Assert.AreEqual(2000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay); |
| Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseExponentialBackOff); |
| Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseCollisionAvoidance); |
| Assert.AreEqual(20, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent); |
| } |
| |
| factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2)); |
| |
| Assert.IsNotNull(factory); |
| Assert.IsNotNull(factory.ConnectionFactory); |
| using(IConnection connection = factory.CreateConnection("", "")) |
| { |
| Assert.IsNotNull(connection); |
| |
| Connection amqConnection = connection as Connection; |
| Assert.AreEqual(50, amqConnection.RedeliveryPolicy.BackOffMultiplier); |
| Assert.AreEqual(4000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay); |
| Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseExponentialBackOff); |
| Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseCollisionAvoidance); |
| Assert.AreEqual(10, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent); |
| } |
| } |
| |
| } |
| } |