blob: d87e9a71cc43b4d28c7de5d1d2b80ef781581420 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.core.impl;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.core.util.FluoThreadFactory;
// created this class because batch writer blocks adding mutations while its flushing
public class SharedBatchWriter {
private final BatchWriter bw;
private ArrayBlockingQueue<MutationBatch> mutQueue;
private MutationBatch end = new MutationBatch(new ArrayList<Mutation>(), false);
private AtomicLong asyncBatchesAdded = new AtomicLong(0);
private long asyncBatchesProcessed = 0;
// added to avoid findbugs false positive
private static final Supplier<Void> NULLS = () -> null;
private static class MutationBatch {
private Collection<Mutation> mutations;
private CountDownLatch cdl;
private boolean isAsync = false;
private CompletableFuture<Void> cf;
public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
this.mutations = mutations;
this.isAsync = isAsync;
if (!isAsync) {
this.cdl = new CountDownLatch(1);
}
}
public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
this.mutations = mutations;
this.cf = cf;
this.cdl = null;
this.isAsync = false;
}
public void countDown() {
if (cdl != null) {
cdl.countDown();
}
if (cf != null) {
cf.complete(NULLS.get());
}
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
boolean keepRunning = true;
ArrayList<MutationBatch> batches = new ArrayList<>();
while (keepRunning || batches.size() > 0) {
try {
if (batches.size() == 0) {
batches.add(mutQueue.take());
}
mutQueue.drainTo(batches);
processBatches(batches);
for (MutationBatch mutationBatch : batches) {
if (mutationBatch == end) {
keepRunning = false;
}
}
batches.clear();
} catch (Exception e) {
// TODO error handling
e.printStackTrace();
}
}
}
private void processBatches(ArrayList<MutationBatch> batches)
throws MutationsRejectedException {
for (MutationBatch mutationBatch : batches) {
if (mutationBatch != end) {
bw.addMutations(mutationBatch.mutations);
}
}
bw.flush();
int numAsync = 0;
for (MutationBatch mutationBatch : batches) {
mutationBatch.countDown();
if (mutationBatch.isAsync) {
numAsync++;
}
}
if (numAsync > 0) {
synchronized (SharedBatchWriter.this) {
asyncBatchesProcessed += numAsync;
SharedBatchWriter.this.notifyAll();
}
}
}
}
SharedBatchWriter(BatchWriter bw) {
this.bw = bw;
this.mutQueue = new ArrayBlockingQueue<>(100000);
Thread thread = new FluoThreadFactory("sharedBW").newThread(new FlushTask());
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println("Uncaught exception in shared batch writer");
e.printStackTrace();
}
});
thread.setDaemon(true);
thread.start();
}
void writeMutation(Mutation m) {
writeMutations(Collections.singletonList(m));
}
void writeMutations(Collection<Mutation> ml) {
if (ml.size() == 0) {
return;
}
try {
MutationBatch mb = new MutationBatch(ml, false);
mutQueue.put(mb);
mb.cdl.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
if (ml.size() == 0) {
return CompletableFuture.completedFuture(NULLS.get());
}
CompletableFuture<Void> cf = new CompletableFuture<>();
try {
MutationBatch mb = new MutationBatch(ml, cf);
mutQueue.put(mb);
return cf;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
return writeMutationsAsyncFuture(Collections.singleton(m));
}
void close() {
try {
mutQueue.put(end);
end.cdl.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void writeMutationsAsync(List<Mutation> ml) {
try {
MutationBatch mb = new MutationBatch(ml, true);
asyncBatchesAdded.incrementAndGet();
mutQueue.put(mb);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void writeMutationAsync(Mutation m) {
writeMutationsAsync(Collections.singletonList(m));
}
/**
* waits for all async mutations that were added before this was called to be flushed. Does not
* wait for async mutations added after call.
*/
public void waitForAsyncFlush() {
long numAdded = asyncBatchesAdded.get();
synchronized (this) {
while (numAdded > asyncBatchesProcessed) {
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}