fixes #722 Replace ListenableFuture with CompletableFuture (#975)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index 75a9fa1..6f35fbc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -19,15 +19,12 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.client.ConditionalWriter.Result;
import org.apache.accumulo.core.data.ConditionalMutation;
@@ -36,11 +33,10 @@
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Limit;
-public class AsyncConditionalWriter
- implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
+public class AsyncConditionalWriter {
private final ConditionalWriter cw;
- private final ListeningExecutorService les;
+ private final ExecutorService es;
private final Limit semaphore;
@@ -50,55 +46,38 @@
FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
- this.les =
- MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
+ this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
// the conditional writer currently has not memory limits... give it too much and it blows out
// memory.. need to fix this in conditional writer
// for now this needs to be memory based
this.semaphore = new Limit(permits);
}
- private class IterTask implements Callable<Iterator<Result>> {
-
- private Iterator<Result> input;
- private int permitsAcquired;
-
- public IterTask(Iterator<Result> iter, int permitsAcquired) {
- this.input = iter;
- this.permitsAcquired = permitsAcquired;
- }
-
- @Override
- public Iterator<Result> call() throws Exception {
- try {
- Builder<Result> imlb = ImmutableList.builder();
- while (input.hasNext()) {
- Result result = input.next();
- imlb.add(result);
- }
- return imlb.build().iterator();
- } finally {
- semaphore.release(permitsAcquired);
- }
- }
-
- }
-
- @Override
- public ListenableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
+ public CompletableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
if (input.size() == 0) {
- return Futures.immediateFuture(Collections.<Result>emptyList().iterator());
+ return CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
}
semaphore.acquire(input.size());
Iterator<Result> iter = cw.write(input.iterator());
- return les.submit(new IterTask(iter, input.size()));
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Builder<Result> imlb = ImmutableList.builder();
+ while (iter.hasNext()) {
+ Result result = iter.next();
+ imlb.add(result);
+ }
+ return imlb.build().iterator();
+ } finally {
+ semaphore.release(input.size());
+ }
+ }, es);
}
public void close() {
- les.shutdownNow();
+ es.shutdownNow();
try {
- les.awaitTermination(5, TimeUnit.SECONDS);
+ es.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
index 2b53cbf..d87e9a7 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
@@ -21,12 +21,11 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
@@ -42,13 +41,15 @@
private AtomicLong asyncBatchesAdded = new AtomicLong(0);
private long asyncBatchesProcessed = 0;
+ // added to avoid findbugs false positive
+ private static final Supplier<Void> NULLS = () -> null;
private static class MutationBatch {
private Collection<Mutation> mutations;
private CountDownLatch cdl;
private boolean isAsync = false;
- private ListenableFutureTask<Void> lf;
+ private CompletableFuture<Void> cf;
public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
this.mutations = mutations;
@@ -58,9 +59,9 @@
}
}
- public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) {
+ public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
this.mutations = mutations;
- this.lf = lf;
+ this.cf = cf;
this.cdl = null;
this.isAsync = false;
}
@@ -70,8 +71,8 @@
cdl.countDown();
}
- if (lf != null) {
- lf.run();
+ if (cf != null) {
+ cf.complete(NULLS.get());
}
}
}
@@ -170,27 +171,22 @@
}
}
- private static final Runnable DO_NOTHING = new Runnable() {
- @Override
- public void run() {}
- };
-
- ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
+ CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
if (ml.size() == 0) {
- return Futures.immediateFuture(null);
+ return CompletableFuture.completedFuture(NULLS.get());
}
- ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, null);
+ CompletableFuture<Void> cf = new CompletableFuture<>();
try {
- MutationBatch mb = new MutationBatch(ml, lf);
+ MutationBatch mb = new MutationBatch(ml, cf);
mutQueue.put(mb);
- return lf;
+ return cf;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
+ CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
return writeMutationsAsyncFuture(Collections.singleton(m));
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 0abdafe..94e4107 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,15 +26,13 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
@@ -818,33 +816,12 @@
return startTs;
}
- // async experiment
-
- private abstract static class CommitCallback<V> implements FutureCallback<V> {
-
- private CommitData cd;
-
- CommitCallback(CommitData cd) {
- this.cd = cd;
- }
-
- @Override
- public void onSuccess(V result) {
- try {
- onSuccess(cd, result);
- } catch (Exception e) {
- cd.commitObserver.failed(e);
- }
- }
-
- protected abstract void onSuccess(CommitData cd, V result) throws Exception;
-
-
- @Override
- public void onFailure(Throwable t) {
- cd.commitObserver.failed(t);
- }
-
+ /**
+ * Funcitonal interface to provide next step of asynchronous commit on successful completion of
+ * the previous one
+ */
+ private static interface OnSuccessInterface<V> {
+ public void onSuccess(V result) throws Exception;
}
private abstract static class SynchronousCommitTask implements Runnable {
@@ -895,6 +872,24 @@
return size;
}
+ private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
+ OnSuccessInterface<V> onSuccessInterface) {
+ cfuture.handleAsync((result, exception) -> {
+ if (exception != null) {
+ cd.commitObserver.failed(exception);
+ return null;
+ } else {
+ try {
+ onSuccessInterface.onSuccess(result);
+ return null;
+ } catch (Exception e) {
+ cd.commitObserver.failed(e);
+ return null;
+ }
+ }
+ }, env.getSharedResources().getAsyncCommitExecutor());
+ }
+
@Override
public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
@@ -972,13 +967,8 @@
final ConditionalMutation pcm =
prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow));
- ListenableFuture<Iterator<Result>> future = cd.acw.apply(Collections.singletonList(pcm));
- Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception {
- postLockPrimary(cd, pcm, Iterators.getOnlyElement(result));
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(pcm));
+ addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, Iterators.getOnlyElement(result)));
}
private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result)
@@ -1059,13 +1049,8 @@
cd.acceptedRows = new HashSet<>();
- ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
- Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Iterator<Result> results) throws Exception {
- postLockOther(cd, results);
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
+ addCallback(cfuture, cd, results -> postLockOther(cd, results));
}
private void postLockOther(final CommitData cd, Iterator<Result> results) throws Exception {
@@ -1092,13 +1077,8 @@
} else if (stopAfterPreCommit) {
cd.commitObserver.committed();
} else {
- ListenableFuture<Stamp> future = env.getSharedResources().getOracleClient().getStampAsync();
- Futures.addCallback(future, new CommitCallback<Stamp>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Stamp stamp) throws Exception {
- beginSecondCommitPhase(cd, stamp);
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync();
+ addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
}
}
@@ -1124,14 +1104,9 @@
mutations.add(m);
}
- ListenableFuture<Void> future =
+ CompletableFuture<Void> cfuture =
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
- Futures.addCallback(future, new CommitCallback<Void>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Void v) throws Exception {
- rollbackPrimaryLock(cd);
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
}
private void rollbackPrimaryLock(CommitData cd) throws Exception {
@@ -1143,14 +1118,10 @@
DelLockValue.encodeRollback(startTs, true, true));
m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
- ListenableFuture<Void> future =
+ CompletableFuture<Void> cfuture =
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
- Futures.addCallback(future, new CommitCallback<Void>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Void v) throws Exception {
- cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ addCallback(cfuture, cd,
+ result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
}
private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
@@ -1213,14 +1184,9 @@
}
}
- ListenableFuture<Void> future =
+ CompletableFuture<Void> cfuture =
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
- Futures.addCallback(future, new CommitCallback<Void>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Void v) throws Exception {
- commmitPrimary(cd, commitTs);
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
}
private void commmitPrimary(CommitData cd, final long commitTs) {
@@ -1237,15 +1203,10 @@
ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
- ListenableFuture<Iterator<Result>> future =
+ CompletableFuture<Iterator<Result>> cfuture =
cd.acw.apply(Collections.singletonList(delLockMutation));
- Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception {
- handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
- Iterators.getOnlyElement(result));
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
+ addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
+ Iterators.getOnlyElement(result)));
}
private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
@@ -1321,16 +1282,9 @@
mutations.add(m);
}
-
- ListenableFuture<Void> future =
+ CompletableFuture<Void> cfuture =
env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
- Futures.addCallback(future, new CommitCallback<Void>(cd) {
- @Override
- protected void onSuccess(CommitData cd, Void v) throws Exception {
- finishCommit(cd, commitTs);
- }
- }, env.getSharedResources().getAsyncCommitExecutor());
-
+ addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
}
@VisibleForTesting
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 012c4a8..9c11239 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +28,6 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -70,15 +69,10 @@
private Participant currentLeader;
- private static final class TimeRequest implements Callable<Stamp> {
+ private static final class TimeRequest {
CountDownLatch cdl = new CountDownLatch(1);
AtomicReference<Stamp> stampRef = new AtomicReference<>();
- ListenableFutureTask<Stamp> lf = null;
-
- @Override
- public Stamp call() throws Exception {
- return stampRef.get();
- }
+ CompletableFuture<Stamp> cf = null;
}
private class TimestampRetriever extends LeaderSelectorListenerAdapter
@@ -211,11 +205,12 @@
for (int i = 0; i < request.size(); i++) {
TimeRequest tr = request.get(i);
- tr.stampRef.set(new Stamp(txStampsStart + i, gcStamp));
- if (tr.lf == null) {
+ Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
+ tr.stampRef.set(stampRes);
+ if (tr.cf == null) {
tr.cdl.countDown();
} else {
- tr.lf.run();
+ tr.cf.complete(stampRes);
}
}
} catch (InterruptedException e) {
@@ -386,18 +381,18 @@
return tr.stampRef.get();
}
- public ListenableFuture<Stamp> getStampAsync() {
+ public CompletableFuture<Stamp> getStampAsync() {
checkClosed();
TimeRequest tr = new TimeRequest();
- ListenableFutureTask<Stamp> lf = ListenableFutureTask.create(tr);
- tr.lf = lf;
+ CompletableFuture<Stamp> cf = new CompletableFuture<>();
+ tr.cf = cf;
try {
queue.put(tr);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- return lf;
+ return cf;
}
/**