blob: 9ac065aed1208e708dddba15f9ce61049739a100 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Collections.Generic;
using Apache.NMS.Test;
using Apache.NMS.Policies;
using Apache.NMS.Stomp;
using Apache.NMS.Stomp.Commands;
using NUnit.Framework;
namespace Apache.NMS.Stomp.Test
{
[TestFixture]
public class StompRedeliveryPolicyTest : NMSTestSupport
{
private const string DESTINATION_NAME = "TEST.RedeliveryPolicyTestDest";
[Test]
public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
{
using(Connection connection = (Connection) CreateConnection())
{
IRedeliveryPolicy policy = connection.RedeliveryPolicy;
policy.InitialRedeliveryDelay = 500;
policy.BackOffMultiplier = 2;
policy.UseExponentialBackOff = true;
policy.UseCollisionAvoidance = false;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IDestination destination = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
// Send the messages
producer.Send(session.CreateTextMessage("1st"));
producer.Send(session.CreateTextMessage("2nd"));
session.Commit();
ITextMessage m;
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
// No delay on first Rollback..
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNotNull(m);
session.Rollback();
// Show subsequent re-delivery delay is incrementing.
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNull(m);
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
// Show re-delivery delay is incrementing exponentially
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNull(m);
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500));
Assert.IsNull(m);
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
}
}
[Test]
public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback()
{
using(Connection connection = (Connection) CreateConnection())
{
IRedeliveryPolicy policy = connection.RedeliveryPolicy;
policy.InitialRedeliveryDelay = 500;
policy.UseExponentialBackOff = false;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IDestination destination = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
// Send the messages
producer.Send(session.CreateTextMessage("1st"));
producer.Send(session.CreateTextMessage("2nd"));
session.Commit();
ITextMessage m;
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
// No delay on first Rollback..
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNotNull(m);
session.Rollback();
// Show subsequent re-delivery delay is incrementing.
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNull(m);
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
// The message gets redelivered after 500 ms every time since
// we are not using exponential backoff.
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
Assert.IsNull(m);
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
}
}
//[Test]
public void TestDLQHandling()
{
using(Connection connection = (Connection) CreateConnection())
{
IRedeliveryPolicy policy = connection.RedeliveryPolicy;
policy.InitialRedeliveryDelay = 100;
policy.UseExponentialBackOff = false;
policy.MaximumRedeliveries = 2;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IDestination destination = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
IMessageConsumer dlqConsumer = session.CreateConsumer(dlq);
// Purge any messages already in the DLQ.
while(dlqConsumer.ReceiveNoWait() != null)
{
session.Commit();
}
// Send the messages
producer.Send(session.CreateTextMessage("1st"));
producer.Send(session.CreateTextMessage("2nd"));
session.Commit();
ITextMessage m;
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
// The last Rollback should cause the 1st message to get sent to the DLQ
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("2nd", m.Text);
session.Commit();
// We should be able to get the message off the DLQ now.
m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Commit();
}
}
[Test]
public void TestInfiniteMaximumNumberOfRedeliveries()
{
using(Connection connection = (Connection) CreateConnection())
{
IRedeliveryPolicy policy = connection.RedeliveryPolicy;
policy.InitialRedeliveryDelay = 100;
policy.UseExponentialBackOff = false;
// let's set the maximum redeliveries to no maximum (ie. infinite)
policy.MaximumRedeliveries = -1;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IDestination destination = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
// Send the messages
producer.Send(session.CreateTextMessage("1st"));
producer.Send(session.CreateTextMessage("2nd"));
session.Commit();
ITextMessage m;
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
//we should be able to get the 1st message redelivered until a session.Commit is called
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Commit();
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("2nd", m.Text);
session.Commit();
}
}
[Test]
public void TestZeroMaximumNumberOfRedeliveries()
{
using(Connection connection = (Connection) CreateConnection())
{
IRedeliveryPolicy policy = connection.RedeliveryPolicy;
policy.InitialRedeliveryDelay = 100;
policy.UseExponentialBackOff = false;
//let's set the maximum redeliveries to 0
policy.MaximumRedeliveries = 0;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
IDestination destination = session.CreateTemporaryQueue();
IMessageProducer producer = session.CreateProducer(destination);
IMessageConsumer consumer = session.CreateConsumer(destination);
// Send the messages
producer.Send(session.CreateTextMessage("1st"));
producer.Send(session.CreateTextMessage("2nd"));
session.Commit();
ITextMessage m;
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("1st", m.Text);
session.Rollback();
//the 1st message should not be redelivered since maximumRedeliveries is set to 0
m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
Assert.IsNotNull(m);
Assert.AreEqual("2nd", m.Text);
session.Commit();
}
}
#if !NETCF
[Test]
#endif
public void TestURIForRedeliverPolicyHandling()
{
string uri1 = "stomp:tcp://${activemqhost}:61613" +
"?nms.RedeliveryPolicy.BackOffMultiplier=10" +
"&nms.RedeliveryPolicy.InitialRedeliveryDelay=2000" +
"&nms.RedeliveryPolicy.UseExponentialBackOff=true" +
"&nms.RedeliveryPolicy.UseCollisionAvoidance=true" +
"&nms.RedeliveryPolicy.CollisionAvoidancePercent=20";
string uri2 = "stomp:tcp://${activemqhost}:61613" +
"?nms.RedeliveryPolicy.backOffMultiplier=50" +
"&nms.RedeliveryPolicy.initialRedeliveryDelay=4000" +
"&nms.RedeliveryPolicy.useExponentialBackOff=false" +
"&nms.RedeliveryPolicy.useCollisionAvoidance=false" +
"&nms.RedeliveryPolicy.collisionAvoidancePercent=10";
NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1));
Assert.IsNotNull(factory);
Assert.IsNotNull(factory.ConnectionFactory);
using(IConnection connection = factory.CreateConnection("", ""))
{
Assert.IsNotNull(connection);
Connection amqConnection = connection as Connection;
Assert.AreEqual(10, amqConnection.RedeliveryPolicy.BackOffMultiplier);
Assert.AreEqual(2000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay);
Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseExponentialBackOff);
Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseCollisionAvoidance);
Assert.AreEqual(20, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent);
}
factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2));
Assert.IsNotNull(factory);
Assert.IsNotNull(factory.ConnectionFactory);
using(IConnection connection = factory.CreateConnection("", ""))
{
Assert.IsNotNull(connection);
Connection amqConnection = connection as Connection;
Assert.AreEqual(50, amqConnection.RedeliveryPolicy.BackOffMultiplier);
Assert.AreEqual(4000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay);
Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseExponentialBackOff);
Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseCollisionAvoidance);
Assert.AreEqual(10, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent);
}
}
}
}