blob: 95c6d9487fcb249c30364551bd35f10341595718 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
#define _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_
#include <decaf/util/Config.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/BlockingQueue.h>
#include <decaf/util/concurrent/locks/ReentrantLock.h>
#include <decaf/util/AbstractQueue.h>
#include <decaf/util/Iterator.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/NoSuchElementException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
namespace decaf {
namespace util {
namespace concurrent {
using decaf::lang::Pointer;
/**
* A BlockingQueue derivative that allows for a bound to be placed on the number of elements
* that can be enqueued at any one time. Elements are inserted and removed in FIFO order.
* The internal structure of the queue is based on a linked nodes which provides for better
* performance over their array based versions but the performance is less predictable.
*
* The capacity bound of this class default to Integer::MAX_VALUE.
*
* @since 1.0
*/
template<typename E>
class LinkedBlockingQueue : public BlockingQueue<E> {
private:
template< typename U >
class QueueNode {
private:
U value;
bool unlinked;
bool dequeued;
public:
Pointer< QueueNode<E> > next;
private:
QueueNode(const QueueNode&);
QueueNode& operator=(const QueueNode&);
public:
QueueNode() : value(), unlinked(false), dequeued(false), next() {}
QueueNode(const U& value) : value(value), unlinked(false), dequeued(false), next() {}
void set(Pointer< QueueNode<U> > next, const U& value) {
this->next = next;
this->value = value;
this->unlinked = false;
this->dequeued = false;
}
E get() const {
return this->value;
}
E getAndDequeue() {
E result = this->value;
this->value = E();
this->dequeued = true;
return result;
}
void unlink() {
this->value = E();
this->unlinked = true;
}
bool isUnlinked() const {
return this->unlinked;
}
bool isDequeued() const {
return this->dequeued;
}
};
class TotalLock {
private:
TotalLock(const TotalLock& src);
TotalLock& operator=(const TotalLock& src);
private:
const LinkedBlockingQueue<E>* parent;
public:
TotalLock(const LinkedBlockingQueue<E>* parent) : parent(parent) {
parent->putLock.lock();
parent->takeLock.lock();
}
~TotalLock() {
parent->putLock.unlock();
parent->takeLock.unlock();
}
};
private:
int capacity;
decaf::util::concurrent::atomic::AtomicInteger count;
/** Lock held by take, poll, etc */
mutable locks::ReentrantLock takeLock;
/** Wait queue for waiting takes */
Pointer<locks::Condition> notEmpty; // takeLock.newCondition();
/** Lock held by put, offer, etc */
mutable locks::ReentrantLock putLock;
/** Wait queue for waiting puts */
Pointer<locks::Condition> notFull; // putLock.newCondition();
Pointer< QueueNode<E> > head;
Pointer< QueueNode<E> > tail;
public:
/**
* Create a new instance with a Capacity of Integer::MAX_VALUE
*/
LinkedBlockingQueue() : BlockingQueue<E>(), capacity(lang::Integer::MAX_VALUE), count(),
takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
this->tail = this->head;
this->notEmpty.reset(this->takeLock.newCondition());
this->notFull.reset(this->putLock.newCondition());
}
/**
* Create a new instance with the given initial capacity value.
*
* @param capacity
* The initial capacity value to assign to this Queue.
*
* @throws IllegalArgumentException if the specified capacity is not greater than zero.
*/
LinkedBlockingQueue(int capacity) : BlockingQueue<E>(), capacity(capacity), count(),
takeLock(), notEmpty(), putLock(), notFull(), head(new QueueNode<E>()), tail() {
if(capacity <= 0) {
throw decaf::lang::exceptions::IllegalArgumentException(
__FILE__, __LINE__, "Capacity value must be greater than zero.");
}
this->tail = this->head;
this->notEmpty.reset(this->takeLock.newCondition());
this->notFull.reset(this->putLock.newCondition());
}
/**
* Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the
* values contained in the specified collection to this Queue.
*
* @param collection
* The Collection whose elements are to be copied to this Queue.
*
* @throws IllegalStateException if the number of elements in the collection exceeds
* this Queue's capacity.
*/
LinkedBlockingQueue(const Collection<E>& collection) : BlockingQueue<E>(),
capacity(lang::Integer::MAX_VALUE), count(),
takeLock(), notEmpty(), putLock(), notFull(),
head(new QueueNode<E>()), tail() {
this->tail = this->head;
this->notEmpty.reset(this->takeLock.newCondition());
this->notFull.reset(this->putLock.newCondition());
Pointer< Iterator<E> > iter(collection.iterator());
try {
int count = 0;
while(iter->hasNext()) {
if(count == this->capacity) {
throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
"Number of elements in the Collection exceeds this Queue's Capacity.");
}
this->enqueue(iter->next());
++count;
}
this->count.set(count);
}
DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
DECAF_CATCH_RETHROW(decaf::lang::Exception)
DECAF_CATCHALL_THROW(decaf::lang::Exception)
}
/**
* Create a new instance with a Capacity of Integer::MAX_VALUE and adds all the
* values contained in the specified LinkedBlockingQueue to this Queue.
*
* @param queue
* The LinkedBlockingQueue whose elements are to be copied to this Queue.
*
* @throws IllegalStateException if the number of elements in the collection exceeds
* this Queue's capacity.
*/
LinkedBlockingQueue(const LinkedBlockingQueue& queue) : BlockingQueue<E>(),
capacity(lang::Integer::MAX_VALUE), count(),
takeLock(), notEmpty(), putLock(), notFull(),
head(new QueueNode<E>()), tail() {
this->tail = this->head;
this->notEmpty.reset(this->takeLock.newCondition());
this->notFull.reset(this->putLock.newCondition());
Pointer< Iterator<E> > iter(queue.iterator());
try {
int count = 0;
while(iter->hasNext()) {
if(count == this->capacity) {
throw decaf::lang::exceptions::IllegalStateException( __FILE__, __LINE__,
"Number of elements in the Collection exceeds this Queue's Capacity.");
}
this->enqueue(iter->next());
++count;
}
this->count.set(count);
}
DECAF_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
DECAF_CATCH_RETHROW(decaf::lang::Exception)
DECAF_CATCHALL_THROW(decaf::lang::Exception)
}
virtual ~LinkedBlockingQueue() {
try{
this->purgeList();
} catch(...) {}
}
public:
LinkedBlockingQueue<E>& operator= ( const LinkedBlockingQueue<E>& queue ) {
this->clear();
this->addAll(queue);
return *this;
}
LinkedBlockingQueue<E>& operator= ( const Collection<E>& collection ) {
this->clear();
this->addAll(collection);
return *this;
}
public:
virtual int size() const {
return this->count.get();
}
virtual void clear() {
TotalLock lock(this);
this->purgeList();
this->tail = this->head;
this->count.set(0);
if(this->count.getAndSet(0) == this->capacity) {
this->notFull->signal();
}
}
virtual int remainingCapacity() const {
return this->capacity - this->count.get();
}
virtual void put( const E& value ) {
int c = -1;
this->putLock.lockInterruptibly();
try {
// Note that count is used in wait guard even though it is not
// protected by lock. This works because count can only decrease at
// this point (all other puts are shut out by lock), and we (or some
// other waiting put) are signaled if it ever changes from capacity.
// Similarly for all other uses of count in other wait guards.
while (this->count.get() == this->capacity) {
this->notFull->await();
}
// This method now owns the putLock so we know we have at least
// enough capacity for one put, if we enqueue an item and there's
// still more room we should signal a waiting put to ensure that
// threads don't wait forever.
enqueue(value);
c = this->count.getAndIncrement();
if(c + 1 < this->capacity) {
this->notFull->signal();
}
} catch(decaf::lang::Exception& ex) {
this->putLock.unlock();
throw;
}
this->putLock.unlock();
// When c is zero it means we at least incremented once so there was
// something in the Queue, another take could have already happened but
// we don't know so wake up a waiting taker.
if (c == 0) {
this->signalNotEmpty();
}
}
virtual bool offer( const E& value, long long timeout, const TimeUnit& unit ) {
int c = -1;
long long nanos = unit.toNanos(timeout);
this->putLock.lockInterruptibly();
try {
while(this->count.get() == this->capacity) {
if (nanos <= 0) {
return false;
}
nanos = this->notFull->awaitNanos(nanos);
}
enqueue(value);
c = this->count.getAndIncrement();
if(c + 1 < this->capacity) {
this->notFull->signal();
}
} catch(decaf::lang::Exception& ex) {
this->putLock.unlock();
throw;
}
this->putLock.unlock();
if(c == 0) {
this->signalNotEmpty();
}
return true;
}
virtual bool offer(const E& value) {
if (this->count.get() == this->capacity) {
return false;
}
int c = -1;
this->putLock.lockInterruptibly();
try {
if (this->count.get() < this->capacity) {
enqueue(value);
c = this->count.getAndIncrement();
if (c + 1 < this->capacity) {
this->notFull->signal();
}
}
} catch (decaf::lang::Exception& ex) {
this->putLock.unlock();
throw;
}
this->putLock.unlock();
if (c == 0) {
this->signalNotEmpty();
}
return c >= 0;
}
virtual E take() {
E value = E();
int c = -1;
this->takeLock.lockInterruptibly();
try {
while (this->count.get() == 0) {
this->notEmpty->await();
}
// Since this methods owns the takeLock and count != 0 we know that
// its safe to take one element. if c is greater than one then there
// is at least one more so we try to wake up another taker if any.
value = dequeue();
c = this->count.getAndDecrement();
if (c > 1) {
this->notEmpty->signal();
}
} catch (decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
// When c equals capacity we have removed at least one element
// from the Queue so we wake a blocked put operation if there is
// one to prevent a deadlock.
if (c == this->capacity) {
this->signalNotFull();
}
return value;
}
virtual bool poll(E& result, long long timeout, const TimeUnit& unit) {
int c = -1;
long long nanos = unit.toNanos(timeout);
this->takeLock.lockInterruptibly();
try {
while (this->count.get() == 0) {
if (nanos <= 0) {
return false;
}
nanos = this->notEmpty->awaitNanos(nanos);
}
result = dequeue();
c = this->count.getAndDecrement();
if (c > 1) {
this->notEmpty->signal();
}
} catch (decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
if(c == this->capacity) {
this->signalNotFull();
}
return true;
}
virtual bool poll(E& result) {
if (this->count.get() == 0) {
return false;
}
int c = -1;
this->takeLock.lock();
try {
if (this->count.get() > 0) {
result = dequeue();
c = this->count.getAndDecrement();
if (c > 1) {
this->notEmpty->signal();
}
}
} catch (decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
if (c == this->capacity) {
this->signalNotFull();
}
return true;
}
virtual bool peek(E& result) const {
if(this->count.get() == 0) {
return false;
}
this->takeLock.lock();
try {
Pointer< QueueNode<E> > front = this->head->next;
if(front == NULL) {
return false;
} else {
result = front->get();
}
} catch (decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
return true;
}
using AbstractQueue<E>::remove;
virtual bool remove(const E& value) {
TotalLock lock(this);
for(Pointer< QueueNode<E> > predicessor = this->head, p = predicessor->next; p != NULL;
predicessor = p, p = p->next) {
if(value == p->get()) {
unlink(p, predicessor);
return true;
}
}
return false;
}
virtual std::vector<E> toArray() const {
TotalLock lock(this);
int size = this->count.get();
std::vector<E> array;
array.reserve(size);
for(Pointer< QueueNode<E> > p = this->head->next; p != NULL; p = p->next) {
array.push_back(p->get());
}
return array;
}
virtual std::string toString() const {
return std::string("LinkedBlockingQueue [ current size = ") +
decaf::lang::Integer::toString(this->count.get()) + "]";
}
virtual int drainTo( Collection<E>& c ) {
return this->drainTo(c, decaf::lang::Integer::MAX_VALUE);
}
virtual int drainTo( Collection<E>& sink, int maxElements ) {
if(&sink == this) {
throw decaf::lang::exceptions::IllegalArgumentException(__FILE__, __LINE__,
"Cannot drain this Collection to itself.");
}
bool signalNotFull = false;
bool shouldThrow = false;
decaf::lang::Exception delayed;
int result = 0;
this->takeLock.lock();
try {
// We get the count of Nodes that exist now, any puts that are done
// after this are not drained and since we hold the lock nothing can
// get taken so state should remain consistent.
result = decaf::lang::Math::min(maxElements, this->count.get());
Pointer< QueueNode<E> > node = this->head;
int i = 0;
try {
while(i < result) {
Pointer< QueueNode<E> > p = node->next;
sink.add( p->getAndDequeue() );
node = p;
++i;
}
} catch(decaf::lang::Exception& e) {
delayed = e;
shouldThrow = true;
}
if (i > 0) {
this->head = node;
signalNotFull = (this->count.getAndAdd(-i) == this->capacity);
}
} catch(decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
if (signalNotFull) {
this->signalNotFull();
}
if (shouldThrow) {
throw delayed;
}
return result;
}
private:
class LinkedIterator : public Iterator<E> {
private:
Pointer< QueueNode<E> > current;
Pointer< QueueNode<E> > last;
E currentElement;
LinkedBlockingQueue<E>* parent;
private:
LinkedIterator(const LinkedIterator&);
LinkedIterator& operator= (const LinkedIterator&);
public:
LinkedIterator(LinkedBlockingQueue<E>* parent) : current(), last(),
currentElement(), parent(parent) {
TotalLock lock(parent);
this->current = parent->head->next;
if(this->current != NULL) {
this->currentElement = current->get();
}
}
virtual bool hasNext() const {
return this->current != NULL;
}
virtual E next() {
TotalLock lock(this->parent);
if(this->current == NULL) {
throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
"Iterator next called with no matching next element.");
}
E result = this->currentElement;
this->last = this->current;
this->current = this->nextNode(this->current);
this->currentElement = (this->current == NULL) ? E() : this->current->get();
return result;
}
virtual void remove() {
if(this->last == NULL) {
throw decaf::lang::exceptions::IllegalStateException(__FILE__, __LINE__,
"Iterator remove called without having called next().");
}
TotalLock lock(this->parent);
Pointer< QueueNode<E> > node;
node.swap(this->last);
for(Pointer< QueueNode<E> > trail = this->parent->head, p = trail->next; p != NULL;
trail = p, p = p->next) {
if(p == node) {
this->parent->unlink(p, trail);
break;
}
}
}
private:
Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
// Handle the case of a dequeued Node, the new head of Queue
// will be parent->head->next() even if the Queue is empty.
if(p->isDequeued()) {
return this->parent->head->next;
}
Pointer< QueueNode<E> > s = p->next;
// Handle Nodes that have been removed from the interior of the
// Queue, these are tagged but still retain their next() value
// in order to account for multiple removes. If all nodes were
// removed from the last call then eventually we reach next() == NULL
// which is the old tail.
while(s != NULL && s->isUnlinked()) {
s = s->next;
}
return s;
}
};
class ConstLinkedIterator : public Iterator<E> {
private:
Pointer< QueueNode<E> > current;
Pointer< QueueNode<E> > last;
E currentElement;
const LinkedBlockingQueue<E>* parent;
private:
ConstLinkedIterator(const ConstLinkedIterator&);
ConstLinkedIterator& operator= (const ConstLinkedIterator&);
public:
ConstLinkedIterator(const LinkedBlockingQueue<E>* parent) : current(), last(),
currentElement(),
parent(parent) {
TotalLock lock(parent);
this->current = parent->head->next;
if(this->current != NULL) {
this->currentElement = current->get();
}
}
virtual bool hasNext() const {
return this->current != NULL;
}
virtual E next() {
TotalLock lock(this->parent);
if(this->current == NULL) {
throw decaf::util::NoSuchElementException(__FILE__, __LINE__,
"Iterator next called with no matching next element.");
}
E result = this->currentElement;
this->last = this->current;
this->current = this->nextNode(this->current);
this->currentElement = (this->current == NULL) ? E() : this->current->get();
return result;
}
virtual void remove() {
throw lang::exceptions::UnsupportedOperationException(
__FILE__, __LINE__, "Cannot write to a const ListIterator." );
}
private:
Pointer< QueueNode<E> > nextNode(Pointer< QueueNode<E> >& p) {
// Handle the case of a dequeued Node, the new head of Queue
// will be parent->head->next() even if the Queue is empty.
if(p->isDequeued()) {
return this->parent->head->next;
}
Pointer< QueueNode<E> > s = p->next;
// Handle Nodes that have been removed from the interior of the
// Queue, these are tagged but still retain their next() value
// in order to account for multiple removes. If all nodes were
// removed from the last call then eventually we reach next() == NULL
// which is the old tail.
while(s != NULL && s->isUnlinked()) {
s = s->next;
}
return s;
}
};
public:
virtual decaf::util::Iterator<E>* iterator() {
return new LinkedIterator(this);
}
virtual decaf::util::Iterator<E>* iterator() const {
return new ConstLinkedIterator(this);
}
private:
void unlink(Pointer< QueueNode<E> >& p, Pointer< QueueNode<E> >& predicessor) {
// In order to prevent Iterators from losing their ability to provide
// weakly consistent iteration the next value of p is left intact but
// the node is marked as unlinked and it value is reset to default.
p->unlink();
predicessor->next = p->next;
if(this->tail == p) {
this->tail = predicessor;
}
if(this->count.getAndDecrement() == capacity) {
this->signalNotFull();
}
}
void signalNotEmpty() {
this->takeLock.lock();
try {
this->notEmpty->signal();
} catch(decaf::lang::Exception& ex) {
this->takeLock.unlock();
throw;
}
this->takeLock.unlock();
}
void signalNotFull() {
this->putLock.lock();
try {
this->notFull->signal();
} catch(decaf::lang::Exception& ex) {
this->putLock.unlock();
throw;
}
this->putLock.unlock();
}
// Must be called with the putLock locked.
void enqueue(E value) {
Pointer< QueueNode<E> > newTail( new QueueNode<E>(value) );
this->tail->next = newTail;
this->tail = newTail;
}
// Must be called with the takeLock locked.
E dequeue() {
Pointer< QueueNode<E> > temp = this->head;
Pointer< QueueNode<E> > newHead = temp->next;
this->head = newHead;
return newHead->getAndDequeue();
}
void purgeList() {
Pointer< QueueNode<E> > current = this->head->next;
Pointer< QueueNode<E> > temp;
while(current != NULL) {
temp = current;
current = current->next;
temp->next.reset(NULL);
temp.reset(NULL);
}
}
};
}}}
#endif /* _DECAF_UTIL_CONCURRENT_LINKEDBLOCKINGQUEUE_H_ */