Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)"
This reverts commit 34ba2c51cf6c201ad65d3ddaf4377f81ed354965.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index cfdf0e1..c2e96ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -29,6 +28,7 @@
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,7 +37,6 @@
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -53,12 +52,12 @@
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/
- public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
- AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
- Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
+ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
+ int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
- return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
+ FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 39e68bf..cec360a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,11 +29,18 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -50,7 +57,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.ipc.RemoteException;
@@ -58,8 +65,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@@ -76,6 +82,8 @@
private static final Logger LOG =
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
+ private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
+
/** Drop edits for tables that been deleted from the replication source and target */
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
"hbase.replication.drop.on.deleted.table";
@@ -89,22 +97,25 @@
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
+ // Amount of time for shutdown to wait for all tasks to complete
+ private long maxTerminationWait;
// Size limit for replication RPCs, in bytes
private int replicationRpcLimit;
// Metrics for this source
private MetricsSource metrics;
private boolean peersSelected = false;
private String replicationClusterId = "";
+ private ThreadPoolExecutor exec;
private int maxThreads;
private Path baseNamespaceDir;
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
+ private Abortable abortable;
private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
- private volatile boolean stopping = false;
@Override
public void init(Context context) throws IOException {
@@ -113,11 +124,20 @@
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier =
this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
+ // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
+ // tasks to terminate when doStop() is called.
+ long maxTerminationWaitMultiplier = this.conf.getLong(
+ "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+ this.maxTerminationWait = maxTerminationWaitMultiplier
+ * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+ this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
+ this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
// conservative for now.
@@ -374,31 +394,30 @@
return entryList;
}
- private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
- throws IOException {
- List<CompletableFuture<Integer>> futures =
- new ArrayList<CompletableFuture<Integer>>(batches.size());
+ private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
+ List<List<Entry>> batches) throws IOException {
+ int futures = 0;
for (int i = 0; i < batches.size(); i++) {
List<Entry> entries = batches.get(i);
- if (entries.isEmpty()) {
- continue;
+ if (!entries.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+ replicateContext.getSize());
+ }
+ // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+ pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
+ futures++;
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
- replicateContext.getSize());
- }
- // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
- futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
}
IOException iox = null;
long lastWriteTime = 0;
-
- for (CompletableFuture<Integer> f : futures) {
+ for (int i = 0; i < futures; i++) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
- int index = FutureUtils.get(f);
+ Future<Integer> f = pool.take();
+ int index = f.get();
List<Entry> batch = batches.get(index);
batches.set(index, Collections.emptyList()); // remove successful batch
// Find the most recent write time in the batch
@@ -406,10 +425,12 @@
if (writeTime > lastWriteTime) {
lastWriteTime = writeTime;
}
- } catch (IOException e) {
- iox = e;
- } catch (RuntimeException e) {
- iox = new IOException(e);
+ } catch (InterruptedException ie) {
+ iox = new IOException(ie);
+ } catch (ExecutionException ee) {
+ iox = ee.getCause() instanceof IOException
+ ? (IOException) ee.getCause()
+ : new IOException(ee.getCause());
}
}
if (iox != null) {
@@ -424,6 +445,7 @@
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
+ CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) {
@@ -446,7 +468,7 @@
}
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
- while (this.isRunning() && !this.stopping) {
+ while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
@@ -455,7 +477,7 @@
}
try {
// replicate the batches to sink side.
- parallelReplicate(replicateContext, batches);
+ parallelReplicate(pool, replicateContext, batches);
return true;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
@@ -510,117 +532,82 @@
@Override
protected void doStop() {
- // Allow currently running replication tasks to finish
- this.stopping = true;
disconnect(); // don't call super.doStop()
+ // Allow currently running replication tasks to finish
+ exec.shutdown();
+ try {
+ exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ // Abort if the tasks did not terminate in time
+ if (!exec.isTerminated()) {
+ String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
+ + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
+ + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+ abortable.abort(errMsg, new IOException(errMsg));
+ }
notifyStopped();
}
- protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
- int timeout) {
- int entriesHashCode = System.identityHashCode(entries);
- if (LOG.isTraceEnabled()) {
- long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
- LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
- entriesHashCode, entries.size(), size, replicationClusterId);
- }
+ protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
+ throws IOException {
SinkPeer sinkPeer = null;
- final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
try {
+ int entriesHashCode = System.identityHashCode(entries);
+ if (LOG.isTraceEnabled()) {
+ long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+ LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
+ logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
+ }
sinkPeer = getReplicationSink();
- } catch (IOException e) {
- this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
- resultCompletableFuture.completeExceptionally(e);
- return resultCompletableFuture;
- }
- assert sinkPeer != null;
- AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
- final SinkPeer sinkPeerToUse = sinkPeer;
- FutureUtils.addListener(
- ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
- (response, exception) -> {
- if (exception != null) {
- onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
- resultCompletableFuture.completeExceptionally(exception);
- return;
+ AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+ try {
+ ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+ entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
+ hfileArchiveDir, timeout);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
}
- reportSinkSuccess(sinkPeerToUse);
- resultCompletableFuture.complete(batchIndex);
- });
- return resultCompletableFuture;
- }
-
- private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
- final SinkPeer sinkPeer) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
- }
- if (exception instanceof IOException) {
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+ }
+ throw e;
+ }
+ reportSinkSuccess(sinkPeer);
+ } catch (IOException ioe) {
if (sinkPeer != null) {
reportBadSink(sinkPeer);
}
+ throw ioe;
}
+ return batchIndex;
}
- /**
- * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
- * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
- * send the next batch, until we send all entries out.
- */
- private CompletableFuture<Integer> serialReplicateRegionEntries(
- PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
- if (!walEntryPeekingIterator.hasNext()) {
- return CompletableFuture.completedFuture(batchIndex);
- }
- int batchSize = 0;
+ private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
+ throws IOException {
+ int batchSize = 0, index = 0;
List<Entry> batch = new ArrayList<>();
- while (walEntryPeekingIterator.hasNext()) {
- Entry entry = walEntryPeekingIterator.peek();
+ for (Entry entry : entries) {
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
- break;
+ replicateEntries(batch, index++, timeout);
+ batch.clear();
+ batchSize = 0;
}
- walEntryPeekingIterator.next();
batch.add(entry);
batchSize += entrySize;
}
-
- if (batchSize <= 0) {
- return CompletableFuture.completedFuture(batchIndex);
+ if (batchSize > 0) {
+ replicateEntries(batch, index, timeout);
}
- final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
- FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
- if (exception != null) {
- resultCompletableFuture.completeExceptionally(exception);
- return;
- }
- if (!walEntryPeekingIterator.hasNext()) {
- resultCompletableFuture.complete(batchIndex);
- return;
- }
- FutureUtils.addListener(
- serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
- (currentResponse, currentException) -> {
- if (currentException != null) {
- resultCompletableFuture.completeExceptionally(currentException);
- return;
- }
- resultCompletableFuture.complete(batchIndex);
- });
- });
- return resultCompletableFuture;
+ return batchIndex;
}
- /**
- * Replicate entries to peer cluster by async API.
- */
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
- int timeout) {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
return isSerial
- ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
- timeout)
- : replicateEntries(entries, batchIndex, timeout);
+ ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
+ : () -> replicateEntries(entries, batchIndex, timeout);
}
private String logPeerId() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index e82d698..011f0a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,7 +50,6 @@
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -268,14 +267,14 @@
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
- FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+ ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else {
try {
- FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+ ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 9bc632e..53512ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -556,16 +556,15 @@
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
- int timeout) {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
- return CompletableFuture.completedFuture(ordinal);
+ return () -> ordinal;
} else {
failedOnce = true;
- CompletableFuture<Integer> future = new CompletableFuture<Integer>();
- future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
- return future;
+ return () -> {
+ throw new IOException("Sample Exception: Failed to replicate.");
+ };
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index c48755f..803c427 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
- int timeout) {
- return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+ return () -> {
+ int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
- });
-
+ return batchIndex;
+ };
}
}
@@ -245,23 +245,20 @@
private final AtomicBoolean failNext = new AtomicBoolean(false);
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
- int timeout) {
-
- if (failNext.compareAndSet(false, true)) {
- return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+ return () -> {
+ if (failNext.compareAndSet(false, true)) {
+ int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
- });
- } else if (failNext.compareAndSet(true, false)) {
- CompletableFuture<Integer> future = new CompletableFuture<Integer>();
- future.completeExceptionally(new ServiceException("Injected failure"));
- return future;
- }
- return CompletableFuture.completedFuture(ordinal);
-
+ return batchIndex;
+ } else if (failNext.compareAndSet(true, false)) {
+ throw new ServiceException("Injected failure");
+ }
+ return ordinal;
+ };
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 5f99b88..c0eace0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -165,10 +165,11 @@
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
- int timeout) {
- entryQueue.addAll(entries);
- return CompletableFuture.completedFuture(ordinal);
+ protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+ return () -> {
+ entryQueue.addAll(entries);
+ return ordinal;
+ };
}
@Override