blob: 7239aaffb98ddffa0ccfa37c563444d8188ebfa5 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util;
/**
* A circular queue of a fixed size. Elements are added to one
* end and removed from the other, with the queue size ranging
* from empty to full. Operations can optionally wait until
* finished or return status indicating success or failure.
* For instance, adding to a full queue can wait until an item
* is removed before adding the new item or it can wait only
* a specified amount of time before completing successfully
* or giving up.
* @param <T> the type of items in the queue.
*/
public final class CircularQueue<T>
{
/**
* Constructs the CircularQueue.
*
* @param size the maximum number of items allowed in the queue.
*/
public CircularQueue( int size )
{
if (size < 1)
throw new IllegalArgumentException( "size < 1" );
this.size = size;
items = new Object[size];
}
/**
* Constructs the CircularQueue with the maximum number of items
* defaulted to 10.
*/
public CircularQueue()
{
this( 10 );
}
private final int size;
private final Object[] items;
/**
* @return the maximum number of items that may be put in the queue.
*/
public int size()
{
return size;
}
/////////////////////
// BASIC QUEUE OPS //
/////////////////////
/**
* @return the current number of items in the queue.
*/
public int count()
{
return count;
}
/**
* @return true if the queue is empty.
*/
public boolean isEmpty()
{
return count == 0;
}
/**
* @return true if the queue is full.
*/
public boolean isFull()
{
return count == size;
}
/**
* Gets the item at the head of the queue.
* @return the item at the head of the queue.
*/
private Object get0()
{
Assertion.check( count > 0, "count > 0" );
Object obj = items[head];
Assertion.check( obj != null, "obj != null" );
items[head] = null;
count--;
head++;
if (head == size)
head = 0;
return obj;
}
/**
* Puts the item at the tail of the queue.
* @param obj non-null item to put.
*/
private void put0( Object obj )
{
Assertion.check( obj != null, "obj != null" );
Assertion.check( count < size, "count < size" );
Assertion.check( items[tail] == null, "items[tail] == null" );
items[tail] = obj;
count++;
tail++;
if (tail == size)
tail = 0;
}
private int count;
private int head;
private int tail;
//////////////////////
// PUBLIC QUEUE OPS //
//////////////////////
/**
* Gets the next available item from the queue, waiting
* until an item is available or the queue is closed.
*
* @return an item from the queue or null if the queue
* is closed.
*
* @throws InterruptedException if the thread is interrupted.
*/
public T get() throws InterruptedException
{
return get( 0 );
}
/**
* Gets the next available item from the queue, waiting
* until an item is available or the queue is closed.
*
* @param maxDelay the maximum time in ms to wait for
* something to be put in the queue; 0 means wait forever,
* less than 0 means don't wait at all.
*
* @return an item from the queue or null if maxDelay
* has been exceeded or the queue is closed.
*
* @throws InterruptedException if the thread is interrupted.
*/
public synchronized T get( int maxDelay ) throws InterruptedException
{
if (!isEmpty())
return getAndNotify();
// the queue is empty.
if (isClosed() || maxDelay < 0)
return null;
// the queue is empty, not closed, and caller has requested a delay...
long now = Timer.getNanos();
long end = getEndTime( now, maxDelay );
Assertion.check( end > now, "end > now" );
long d;
while ((d = (end - now) / Timer.NANOS_PER_MILLI) > 0)
{
// the queue is empty, not closed, and delay has not run out...
Assertion.check( isEmpty(), "isEmpty()" );
Assertion.check( !isClosed(), "!isClosed()" );
Assertion.check( d > 0, "d > 0" );
wait( d );
if (!isEmpty())
return getAndNotify();
if (isClosed())
return null;
now = Timer.getNanos();
}
return null;
}
/**
* Puts an item in the queue, waiting until space is available
* or the queue is closed.
*
* @param obj a non-null item to put in the queue.
*
* @return true if the item was placed in the queue
* or false if the queue is closed.
*
* @throws InterruptedException if the thread is interrupted.
*/
public boolean put( T obj ) throws InterruptedException
{
return put( obj, 0 );
}
/**
* Puts an item in the queue, waiting until space is available
* or the queue is closed.
*
* @param obj a non-null item to put in the queue.
*
* @param maxDelay the maximum time in ms to wait for
* available space the queue; 0 means wait forever,
* less than 0 means don't wait at all.
*
* @return true if the item was placed in the queue
* or false if maxDelay has been exceeded or the queue
* is closed.
*
* @throws InterruptedException if the thread is interrupted.
*/
public synchronized boolean put( T obj, int maxDelay ) throws InterruptedException
{
if (obj == null)
throw new NullPointerException( "obj == null" );
if (isClosed())
return false;
// the queue is not closed.
if (!isFull())
{
putAndNotify( obj );
return true;
}
// the queue is full and not closed.
if (maxDelay < 0)
return false;
// the queue is full, not closed, and the caller has requested a delay...
long now = Timer.getNanos();
long end = getEndTime( now, maxDelay );
Assertion.check( end > now, "end > now" );
long d;
while ((d = (end - now) / Timer.NANOS_PER_MILLI) > 0)
{
// the queue is full, not closed, and delay has not run out...
Assertion.check( isFull(), "isFull()" );
Assertion.check( !isClosed(), "!isClosed()" );
Assertion.check( d > 0, "d > 0" );
wait( d );
if (isClosed())
return false;
// the queue is not closed.
if (!isFull())
{
putAndNotify( obj );
return true;
}
now = Timer.getNanos();
}
return false;
}
/**
* Closes the queue so that no more items may be put into it.
* Get will return null when there are no more items to return.
*/
public synchronized void close()
{
if (!closed)
{
closed = true;
notifyAll();
}
}
/**
* @return true if the queue is closed.
*/
public boolean isClosed()
{
return closed;
}
private boolean closed;
//////////////////
// PRIVATE OPS //
//////////////////
/**
* Gets the item at the head of the queue. Additionally, wakes
* up the next one waiting for the queue, either to get or put.
* @return the item at the head of the queue.
*/
@SuppressWarnings("unchecked")
private T getAndNotify()
{
// the queue is now not empty.
// putters waiting should be notified if the queue is now full.
boolean notify = isFull();
Object obj = get0();
// getters waiting should be notified if the queue is now not empty.
notify = notify || !isEmpty();
if (notify)
notify();
return (T) obj;
}
/**
* Puts the item at the tail of the queue. Additionally, wakes
* up the next one waiting for the queue, either to get or put.
* @param obj non-null item to put.
*/
private void putAndNotify( T obj )
{
// the queue is now not full.
// getters waiting should be notified if the queue is now empty.
boolean notify = isEmpty();
put0( obj );
// putters waiting should be notified if the queue is now not full.
notify = notify || !isFull();
if (notify)
notify();
}
private long getEndTime( long now, int maxDelay )
{
return maxDelay > 0 ? now + maxDelay * Timer.NANOS_PER_MILLI : Long.MAX_VALUE;
}
}