/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
using System;
using System.Threading;

namespace Qpid.Collections
{
    public class LinkedBlockingQueue : BlockingQueue
    {             

        /*
         * A variant of the "two lock queue" algorithm.  The putLock gates
         * entry to put (and offer), and has an associated condition for
         * waiting puts.  Similarly for the takeLock.  The "count" field
         * that they both rely on is maintained as an atomic to avoid
         * needing to get both locks in most cases. Also, to minimize need
         * for puts to get takeLock and vice-versa, cascading notifies are
         * used. When a put notices that it has enabled at least one take,
         * it signals taker. That taker in turn signals others if more
         * items have been entered since the signal. And symmetrically for
         * takes signalling puts. Operations such as remove(Object) and
         * iterators acquire both locks.
         */

        /**
         * Linked list node class
         */
        internal class Node
        {
            /** The item, volatile to ensure barrier separating write and read */
            internal volatile Object item;
            internal Node next;
            internal Node(Object x) { item = x; }
        }

        /** The capacity bound, or Integer.MAX_VALUE if none */
        private readonly int capacity;

        /** Current number of elements */
        private volatile int count = 0;

        /** Head of linked list */
        private Node head;

        /** Tail of linked list */
        private Node last;

        /** Lock held by take, poll, etc */
        private readonly object takeLock = new Object(); //new SerializableLock();

        /** Lock held by put, offer, etc */
        private readonly object putLock = new Object();//new SerializableLock();

        /**
         * Signals a waiting take. Called only from put/offer (which do not
         * otherwise ordinarily lock takeLock.)
         */
        private void SignalNotEmpty()
        {
            lock (takeLock)
            {
                Monitor.Pulse(takeLock);
            }
        }

        /**
         * Signals a waiting put. Called only from take/poll.
         */
        private void SignalNotFull()
        {
            lock (putLock)
            {
                Monitor.Pulse(putLock);
            }
        }

        /**
         * Creates a node and links it at end of queue.
         * @param x the item
         */
        private void Insert(Object x)
        {
            last = last.next = new Node(x);
        }

        /**
         * Removes a node from head of queue,
         * @return the node
         */
        private Object Extract()
        {
            Node first = head.next;
            head = first;
            Object x = first.item;
            first.item = null;
            return x;
        }


        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
         * {@link Integer#MAX_VALUE}.
         */
        public LinkedBlockingQueue() : this(Int32.MaxValue)
        {            
        }

        /**
         * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
         *
         * @param capacity the capacity of this queue
         * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
         *         than zero
         */
        public LinkedBlockingQueue(int capacity)
        {
            if (capacity <= 0) throw new ArgumentException("Capacity must be positive, was passed " + capacity);
            this.capacity = capacity;
            last = head = new Node(null);
        }        

        // this doc comment is overridden to remove the reference to collections
        // greater in size than Integer.MAX_VALUE
        /**
         * Returns the number of elements in this queue.
         *
         * @return the number of elements in this queue
         */
        public int Size
        {
            get
            {
                return count;
            }            
        }

        // this doc comment is a modified copy of the inherited doc comment,
        // without the reference to unlimited queues.
        /**
         * Returns the number of additional elements that this queue can ideally
         * (in the absence of memory or resource constraints) accept without
         * blocking. This is always equal to the initial capacity of this queue
         * less the current <tt>size</tt> of this queue.
         *
         * <p>Note that you <em>cannot</em> always tell if an attempt to insert
         * an element will succeed by inspecting <tt>remainingCapacity</tt>
         * because it may be the case that another thread is about to
         * insert or remove an element.
         */
        public override int RemainingCapacity
        {
            get
            {
                return capacity - count;
            }            
        }

        /**
         * Inserts the specified element at the tail of this queue, waiting if
         * necessary for space to become available.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public override void EnqueueBlocking(Object e)
        {
            if (e == null) throw new ArgumentNullException("Object must not be null");
            // Note: convention in all put/take/etc is to preset
            // local var holding count  negative to indicate failure unless set.
            int c = -1;
            lock (putLock) 
            {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from
                 * capacity. Similarly for all other uses of count in
                 * other wait guards.
                 */                
                while (count == capacity)
                {
                    Monitor.Wait(putLock);
                }
                
                Insert(e);
                lock(this)
                {
                    c = count++;
                }
                if (c + 1 < capacity)
                {
                    Monitor.Pulse(putLock);
                }                    
            }

            if (c == 0)
            {
                SignalNotEmpty();
            }
        }
        
        /**
         * Inserts the specified element at the tail of this queue if it is
         * possible to do so immediately without exceeding the queue's capacity,
         * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
         * is full.
         * When using a capacity-restricted queue, this method is generally
         * preferable to method {@link BlockingQueue#add add}, which can fail to
         * insert an element only by throwing an exception.
         *
         * @throws NullPointerException if the specified element is null
         */
        public override bool EnqueueNoThrow(Object e)
        {
            if (e == null) throw new ArgumentNullException("e must not be null");
            if (count == capacity)
            {
                return false;
            }
            int c = -1;
            lock (putLock) 
            {
                if (count < capacity) 
                {
                    Insert(e);
                    lock (this)
                    {
                        c = count++;
                    }
                    if (c + 1 < capacity)
                    {
                        Monitor.Pulse(putLock);
                    }
                }
            }
            if (c == 0)
            {
                SignalNotEmpty();
            }
            return c >= 0;
        }

        /**
         * Retrieves and removes the head of this queue, waiting if necessary
         * until an element becomes available.
         *
         * @return the head of this queue
         * @throws InterruptedException if interrupted while waiting
         */
        public override Object DequeueBlocking()
        {
            Object x;
            int c = -1;
            lock (takeLock) 
            {
                
                while (count == 0)
                {
                    Monitor.Wait(takeLock);
                }
                

                x = Extract();
                lock (this) { c = count--; }
                if (c > 1)
                {
                    Monitor.Pulse(takeLock);
                }
            }
            if (c == capacity)
            {
                SignalNotFull();
            }
            return x;
        }
        
        public Object Poll()
        {
            if (count == 0)
            {
                return null;
            }
            Object x = null;
            int c = -1;
            lock (takeLock) 
            {
                if (count > 0) 
                {
                    x = Extract();
                    lock (this) { c = count--; }
                    if (c > 1)
                    {
                        Monitor.Pulse(takeLock);
                    }
                }
            }
            if (c == capacity)
            {
                SignalNotFull();
            }
            return x;
        }


        public override Object Peek()
        {
            if (count == 0)
            {
                return null;
            }
            lock (takeLock) 
            {
                Node first = head.next;
                if (first == null)
                {
                    return null;
                }
                else
                {
                    return first.item;
                }
            }
        }
        
        public override String ToString()
        {
            lock (putLock) 
            {
                lock (takeLock) 
                {
                    return base.ToString();
                }
            }
        }

        /**
         * Atomically removes all of the elements from this queue.
         * The queue will be empty after this call returns.
         */
        public override void Clear()
        {
            lock (putLock) 
            {
                lock (takeLock) 
                {
                    head.next = null;                
                    last = head;
                    int c;
                    lock (this) 
                    {
                        c = count;
                        count = 0;
                    }
                    if (c == capacity)
                    {
                        Monitor.PulseAll(putLock);
                    }
                }
            }
        }                       
    }
}

