blob: 3dd9592b88239d8fa1cfe5e0dc54471b1172d6c0 [file] [log] [blame]
package org.apache.yoko.util.concurrent;
import org.apache.yoko.util.KeyedFactory;
import org.apache.yoko.util.Sequential;
import java.lang.ref.ReferenceQueue;
public class WeakConcurrentFifo<T> extends ConcurrentFifo<T> {
private final ReferenceQueue<T> refQueue = new ReferenceQueue<>();
private final KeyedFactory<? super T, Runnable> cleanupFactory;
WeakConcurrentFifo() {
this(NoOpRunnableFactory.INSTANCE);
}
WeakConcurrentFifo(KeyedFactory<? super T, Runnable> cleanupFactory) {
this.cleanupFactory = cleanupFactory;
}
@Override
public int size() {
cleanup();
return super.size();
}
@Override
public T peek() {
cleanup();
return super.peek();
}
@Override
public Place<T> put(T elem) {
cleanup();
return super.put(elem);
}
@Override
public T remove() {
cleanup();
return super.remove();
}
@Override
protected VNode<T> createNode(T elem) {
return new WeakNode<>(elem, refQueue, cleanupFactory.create(elem));
}
private void cleanup() {
do {
@SuppressWarnings("unchecked")
WeakNode<T> wn = (WeakNode<T>) refQueue.poll();
if (wn == null) return;
cleanup(wn);
} while (true);
}
private void cleanup(WeakNode<T> wn) {
RESPIN: do {
PNode<T> prev = wn.prev();
if (prev == null) return; // this node is already removed
synchronized (prev) {
if (wn.prev() != prev) continue RESPIN; // something changed!
synchronized (wn) {
wn.delete();
size.decrementAndGet();
wn.cleanup.run();
return;
}
}
} while (true);
}
}