blob: a041b823c6358a4f17a50f9e2de5346c23a001d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.collections;
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* A priority linked list implementation
* <p>
* It implements this by maintaining an individual LinkedBlockingDeque for each priority level.
*/
public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");
protected LinkedListImpl<E>[] levels;
private volatile int size;
private int lastReset;
private int highestPriority = -1;
private int lastPriority = -1;
public PriorityLinkedListImpl(final int priorities) {
this(priorities, null);
}
public PriorityLinkedListImpl(final int priorities, Comparator<E> comparator) {
levels = (LinkedListImpl<E>[]) Array.newInstance(LinkedListImpl.class, priorities);
for (int i = 0; i < priorities; i++) {
levels[i] = new LinkedListImpl<>(comparator);
}
}
private void checkHighest(final int priority) {
if (lastPriority != priority || priority > highestPriority) {
lastPriority = priority;
if (lastReset == Integer.MAX_VALUE) {
lastReset = 0;
} else {
lastReset++;
}
}
if (priority > highestPriority) {
highestPriority = priority;
}
}
@Override
public void addHead(final E e, final int priority) {
checkHighest(priority);
levels[priority].addHead(e);
exclusiveIncrementSize(1);
}
@Override
public void addTail(final E e, final int priority) {
checkHighest(priority);
levels[priority].addTail(e);
exclusiveIncrementSize(1);
}
@Override
public void addSorted(E e, int priority) {
checkHighest(priority);
levels[priority].addSorted(e);
exclusiveIncrementSize(1);
}
@Override
public void setNodeStore(NodeStore<E> supplier) {
for (LinkedList<E> list : levels) {
list.setNodeStore(supplier);
}
}
@Override
public E removeWithID(String listID, long id) {
// we start at 4 just as an optimization, since most times we only use level 4 as the level on messages
if (levels.length > 4) {
for (int l = 4; l < levels.length; l++) {
E removed = levels[l].removeWithID(listID, id);
if (removed != null) {
exclusiveIncrementSize(-1);
return removed;
}
}
}
for (int l = Math.min(3, levels.length); l >= 0; l--) {
E removed = levels[l].removeWithID(listID, id);
if (removed != null) {
exclusiveIncrementSize(-1);
return removed;
}
}
return null;
}
@Override
public E poll() {
E e = null;
// We are just using a simple prioritization algorithm:
// Highest priority refs always get returned first.
// This could cause starvation of lower priority refs.
// TODO - A better prioritization algorithm
for (int i = highestPriority; i >= 0; i--) {
LinkedListImpl<E> ll = levels[i];
if (ll.size() != 0) {
e = ll.poll();
if (e != null) {
exclusiveIncrementSize(-1);
if (ll.size() == 0) {
if (highestPriority == i) {
highestPriority--;
}
}
}
break;
}
}
return e;
}
@Override
public void clear() {
for (LinkedListImpl<E> list : levels) {
list.clear();
}
exclusiveSetSize(0);
}
private void exclusiveIncrementSize(int amount) {
SIZE_UPDATER.lazySet(this, this.size + amount);
}
private void exclusiveSetSize(int value) {
SIZE_UPDATER.lazySet(this, value);
}
@Override
public int size() {
return size;
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public LinkedListIterator<E> iterator() {
return new PriorityLinkedListIterator();
}
private class PriorityLinkedListIterator implements LinkedListIterator<E> {
private int index;
private final LinkedListIterator<E>[] cachedIters = new LinkedListIterator[levels.length];
private LinkedListIterator<E> lastIter;
private int resetCount = lastReset;
volatile boolean closed = false;
PriorityLinkedListIterator() {
index = levels.length - 1;
}
@Override
protected void finalize() {
close();
}
@Override
public void repeat() {
if (lastIter == null) {
throw new NoSuchElementException();
}
lastIter.repeat();
}
@Override
public void close() {
if (!closed) {
closed = true;
lastIter = null;
for (LinkedListIterator<E> iter : cachedIters) {
if (iter != null) {
iter.close();
}
}
}
}
private void checkReset() {
if (lastReset != resetCount) {
index = highestPriority;
resetCount = lastReset;
}
}
@Override
public boolean hasNext() {
checkReset();
while (index >= 0) {
lastIter = cachedIters[index];
if (lastIter == null) {
lastIter = cachedIters[index] = levels[index].iterator();
}
boolean b = lastIter.hasNext();
if (b) {
return true;
}
index--;
if (index < 0) {
index = levels.length - 1;
break;
}
}
return false;
}
@Override
public E next() {
if (lastIter == null) {
throw new NoSuchElementException();
}
return lastIter.next();
}
@Override
public void remove() {
if (lastIter == null) {
throw new NoSuchElementException();
}
lastIter.remove();
// This next statement would be the equivalent of:
// if (index == highestPriority && levels[index].size() == 0)
// However we have to keep checking all the previous levels
// otherwise we would cache a max that will not exist
// what would make us eventually having hasNext() returning false
// as a bug
// Part of the fix for HORNETQ-705
for (int i = index; i >= 0 && levels[index].size() == 0; i--) {
highestPriority = i;
}
exclusiveIncrementSize(-1);
}
}
}