HDFS-16659. JournalNode should throw NewerTxnIdException when SinceTxId is bigger than HighestWrittenTxId (#4560)

Co-authored-by: Zander Xu <zengqiang.xu@shopee.com>
Signed-off-by: Erik Krogen <xkrogen@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index faf71a7..0e3a8dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.hdfs.qjournal.server.NewerTxnIdException;
 import org.apache.hadoop.util.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -523,6 +524,9 @@
         selectRpcInputStreams(rpcStreams, fromTxnId, onlyDurableTxns);
         streams.addAll(rpcStreams);
         return;
+      } catch (NewerTxnIdException ntie) {
+        // normal situation, we requested newer IDs than any journal has. no new streams
+        return;
       } catch (IOException ioe) {
         LOG.warn("Encountered exception while tailing edits >= " + fromTxnId +
             " via RPC; falling back to streaming.", ioe);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 6b9b408..7726377 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -750,10 +750,13 @@
           "is a requirement to fetch journaled edits via RPC. Please enable " +
           "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
     }
-    if (sinceTxId > getHighestWrittenTxId()) {
-      // Requested edits that don't exist yet; short-circuit the cache here
+    long highestTxId = getHighestWrittenTxId();
+    if (sinceTxId > highestTxId) {
+      // Requested edits that don't exist yet and is newer than highestTxId.
       metrics.rpcEmptyResponses.incr();
-      return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+      throw new NewerTxnIdException(
+          "Highest txn ID available in the journal is %d, but requested txns starting at %d.",
+          highestTxId, sinceTxId);
     }
     try {
       List<ByteBuffer> buffers = new ArrayList<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java
new file mode 100644
index 0000000..ec69140
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/NewerTxnIdException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+
+/**
+ * Exception when no edits are available.
+ */
+public class NewerTxnIdException extends IOException {
+  private static final long serialVersionUID = 0L;
+
+  public NewerTxnIdException(String msgFormat, Object... msgArgs) {
+    super(String.format(msgFormat, msgArgs));
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index 5666ae5..84ce7c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -28,7 +28,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.eq;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -40,11 +40,15 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.net.MockDomainNameResolver;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.SettableFuture;
 import org.apache.hadoop.util.Lists;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -53,6 +57,7 @@
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalFaultInjector;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -1101,6 +1106,56 @@
     }
   }
 
+  /**
+   * Test selecting EditLogInputStream after some journalNode jitter.
+   * Suppose there are 3 journalNodes, JN0 ~ JN2.
+   *  1. JN0 has some abnormal cases when Active Namenode is syncing 10 Edits with first txid 11.
+   *  2. NameNode just ignore the abnormal JN0 and continue to sync Edits to Journal 1 and 2.
+   *  3. JN0 backed to health.
+   *  4. NameNode continue sync 10 Edits with first txid 21.
+   *  5. At this point, there are no Edits 11 ~ 30 in the cache of JN0.
+   *  6. Observer NameNode try to select EditLogInputStream through
+   *     getJournaledEdits with since txId 21.
+   *  7. JN2 has some abnormal cases and caused a slow response.
+   */
+  @Test
+  public void testSelectViaRPCAfterJNJitter() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(
+        1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    SettableFuture<Void> slowLog = SettableFuture.create();
+    Mockito.doReturn(slowLog).when(spies.get(0))
+        .sendEdits(eq(1L), eq(11L), eq(10), Mockito.any());
+    // Successfully write these edits to JN0 ~ JN2
+    writeTxns(stm, 1, 10);
+    // Failed write these edits to JN0, but successfully write them to JN1 ~ JN2
+    writeTxns(stm, 11, 10);
+    // Successfully write these edits to JN1 ~ JN2
+    writeTxns(stm, 21, 20);
+
+    Semaphore semaphore = new Semaphore(0);
+    spyGetJournaledEdits(0, 21, () -> semaphore.release(1));
+    spyGetJournaledEdits(1, 21, () -> semaphore.release(1));
+    spyGetJournaledEdits(2, 21, () -> semaphore.acquireUninterruptibly(2));
+
+    List<EditLogInputStream> streams = new ArrayList<>();
+    qjm.selectInputStreams(streams, 21, true, true);
+
+    assertEquals(1, streams.size());
+    assertEquals(21, streams.get(0).getFirstTxId());
+    assertEquals(40, streams.get(0).getLastTxId());
+  }
+
+  private void spyGetJournaledEdits(int jnSpyIdx, long fromTxId, Runnable preHook) {
+    Mockito.doAnswer((Answer<ListenableFuture<GetJournaledEditsResponseProto>>) invocation -> {
+      preHook.run();
+      @SuppressWarnings("unchecked")
+      ListenableFuture<GetJournaledEditsResponseProto> result =
+          (ListenableFuture<GetJournaledEditsResponseProto>) invocation.callRealMethod();
+      return result;
+    }).when(spies.get(jnSpyIdx)).getJournaledEdits(fromTxId,
+        QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
+  }
+
   @Test
   public void testSelectViaRpcAfterJNRestart() throws Exception {
     EditLogOutputStream stm =