blob: b9b19480f658bfafd2f62635438ca0f6a1069737 [file] [log] [blame]
package com.gemstone.gemfire.cache.hdfs.internal;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Tracks flushes using a queue of latches.
*
* @author bakera
*/
public class SignalledFlushObserver implements FlushObserver {
private static class FlushLatch extends CountDownLatch {
private final long seqnum;
public FlushLatch(long seqnum) {
super(1);
this.seqnum = seqnum;
}
public long getSequence() {
return seqnum;
}
}
// assume the number of outstanding flush requests is small so we don't
// need to organize by seqnum
private final List<FlushLatch> signals;
private final AtomicLong eventsReceived;
private final AtomicLong eventsDelivered;
public SignalledFlushObserver() {
signals = new ArrayList<FlushLatch>();
eventsReceived = new AtomicLong(0);
eventsDelivered = new AtomicLong(0);
}
@Override
public boolean shouldDrainImmediately() {
synchronized (signals) {
return !signals.isEmpty();
}
}
@Override
public AsyncFlushResult flush() {
final long seqnum = eventsReceived.get();
synchronized (signals) {
final FlushLatch flush;
if (seqnum <= eventsDelivered.get()) {
flush = null;
} else {
flush = new FlushLatch(seqnum);
signals.add(flush);
}
return new AsyncFlushResult() {
@Override
public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
return flush == null ? true : flush.await(timeout, unit);
}
};
}
}
/**
* Invoked when an event is received.
*/
public void push() {
eventsReceived.incrementAndGet();
}
/**
* Invoked when a batch has been dispatched.
*/
public void pop(int count) {
long highmark = eventsDelivered.addAndGet(count);
synchronized (signals) {
for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
FlushLatch flush = iter.next();
if (flush.getSequence() <= highmark) {
flush.countDown();
iter.remove();
}
}
}
}
/**
* Invoked when the queue is cleared.
*/
public void clear() {
synchronized (signals) {
for (FlushLatch flush : signals) {
flush.countDown();
}
signals.clear();
eventsReceived.set(0);
eventsDelivered.set(0);
}
}
}