blob: 3a3d5ed1fe3d67c10bed875f4472f6c5ab7e5fa2 [file] [log] [blame]
package org.apache.accumulo.testing.performance.tests;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
public class SharedBatchWriter implements BatchWriter {
private BatchWriter bw;
private ArrayBlockingQueue<Mutation> mQueue = new ArrayBlockingQueue<Mutation>(1000);
private ArrayBlockingQueue<CountDownLatch> fQueue = new ArrayBlockingQueue<CountDownLatch>(1000);
private class FlushTask implements Runnable {
@Override
public void run() {
while (true) {
try {
ArrayList<CountDownLatch> latches = new ArrayList<CountDownLatch>();
latches.add(fQueue.take());
/*
* CountDownLatch cdl = fQueue.poll(1, TimeUnit.MILLISECONDS); if (cdl != null)
* latches.add(cdl);
*/
fQueue.drainTo(latches);
ArrayList<Mutation> buffer = new ArrayList<Mutation>();
mQueue.drainTo(buffer);
bw.addMutations(buffer);
bw.flush();
for (CountDownLatch latch : latches) {
latch.countDown();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public SharedBatchWriter(BatchWriter bw) {
this.bw = bw;
Thread thread = new Thread(new FlushTask());
thread.setDaemon(true);
thread.start();
}
@Override
public void addMutation(Mutation m) throws MutationsRejectedException {
mQueue.add(m);
}
@Override
public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
throw new UnsupportedOperationException();
}
@Override
public void flush() throws MutationsRejectedException {
try {
CountDownLatch cdl = new CountDownLatch(1);
fQueue.put(cdl);
cdl.await();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws MutationsRejectedException {
flush();
}
}