HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… (#5817)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 7e3c434..51a9a07 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -230,7 +230,8 @@
   // Notice that, the public methods of this class is supposed to be called by upper layer only, and
   // package private methods can only be called within the implementation of
   // AsyncScanSingleRegionRpcRetryingCaller.
-  private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
+  @InterfaceAudience.Private
+  final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
 
     // INITIALIZED -> SUSPENDED -> RESUMED
     // INITIALIZED -> RESUMED
@@ -250,6 +251,18 @@
 
     @Override
     public void resume() {
+      doResume(false);
+    }
+
+    /**
+     * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
+     * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
+     */
+    public void terminate() {
+      doResume(true);
+    }
+
+    private void doResume(boolean stopScan) {
       // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
       // just return at the first if condition without loading the resp and numValidResuls field. If
       // resume is called after suspend, then it is also safe to just reference resp and
@@ -274,7 +287,11 @@
         localResp = this.resp;
         localNumberOfCompleteRows = this.numberOfCompleteRows;
       }
-      completeOrNext(localResp, localNumberOfCompleteRows);
+      if (stopScan) {
+        stopScan(localResp);
+      } else {
+        completeOrNext(localResp, localNumberOfCompleteRows);
+      }
     }
 
     private void scheduleRenewLeaseTask() {
@@ -536,12 +553,7 @@
     }
     ScanControllerState state = scanController.destroy();
     if (state == ScanControllerState.TERMINATED) {
-      if (resp.getMoreResultsInRegion()) {
-        // we have more results in region but user request to stop the scan, so we need to close the
-        // scanner explicitly.
-        closeScanner();
-      }
-      completeNoMoreResults();
+      stopScan(resp);
       return;
     }
     int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
@@ -553,6 +565,15 @@
     completeOrNext(resp, numberOfCompleteRows);
   }
 
+  private void stopScan(ScanResponse resp) {
+    if (resp.getMoreResultsInRegion()) {
+      // we have more results in region but user request to stop the scan, so we need to close the
+      // scanner explicitly.
+      closeScanner();
+    }
+    completeNoMoreResults();
+  }
+
   private void call() {
     // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
     // less than the scan timeout. If the server does not respond in time(usually this will not
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 1f9d749..7773042 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -25,6 +25,7 @@
 import java.util.ArrayDeque;
 import java.util.Queue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -143,6 +144,25 @@
     resumer = null;
   }
 
+  private void terminateResumerIfPossible() {
+    if (resumer == null) {
+      return;
+    }
+    // AsyncTableResultScanner.close means we do not need scan results any more, but for
+    // ScanResumerImpl.resume, it would perform another scan on RegionServer and call
+    // AsyncTableResultScanner.onNext again when ScanResponse is received. This time
+    // AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse
+    // because AsyncTableResultScanner.closed is true. So here we would better save this
+    // unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close
+    // scanner directly.
+    if (resumer instanceof ScanResumerImpl) {
+      ((ScanResumerImpl) resumer).terminate();
+    } else {
+      resumePrefetch();
+    }
+    resumer = null;
+  }
+
   @Override
   public synchronized Result next() throws IOException {
     while (queue.isEmpty()) {
@@ -173,9 +193,7 @@
     closed = true;
     queue.clear();
     cacheSize = 0;
-    if (resumer != null) {
-      resumePrefetch();
-    }
+    terminateResumerIfPossible();
     notifyAll();
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
index 4507664..4f52e6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
@@ -19,7 +19,9 @@
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -78,21 +80,41 @@
 
   @Test
   public void testCloseScannerWhileSuspending() throws Exception {
-    try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
-      TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+    final AtomicInteger onNextCounter = new AtomicInteger(0);
+    final CountDownLatch latch = new CountDownLatch(1);
+    final Scan scan = new Scan().setMaxResultSize(1);
+    final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        onNextCounter.incrementAndGet();
+        super.onNext(results, controller);
+      }
 
-        @Override
-        public boolean evaluate() throws Exception {
-          return ((AsyncTableResultScanner) scanner).isSuspended();
-        }
+      @Override
+      public void onComplete() {
+        super.onComplete();
+        latch.countDown();
+      }
+    };
 
-        @Override
-        public String explainFailure() throws Exception {
-          return "The given scanner has been suspended in time";
-        }
-      });
-      assertEquals(1, getScannersCount());
-    }
+    CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+    TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return scanner.isSuspended();
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "The given scanner has been suspended in time";
+      }
+    });
+    assertEquals(1, getScannersCount());
+    assertEquals(1, onNextCounter.get());
+
+    scanner.close();
     TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
 
       @Override
@@ -105,5 +127,7 @@
         return "Still have " + getScannersCount() + " scanners opened";
       }
     });
+    latch.await();
+    assertEquals(1, onNextCounter.get());
   }
 }