blob: b2504c3766aa764968d023acd77d38e7c159d0ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.job;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
*
* An bounded blocking queue implementation that keeps a virtual queue of elements on per-producer
* basis and iterates through each producer queue in round robin fashion.
*
*/
public abstract class AbstractRoundRobinQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>{
/**
* Construct an AbstractBlockingRoundRobinQueue that limits the size of the queued elements
* to at most maxSize. Attempts to insert new elements after that point will cause the
* caller to block.
* @param maxSize
*/
public AbstractRoundRobinQueue(int maxSize) {
this(maxSize, false);
}
/**
* @param newProducerToFront If true, new producers go to the front of the round-robin list, if false, they go to the end.
*/
public AbstractRoundRobinQueue(int maxSize, boolean newProducerToFront) {
this.producerMap = new HashMap<Object,ProducerList<E>>();
this.producerLists = new LinkedList<ProducerList<E>>();
this.lock = new Object();
this.newProducerToFront = newProducerToFront;
this.maxSize = maxSize;
}
@Override
public Iterator<E> iterator() {
synchronized(lock) {
ArrayList<E> allElements = new ArrayList<E>(this.size);
ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
while(iter.hasNext()) {
ProducerList<E> tList = iter.next();
allElements.addAll(tList.list);
}
return allElements.iterator();
}
}
@Override
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
boolean taken = false;
long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
synchronized(lock) {
long waitTime = endAt - System.currentTimeMillis();
while (!(taken = offer(o)) && waitTime > 0) {
this.lock.wait(waitTime);
waitTime = endAt - System.currentTimeMillis();
}
}
return taken;
}
@Override
public boolean offer(E o) {
if (o == null)
throw new NullPointerException();
final Object producerKey = extractProducer(o);
ProducerList<E> producerList = null;
synchronized(lock) {
if (this.size == this.maxSize) {
return false;
}
producerList = this.producerMap.get(producerKey);
if (producerList == null) {
producerList = new ProducerList<E>(producerKey);
this.producerMap.put(producerKey, producerList);
this.producerLists.add(this.currentProducer, producerList);
if (!this.newProducerToFront) {
incrementCurrentProducerPointer();
}
}
producerList.list.add(o);
this.size++;
lock.notifyAll();
}
return true;
}
/**
* Implementations must extracts the producer object which is used as the key to identify a unique producer.
*/
protected abstract Object extractProducer(E o);
@Override
public void put(E o) {
offer(o);
}
@Override
public E take() throws InterruptedException {
synchronized(lock) {
while (this.size == 0) {
this.lock.wait();
}
E element = poll();
assert element != null;
return element;
}
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
synchronized(lock) {
long waitTime = endAt - System.currentTimeMillis();
while (this.size == 0 && waitTime > 0) {
this.lock.wait(waitTime);
waitTime = endAt - System.currentTimeMillis();
}
return poll();
}
}
@Override
public E poll() {
synchronized(lock) {
ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
while (iter.hasNext()) {
ProducerList<E> tList = iter.next();
if (tList.list.isEmpty()) {
iter.remove();
this.producerMap.remove(tList.producer);
adjustCurrentProducerPointer();
} else {
E element = tList.list.removeFirst();
this.size--;
assert element != null;
// This is the round robin part. When we take an element from the current thread's queue
// we move on to the next thread.
if (tList.list.isEmpty()) {
iter.remove();
this.producerMap.remove(tList.producer);
adjustCurrentProducerPointer();
} else {
incrementCurrentProducerPointer();
}
lock.notifyAll();
return element;
}
}
assert this.size == 0;
}
return null;
}
/**
* Polls using the given producer key.
*/
protected E pollProducer(Object producer) {
synchronized(lock) {
ProducerList<E> tList = this.producerMap.get(producer);
if (tList != null && !tList.list.isEmpty()) {
E element = tList.list.removeFirst();
this.size--;
if (tList.list.isEmpty()) {
this.producerLists.remove(tList);
this.producerMap.remove(tList.producer);
// we need to adjust the current thread pointer in case it pointed to this thread list, which is now removed
adjustCurrentProducerPointer();
}
lock.notifyAll();
assert element != null;
// Since this is only processing the current thread's work, we'll leave the
// round-robin part alone and just return the work
return element;
}
}
return null;
}
@Override
public E peek() {
synchronized(lock) {
ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
while (iter.hasNext()) {
ProducerList<E> tList = iter.next();
if (tList.list.isEmpty()) {
iter.remove();
this.producerMap.remove(tList.producer);
adjustCurrentProducerPointer();
} else {
E element = tList.list.getFirst();
assert element != null;
return element;
}
}
assert this.size == 0;
}
return null;
}
@Override
public int drainTo(Collection<? super E> c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
synchronized(this.lock) {
int originalSize = this.size;
int drained = drainTo(c, this.size);
assert drained == originalSize;
assert this.size == 0;
assert this.producerLists.isEmpty();
assert this.producerMap.isEmpty();
return drained;
}
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
synchronized(this.lock) {
int i = 0;
while(i < maxElements) {
E element = poll();
if (element != null) {
c.add(element);
i++;
} else {
break;
}
}
return i;
}
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public int size() {
synchronized(this.lock) {
return this.size;
}
}
private void incrementCurrentProducerPointer() {
synchronized(lock) {
if (this.producerLists.size() == 0) {
this.currentProducer = 0;
} else {
this.currentProducer = (this.currentProducer+1)%this.producerLists.size();
}
}
}
/**
* Adjusts the current pointer to a decrease in size.
*/
private void adjustCurrentProducerPointer() {
synchronized(lock) {
if (this.producerLists.size() == 0) {
this.currentProducer = 0;
} else {
this.currentProducer = (this.currentProducer)%this.producerLists.size();
}
}
}
private static class ProducerList<E> {
public ProducerList(Object producer) {
this.producer = producer;
this.list = new LinkedList<E>();
}
private final Object producer;
private final LinkedList<E> list;
}
private final Map<Object,ProducerList<E>> producerMap;
private final LinkedList<ProducerList<E>> producerLists;
private final Object lock;
private final boolean newProducerToFront;
private int currentProducer;
private int size;
private int maxSize;
}