/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.server.queue;

import java.security.AccessController;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;

public class StandardQueueTest extends AbstractQueueTestBase
{

    public void testAutoDeleteQueue() throws Exception
    {
        getQueue().close();
        getQueue().delete();
        Map<String,Object> queueAttributes = new HashMap<>();
        queueAttributes.put(Queue.NAME, getQname());
        queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
        final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
        queue.open();
        setQueue(queue);

        ServerMessage message = createMessage(25l);
        QueueConsumer consumer =
                (QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
                                                       EnumSet.of(ConsumerOption.ACQUIRES,
                                                                  ConsumerOption.SEES_REQUEUES), 0);

        getQueue().enqueue(message, null, null);
        consumer.close();
        assertTrue("Queue was not deleted when consumer was removed",
                   getQueue().isDeleted());
    }

    public void testActiveConsumerCount() throws Exception
    {

        Map<String,Object> queueAttributes = new HashMap<>();
        queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
        queueAttributes.put(Queue.OWNER, "testOwner");
        final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
        queue.open();
        //verify adding an active consumer increases the count
        final MockConsumer consumer1 = new MockConsumer();
        consumer1.setActive(true);
        consumer1.setState(ConsumerTarget.State.ACTIVE);
        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
        queue.addConsumer(consumer1,
                          null,
                          createMessage(-1l).getClass(),
                          "test",
                          EnumSet.of(ConsumerOption.ACQUIRES,
                                     ConsumerOption.SEES_REQUEUES), 0);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        //verify adding an inactive consumer doesn't increase the count
        final MockConsumer consumer2 = new MockConsumer();
        consumer2.setActive(false);
        consumer2.setState(ConsumerTarget.State.SUSPENDED);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
        queue.addConsumer(consumer2,
                          null,
                          createMessage(-1l).getClass(),
                          "test",
                          EnumSet.of(ConsumerOption.ACQUIRES,
                                     ConsumerOption.SEES_REQUEUES), 0);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        //verify behaviour in face of expected state changes:

        //verify a consumer going suspended->active increases the count
        consumer2.setState(ConsumerTarget.State.ACTIVE);
        assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());

        //verify a consumer going active->suspended decreases the count
        consumer2.setState(ConsumerTarget.State.SUSPENDED);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        //verify a consumer going suspended->closed doesn't change the count
        consumer2.setState(ConsumerTarget.State.CLOSED);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        //verify a consumer going active->active doesn't change the count
        consumer1.setState(ConsumerTarget.State.ACTIVE);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        consumer1.setState(ConsumerTarget.State.SUSPENDED);
        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());

        //verify a consumer going suspended->suspended doesn't change the count
        consumer1.setState(ConsumerTarget.State.SUSPENDED);
        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());

        consumer1.setState(ConsumerTarget.State.ACTIVE);
        assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());

        //verify a consumer going active->closed  decreases the count
        consumer1.setState(ConsumerTarget.State.CLOSED);
        assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());

    }


    /**
     * Tests that entry in dequeued state are not enqueued and not delivered to consumer
     */
    public void testEnqueueDequeuedEntry() throws Exception
    {
        // create a queue where each even entry is considered a dequeued
        AbstractQueue queue = new DequeuedQueue(getVirtualHost());
        queue.create();
        // create a consumer
        MockConsumer consumer = new MockConsumer();

        // register consumer
        queue.addConsumer(consumer,
                          null,
                          createMessage(-1l).getClass(),
                          "test",
                          EnumSet.of(ConsumerOption.ACQUIRES,
                                     ConsumerOption.SEES_REQUEUES), 0);

        // put test messages into a queue
        putGivenNumberOfMessages(queue, 4);

        // assert received messages
        List<MessageInstance> messages = consumer.getMessages();
        assertEquals("Only 2 messages should be returned", 2, messages.size());
        assertEquals("ID of first message should be 1", 1l,
                     (messages.get(0).getMessage()).getMessageNumber());
        assertEquals("ID of second message should be 3", 3l,
                     (messages.get(1).getMessage()).getMessageNumber());
    }

    /**
     * Tests whether dequeued entry is sent to subscriber in result of
     * invocation of {@link AbstractQueue#processQueue(QueueRunner)}
     */
    public void testProcessQueueWithDequeuedEntry() throws Exception
    {
        // total number of messages to send
        int messageNumber = 4;
        int dequeueMessageIndex = 1;

        Map<String,Object> queueAttributes = new HashMap<>();
        queueAttributes.put(Queue.NAME, "test");
        // create queue with overridden method deliverAsync
        StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost());
        testQueue.create();

        // put messages
        List<StandardQueueEntry> entries =
                (List<StandardQueueEntry>) enqueueGivenNumberOfMessages(testQueue, messageNumber);

        // dequeue message
        dequeueMessage(testQueue, dequeueMessageIndex);

        // latch to wait for message receipt
        final CountDownLatch latch = new CountDownLatch(messageNumber -1);

        // create a consumer
        MockConsumer consumer = new MockConsumer()
        {
            /**
             * Send a message and decrement latch
             * @param consumer
             * @param entry
             * @param batch
             */
            public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
            {
                long size = super.send(consumer, entry, batch);
                latch.countDown();
                return size;
            }
        };

        // subscribe
        testQueue.addConsumer(consumer,
                              null,
                              entries.get(0).getMessage().getClass(),
                              "test",
                              EnumSet.of(ConsumerOption.ACQUIRES,
                                         ConsumerOption.SEES_REQUEUES), 0);

        // process queue
        testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
        {
            public void run()
            {
                // do nothing
            }
        });

        // wait up to 1 minute for message receipt
        try
        {
            latch.await(1, TimeUnit.MINUTES);
        }
        catch (InterruptedException e1)
        {
            Thread.currentThread().interrupt();
        }
        List<MessageInstance> expected = Arrays.asList((MessageInstance) entries.get(0), entries.get(2), entries.get(3));
        verifyReceivedMessages(expected, consumer.getMessages());
    }

    public void testNonDurableImpliesMessageDurabilityNever() throws Exception
    {
        getQueue().close();
        getQueue().delete();

        Map<String,Object> attributes = new HashMap<>();
        attributes.put(Queue.NAME, getQname());
        attributes.put(Queue.DURABLE, Boolean.FALSE);
        attributes.put(Queue.MESSAGE_DURABILITY, MessageDurability.ALWAYS);

        Queue queue =  getVirtualHost().createChild(Queue.class, attributes);
        assertNotNull("Queue was not created", queue);
        setQueue(queue);

        assertEquals(MessageDurability.NEVER, queue.getMessageDurability());
    }

    private static class DequeuedQueue extends AbstractQueue
    {

        private QueueEntryList _entries = new DequeuedQueueEntryList(this);

        public DequeuedQueue(VirtualHost<?> virtualHost)
        {
            super(attributes(), virtualHost);
        }

        @Override
        QueueEntryList getEntries()
        {
            return _entries;
        }

        private static Map<String,Object> attributes()
        {
            Map<String,Object> attributes = new HashMap<>();
            attributes.put(Queue.NAME, "test");
            attributes.put(Queue.DURABLE, false);
            attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
            return attributes;
        }
    }

    private static class DequeuedQueueEntryList extends OrderedQueueEntryList
    {
        private static final HeadCreator HEAD_CREATOR =
                new HeadCreator()
                {

                    @Override
                    public DequeuedQueueEntry createHead(final QueueEntryList list)
                    {
                        return new DequeuedQueueEntry((DequeuedQueueEntryList) list);
                    }
                };

        public DequeuedQueueEntryList(final DequeuedQueue queue)
        {
            super(queue, HEAD_CREATOR);
        }

        /**
         * Entries with even message id are considered
         * dequeued!
         */
        protected DequeuedQueueEntry createQueueEntry(final ServerMessage message,
                                                      final MessageEnqueueRecord enqueueRecord)
        {
            return new DequeuedQueueEntry(this, message);
        }


    }

    private static class DequeuedQueueEntry extends OrderedQueueEntry
    {

        private final ServerMessage _message;

        private DequeuedQueueEntry(final DequeuedQueueEntryList queueEntryList)
        {
            super(queueEntryList);
            _message = null;
        }

        public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
        {
            super(list, message, null);
            _message = message;
        }

        public boolean isDeleted()
        {
            return (_message.getMessageNumber() % 2 == 0);
        }

        public boolean isAvailable()
        {
            return !(_message.getMessageNumber() % 2 == 0);
        }

        @Override
        public boolean acquire(MessageInstanceConsumer sub)
        {
            if(_message.getMessageNumber() % 2 == 0)
            {
                return false;
            }
            else
            {
                return super.acquire(sub);
            }
        }

        @Override
        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
        {
            return true;
        }

        @Override
        public boolean makeAcquisitionStealable()
        {
            return true;
        }
    }
}
