﻿/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System.Threading;
using Apache.NMS.Test;
using NUnit.Framework;
using Apache.NMS.Stomp.Commands;
using System;
using Apache.NMS.Util;

namespace Apache.NMS.Stomp.Test
{
    public enum ExpirationOptions
    {
        DEFAULT,
        IGNORE,
        DO_NOT_IGNORE
    }

    [TestFixture]
    public class MessageConsumerTest : NMSTestSupport
    {
        protected static string DESTINATION_NAME = "queue://TEST.MessageConsumerTestDestination";
        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";

        private CountDownLatch doneLatch;
        private int counter;
        private String errorMessage;

        [SetUp]
        public override void SetUp()
        {
            base.SetUp();

            this.doneLatch = new CountDownLatch(1);
            this.counter = 0;
            this.errorMessage = null;
        }

        [Test]
        public void TestBadSelectorDoesNotCloseConnection()
        {
            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
            {
                using(ISession sender = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    IDestination destination = sender.CreateTemporaryQueue();

                    IMessageProducer producer = sender.CreateProducer(destination);
                    ITextMessage goodMsg = sender.CreateTextMessage("testGood");
                    producer.Send(goodMsg);

                    IMessageConsumer consumer = session.CreateConsumer(destination);
                    connection.Start();
                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));

                    try
                    {
                        ISession badListenerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
                        badListenerSession.CreateConsumer(destination, "badSelector;too");
                        Assert.Fail("Exception expected.");
                    }
                    catch(Exception e)
                    {
                        Tracer.DebugFormat("Caught Ex: {0}", e);
                    }

                    ITextMessage failMsg = sender.CreateTextMessage("testFail");
                    producer.Send(failMsg);
                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
                }
            }
        }

        [Test]
        public void TestAsyncDispatchExceptionRedelivers()
        {
            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
            {
                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;

                    using(IMessageProducer producer = session.CreateProducer(queue))
                    {
                        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
                        producer.Send(producer.CreateTextMessage("First"));
                        producer.Send(producer.CreateTextMessage("Second"));
                    }

                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
                    {
                        consumer.Listener += OnTestAsynchRedliversMessage;

                        connection.Start();

                        if(doneLatch.await(TimeSpan.FromSeconds(10)))
                        {
                            if(!String.IsNullOrEmpty(errorMessage))
                            {
                                Assert.Fail(errorMessage);
                            }
                        }
                        else
                        {
                            Assert.Fail("Timeout waiting for async message delivery to complete.");
                        }
                    }
                }
            }
        }

        private void OnTestAsynchRedliversMessage(IMessage msg)
        {
            counter++;
            try
            {
                ITextMessage message = msg as ITextMessage;
                switch(counter)
                {
                    case 1:
                        Tracer.Debug("Got first Message: " + message.Text);
                        Assert.AreEqual("First", message.Text);
                        Assert.IsFalse(message.NMSRedelivered);
                        break;
                    case 2:
                        Tracer.Debug("Got Second Message: " + message.Text);
                        Assert.AreEqual("Second", message.Text);
                        Assert.IsFalse(message.NMSRedelivered);
                        throw new Exception("Ignore Me");
                    case 3:
                        Tracer.Debug("Got Third Message: " + message.Text);
                        Assert.AreEqual("Second", message.Text);
                        Assert.IsTrue(message.NMSRedelivered);
                        doneLatch.countDown();
                        break;
                    default:
                        errorMessage = "Got too many messages: " + counter;
                        Tracer.Debug(errorMessage);
                        doneLatch.countDown();
                        break;
                }
            }
            catch(Exception e)
            {
                if(e.Message.Equals("Ignore Me"))
                {
                    throw;
                }
                errorMessage = "Got exception: " + e.Message;
                Tracer.Warn("Exception on Message Receive: " + e.Message);
                doneLatch.countDown();
            }
        }

        [Test]
        public void ConsumeInTwoThreads()
        {
            ParameterizedThreadStart threadStart =
                delegate(object o)
                {
                    IMessageConsumer consumer = (IMessageConsumer) o;
                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));
                    Assert.IsNotNull(message);
                };

            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
            {
                connection.Start();
                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
                {
                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME) as IQueue;

                    // enqueue 2 messages
                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
                    using(IMessageProducer producer = session.CreateProducer(queue))
                    {
                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
                        producer.Send(producer.CreateMessage());
                        producer.Send(producer.CreateMessage());
                        session.Commit();

                        // receive first using a dedicated thread. This works
                        Thread thread = new Thread(threadStart);
                        thread.Start(consumer);
                        thread.Join();
                        session.Commit();

                        // receive second using main thread. This FAILS
                        IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); // throws System.Threading.AbandonedMutexException
                        Assert.IsNotNull(message);
                        session.Commit();
                    }
                }
            }
        }

        [Test]
        public void TestReceiveIgnoreExpirationMessage(
            [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
                AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
            AcknowledgementMode ackMode,
            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
            MsgDeliveryMode deliveryMode,
            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
            ExpirationOptions expirationOption)
        {
            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
            {
                connection.Start();
                using(Session session = connection.CreateSession(ackMode) as Session)
                {
                    string destinationName = DESTINATION_NAME;

                    if(ExpirationOptions.IGNORE == expirationOption)
                    {
                        destinationName += "?consumer.nms.ignoreExpiration=true";
                    }
                    else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
                    {
                        destinationName += "?consumer.nms.ignoreExpiration=false";
                    }

                    try
                    {
                        IDestination destination = SessionUtil.GetDestination(session, destinationName);

                        using(IMessageConsumer consumer = session.CreateConsumer(destination))
                        using(IMessageProducer producer = session.CreateProducer(destination))
                        {
                            producer.DeliveryMode = deliveryMode;

                            string msgText = string.Format("ExpiredMessage: {0}", Guid.NewGuid().ToString());

                            TextMessage msg = session.CreateTextMessage(msgText) as TextMessage;

                            // Give it two seconds to live.
                            msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);

                            producer.Send(msg);

                            if(AcknowledgementMode.Transactional == ackMode)
                            {
                                session.Commit();
                            }

                            // Wait for four seconds before processing it.  The broker will have sent it to our local
                            // client dispatch queue, but we won't attempt to process the message until it has had
                            // a chance to expire within our internal queue system.
                            Thread.Sleep(4000);

                            TextMessage rcvMsg = consumer.ReceiveNoWait() as TextMessage;

                            if(ExpirationOptions.IGNORE == expirationOption)
                            {
                                Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
                                rcvMsg.Acknowledge();

                                Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
                                Assert.IsTrue(rcvMsg.IsExpired());

                                if(AcknowledgementMode.Transactional == ackMode)
                                {
                                    session.Commit();
                                }
                            }
                            else
                            {
                                // Should not receive a message.
                                Assert.IsNull(rcvMsg, "Received an expired message!");
                            }

                            consumer.Close();
                            producer.Close();
                        }
                    }
                    finally
                    {
                        try
                        {
                            // Ensure that Session resources on the Broker release transacted Consumers.
                            session.Close();
                            // Give the Broker some time to remove the subscriptions.
                            Thread.Sleep(2000);
                            SessionUtil.DeleteDestination(session, destinationName);
                        }
                        catch
                        {
                        }
                    }
                }
            }
        }
    }
}
