/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Test;
using NUnit.Framework;

namespace Apache.NMS.ActiveMQ.Test
{
    [TestFixture]   
    public class ExclusiveConsumerTest : NMSTestSupport
    {
        protected static string DESTINATION_NAME = "ExclusiveConsumerTestDestination";
        protected static string TEST_CLIENT_ID = "ExclusiveConsumerTestClientId";
        
        private IConnection createConnection(bool start) 
        {
            IConnection conn = CreateConnection(TEST_CLIENT_ID);
            if(start) 
            {
                conn.Start();
            }
            
            return conn;
        }
		
		public void purgeQueue(IConnection conn, ActiveMQQueue queue)
		{
			ISession session = conn.CreateSession();
			IMessageConsumer consumer = session.CreateConsumer(queue);
			while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null)
			{
			}				
			
			consumer.Close();
			session.Close();
		}
    
        [Test]
        public void TestExclusiveConsumerSelectedCreatedFirst()
        {			
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE1"));
			
            try 
            {
                exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
    
                IMessage msg = senderSession.CreateTextMessage("test");
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
                producer.Send(msg);
                Thread.Sleep(500);

                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
            }
            finally 
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }

        [Test]
        public void TestExclusiveConsumerSelectedCreatedAfter()         
        {
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE5"));
			
            try 
            {
                exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
    
                IMessage msg = senderSession.CreateTextMessage("test");
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
                producer.Send(msg);
                Thread.Sleep(500);
    
                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
            } 
            finally 
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }

        [Test]
        public void TestFailoverToAnotherExclusiveConsumerCreatedFirst() 
        {
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession1 = null;
            ISession exclusiveSession2 = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE2"));
			
            try 
            {
                exclusiveSession1 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                exclusiveSession2 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                // This creates the exclusive consumer first which avoids AMQ-1024
                // bug.
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer1 = exclusiveSession1.CreateConsumer(exclusiveQueue);
                IMessageConsumer exclusiveConsumer2 = exclusiveSession2.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
    
                IMessage msg = senderSession.CreateTextMessage("test");
                producer.Send(msg);
                Thread.Sleep(500);
    
                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer1.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
                // Close the exclusive consumer to verify the non-exclusive consumer
                // takes over
                exclusiveConsumer1.Close();
    
                producer.Send(msg);
                producer.Send(msg);
    
                Assert.IsNotNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
            } 
            finally 
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }

        [Test]
        public void TestFailoverToAnotherExclusiveConsumerCreatedAfter()
        {
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession1 = null;
            ISession exclusiveSession2 = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE6"));
			
            try 
            {
                exclusiveSession1 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                exclusiveSession2 = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                // This creates the exclusive consumer first which avoids AMQ-1024
                // bug.
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer1 = exclusiveSession1.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                IMessageConsumer exclusiveConsumer2 = exclusiveSession2.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

                IMessage msg = senderSession.CreateTextMessage("test");
                producer.Send(msg);
                Thread.Sleep(500);
    
                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer1.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
                // Close the exclusive consumer to verify the non-exclusive consumer
                // takes over
                exclusiveConsumer1.Close();
				
				Thread.Sleep(100);
    
                producer.Send(msg);
                producer.Send(msg);
    
                Assert.IsNotNull(exclusiveConsumer2.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
            } 
            finally 
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }
    
        [Test]
        public void TestFailoverToNonExclusiveConsumer() 
        {
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE3"));
			
            try 
            {
                exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                // This creates the exclusive consumer first which avoids AMQ-1024
                // bug.
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
    
                IMessage msg = senderSession.CreateTextMessage("test");
                producer.Send(msg);
                Thread.Sleep(500);
    
                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
                // Close the exclusive consumer to verify the non-exclusive consumer
                // takes over
                exclusiveConsumer.Close();
    
                producer.Send(msg);
    
                Assert.IsNotNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
            } 
            finally 
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }
    
        [Test]
        public void TestFallbackToExclusiveConsumer() 
        {
            IConnection conn = createConnection(true);
    
            ISession exclusiveSession = null;
            ISession fallbackSession = null;
            ISession senderSession = null;
    
			purgeQueue(conn, new ActiveMQQueue("TEST.QUEUE4"));
			
            try 
            {
                exclusiveSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                fallbackSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
                senderSession = conn.CreateSession(AcknowledgementMode.AutoAcknowledge);
    
                // This creates the exclusive consumer first which avoids AMQ-1024
                // bug.
                ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
                IMessageConsumer exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue);
    
                ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4");
                IMessageConsumer fallbackConsumer = fallbackSession.CreateConsumer(fallbackQueue);
    
                ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4");
    
                IMessageProducer producer = senderSession.CreateProducer(senderQueue);
                producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
    
                IMessage msg = senderSession.CreateTextMessage("test");
                producer.Send(msg);
                Thread.Sleep(500);
    
                // Verify exclusive consumer receives the message.
                Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
                // Close the exclusive consumer to verify the non-exclusive consumer
                // takes over
                exclusiveConsumer.Close();
    
                producer.Send(msg);
    
                // Verify other non-exclusive consumer receices the message.
                Assert.IsNotNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
    
                // Create exclusive consumer to determine if it will start receiving
                // the messages.
                exclusiveConsumer = exclusiveSession.CreateConsumer(exclusiveQueue);
    
                producer.Send(msg);
                Assert.IsNotNull(exclusiveConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
                Assert.IsNull(fallbackConsumer.Receive(TimeSpan.FromMilliseconds(1000)));
            } 
            finally
            {
                fallbackSession.Close();
                senderSession.Close();
                conn.Close();
            }
        }
        
    }
}
