blob: 67d2eb3d0043759ceca617b643c1156a0f8e345a [file] [log] [blame]
package org.apache.yoko.util.concurrent;
import org.apache.yoko.util.Fifa;
import org.apache.yoko.util.Fifo;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Objects.requireNonNull;
/**
* A thread-safe queue that allows concurrent modification of non-adjacent elements.
*/
class ConcurrentFifo<T> implements Fifo<T>, Fifa<T> {
/*
* This class relies on consistent lock ordering. Locks are ALWAYS obtained
* in the order of elements in the queue. The locks used are the monitors
* of the node elements, and each node's monitor guards the relationship
* between that node and its successor. By implication, it guards the other
* node's back reference as well.
*
* So, for a delete operation, two nodes must be locked: the node to be
* deleted, and the previous node, but NOT IN THAT ORDER! Moreover, after
* the previous node is locked, the relationship must be checked to ensure
* it is still current. The convention observed is that next() is only
* accessed while the lock is held, and it is cross-checked against any
* unguarded calls to prev().
*
* NOTE: this is not double-check locking (DCL) because the early access
* never obviates a synchronized block, and results are always checked
* within the guarded section. Therefore, it is not necessary for any of
* the non-final fields to be volatile.
*
* DCL: https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#dcl
*
* If inconsistency is detected, all locks are to be released and the
* operation restarted from scratch, unless it can be determined the
* operation has definitively failed due to concurrent modification.
*
* Operations on non-adjacent nodes are concurrent. Concurrent operations
* on a node or on adjacent nodes will contend for monitors, but never
* deadlock.
*/
private final Head<T> head = new Head<>();
private final Foot<T> foot = new Foot<>(head);
protected final AtomicInteger size = new AtomicInteger(0);
@Override
public int size() { return size.get(); }
/**
* Get, without removing it, the oldest remaining element from this FIFO.
* This method does not block and returns an answer consistent with the state of the FIFO at some point.
*
* @return the oldest remaining element or <code>null</code> if the FIFO was empty
*/
@Override
public T peek() {
return recursivePeek(head);
}
/**
* Find the first non-null value.
*/
private T recursivePeek(PNode<T> start) {
synchronized (start) {
NNode<T> nn = start.next();
if (nn == foot) return null;
VNode<T> node = (VNode<T>) nn;
T result = node.get();
return (result == null) ? recursivePeek(node) : result;
}
}
/**
* Add an element to the end of this FIFO.
*
* @param elem must not be <code>null</code>
* @return an object representing the place in the queue
*/
@Override
public Place<T> put(T elem) {
do {
final PNode<T> pnode = foot.prev();
// lock penultimate node
synchronized (pnode) {
// RETRY if structure changed
if (pnode.next() != foot) continue;
// create a new node
final VNode<T> node = createNode(elem);
// insert new node
synchronized (node) {
node.insertAfter(pnode);
size.incrementAndGet();
}
// return place in queue
return new Place<T>() {
@Override
public T relinquish() {
return remove(node);
}
};
}
} while (true);
}
protected VNode<T> createNode(T elem) {
return new StrongNode<>(requireNonNull(elem));
}
/**
* Remove the least recently added element.
*
* @return the removed element, or <code>null</code> if the FIFO was empty.
*/
@Override
public T remove() {
return recursiveRemove(head);
}
/**
* Find and remove the first non-null value
*/
private T recursiveRemove(PNode<T> start) {
synchronized (start) {
NNode<T> nn = start.next();
if (nn == foot) return null;
VNode<T> node = (VNode<T>) nn;
T result = node.get();
if (result == null)
return recursiveRemove(node);
synchronized (node) {
node.delete();
size.decrementAndGet();
return result;
}
}
}
/**
* Remove the specified node from the FIFO, if present.
* @return the element if it was successfully removed,
* otherwise <code>null</code>
*/
protected T remove(VNode<T> node) {
do {
// retrieve previous node
final PNode<T> pNode = node.prev();
// FAIL if node already deleted
if (pNode == null) return null;
// lock previous node
synchronized (pNode) {
// RETRY if structure has changed
if (pNode.next() != node) continue;
// Remove node from chain and report success
synchronized (node) {
node.delete();
size.decrementAndGet();
}
return node.get();
}
} while (true);
}
}