HBASE-25596: Fix NPE and avoid permanent unreplicated data due to EOF (#2987)
Signed-off-by: Xu Cang <xucang@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3272cf1..95bd686 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -30,6 +30,7 @@
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,7 +66,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
@@ -260,6 +260,11 @@
}
}
+ @InterfaceAudience.Private
+ public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+ return logQueue.getQueues();
+ }
+
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 301a9e8..57c0a16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -41,7 +41,6 @@
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@@ -123,44 +122,64 @@
@Override
public void run() {
int sleepMultiplier = 1;
- while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, conf, currentPosition,
- source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
- source.getSourceMetrics(), walGroupId)) {
- while (isReaderRunning()) { // loop here to keep reusing stream while we can
- if (!source.isPeerEnabled()) {
- Threads.sleep(sleepForRetries);
- continue;
+ WALEntryBatch batch = null;
+ WALEntryStream entryStream = null;
+ try {
+ // we only loop back here if something fatal happened to our stream
+ while (isReaderRunning()) {
+ try {
+ entryStream =
+ new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
+ source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId);
+ while (isReaderRunning()) { // loop here to keep reusing stream while we can
+ if (!source.isPeerEnabled()) {
+ Threads.sleep(sleepForRetries);
+ continue;
+ }
+ if (!checkQuota()) {
+ continue;
+ }
+
+ batch = createBatch(entryStream);
+ batch = readWALEntries(entryStream, batch);
+ currentPosition = entryStream.getPosition();
+ if (batch == null) {
+ // either the queue have no WAL to read
+ // or got no new entries (didn't advance position in WAL)
+ handleEmptyWALEntryBatch();
+ entryStream.reset(); // reuse stream
+ } else {
+ addBatchToShippingQueue(batch);
+ }
}
- if (!checkQuota()) {
- continue;
- }
- WALEntryBatch batch = readWALEntries(entryStream);
- currentPosition = entryStream.getPosition();
- if (batch != null) {
- // need to propagate the batch even it has no entries since it may carry the last
- // sequence id information for serial replication.
- LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
- entryBatchQueue.put(batch);
+ } catch (IOException e) { // stream related
+ if (handleEofException(e, batch)) {
sleepMultiplier = 1;
- } else { // got no entries and didn't advance position in WAL
- handleEmptyWALEntryBatch();
- entryStream.reset(); // reuse stream
+ } else {
+ LOG.warn("Failed to read stream of replication entries", e);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier++;
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
}
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
+ } finally {
+ entryStream.close();
}
- } catch (IOException e) { // stream related
- if (!handleEofException(e)) {
- LOG.warn("Failed to read stream of replication entries", e);
- if (sleepMultiplier < maxRetriesMultiplier) {
- sleepMultiplier ++;
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
- }
- } catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads");
- Thread.currentThread().interrupt();
}
+ } catch (IOException e) {
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ LOG.debug("Failed to read stream of replication entries: " + e);
+ sleepMultiplier++;
+ } else {
+ LOG.error("Failed to read stream of replication entries", e);
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while sleeping between WAL reads");
+ Thread.currentThread().interrupt();
}
}
@@ -189,14 +208,19 @@
return newPath == null || !path.getName().equals(newPath.getName());
}
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
- throws IOException, InterruptedException {
+ // We need to get the WALEntryBatch from the caller so we can add entries in there
+ // This is required in case there is any exception in while reading entries
+ // we do want to loss the existing entries in the batch
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream,
+ WALEntryBatch batch) throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
+ // This would mean either no more files in the queue
+ // or there is no new data yet on the current wal
return null;
}
}
@@ -208,7 +232,7 @@
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
- WALEntryBatch batch = createBatch(entryStream);
+ batch.setLastWalPath(currentPath);
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
@@ -236,6 +260,7 @@
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
+ LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
@@ -245,22 +270,38 @@
}
/**
- * if we get an EOF due to a zero-length log, and there are other logs in queue
- * (highly likely we've closed the current log), and autorecovery is
- * enabled, then dump the log
+ * This is to handle the EOFException from the WAL entry stream. EOFException should
+ * be handled carefully because there are chances of data loss because of never replicating
+ * the data. Thus we should always try to ship existing batch of entries here.
+ * If there was only one log in the queue before EOF, we ship the empty batch here
+ * and since reader is still active, in the next iteration of reader we will
+ * stop the reader.
+ * If there was more than one log in the queue before EOF, we ship the existing batch
+ * and reset the wal patch and position to the log with EOF, so shipper can remove
+ * logs from replication queue
* @return true only the IOE can be handled
*/
- private boolean handleEofException(IOException e) {
+ private boolean handleEofException(IOException e, WALEntryBatch batch)
+ throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
- if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
- (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
+ if ((e instanceof EOFException || e.getCause() instanceof EOFException)
+ && (source.isRecovered() || queue.size() > 1)
+ && this.eofAutoRecovery) {
+ Path head = queue.peek();
try {
- if (fs.getFileStatus(queue.peek()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
+ if (fs.getFileStatus(head).getLen() == 0) {
+ // head of the queue is an empty log file
+ LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId);
currentPosition = 0;
+ // After we removed the WAL from the queue, we should
+ // try shipping the existing batch of entries and set the wal position
+ // and path to the wal just dequeued to correctly remove logs from the zk
+ batch.setLastWalPath(head);
+ batch.setLastWalPosition(currentPosition);
+ addBatchToShippingQueue(batch);
return true;
}
} catch (IOException ioe) {
@@ -270,6 +311,20 @@
return false;
}
+ /**
+ * Update the batch try to ship and return true if shipped
+ * @param batch Batch of entries to ship
+ * @throws InterruptedException throws interrupted exception
+ * @throws IOException throws io exception from stream
+ */
+ private void addBatchToShippingQueue(WALEntryBatch batch)
+ throws InterruptedException, IOException {
+ // need to propagate the batch even it has no entries since it may carry the last
+ // sequence id information for serial replication.
+ LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+ entryBatchQueue.put(batch);
+ }
+
public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index d0e76fb..254dc4a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -50,7 +50,7 @@
}
@Override
- protected WALEntryBatch readWALEntries(WALEntryStream entryStream)
+ protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
@@ -70,7 +70,7 @@
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition();
- WALEntryBatch batch = createBatch(entryStream);
+ batch = createBatch(entryStream);
for (;;) {
Entry entry = entryStream.peek();
boolean doFiltering = true;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..8301dff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -94,6 +94,10 @@
return lastWalPath;
}
+ public void setLastWalPath(Path lastWalPath) {
+ this.lastWalPath = lastWalPath;
+ }
+
/**
* @return the position in the last WAL that was read.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index ab70501..373f5f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -80,7 +80,7 @@
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
- * @throws IOException
+ * @throws IOException throw IO exception from stream
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
@@ -368,7 +368,9 @@
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
- if (!(ioe instanceof FileNotFoundException)) throw ioe;
+ if (!(ioe instanceof FileNotFoundException)) {
+ throw ioe;
+ }
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index eca0d67..b40ee94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -20,10 +20,10 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,8 +44,10 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -53,9 +55,9 @@
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
@@ -87,6 +89,8 @@
NB_ROWS_IN_BATCH * 10;
protected static final long SLEEP_TIME = 500;
protected static final int NB_RETRIES = 50;
+ protected static AtomicInteger replicateCount = new AtomicInteger();
+ protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
protected static final TableName tableName = TableName.valueOf("test");
protected static final byte[] famName = Bytes.toBytes("f");
@@ -281,7 +285,8 @@
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
- .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer());
+ .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
+ ReplicationEndpointTest.class.getName());
if (isSyncPeer()) {
FileSystem fs2 = UTIL2.getTestFileSystem();
// The remote wal dir is not important as we do not use it in DA state, here we only need to
@@ -378,4 +383,20 @@
UTIL2.shutdownMiniCluster();
UTIL1.shutdownMiniCluster();
}
+
+ /**
+ * Custom replication endpoint to keep track of replication status for tests.
+ */
+ public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
+ public ReplicationEndpointTest() {
+ replicateCount.set(0);
+ }
+
+ @Override public boolean replicate(ReplicateContext replicateContext) {
+ replicateCount.incrementAndGet();
+ replicatedEntries.addAll(replicateContext.getEntries());
+
+ return super.replicate(replicateContext);
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index c0f22a9..2d72618 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -6,9 +6,7 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,56 +18,99 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category({ ReplicationTests.class, LargeTests.class })
-public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
+@Category
+ ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
+ extends TestReplicationBase {
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+ NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before
public void setUp() throws IOException, InterruptedException {
cleanUp();
+ scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
+ replicateCount.set(0);
+ replicatedEntries.clear();
}
/**
* Waits until there is only one log(the current writing one) in the replication queue
- * @param numRs number of regionservers
+ *
+ * @param numRs number of region servers
*/
- private void waitForLogAdvance(int numRs) throws Exception {
- Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
+ private void waitForLogAdvance(int numRs) {
+ Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
- Replication replicationService = (Replication) UTIL1.getHBaseCluster()
- .getRegionServer(i).getReplicationSourceService();
+ Replication replicationService =
+ (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
- .getSources()) {
+ .getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
- if (!currentFile.equals(source.getCurrentPath())) {
+ // We are making sure that there is only one log queue and that is for the
+ // current WAL of region server
+ String logPrefix = source.getQueues().keySet().stream().findFirst().get();
+ if (!currentFile.equals(source.getCurrentPath())
+ || source.getQueues().keySet().size() != 1
+ || source.getQueues().get(logPrefix).size() != 1) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
+
+ private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
+ Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ String logPrefix = source.getQueues().keySet().stream().findFirst().get();
+ if (source.getQueues().get(logPrefix).size() != numQueues) {
return false;
}
}
@@ -82,13 +123,12 @@
@Test
public void testEmptyWALRecovery() throws Exception {
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
-
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
- UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
@@ -97,10 +137,197 @@
emptyWalPaths.add(emptyWalPath);
}
- // inject our empty wal into the replication queue, and then roll the original wal, which
- // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
- // determine if the file being replicated currently is still opened for write, so just inject a
- // new wal to the replication queue does not mean the previous file is closed.
+ injectEmptyWAL(numRs, emptyWalPaths);
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs);
+ verifyNumberOfLogsInQueue(1, numRs);
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ runSimplePutDeleteTest();
+ rollWalsAndWaitForDeque(numRs);
+ }
+
+ /**
+ * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
+ * when we see the empty and handle the EOF exception, we are able to existing the previous
+ * batch of entries without loosing it. This test also tests the number of batches shipped
+ *
+ * @throws Exception throws any exception
+ */
+ @Test
+ public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
+ // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ int numOfEntriesToReplicate = 20;
+
+ final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ RegionInfo regionInfo =
+ UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+ WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+
+ appendEntriesToWal(numOfEntriesToReplicate, wal);
+ wal.rollWriter();
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
+ UTIL1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ injectEmptyWAL(numRs, emptyWalPaths);
+ // There should be three WALs in queue
+ // 1. empty WAL
+ // 2. non empty WAL
+ // 3. live WAL
+ //verifyNumberOfLogsInQueue(3, numRs);
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs);
+
+ // Now we should expect numOfEntriesToReplicate entries
+ // replicated from each region server. This makes sure we didn't loose data
+ // from any previous batch when we encounter EOF exception for empty file.
+ Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
+ replicatedEntries.size());
+
+ // We expect just one batch of replication which will
+ // be from when we handle the EOF exception.
+ Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
+ verifyNumberOfLogsInQueue(1, numRs);
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ runSimplePutDeleteTest();
+ rollWalsAndWaitForDeque(numRs);
+ }
+
+ /**
+ * Test empty WAL along with non empty WALs in the same batch. This test is to make sure
+ * when we see the empty WAL and handle the EOF exception, we are able to proceed
+ * with next batch and replicate it properly without missing data.
+ *
+ * @throws Exception throws any exception
+ */
+ @Test
+ public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
+ // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ int numOfEntriesToReplicate = 20;
+
+ final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+
+ long ts = System.currentTimeMillis();
+ WAL wal = null;
+ for (int i = 0; i < numRs; i++) {
+ RegionInfo regionInfo =
+ UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+ wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ appendEntriesToWal(numOfEntriesToReplicate, wal);
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
+ UTIL1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+
+ }
+ injectEmptyWAL(numRs, emptyWalPaths);
+ // roll the WAL now
+ for (int i = 0; i < numRs; i++) {
+ wal.rollWriter();
+ }
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs);
+
+ // Now we should expect numOfEntriesToReplicate entries
+ // replicated from each region server. This makes sure we didn't loose data
+ // from any previous batch when we encounter EOF exception for empty file.
+ Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
+ replicatedEntries.size());
+
+ // We expect just one batch of replication to be shipped which will
+ // for non empty WAL
+ Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
+ verifyNumberOfLogsInQueue(1, numRs);
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ runSimplePutDeleteTest();
+ rollWalsAndWaitForDeque(numRs);
+ }
+
+ /**
+ * This test make sure we replicate all the enties from the non empty WALs which
+ * are surrounding the empty WALs
+ *
+ * @throws Exception throws exception
+ */
+ @Test
+ public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
+ // Disable the replication peer to accumulate the non empty WAL followed by empty WAL
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ int numOfEntriesToReplicate = 20;
+
+ final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+
+ long ts = System.currentTimeMillis();
+ WAL wal = null;
+ for (int i = 0; i < numRs; i++) {
+ RegionInfo regionInfo =
+ UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+ wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ appendEntriesToWal(numOfEntriesToReplicate, wal);
+ wal.rollWriter();
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
+ UTIL1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+ injectEmptyWAL(numRs, emptyWalPaths);
+
+ // roll the WAL again with some entries
+ for (int i = 0; i < numRs; i++) {
+ appendEntriesToWal(numOfEntriesToReplicate, wal);
+ wal.rollWriter();
+ }
+
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs);
+
+ // Now we should expect numOfEntriesToReplicate entries
+ // replicated from each region server. This makes sure we didn't loose data
+ // from any previous batch when we encounter EOF exception for empty file.
+ Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
+ replicatedEntries.size());
+
+ // We expect two batch of replication to be shipped which will
+ // for non empty WAL
+ Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
+ verifyNumberOfLogsInQueue(1, numRs);
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ runSimplePutDeleteTest();
+ rollWalsAndWaitForDeque(numRs);
+ }
+
+ // inject our empty wal into the replication queue, and then roll the original wal, which
+ // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
+ // determine if the file being replicated currently is still opened for write, so just inject a
+ // new wal to the replication queue does not mean the previous file is closed.
+ private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
@@ -111,13 +338,32 @@
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}
+ }
- // ReplicationSource should advance past the empty wal, or else the test will fail
+ protected WALKeyImpl getWalKeyImpl() {
+ return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
+ }
+
+ // Roll the WAL and wait for it to get deque from the log queue
+ private void rollWalsAndWaitForDeque(int numRs) throws IOException {
+ RegionInfo regionInfo =
+ UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
+ for (int i = 0; i < numRs; i++) {
+ WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter();
+ }
waitForLogAdvance(numRs);
+ }
- // we're now writing to the new wal
- // if everything works, the source should've stopped reading from the empty wal, and start
- // replicating from the new wal
- runSimplePutDeleteTest();
+ private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
+ long txId = -1;
+ for (int i = 0; i < numEntries; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b, famName, b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ txId = wal.appendData(info, getWalKeyImpl(), edit);
+ }
+ wal.sync(txId);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 9c6fafc..d31b864 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -27,9 +27,9 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
@@ -69,6 +69,7 @@
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -83,7 +84,6 @@
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
@@ -687,6 +687,7 @@
// Override the max retries multiplier to fail fast.
conf.setInt("replication.source.maxretriesmultiplier", 1);
conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setInt("replication.source.nb.batches", 10);
// Create a reader thread with source as recovered source.
ReplicationSource source = mockReplicationSource(true, conf);
when(source.isPeerEnabled()).thenReturn(true);
@@ -705,7 +706,64 @@
assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
}
+ @Test
+ public void testEOFExceptionForRecoveredQueueWithMultipleLogs() throws Exception {
+ Configuration conf = new Configuration(CONF);
+ MetricsSource metrics = mock(MetricsSource.class);
+ ReplicationSource source = mockReplicationSource(true, conf);
+ ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+ // Create a 0 length log.
+ Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
+ FSDataOutputStream fsdos = fs.create(emptyLog);
+ fsdos.close();
+ assertEquals(0, fs.getFileStatus(emptyLog).getLen());
+ localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
+
+ final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
+ appendEntries(writer1, 3);
+ localLogQueue.enqueueLog(log1, fakeWalGroupId);
+
+ ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
+ // Make it look like the source is from recovered source.
+ when(mockSourceManager.getOldSources())
+ .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
+ when(source.isPeerEnabled()).thenReturn(true);
+ when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ // Override the max retries multiplier to fail fast.
+ conf.setInt("replication.source.maxretriesmultiplier", 1);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setInt("replication.source.nb.batches", 10);
+ // Create a reader thread.
+ ReplicationSourceWALReader reader =
+ new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
+ getDummyFilter(), source, fakeWalGroupId);
+ assertEquals("Initial log queue size is not correct",
+ 2, localLogQueue.getQueueSize(fakeWalGroupId));
+ reader.run();
+
+ // remove empty log from logQueue.
+ assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+ assertEquals("Log queue should be empty", 0, localLogQueue.getQueueSize(fakeWalGroupId));
+ }
+
private PriorityBlockingQueue<Path> getQueue() {
return logQueue.getQueue(fakeWalGroupId);
}
+
+ private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
+ for (int i = 0; i < numEntries; i++) {
+ byte[] b = Bytes.toBytes(Integer.toString(i));
+ KeyValue kv = new KeyValue(b,b,b);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+ WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
+ HConstants.DEFAULT_CLUSTER_ID);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
+ writer.append(new WAL.Entry(key, edit));
+ writer.sync(false);
+ }
+ writer.close();
+ }
}