/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Collections.Generic;
using Apache.NMS.Test;
using Apache.NMS.Policies;
using Apache.NMS.Stomp;
using Apache.NMS.Stomp.Commands;
using NUnit.Framework;
using NUnit.Framework.Extensions;

namespace Apache.NMS.Stomp.Test
{
    [TestFixture]
    public class StompRedeliveryPolicyTest : NMSTestSupport
    {
        private const string DESTINATION_NAME = "RedeliveryPolicyTestDest";

        [Test]
        public void TestExponentialRedeliveryPolicyDelaysDeliveryOnRollback()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.InitialRedeliveryDelay = 500;
                policy.BackOffMultiplier = 2;
                policy.UseExponentialBackOff = true;
                policy.UseCollisionAvoidance = false;

                connection.Start();
                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();
                IMessageProducer producer = session.CreateProducer(destination);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                // Send the messages
                producer.Send(session.CreateTextMessage("1st"));
                producer.Send(session.CreateTextMessage("2nd"));
                session.Commit();

                ITextMessage m;
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // No delay on first Rollback..
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNotNull(m);
                session.Rollback();

                // Show subsequent re-delivery delay is incrementing.
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNull(m);

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // Show re-delivery delay is incrementing exponentially
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNull(m);
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(500));
                Assert.IsNull(m);
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
            }
        }

        [Test]
        public void TestNornalRedeliveryPolicyDelaysDeliveryOnRollback()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.InitialRedeliveryDelay = 500;
                policy.UseExponentialBackOff = false;

                connection.Start();
                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();

                IMessageProducer producer = session.CreateProducer(destination);
                IMessageConsumer consumer = session.CreateConsumer(destination);

                // Send the messages
                producer.Send(session.CreateTextMessage("1st"));
                producer.Send(session.CreateTextMessage("2nd"));
                session.Commit();

                ITextMessage m;
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // No delay on first Rollback..
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNotNull(m);
                session.Rollback();

                // Show subsequent re-delivery delay is incrementing.
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNull(m);
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // The message gets redelivered after 500 ms every time since
                // we are not using exponential backoff.
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
                Assert.IsNull(m);
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
            }
        }

        //[Test]
        public void TestDLQHandling()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.InitialRedeliveryDelay = 100;
                policy.UseExponentialBackOff = false;
                policy.MaximumRedeliveries = 2;

                connection.Start();
                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();
                IMessageProducer producer = session.CreateProducer(destination);

                IMessageConsumer consumer = session.CreateConsumer(destination);
                IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
                IMessageConsumer dlqConsumer = session.CreateConsumer(dlq);

                // Purge any messages already in the DLQ.
                while(dlqConsumer.ReceiveNoWait() != null)
                {
                    session.Commit();
                }

                // Send the messages
                producer.Send(session.CreateTextMessage("1st"));
                producer.Send(session.CreateTextMessage("2nd"));
                session.Commit();

                ITextMessage m;
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                // The last Rollback should cause the 1st message to get sent to the DLQ
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("2nd", m.Text);
                session.Commit();

                // We should be able to get the message off the DLQ now.
                m = (ITextMessage)dlqConsumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Commit();
            }
        }

        [Test]
        public void TestInfiniteMaximumNumberOfRedeliveries()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.InitialRedeliveryDelay = 100;
                policy.UseExponentialBackOff = false;
                // let's set the maximum redeliveries to no maximum (ie. infinite)
                policy.MaximumRedeliveries = -1;

                connection.Start();
                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();
                IMessageProducer producer = session.CreateProducer(destination);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                // Send the messages
                producer.Send(session.CreateTextMessage("1st"));
                producer.Send(session.CreateTextMessage("2nd"));
                session.Commit();

                ITextMessage m;

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                //we should be able to get the 1st message redelivered until a session.Commit is called
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Commit();

                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("2nd", m.Text);
                session.Commit();
            }
        }

        [Test]
        public void TestZeroMaximumNumberOfRedeliveries()
        {
            using(Connection connection = (Connection) CreateConnection())
            {
                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
                policy.InitialRedeliveryDelay = 100;
                policy.UseExponentialBackOff = false;
                //let's set the maximum redeliveries to 0
                policy.MaximumRedeliveries = 0;

                connection.Start();
                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
                IDestination destination = session.CreateTemporaryQueue();
                IMessageProducer producer = session.CreateProducer(destination);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                // Send the messages
                producer.Send(session.CreateTextMessage("1st"));
                producer.Send(session.CreateTextMessage("2nd"));
                session.Commit();

                ITextMessage m;
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("1st", m.Text);
                session.Rollback();

                //the 1st  message should not be redelivered since maximumRedeliveries is set to 0
                m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
                Assert.IsNotNull(m);
                Assert.AreEqual("2nd", m.Text);
                session.Commit();
            }
        }

#if !NETCF
        [Test]
#endif
        public void TestURIForRedeliverPolicyHandling()
        {
            string uri1 = "stomp:tcp://${activemqhost}:61613" +
                          "?nms.RedeliveryPolicy.BackOffMultiplier=10" +
                          "&nms.RedeliveryPolicy.InitialRedeliveryDelay=2000" +
                          "&nms.RedeliveryPolicy.UseExponentialBackOff=true" +
                          "&nms.RedeliveryPolicy.UseCollisionAvoidance=true" +
                          "&nms.RedeliveryPolicy.CollisionAvoidancePercent=20";

            string uri2 = "stomp:tcp://${activemqhost}:61613" +
                          "?nms.RedeliveryPolicy.backOffMultiplier=50" +
                          "&nms.RedeliveryPolicy.initialRedeliveryDelay=4000" +
                          "&nms.RedeliveryPolicy.useExponentialBackOff=false" +
                          "&nms.RedeliveryPolicy.useCollisionAvoidance=false" +
                          "&nms.RedeliveryPolicy.collisionAvoidancePercent=10";

            NMSConnectionFactory factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri1));

            Assert.IsNotNull(factory);
            Assert.IsNotNull(factory.ConnectionFactory);
            using(IConnection connection = factory.CreateConnection("", ""))
            {
                Assert.IsNotNull(connection);

                Connection amqConnection = connection as Connection;

                Assert.AreEqual(10, amqConnection.RedeliveryPolicy.BackOffMultiplier);
                Assert.AreEqual(2000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay);
                Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseExponentialBackOff);
                Assert.AreEqual(true, amqConnection.RedeliveryPolicy.UseCollisionAvoidance);
                Assert.AreEqual(20, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent);
            }

            factory = new NMSConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri2));

            Assert.IsNotNull(factory);
            Assert.IsNotNull(factory.ConnectionFactory);
            using(IConnection connection = factory.CreateConnection("", ""))
            {
                Assert.IsNotNull(connection);

                Connection amqConnection = connection as Connection;
                Assert.AreEqual(50, amqConnection.RedeliveryPolicy.BackOffMultiplier);
                Assert.AreEqual(4000, amqConnection.RedeliveryPolicy.InitialRedeliveryDelay);
                Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseExponentialBackOff);
                Assert.AreEqual(false, amqConnection.RedeliveryPolicy.UseCollisionAvoidance);
                Assert.AreEqual(10, amqConnection.RedeliveryPolicy.CollisionAvoidancePercent);
            }
        }

    }
}
