fixes #722 Replace ListenableFuture with CompletableFuture (#975)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
index 75a9fa1..6f35fbc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/AsyncConditionalWriter.java
@@ -19,15 +19,12 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.ConditionalWriter.Result;
 import org.apache.accumulo.core.data.ConditionalMutation;
@@ -36,11 +33,10 @@
 import org.apache.fluo.core.util.FluoExecutors;
 import org.apache.fluo.core.util.Limit;
 
-public class AsyncConditionalWriter
-    implements AsyncFunction<Collection<ConditionalMutation>, Iterator<Result>> {
+public class AsyncConditionalWriter {
 
   private final ConditionalWriter cw;
-  private final ListeningExecutorService les;
+  private final ExecutorService es;
   private final Limit semaphore;
 
 
@@ -50,55 +46,38 @@
         FluoConfigurationImpl.ASYNC_CW_THREADS_DEFAULT);
     int permits = env.getConfiguration().getInt(FluoConfigurationImpl.ASYNC_CW_LIMIT,
         FluoConfigurationImpl.ASYNC_CW_LIMIT_DEFAULT);
-    this.les =
-        MoreExecutors.listeningDecorator(FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
+    this.es = FluoExecutors.newFixedThreadPool(numThreads, "asyncCw");
     // the conditional writer currently has not memory limits... give it too much and it blows out
     // memory.. need to fix this in conditional writer
     // for now this needs to be memory based
     this.semaphore = new Limit(permits);
   }
 
-  private class IterTask implements Callable<Iterator<Result>> {
-
-    private Iterator<Result> input;
-    private int permitsAcquired;
-
-    public IterTask(Iterator<Result> iter, int permitsAcquired) {
-      this.input = iter;
-      this.permitsAcquired = permitsAcquired;
-    }
-
-    @Override
-    public Iterator<Result> call() throws Exception {
-      try {
-        Builder<Result> imlb = ImmutableList.builder();
-        while (input.hasNext()) {
-          Result result = input.next();
-          imlb.add(result);
-        }
-        return imlb.build().iterator();
-      } finally {
-        semaphore.release(permitsAcquired);
-      }
-    }
-
-  }
-
-  @Override
-  public ListenableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
+  public CompletableFuture<Iterator<Result>> apply(Collection<ConditionalMutation> input) {
     if (input.size() == 0) {
-      return Futures.immediateFuture(Collections.<Result>emptyList().iterator());
+      return CompletableFuture.completedFuture(Collections.<Result>emptyList().iterator());
     }
 
     semaphore.acquire(input.size());
     Iterator<Result> iter = cw.write(input.iterator());
-    return les.submit(new IterTask(iter, input.size()));
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        Builder<Result> imlb = ImmutableList.builder();
+        while (iter.hasNext()) {
+          Result result = iter.next();
+          imlb.add(result);
+        }
+        return imlb.build().iterator();
+      } finally {
+        semaphore.release(input.size());
+      }
+    }, es);
   }
 
   public void close() {
-    les.shutdownNow();
+    es.shutdownNow();
     try {
-      les.awaitTermination(5, TimeUnit.SECONDS);
+      es.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
index 2b53cbf..d87e9a7 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/SharedBatchWriter.java
@@ -21,12 +21,11 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.data.Mutation;
@@ -42,13 +41,15 @@
 
   private AtomicLong asyncBatchesAdded = new AtomicLong(0);
   private long asyncBatchesProcessed = 0;
+  // added to avoid findbugs false positive
+  private static final Supplier<Void> NULLS = () -> null;
 
   private static class MutationBatch {
 
     private Collection<Mutation> mutations;
     private CountDownLatch cdl;
     private boolean isAsync = false;
-    private ListenableFutureTask<Void> lf;
+    private CompletableFuture<Void> cf;
 
     public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
       this.mutations = mutations;
@@ -58,9 +59,9 @@
       }
     }
 
-    public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) {
+    public MutationBatch(Collection<Mutation> mutations, CompletableFuture<Void> cf) {
       this.mutations = mutations;
-      this.lf = lf;
+      this.cf = cf;
       this.cdl = null;
       this.isAsync = false;
     }
@@ -70,8 +71,8 @@
         cdl.countDown();
       }
 
-      if (lf != null) {
-        lf.run();
+      if (cf != null) {
+        cf.complete(NULLS.get());
       }
     }
   }
@@ -170,27 +171,22 @@
     }
   }
 
-  private static final Runnable DO_NOTHING = new Runnable() {
-    @Override
-    public void run() {}
-  };
-
-  ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
     if (ml.size() == 0) {
-      return Futures.immediateFuture(null);
+      return CompletableFuture.completedFuture(NULLS.get());
     }
 
-    ListenableFutureTask<Void> lf = ListenableFutureTask.create(DO_NOTHING, null);
+    CompletableFuture<Void> cf = new CompletableFuture<>();
     try {
-      MutationBatch mb = new MutationBatch(ml, lf);
+      MutationBatch mb = new MutationBatch(ml, cf);
       mutQueue.put(mb);
-      return lf;
+      return cf;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
+  CompletableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
     return writeMutationsAsyncFuture(Collections.singleton(m));
   }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 0abdafe..94e4107 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -26,15 +26,13 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.ConditionalWriter;
@@ -818,33 +816,12 @@
     return startTs;
   }
 
-  // async experiment
-
-  private abstract static class CommitCallback<V> implements FutureCallback<V> {
-
-    private CommitData cd;
-
-    CommitCallback(CommitData cd) {
-      this.cd = cd;
-    }
-
-    @Override
-    public void onSuccess(V result) {
-      try {
-        onSuccess(cd, result);
-      } catch (Exception e) {
-        cd.commitObserver.failed(e);
-      }
-    }
-
-    protected abstract void onSuccess(CommitData cd, V result) throws Exception;
-
-
-    @Override
-    public void onFailure(Throwable t) {
-      cd.commitObserver.failed(t);
-    }
-
+  /**
+   * Funcitonal interface to provide next step of asynchronous commit on successful completion of
+   * the previous one
+   */
+  private static interface OnSuccessInterface<V> {
+    public void onSuccess(V result) throws Exception;
   }
 
   private abstract static class SynchronousCommitTask implements Runnable {
@@ -895,6 +872,24 @@
     return size;
   }
 
+  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
+      OnSuccessInterface<V> onSuccessInterface) {
+    cfuture.handleAsync((result, exception) -> {
+      if (exception != null) {
+        cd.commitObserver.failed(exception);
+        return null;
+      } else {
+        try {
+          onSuccessInterface.onSuccess(result);
+          return null;
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+          return null;
+        }
+      }
+    }, env.getSharedResources().getAsyncCommitExecutor());
+  }
+
   @Override
   public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
 
@@ -972,13 +967,8 @@
     final ConditionalMutation pcm =
         prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow));
 
-    ListenableFuture<Iterator<Result>> future = cd.acw.apply(Collections.singletonList(pcm));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception {
-        postLockPrimary(cd, pcm, Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(pcm));
+    addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, Iterators.getOnlyElement(result)));
   }
 
   private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result result)
@@ -1059,13 +1049,8 @@
 
     cd.acceptedRows = new HashSet<>();
 
-    ListenableFuture<Iterator<Result>> future = cd.bacw.apply(mutations);
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> results) throws Exception {
-        postLockOther(cd, results);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
+    addCallback(cfuture, cd, results -> postLockOther(cd, results));
   }
 
   private void postLockOther(final CommitData cd, Iterator<Result> results) throws Exception {
@@ -1092,13 +1077,8 @@
     } else if (stopAfterPreCommit) {
       cd.commitObserver.committed();
     } else {
-      ListenableFuture<Stamp> future = env.getSharedResources().getOracleClient().getStampAsync();
-      Futures.addCallback(future, new CommitCallback<Stamp>(cd) {
-        @Override
-        protected void onSuccess(CommitData cd, Stamp stamp) throws Exception {
-          beginSecondCommitPhase(cd, stamp);
-        }
-      }, env.getSharedResources().getAsyncCommitExecutor());
+      CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync();
+      addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
     }
   }
 
@@ -1124,14 +1104,9 @@
       mutations.add(m);
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        rollbackPrimaryLock(cd);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
   }
 
   private void rollbackPrimaryLock(CommitData cd) throws Exception {
@@ -1143,14 +1118,10 @@
         DelLockValue.encodeRollback(startTs, true, true));
     m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd,
+        result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
   }
 
   private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception {
@@ -1213,14 +1184,9 @@
       }
     }
 
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        commmitPrimary(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
   }
 
   private void commmitPrimary(CommitData cd, final long commitTs) {
@@ -1237,15 +1203,10 @@
     ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
         isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
 
-    ListenableFuture<Iterator<Result>> future =
+    CompletableFuture<Iterator<Result>> cfuture =
         cd.acw.apply(Collections.singletonList(delLockMutation));
-    Futures.addCallback(future, new CommitCallback<Iterator<Result>>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Iterator<Result> result) throws Exception {
-        handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-            Iterators.getOnlyElement(result));
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
+        Iterators.getOnlyElement(result)));
   }
 
   private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
@@ -1321,16 +1282,9 @@
       mutations.add(m);
     }
 
-
-    ListenableFuture<Void> future =
+    CompletableFuture<Void> cfuture =
         env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    Futures.addCallback(future, new CommitCallback<Void>(cd) {
-      @Override
-      protected void onSuccess(CommitData cd, Void v) throws Exception {
-        finishCommit(cd, commitTs);
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
-
+    addCallback(cfuture, cd, result -> finishCommit(cd, commitTs));
   }
 
   @VisibleForTesting
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 012c4a8..9c11239 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,8 +28,6 @@
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@@ -70,15 +69,10 @@
 
   private Participant currentLeader;
 
-  private static final class TimeRequest implements Callable<Stamp> {
+  private static final class TimeRequest {
     CountDownLatch cdl = new CountDownLatch(1);
     AtomicReference<Stamp> stampRef = new AtomicReference<>();
-    ListenableFutureTask<Stamp> lf = null;
-
-    @Override
-    public Stamp call() throws Exception {
-      return stampRef.get();
-    }
+    CompletableFuture<Stamp> cf = null;
   }
 
   private class TimestampRetriever extends LeaderSelectorListenerAdapter
@@ -211,11 +205,12 @@
 
           for (int i = 0; i < request.size(); i++) {
             TimeRequest tr = request.get(i);
-            tr.stampRef.set(new Stamp(txStampsStart + i, gcStamp));
-            if (tr.lf == null) {
+            Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
+            tr.stampRef.set(stampRes);
+            if (tr.cf == null) {
               tr.cdl.countDown();
             } else {
-              tr.lf.run();
+              tr.cf.complete(stampRes);
             }
           }
         } catch (InterruptedException e) {
@@ -386,18 +381,18 @@
     return tr.stampRef.get();
   }
 
-  public ListenableFuture<Stamp> getStampAsync() {
+  public CompletableFuture<Stamp> getStampAsync() {
     checkClosed();
 
     TimeRequest tr = new TimeRequest();
-    ListenableFutureTask<Stamp> lf = ListenableFutureTask.create(tr);
-    tr.lf = lf;
+    CompletableFuture<Stamp> cf = new CompletableFuture<>();
+    tr.cf = cf;
     try {
       queue.put(tr);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    return lf;
+    return cf;
   }
 
   /**