HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… (#5817)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 7e3c434..51a9a07 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -230,7 +230,8 @@
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
- private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
+ @InterfaceAudience.Private
+ final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
// INITIALIZED -> SUSPENDED -> RESUMED
// INITIALIZED -> RESUMED
@@ -250,6 +251,18 @@
@Override
public void resume() {
+ doResume(false);
+ }
+
+ /**
+ * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
+ * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
+ */
+ public void terminate() {
+ doResume(true);
+ }
+
+ private void doResume(boolean stopScan) {
// just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
// just return at the first if condition without loading the resp and numValidResuls field. If
// resume is called after suspend, then it is also safe to just reference resp and
@@ -274,7 +287,11 @@
localResp = this.resp;
localNumberOfCompleteRows = this.numberOfCompleteRows;
}
- completeOrNext(localResp, localNumberOfCompleteRows);
+ if (stopScan) {
+ stopScan(localResp);
+ } else {
+ completeOrNext(localResp, localNumberOfCompleteRows);
+ }
}
private void scheduleRenewLeaseTask() {
@@ -536,12 +553,7 @@
}
ScanControllerState state = scanController.destroy();
if (state == ScanControllerState.TERMINATED) {
- if (resp.getMoreResultsInRegion()) {
- // we have more results in region but user request to stop the scan, so we need to close the
- // scanner explicitly.
- closeScanner();
- }
- completeNoMoreResults();
+ stopScan(resp);
return;
}
int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
@@ -553,6 +565,15 @@
completeOrNext(resp, numberOfCompleteRows);
}
+ private void stopScan(ScanResponse resp) {
+ if (resp.getMoreResultsInRegion()) {
+ // we have more results in region but user request to stop the scan, so we need to close the
+ // scanner explicitly.
+ closeScanner();
+ }
+ completeNoMoreResults();
+ }
+
private void call() {
// As we have a call sequence for scan, it is useless to have a different rpc timeout which is
// less than the scan timeout. If the server does not respond in time(usually this will not
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 1f9d749..7773042 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -25,6 +25,7 @@
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -143,6 +144,25 @@
resumer = null;
}
+ private void terminateResumerIfPossible() {
+ if (resumer == null) {
+ return;
+ }
+ // AsyncTableResultScanner.close means we do not need scan results any more, but for
+ // ScanResumerImpl.resume, it would perform another scan on RegionServer and call
+ // AsyncTableResultScanner.onNext again when ScanResponse is received. This time
+ // AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse
+ // because AsyncTableResultScanner.closed is true. So here we would better save this
+ // unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close
+ // scanner directly.
+ if (resumer instanceof ScanResumerImpl) {
+ ((ScanResumerImpl) resumer).terminate();
+ } else {
+ resumePrefetch();
+ }
+ resumer = null;
+ }
+
@Override
public synchronized Result next() throws IOException {
while (queue.isEmpty()) {
@@ -173,9 +193,7 @@
closed = true;
queue.clear();
cacheSize = 0;
- if (resumer != null) {
- resumePrefetch();
- }
+ terminateResumerIfPossible();
notifyAll();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
index 4507664..4f52e6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java
@@ -19,7 +19,9 @@
import static org.junit.Assert.assertEquals;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -78,21 +80,41 @@
@Test
public void testCloseScannerWhileSuspending() throws Exception {
- try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
- TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+ final AtomicInteger onNextCounter = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Scan scan = new Scan().setMaxResultSize(1);
+ final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ onNextCounter.incrementAndGet();
+ super.onNext(results, controller);
+ }
- @Override
- public boolean evaluate() throws Exception {
- return ((AsyncTableResultScanner) scanner).isSuspended();
- }
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ latch.countDown();
+ }
+ };
- @Override
- public String explainFailure() throws Exception {
- return "The given scanner has been suspended in time";
- }
- });
- assertEquals(1, getScannersCount());
- }
+ CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+ TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return scanner.isSuspended();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "The given scanner has been suspended in time";
+ }
+ });
+ assertEquals(1, getScannersCount());
+ assertEquals(1, onNextCounter.get());
+
+ scanner.close();
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {
@Override
@@ -105,5 +127,7 @@
return "Still have " + getScannersCount() + " scanners opened";
}
});
+ latch.await();
+ assertEquals(1, onNextCounter.get());
}
}