HBASE-26093 Replication is stuck due to zero length wal file in oldWALs directory (#3504)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index ca41184..9af91e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -274,11 +275,15 @@
     // since we don't add current log to recovered source queue so it is safe to remove.
     if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
       (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
-      Path head = queue.peek();
+      Path path = queue.peek();
       try {
-        if (fs.getFileStatus(head).getLen() == 0) {
-          // head of the queue is an empty log file
-          LOG.warn("Forcing removal of 0 length log in queue: {}", head);
+        if (!fs.exists(path)) {
+          // There is a chance that wal has moved to oldWALs directory, so look there also.
+          path = AbstractFSWALProvider.findArchivedLog(path, conf);
+          // path is null if it couldn't find archive path.
+        }
+        if (path != null && fs.getFileStatus(path).getLen() == 0) {
+          LOG.warn("Forcing removal of 0 length log in queue: {}", path);
           logQueue.remove(walGroupId);
           currentPosition = 0;
           if (batch != null) {
@@ -289,7 +294,7 @@
           return true;
         }
       } catch (IOException ioe) {
-        LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
+        LOG.warn("Couldn't get file length information about log " + path, ioe);
       } catch (InterruptedException ie) {
         LOG.trace("Interrupted while adding WAL batch to ship queue");
         Thread.currentThread().interrupt();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 53cd084..f04819d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -27,13 +27,13 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
 import org.apache.hadoop.hbase.wal.WALFactory;
@@ -316,35 +316,10 @@
     return false;
   }
 
-  private Path getArchivedLog(Path path) throws IOException {
-    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
-
-    // Try found the log in old dir
-    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    Path archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    // Try found the log in the seperate old log dir
-    oldLogDir =
-        new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
-            .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
-    archivedLogLocation = new Path(oldLogDir, path.getName());
-    if (fs.exists(archivedLogLocation)) {
-      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
-      return archivedLogLocation;
-    }
-
-    LOG.error("Couldn't locate log: " + path);
-    return path;
-  }
-
   private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
     // If the log was archived, continue reading from there
-    Path archivedLog = getArchivedLog(path);
-    if (!path.equals(archivedLog)) {
+    Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
+    if (archivedLog != null) {
       openReader(archivedLog);
     } else {
       throw fnfe;
@@ -408,8 +383,8 @@
       seek();
     } catch (FileNotFoundException fnfe) {
       // If the log was archived, continue reading from there
-      Path archivedLog = getArchivedLog(currentPath);
-      if (!currentPath.equals(archivedLog)) {
+      Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+      if (archivedLog != null) {
         openReader(archivedLog);
       } else {
         throw fnfe;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 4b74e10..ffe5c42 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -501,6 +500,43 @@
   }
 
   /**
+   * Find the archived WAL file path if it is not able to locate in WALs dir.
+   * @param path - active WAL file path
+   * @param conf - configuration
+   * @return archived path if exists, null - otherwise
+   * @throws IOException exception
+   */
+  public static Path findArchivedLog(Path path, Configuration conf) throws IOException {
+    // If the path contains oldWALs keyword then exit early.
+    if (path.toString().contains(HConstants.HREGION_OLDLOGDIR_NAME)) {
+      return null;
+    }
+    Path walRootDir = CommonFSUtils.getWALRootDir(conf);
+    FileSystem fs = path.getFileSystem(conf);
+    // Try finding the log in old dir
+    Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    Path archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    }
+
+    ServerName serverName = getServerNameFromWALDirectoryName(path);
+    // Try finding the log in separate old log dir
+    oldLogDir =
+      new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+        .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+    archivedLogLocation = new Path(oldLogDir, path.getName());
+    if (fs.exists(archivedLogLocation)) {
+      LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+      return archivedLogLocation;
+    }
+
+    LOG.error("Couldn't locate log: " + path);
+    return null;
+  }
+
+  /**
    * Opens WAL reader with retries and additional exception handling
    * @param path path to WAL file
    * @param conf configuration
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index ad77c9d..b07b5b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -52,12 +52,14 @@
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -716,4 +718,48 @@
       assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
     }
   }
+
+  /**
+   * Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
+   * @throws Exception exception
+   */
+  @Test
+  public void testEOFExceptionInOldWALsDirectory() throws Exception {
+    assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
+    AbstractFSWAL  abstractWAL = (AbstractFSWAL)log;
+    Path emptyLogFile = abstractWAL.getCurrentFileName();
+    log.rollWriter(true);
+
+    // AsyncFSWAl and FSHLog both moves the log from WALs to oldWALs directory asynchronously.
+    // Wait for in flight wal close count to become 0. This makes sure that empty wal is moved to
+    // oldWALs directory.
+    Waiter.waitFor(CONF, 5000,
+      (Waiter.Predicate<Exception>) () -> abstractWAL.getInflightWALCloseCount() == 0);
+    // There will 2 logs in the queue.
+    assertEquals(2, logQueue.getQueueSize(fakeWalGroupId));
+
+    // Get the archived dir path for the first wal.
+    Path archivePath = AbstractFSWALProvider.findArchivedLog(emptyLogFile, CONF);
+    // Make sure that the wal path is not the same as archived Dir path.
+    assertNotNull(archivePath);
+    assertTrue(fs.exists(archivePath));
+    fs.truncate(archivePath, 0);
+    // make sure the size of the wal file is 0.
+    assertEquals(0, fs.getFileStatus(archivePath).getLen());
+
+    ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
+    ReplicationSource source = Mockito.mock(ReplicationSource.class);
+    when(source.isPeerEnabled()).thenReturn(true);
+    when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+
+    Configuration localConf = new Configuration(CONF);
+    localConf.setInt("replication.source.maxretriesmultiplier", 1);
+    localConf.setBoolean("replication.source.eof.autorecovery", true);
+    // Start the reader thread.
+    createReader(false, localConf);
+    // Wait for the replication queue size to be 1. This means that we have handled
+    // 0 length wal from oldWALs directory.
+    Waiter.waitFor(localConf, 10000,
+      (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
+  }
 }