Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)"

This reverts commit 34ba2c51cf6c201ad65d3ddaf4377f81ed354965.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index cfdf0e1..c2e96ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -29,6 +28,7 @@
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,7 +37,6 @@
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
@@ -53,12 +52,12 @@
    * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
    * @param sourceHFileArchiveDir  Path to the source cluster hfile archive directory
    */
-  public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
-    AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
-    Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
+  public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+    String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+    int timeout) throws IOException {
     Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
       replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
+    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 39e68bf..cec360a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,11 +29,18 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -50,7 +57,7 @@
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.ipc.RemoteException;
@@ -58,8 +65,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@@ -76,6 +82,8 @@
   private static final Logger LOG =
     LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
 
+  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
+
   /** Drop edits for tables that been deleted from the replication source and target */
   public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
     "hbase.replication.drop.on.deleted.table";
@@ -89,22 +97,25 @@
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
+  // Amount of time for shutdown to wait for all tasks to complete
+  private long maxTerminationWait;
   // Size limit for replication RPCs, in bytes
   private int replicationRpcLimit;
   // Metrics for this source
   private MetricsSource metrics;
   private boolean peersSelected = false;
   private String replicationClusterId = "";
+  private ThreadPoolExecutor exec;
   private int maxThreads;
   private Path baseNamespaceDir;
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
+  private Abortable abortable;
   private boolean dropOnDeletedTables;
   private boolean dropOnDeletedColumnFamilies;
   private boolean isSerial = false;
   // Initialising as 0 to guarantee at least one logging message
   private long lastSinkFetchTime = 0;
-  private volatile boolean stopping = false;
 
   @Override
   public void init(Context context) throws IOException {
@@ -113,11 +124,20 @@
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier =
       this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
+    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
+    // tasks to terminate when doStop() is called.
+    long maxTerminationWaitMultiplier = this.conf.getLong(
+      "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+    this.maxTerminationWait = maxTerminationWaitMultiplier
+      * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
+    this.abortable = ctx.getAbortable();
     // Set the size limit for replication RPCs to 95% of the max request size.
     // We could do with less slop if we have an accurate estimate of encoded size. Being
     // conservative for now.
@@ -374,31 +394,30 @@
     return entryList;
   }
 
-  private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
-    throws IOException {
-    List<CompletableFuture<Integer>> futures =
-      new ArrayList<CompletableFuture<Integer>>(batches.size());
+  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
+    List<List<Entry>> batches) throws IOException {
+    int futures = 0;
     for (int i = 0; i < batches.size(); i++) {
       List<Entry> entries = batches.get(i);
-      if (entries.isEmpty()) {
-        continue;
+      if (!entries.isEmpty()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+            replicateContext.getSize());
+        }
+        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
+        futures++;
       }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
-          replicateContext.getSize());
-      }
-      // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-      futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
     }
 
     IOException iox = null;
     long lastWriteTime = 0;
-
-    for (CompletableFuture<Integer> f : futures) {
+    for (int i = 0; i < futures; i++) {
       try {
         // wait for all futures, remove successful parts
         // (only the remaining parts will be retried)
-        int index = FutureUtils.get(f);
+        Future<Integer> f = pool.take();
+        int index = f.get();
         List<Entry> batch = batches.get(index);
         batches.set(index, Collections.emptyList()); // remove successful batch
         // Find the most recent write time in the batch
@@ -406,10 +425,12 @@
         if (writeTime > lastWriteTime) {
           lastWriteTime = writeTime;
         }
-      } catch (IOException e) {
-        iox = e;
-      } catch (RuntimeException e) {
-        iox = new IOException(e);
+      } catch (InterruptedException ie) {
+        iox = new IOException(ie);
+      } catch (ExecutionException ee) {
+        iox = ee.getCause() instanceof IOException
+          ? (IOException) ee.getCause()
+          : new IOException(ee.getCause());
       }
     }
     if (iox != null) {
@@ -424,6 +445,7 @@
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
+    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
     int sleepMultiplier = 1;
 
     if (!peersSelected && this.isRunning()) {
@@ -446,7 +468,7 @@
     }
 
     List<List<Entry>> batches = createBatches(replicateContext.getEntries());
-    while (this.isRunning() && !this.stopping) {
+    while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -455,7 +477,7 @@
       }
       try {
         // replicate the batches to sink side.
-        parallelReplicate(replicateContext, batches);
+        parallelReplicate(pool, replicateContext, batches);
         return true;
       } catch (IOException ioe) {
         if (ioe instanceof RemoteException) {
@@ -510,117 +532,82 @@
 
   @Override
   protected void doStop() {
-    // Allow currently running replication tasks to finish
-    this.stopping = true;
     disconnect(); // don't call super.doStop()
+    // Allow currently running replication tasks to finish
+    exec.shutdown();
+    try {
+      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+    }
+    // Abort if the tasks did not terminate in time
+    if (!exec.isTerminated()) {
+      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
+        + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
+        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+      abortable.abort(errMsg, new IOException(errMsg));
+    }
     notifyStopped();
   }
 
-  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
-    int timeout) {
-    int entriesHashCode = System.identityHashCode(entries);
-    if (LOG.isTraceEnabled()) {
-      long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-      LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
-        entriesHashCode, entries.size(), size, replicationClusterId);
-    }
+  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
+    throws IOException {
     SinkPeer sinkPeer = null;
-    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
     try {
+      int entriesHashCode = System.identityHashCode(entries);
+      if (LOG.isTraceEnabled()) {
+        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
+          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
+      }
       sinkPeer = getReplicationSink();
-    } catch (IOException e) {
-      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
-      resultCompletableFuture.completeExceptionally(e);
-      return resultCompletableFuture;
-    }
-    assert sinkPeer != null;
-    AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-    final SinkPeer sinkPeerToUse = sinkPeer;
-    FutureUtils.addListener(
-      ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
-        replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
-      (response, exception) -> {
-        if (exception != null) {
-          onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
-          resultCompletableFuture.completeExceptionally(exception);
-          return;
+      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+      try {
+        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+          hfileArchiveDir, timeout);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
         }
-        reportSinkSuccess(sinkPeerToUse);
-        resultCompletableFuture.complete(batchIndex);
-      });
-    return resultCompletableFuture;
-  }
-
-  private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
-    final SinkPeer sinkPeer) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
-    }
-    if (exception instanceof IOException) {
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+        }
+        throw e;
+      }
+      reportSinkSuccess(sinkPeer);
+    } catch (IOException ioe) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
+      throw ioe;
     }
+    return batchIndex;
   }
 
-  /**
-   * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
-   * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
-   * send the next batch, until we send all entries out.
-   */
-  private CompletableFuture<Integer> serialReplicateRegionEntries(
-    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
-    if (!walEntryPeekingIterator.hasNext()) {
-      return CompletableFuture.completedFuture(batchIndex);
-    }
-    int batchSize = 0;
+  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
+    throws IOException {
+    int batchSize = 0, index = 0;
     List<Entry> batch = new ArrayList<>();
-    while (walEntryPeekingIterator.hasNext()) {
-      Entry entry = walEntryPeekingIterator.peek();
+    for (Entry entry : entries) {
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        break;
+        replicateEntries(batch, index++, timeout);
+        batch.clear();
+        batchSize = 0;
       }
-      walEntryPeekingIterator.next();
       batch.add(entry);
       batchSize += entrySize;
     }
-
-    if (batchSize <= 0) {
-      return CompletableFuture.completedFuture(batchIndex);
+    if (batchSize > 0) {
+      replicateEntries(batch, index, timeout);
     }
-    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
-    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
-      if (exception != null) {
-        resultCompletableFuture.completeExceptionally(exception);
-        return;
-      }
-      if (!walEntryPeekingIterator.hasNext()) {
-        resultCompletableFuture.complete(batchIndex);
-        return;
-      }
-      FutureUtils.addListener(
-        serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
-        (currentResponse, currentException) -> {
-          if (currentException != null) {
-            resultCompletableFuture.completeExceptionally(currentException);
-            return;
-          }
-          resultCompletableFuture.complete(batchIndex);
-        });
-    });
-    return resultCompletableFuture;
+    return batchIndex;
   }
 
-  /**
-   * Replicate entries to peer cluster by async API.
-   */
-  protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
-    int timeout) {
+  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
     return isSerial
-      ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
-        timeout)
-      : replicateEntries(entries, batchIndex, timeout);
+      ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
+      : () -> replicateEntries(entries, batchIndex, timeout);
   }
 
   private String logPeerId() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index e82d698..011f0a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,7 +50,6 @@
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -268,14 +267,14 @@
         new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
-      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+      ReplicationProtobufUtil.replicateWALEntry(
         connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
     } else {
       try {
-        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+        ReplicationProtobufUtil.replicateWALEntry(
           connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (RemoteException e) {
         assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 9bc632e..53512ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -556,16 +556,15 @@
     }
 
     @Override
-    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
-      int timeout) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
       // Fail only once, we don't want to slow down the test.
       if (failedOnce) {
-        return CompletableFuture.completedFuture(ordinal);
+        return () -> ordinal;
       } else {
         failedOnce = true;
-        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
-        future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
-        return future;
+        return () -> {
+          throw new IOException("Sample Exception: Failed to replicate.");
+        };
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index c48755f..803c427 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@
     }
 
     @Override
-    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
-      int timeout) {
-      return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+      return () -> {
+        int batchIndex = replicateEntries(entries, ordinal, timeout);
         entriesCount += entries.size();
         int count = batchCount.incrementAndGet();
         LOG.info(
           "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-      });
-
+        return batchIndex;
+      };
     }
   }
 
@@ -245,23 +245,20 @@
     private final AtomicBoolean failNext = new AtomicBoolean(false);
 
     @Override
-    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
-      int timeout) {
-
-      if (failNext.compareAndSet(false, true)) {
-        return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+      return () -> {
+        if (failNext.compareAndSet(false, true)) {
+          int batchIndex = replicateEntries(entries, ordinal, timeout);
           entriesCount += entries.size();
           int count = batchCount.incrementAndGet();
           LOG.info(
             "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-        });
-      } else if (failNext.compareAndSet(true, false)) {
-        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
-        future.completeExceptionally(new ServiceException("Injected failure"));
-        return future;
-      }
-      return CompletableFuture.completedFuture(ordinal);
-
+          return batchIndex;
+        } else if (failNext.compareAndSet(true, false)) {
+          throw new ServiceException("Injected failure");
+        }
+        return ordinal;
+      };
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 5f99b88..c0eace0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -165,10 +165,11 @@
     }
 
     @Override
-    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
-      int timeout) {
-      entryQueue.addAll(entries);
-      return CompletableFuture.completedFuture(ordinal);
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+      return () -> {
+        entryQueue.addAll(entries);
+        return ordinal;
+      };
     }
 
     @Override