PHOENIX-7707 Phoenix server paging on valid rows (#2294)


diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
new file mode 100644
index 0000000..49d1068
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ScannerContext has all methods package visible. To properly update the context progress for our
+ * scanners we need this helper
+ */
+public class PhoenixScannerContext extends ScannerContext {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixScannerContext.class);
+
+  // tracks the start time of the rpc on the server for server paging
+  private final long startTime;
+
+  /**
+   * The scanner remains open on the server during the course of multiple scan rpc requests. We need
+   * a way to determine during the next() call if it is a new scan rpc request on the same scanner.
+   * This is needed so that we can reset the start time for server paging. Every scan rpc request
+   * creates a new ScannerContext which has the lastPeekedCell set to null in the beginning.
+   * Subsequent next() calls will set this field in the ScannerContext.
+   */
+  public static boolean isNewScanRpcRequest(ScannerContext scannerContext) {
+    return scannerContext != null && scannerContext.getLastPeekedCell() == null;
+  }
+
+  public PhoenixScannerContext(ScannerContext hbaseContext) {
+    // set limits to null to create no limit context
+    super(Objects.requireNonNull(hbaseContext).keepProgress, null,
+      Objects.requireNonNull(hbaseContext).isTrackingMetrics());
+    startTime = EnvironmentEdgeManager.currentTimeMillis();
+  }
+
+  public PhoenixScannerContext(boolean trackMetrics) {
+    super(false, null, trackMetrics);
+    startTime = EnvironmentEdgeManager.currentTimeMillis();
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void incrementSizeProgress(List<Cell> cells) {
+    for (Cell cell : cells) {
+      super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize());
+    }
+  }
+
+  /**
+   * returnImmediately is a private field in ScannerContext and there is no getter API on it But the
+   * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set
+   */
+  public boolean isReturnImmediately() {
+    return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+  }
+
+  /**
+   * Update the scanner context created by RSRpcServices so that it can act accordingly
+   * @param dst    hbase scanner context created on every new scan rpc request
+   * @param result list of cells to be returned to the client as scan rpc response
+   */
+  public void updateHBaseScannerContext(ScannerContext dst, List<Cell> result) {
+    if (dst == null) {
+      return;
+    }
+    // update last peeked cell
+    dst.setLastPeekedCell(getLastPeekedCell());
+    // update return immediately
+    if (isDummy(result) || isReturnImmediately()) {
+      // when a dummy row is returned by a lower layer, set returnImmediately
+      // on the ScannerContext to force HBase to return a response to the client
+      dst.returnImmediately();
+    }
+    // update metrics
+    if (isTrackingMetrics() && dst.isTrackingMetrics()) {
+      // getMetricsMap call resets the metrics internally
+      for (Map.Entry<String, Long> entry : getMetrics().getMetricsMap().entrySet()) {
+        dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+      }
+    }
+    // update progress
+    dst.setProgress(getBatchProgress(), getDataSizeProgress(), getHeapSizeProgress());
+  }
+
+  public static boolean isTimedOut(ScannerContext context, long pageSizeMs) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return false;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+    return EnvironmentEdgeManager.currentTimeMillis() - phoenixScannerContext.startTime
+        > pageSizeMs;
+  }
+
+  /**
+   * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching
+   * the time limit. Use this to make RSRpcService.scan return immediately.
+   */
+  public static void setReturnImmediately(ScannerContext context) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+    phoenixScannerContext.returnImmediately();
+  }
+
+  public static boolean isReturnImmediately(ScannerContext context) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return false;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+    return phoenixScannerContext.isReturnImmediately();
+  }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
deleted file mode 100644
index 23bf60c..0000000
--- a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-
-/**
- * ScannerContext has all methods package visible. To properly update the context progress for our
- * scanners we need this helper
- */
-public class ScannerContextUtil {
-  public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
-    for (Cell cell : cells) {
-      sc.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize());
-    }
-  }
-
-  public static void updateMetrics(ScannerContext src, ScannerContext dst) {
-    if (src != null && dst != null && src.isTrackingMetrics() && dst.isTrackingMetrics()) {
-      for (Map.Entry<String, Long> entry : src.getMetrics().getMetricsMap().entrySet()) {
-        dst.metrics.addToCounter(entry.getKey(), entry.getValue());
-      }
-    }
-  }
-
-  public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
-    return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics());
-  }
-
-  public static void updateTimeProgress(ScannerContext sc) {
-    sc.updateTimeProgress();
-  }
-
-  /**
-   * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching
-   * the time limit. Use this to make RSRpcService.scan return immediately.
-   */
-  public static void setReturnImmediately(ScannerContext sc) {
-    sc.returnImmediately();
-  }
-
-  /**
-   * returnImmediately is a private field in ScannerContext and there is no getter API on it But the
-   * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set
-   */
-  public static boolean checkTimeLimit(ScannerContext sc) {
-    return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
-  }
-}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa8689f..31bddd8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -36,12 +36,12 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -182,6 +182,8 @@
     private final Scan scan;
     private final ObserverContext<RegionCoprocessorEnvironment> c;
     private boolean wasOverriden;
+    // tracks the current phoenix scanner context corresponding to the hbase scanner context
+    private PhoenixScannerContext phoenixScannerContext;
 
     public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
       final RegionScanner scanner) {
@@ -239,10 +241,7 @@
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
-      overrideDelegate();
-      boolean res = super.next(result, scannerContext);
-      ScannerContextUtil.incrementSizeProgress(scannerContext, result);
-      return res;
+      return nextInternal(result, scannerContext, false);
     }
 
     @Override
@@ -253,9 +252,30 @@
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+      return nextInternal(result, scannerContext, true);
+    }
+
+    private boolean nextInternal(List<Cell> result, ScannerContext scannerContext, boolean isRaw)
+      throws IOException {
       overrideDelegate();
-      boolean res = super.nextRaw(result, scannerContext);
-      ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+      if (scannerContext instanceof PhoenixScannerContext) {
+        // This is an optimization to avoid creating multiple phoenix scanner context objects for
+        // the same scan rpc request when multiple RegionScannerHolder objects are stacked which
+        // happens if multiple coprocs (not scanners) are processing the scan like
+        // UngroupedAggregateRegionObserver and GlobalIndexChecker
+        phoenixScannerContext = (PhoenixScannerContext) scannerContext;
+      } else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) {
+        // An open scanner can process multiple scan rpcs during its lifetime.
+        // We need to create a new phoenix scanner context for every new scan rpc request.
+        phoenixScannerContext = new PhoenixScannerContext(scannerContext);
+      }
+      boolean res = isRaw
+        ? super.nextRaw(result, phoenixScannerContext)
+        : super.next(result, phoenixScannerContext);
+      if (!(scannerContext instanceof PhoenixScannerContext)) {
+        // only update the top level hbase scanner context
+        phoenixScannerContext.updateHBaseScannerContext(scannerContext, result);
+      }
       return res;
     }
 
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index ad64e3a..4563f10 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import static org.apache.phoenix.util.ScanUtil.isDummy;
-
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
@@ -27,7 +25,6 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 
 public class DelegateRegionScanner implements RegionScanner {
 
@@ -103,17 +100,7 @@
   private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
     throws IOException {
     if (scannerContext != null) {
-      ScannerContext noLimitContext = ScannerContextUtil.copyNoLimitScanner(scannerContext);
-      boolean hasMore =
-        raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, noLimitContext);
-      if (isDummy(result) || ScannerContextUtil.checkTimeLimit(noLimitContext)) {
-        // when a dummy row is returned by a lower layer or if the result is valid but the lower
-        // layer signals us to return immediately, we need to set returnImmediately
-        // on the ScannerContext to force HBase to return a response to the client
-        ScannerContextUtil.setReturnImmediately(scannerContext);
-      }
-      ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
-      return hasMore;
+      return raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext);
     }
     return raw ? delegate.nextRaw(result) : delegate.next(result);
   }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 39d9527..b7163c0 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -50,6 +50,7 @@
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -81,7 +82,6 @@
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
@@ -605,8 +605,6 @@
     private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext scannerContext)
       throws IOException {
       boolean hasMore;
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      long now;
       Tuple result =
         useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
       boolean acquiredLock = false;
@@ -643,8 +641,11 @@
               // Aggregate values here
               aggregators.aggregate(rowAggregators, result);
             }
-            now = EnvironmentEdgeManager.currentTimeMillis();
-            if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) {
+            if (
+              hasMore && groupByCache.size() < limit
+                && (PhoenixScannerContext.isReturnImmediately(scannerContext)
+                  || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs))
+            ) {
               return getDummyResult(resultsToReturn);
             }
           } while (hasMore && groupByCache.size() < limit);
@@ -784,8 +785,7 @@
       boolean hasMore;
       boolean atLimit;
       boolean aggBoundary = false;
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      long now;
+      boolean pageTimeout = false;
       Tuple result =
         useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
       ImmutableBytesPtr key = null;
@@ -835,8 +835,14 @@
             atLimit = rowCount + countOffset >= limit;
             // Do rowCount + 1 b/c we don't have to wait for a complete
             // row in the case of a DISTINCT with a LIMIT
-            now = EnvironmentEdgeManager.currentTimeMillis();
-          } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs);
+            if (
+              PhoenixScannerContext.isReturnImmediately(scannerContext)
+                || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+            ) {
+              pageTimeout = true;
+              break;
+            }
+          } while (hasMore && !aggBoundary && !atLimit && !pageTimeout);
         }
       } catch (Exception e) {
         LOGGER.error("Ordered group-by scanner next encountered error for region {}",
@@ -850,7 +856,7 @@
         if (acquiredLock) region.closeRegionOperation();
       }
       try {
-        if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) {
+        if (hasMore && !aggBoundary && !atLimit && pageTimeout) {
           updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan);
           return true;
         }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index e56d902..ab62a7d 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -59,7 +60,6 @@
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.TupleUtil;
 
 public class HashJoinRegionScanner implements RegionScanner {
@@ -306,7 +306,6 @@
   private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
     throws IOException {
     try {
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
       while (shouldAdvance()) {
         if (scannerContext != null) {
           hasMore =
@@ -322,7 +321,10 @@
         }
         Cell cell = result.get(0);
         processResults(result, false);
-        if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+        if (
+          PhoenixScannerContext.isReturnImmediately(scannerContext)
+            || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+        ) {
           byte[] rowKey = CellUtil.cloneRow(cell);
           result.clear();
           getDummyResult(rowKey, result);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index e3f1d6a..7cde2a3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -34,7 +35,6 @@
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,17 +60,16 @@
   private PagingFilter pagingFilter;
   private MultiKeyPointLookup multiKeyPointLookup = null;
   private boolean initialized = false;
+  private long pageSizeMs;
 
   private class MultiKeyPointLookup {
     private SkipScanFilter skipScanFilter;
     private List<KeyRange> pointLookupRanges = null;
     private int lookupPosition = 0;
     private byte[] lookupKeyPrefix = null;
-    private long pageSizeMs;
 
     private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws IOException {
       this.skipScanFilter = skipScanFilter;
-      pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
       pointLookupRanges = skipScanFilter.getPointLookupKeyRanges();
       lookupPosition = findLookupPosition(scan.getStartRow());
       if (skipScanFilter.getOffset() > 0) {
@@ -133,7 +132,6 @@
     private boolean next(List<Cell> results, boolean raw, RegionScanner scanner,
       ScannerContext scannerContext) throws IOException {
       try {
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (true) {
           boolean hasMore;
           if (scannerContext != null) {
@@ -152,15 +150,19 @@
               "Each scan is supposed to return only one row, scan " + scan + ", region " + region);
           }
           if (!results.isEmpty()) {
+            if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+              // we got a valid result but scanner timed out so return immediately
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
+            }
             return hasMore();
           }
           // The scanner returned an empty result. This means that one of the rows
-          // has been deleted.
+          // has been deleted or the row key is not present in the table.
           if (!hasMore()) {
             return false;
           }
 
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) {
+          if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
             byte[] rowKey = pointLookupRanges.get(lookupPosition - 1).getLowerRange();
             ScanUtil.getDummyResult(rowKey, results);
             return true;
@@ -187,6 +189,7 @@
     this.region = region;
     this.scan = scan;
     pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
+    pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
   }
 
   @VisibleForTesting
@@ -258,10 +261,11 @@
       }
     }
 
+    boolean hasMore;
     if (multiKeyPointLookup != null) {
       return multiKeyPointLookup.next(results, raw, delegate, scannerContext);
     }
-    boolean hasMore;
+
     if (scannerContext != null) {
       hasMore =
         raw ? delegate.nextRaw(results, scannerContext) : delegate.next(results, scannerContext);
@@ -277,16 +281,23 @@
       if (pagingFilter.isStopped()) {
         if (results.isEmpty()) {
           byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded();
-          LOGGER.info("Page filter stopped, generating dummy key {} ",
-            Bytes.toStringBinary(rowKey));
+          LOGGER.info("{} Paging filter stopped, generating dummy key {} ",
+            getRegionInfo().getRegionNameAsString(), Bytes.toStringBinary(rowKey));
           ScanUtil.getDummyResult(rowKey, results);
+        } else {
+          // we got a valid result but page filter stopped set return immediately
+          PhoenixScannerContext.setReturnImmediately(scannerContext);
         }
         return true;
       }
       return false;
     } else {
       // We got a row from the HBase scanner within the configured time (i.e.,
-      // the page size). We need to start a new page on the next next() call.
+      // the page size).
+      if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+        // we got a valid result but scanner timed out so return immediately
+        PhoenixScannerContext.setReturnImmediately(scannerContext);
+      }
       return true;
     }
   }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index aa412d6..2a6e3c3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.query.QueryServices;
@@ -181,7 +182,8 @@
     return false;
   }
 
-  private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore) throws IOException {
+  private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore,
+    ScannerContext scannerContext) throws IOException {
     boolean expired = isExpired(result);
     if (!expired) {
       return hasMore;
@@ -190,23 +192,28 @@
     if (!hasMore) {
       return false;
     }
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
     do {
-      hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+      hasMore =
+        raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext);
       if (result.isEmpty() || ScanUtil.isDummy(result)) {
-        return hasMore;
+        break;
       }
+      // non dummy result check if it is expired
       if (!isExpired(result)) {
-        return hasMore;
+        break;
       }
+      // result is expired
       Cell cell = result.get(0);
       result.clear();
-      if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) {
+      if (
+        PhoenixScannerContext.isReturnImmediately(scannerContext)
+          || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+      ) {
         ScanUtil.getDummyResult(CellUtil.cloneRow(cell), result);
-        return hasMore;
+        break;
       }
     } while (hasMore);
-    return false;
+    return hasMore;
   }
 
   private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
@@ -236,7 +243,7 @@
     if (result.isEmpty() || ScanUtil.isDummy(result)) {
       return hasMore;
     }
-    hasMore = skipExpired(result, raw, hasMore);
+    hasMore = skipExpired(result, raw, hasMore, scannerContext);
     if (result.isEmpty() || ScanUtil.isDummy(result)) {
       return hasMore;
     }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 25062a6..8d98507 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -43,6 +43,7 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -205,9 +206,8 @@
     }
   }
 
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    final byte[] actualStartKey, final int offset, ScannerContext scannerContext)
-    throws IOException {
+  protected boolean scanIndexTableRows(List<Cell> result, final byte[] actualStartKey,
+    final int offset, ScannerContext scannerContext) throws IOException {
     boolean hasMore = false;
     if (actualStartKey != null) {
       do {
@@ -226,7 +226,10 @@
             firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0
         ) {
           result.clear();
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+          if (
+            PhoenixScannerContext.isReturnImmediately(scannerContext)
+              || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+          ) {
             byte[] rowKey = CellUtil.cloneRow(firstCell);
             ScanUtil.getDummyResult(rowKey, result);
             return true;
@@ -266,7 +269,10 @@
             viewConstants));
         indexRows.add(row);
         indexRowCount++;
-        if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+        if (
+          hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+            || PhoenixScannerContext.isReturnImmediately(scannerContext))
+        ) {
           getDummyResult(lastIndexRowKey, result);
           // We do not need to change the state, State.SCANNING_INDEX
           // since we will continue scanning the index table after
@@ -279,9 +285,9 @@
     return hasMore;
   }
 
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    ScannerContext scannerContext) throws IOException {
-    return scanIndexTableRows(result, startTime, null, 0, scannerContext);
+  protected boolean scanIndexTableRows(List<Cell> result, ScannerContext scannerContext)
+    throws IOException {
+    return scanIndexTableRows(result, null, 0, scannerContext);
   }
 
   private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey,
@@ -374,7 +380,9 @@
    */
   @Override
   public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    long startTime = (scannerContext != null)
+      ? ((PhoenixScannerContext) scannerContext).getStartTime()
+      : EnvironmentEdgeManager.currentTimeMillis();
     boolean hasMore;
     region.startRegionOperation();
     try {
@@ -390,7 +398,7 @@
           state = State.SCANNING_INDEX;
         }
         if (state == State.SCANNING_INDEX) {
-          hasMore = scanIndexTableRows(result, startTime, scannerContext);
+          hasMore = scanIndexTableRows(result, scannerContext);
           if (isDummy(result)) {
             updateDummyWithPrevRowKey(result, initStartRowKey, includeInitStartRowKey, scan);
             return hasMore;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 34d9fb6..6b5d124 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -105,9 +105,9 @@
   }
 
   @Override
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    ScannerContext scannerContext) throws IOException {
-    return scanIndexTableRows(result, startTime, actualStartKey, offset, scannerContext);
+  protected boolean scanIndexTableRows(List<Cell> result, ScannerContext scannerContext)
+    throws IOException {
+    return scanIndexTableRows(result, actualStartKey, offset, scannerContext);
   }
 
   @Override
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 821bbd1..2574fd6 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -62,7 +62,6 @@
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -743,13 +742,6 @@
 
   private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
     final Region region, final Scan scan, Configuration config) throws IOException {
-    ScannerContext groupScannerContext;
-    if (scan.isScanMetricsEnabled()) {
-      groupScannerContext =
-        ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
-    } else {
-      groupScannerContext = null;
-    }
     StatsCollectionCallable callable =
       new StatsCollectionCallable(stats, region, innerScanner, config, scan);
     byte[] asyncBytes =
@@ -803,9 +795,6 @@
 
       @Override
       public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
-        if (groupScannerContext != null && scannerContext != null) {
-          ScannerContextUtil.updateMetrics(groupScannerContext, scannerContext);
-        }
         return next(results);
       }
 
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index fd8951f..6673aab 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -60,10 +60,10 @@
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -108,7 +108,6 @@
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -583,8 +582,6 @@
   public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext)
     throws IOException {
     boolean hasMore;
-    boolean returnImmediately = false;
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
     Configuration conf = env.getConfiguration();
     final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
     try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
@@ -626,10 +623,10 @@
                 resultsToReturn.addAll(results);
                 return true;
               }
-              // we got a dummy result from the lower scanner but hasAny is true which means that
-              // we have a valid result which can be returned to the client instead of a dummy.
-              // We need to signal the RPC handler to return.
-              returnImmediately = true;
+              // we got a page timeout from the lower scanner but hasAny is true which means that
+              // we have a valid result which we can return to the client instead of a dummy but we
+              // still need to finish the rpc and release the handler
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
               break;
             }
             if (!results.isEmpty()) {
@@ -679,13 +676,16 @@
               aggregators.aggregate(rowAggregators, result);
               hasAny = true;
             }
-          } while (
-            hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs
-          );
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
-            // we hit a page scanner timeout, signal the RPC handler to return.
-            returnImmediately = true;
-          }
+            if (
+              PhoenixScannerContext.isReturnImmediately(scannerContext)
+                || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+            ) {
+              // we could have a valid result which we can return to the client instead of a dummy,
+              // but we still need to finish the rpc and release the handler
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
+              break;
+            }
+          } while (hasMore);
           if (!mutations.isEmpty()) {
             annotateAndCommit(mutations);
           }
@@ -727,10 +727,6 @@
         }
         resultsToReturn.add(keyValue);
       }
-      if (returnImmediately && scannerContext != null) {
-        // signal the RPC handler to return
-        ScannerContextUtil.setReturnImmediately(scannerContext);
-      }
       return hasMore;
     }
   }
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 8e82f17..97f23c4 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -51,6 +51,7 @@
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -256,7 +257,6 @@
           init();
           initialized = true;
         }
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         do {
           if (raw) {
             hasMore = (scannerContext == null)
@@ -277,7 +277,10 @@
           if (verifyRowAndRepairIfNecessary(result)) {
             break;
           }
-          if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+          if (
+            hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+              || PhoenixScannerContext.isReturnImmediately(scannerContext))
+          ) {
             byte[] rowKey = CellUtil.cloneRow(cell);
             result.clear();
             getDummyResult(rowKey, result);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 33de7aa..b398188 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -37,10 +37,10 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
@@ -177,7 +177,11 @@
         ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
       RegionScannerResultIterator iterator = new RegionScannerResultIterator(scan, innerScanner,
         getMinMaxQualifiersFromScan(scan), encodingScheme);
-      ScannerContext sc = iterator.getRegionScannerContext();
+      // we need to create our own scanner context because we are still opening the scanner and
+      // and don't have a rpc scanner context which is created in the next() call. This scanner
+      // context is used when we are skipping the rows until we hit the offset
+      PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled());
+      iterator.setRegionScannerContext(sc);
       innerScanner = getOffsetScanner(innerScanner,
         new OffsetResultIterator(iterator, scanOffset, getPageSizeMsForRegionScanner(scan),
           isIncompatibleClient),
@@ -237,10 +241,15 @@
         EncodedColumnsUtil.getQualifierEncodingScheme(scan);
       RegionScannerResultIterator inner = new RegionScannerResultIterator(scan, s,
         EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+      // we need to create our own scanner context because we are still opening the scanner and
+      // and don't have a rpc scanner context which is created in the next() call. This scanner
+      // context is used when we are iterating over the top n rows before the first next() call
+      PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled());
+      inner.setRegionScannerContext(sc);
       OrderedResultIterator iterator = new OrderedResultIterator(inner, orderByExpressions,
         spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize,
         getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo());
-      return new OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(), iterator);
+      return new OrderedResultIteratorWithScannerContext(sc, iterator);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -253,15 +262,15 @@
   }
 
   private static class OrderedResultIteratorWithScannerContext {
-    private ScannerContext scannerContext;
+    private PhoenixScannerContext scannerContext;
     private OrderedResultIterator iterator;
 
-    OrderedResultIteratorWithScannerContext(ScannerContext sc, OrderedResultIterator ori) {
+    OrderedResultIteratorWithScannerContext(PhoenixScannerContext sc, OrderedResultIterator ori) {
       this.scannerContext = sc;
       this.iterator = ori;
     }
 
-    public ScannerContext getScannerContext() {
+    public PhoenixScannerContext getScannerContext() {
       return scannerContext;
     }
 
@@ -310,7 +319,7 @@
 
   private RegionScanner getOffsetScanner(final RegionScanner s, final OffsetResultIterator iterator,
     final boolean isLastScan, final boolean incompatibleClient, final Scan scan,
-    final ScannerContext sc) throws IOException {
+    final PhoenixScannerContext sc) throws IOException {
     final Tuple firstTuple;
     final Region region = getRegion();
     region.startRegionOperation();
@@ -383,7 +392,9 @@
     return new BaseRegionScanner(s) {
       private Tuple tuple = firstTuple;
       private byte[] previousResultRowKey;
-      private ScannerContext regionScannerContext = sc;
+      // scanner context used when we are opening the scanner and skipping up to offset rows
+      // We copy this context to the hbase rpc context on the first next call
+      private PhoenixScannerContext regionScannerContext = sc;
 
       @Override
       public boolean isFilterDone() {
@@ -401,6 +412,18 @@
           if (isFilterDone()) {
             return false;
           }
+          if (regionScannerContext != null) {
+            regionScannerContext.updateHBaseScannerContext(scannerContext, results);
+            // we no longer need this context
+            regionScannerContext = null;
+            if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+              return true;
+            }
+          }
+          RegionScannerResultIterator delegate =
+            (RegionScannerResultIterator) (iterator.getDelegate());
+          // just use the scanner context passed to us from now on
+          delegate.setRegionScannerContext(scannerContext);
           Tuple nextTuple = iterator.next();
           if (tuple.size() > 0 && !isDummy(tuple)) {
             for (int i = 0; i < tuple.size(); i++) {
@@ -425,10 +448,6 @@
             }
           }
           tuple = nextTuple;
-          if (regionScannerContext != null) {
-            ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext);
-            regionScannerContext = null;
-          }
           return !isFilterDone();
         } catch (Throwable t) {
           LOGGER.error("Error while iterating Offset scanner.", t);
@@ -488,7 +507,7 @@
    * region) since after this everything is held in memory
    */
   private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
-    final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, ScannerContext sc)
+    final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, PhoenixScannerContext sc)
     throws Throwable {
 
     final Tuple firstTuple;
@@ -512,7 +531,9 @@
     }
     return new BaseRegionScanner(s) {
       private Tuple tuple = firstTuple;
-      private ScannerContext regionScannerContext = sc;
+      // scanner context used when we are opening the scanner and reading the topN rows
+      // We copy this context to the hbase rpc context on the first next call
+      private PhoenixScannerContext regionScannerContext = sc;
 
       @Override
       public boolean isFilterDone() {
@@ -530,6 +551,14 @@
           if (isFilterDone()) {
             return false;
           }
+          if (regionScannerContext != null) {
+            regionScannerContext.updateHBaseScannerContext(scannerContext, results);
+            // we no longer need this context
+            regionScannerContext = null;
+            if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+              return true;
+            }
+          }
           if (isDummy(tuple)) {
             ScanUtil.getDummyResult(CellUtil.cloneRow(tuple.getValue(0)), results);
           } else {
@@ -537,11 +566,11 @@
               results.add(tuple.getValue(i));
             }
           }
+          RegionScannerResultIterator delegate =
+            (RegionScannerResultIterator) (iterator.getDelegate());
+          // just use the scanner context passed to us from now on
+          delegate.setRegionScannerContext(scannerContext);
           tuple = iterator.next();
-          if (regionScannerContext != null) {
-            ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext);
-            regionScannerContext = null;
-          }
           return !isFilterDone();
         } catch (Throwable t) {
           ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index bcc9539..04a9dfc 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -39,7 +39,6 @@
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -297,11 +296,6 @@
           if (extraLimit >= 0 && --extraLimit == 0) {
             return false;
           }
-          // There is a scanattribute set to retrieve the specific array element
-          if (scannerContext != null) {
-            ScannerContextUtil.incrementSizeProgress(scannerContext, result);
-            ScannerContextUtil.updateTimeProgress(scannerContext);
-          }
           return next;
         } catch (Throwable t) {
           ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index c0cc6ec..adbca1f 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -43,7 +43,7 @@
   private final Pair<Integer, Integer> minMaxQualifiers;
   private final boolean useQualifierAsIndex;
   private final QualifierEncodingScheme encodingScheme;
-  private final ScannerContext regionScannerContext;
+  private ScannerContext scannerContext;
 
   public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
     Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
@@ -51,12 +51,6 @@
     this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
     this.minMaxQualifiers = minMaxQualifiers;
     this.encodingScheme = encodingScheme;
-    if (scan.isScanMetricsEnabled()) {
-      regionScannerContext =
-        ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
-    } else {
-      regionScannerContext = null;
-    }
   }
 
   @Override
@@ -74,10 +68,10 @@
         // since this is an indication of whether or not there are more values after the
         // ones returned
         boolean hasMore;
-        if (regionScannerContext == null) {
+        if (scannerContext == null) {
           hasMore = scanner.nextRaw(results);
         } else {
-          hasMore = scanner.nextRaw(results, regionScannerContext);
+          hasMore = scanner.nextRaw(results, scannerContext);
         }
 
         if (!hasMore && results.isEmpty()) {
@@ -98,8 +92,8 @@
     }
   }
 
-  public ScannerContext getRegionScannerContext() {
-    return regionScannerContext;
+  public void setRegionScannerContext(ScannerContext scannerContext) {
+    this.scannerContext = scannerContext;
   }
 
   @Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index 1a43161..b3565f3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -19,6 +19,7 @@
 
 import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
 import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.commitWithException;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
 import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -33,12 +34,17 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PagingRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -60,6 +66,7 @@
 
 @Category(NeedsOwnMiniClusterTest.class)
 public class ServerPagingIT extends ParallelStatsDisabledIT {
+  private static final Random RAND = new Random(11);
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
@@ -262,6 +269,210 @@
         assertEquals(D2.getTime(), rs.getDate(1).getTime());
         assertFalse(rs.next());
         assertServerPagingMetric(tablename, rs, true);
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+        assertEquals(6, numRows);
+        long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // 3 regions * (2 rows per region + 1 scanner open with page timeout set to 0 ms)
+        assertEquals(9, numRpcs);
+      }
+    }
+  }
+
+  @Test
+  public void testMultiKeyPointLookup() throws Exception {
+    final String tablename = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+    String ddl = String.format("CREATE TABLE %s (id VARCHAR NOT NULL, k1 INTEGER NOT NULL, "
+      + "k2 INTEGER NOT NULL, k3 INTEGER, v1 VARCHAR CONSTRAINT pk PRIMARY KEY (id, k1, k2)) "
+      + "\"%s\" = true", tablename, USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      int rowKeyCount = 1000;
+      List<String> inList =
+        Stream.generate(() -> "(?, ?, ?)").limit(rowKeyCount).collect(Collectors.toList());
+      String dql = String.format("select id, k1, k2 from %s where (id, k1, k2) IN (%s)", tablename,
+        String.join(",", inList));
+      ps = conn.prepareStatement(dql);
+      int expectedValidRows = 0;
+      for (int i = 0; i < rowKeyCount; i++) {
+        ps.setString(3 * i + 1, "id_" + i % 3);
+        if (RAND.nextBoolean()) {
+          ++expectedValidRows;
+          ps.setInt(3 * i + 2, i % 20);
+        } else {
+          // generate a non-existing row key
+          ps.setInt(3 * i + 2, 78123);
+        }
+        ps.setInt(3 * i + 3, i);
+      }
+      int actualValidRows = 0;
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          ++actualValidRows;
+        }
+        assertEquals(expectedValidRows, actualValidRows);
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+        assertEquals(expectedValidRows, numRows);
+        long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        assertTrue(numRpcs > 1);
+      }
+    }
+  }
+
+  @Test
+  public void testPagingWithTTLMasking() throws Exception {
+    final String tablename = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+    int ttl = 2; // 2 seconds
+    String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+      + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+      + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) TTL=" + ttl;
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      // Sleep so that the rows expire
+      // Can't use EnvironmentEdgeManager because that messes up page timeout calculations
+      Thread.sleep(ttl * 1000 + 50);
+      String dql = String.format("SELECT count(*) from %s where id = '%s'", tablename, "id_2");
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        assertTrue(rs.next());
+        assertEquals(0, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+      // Insert few more rows
+      int additionalRows = 5;
+      for (int i = 0; i < additionalRows; ++i) {
+        ps.setString(1, "id_2");
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i + totalRows);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+      }
+      conn.commit();
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        assertTrue(rs.next());
+        assertEquals(additionalRows, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+    }
+  }
+
+  @Test
+  public void testPagingWithUnverifiedIndexRows() throws Exception {
+    final String tablename = generateUniqueName();
+    final String indexname = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+    String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+      + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+      + "CONSTRAINT pk PRIMARY KEY (id, k1, k2))";
+    String indexddl = "CREATE INDEX " + indexname + " ON " + tablename + "(k3) include(v1)";
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      conn.createStatement().execute(indexddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      String dql = String.format("SELECT count(*) from %s where k3 = 5", tablename);
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+        String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+        assertTrue(explainPlan.contains(indexname));
+        assertTrue(rs.next());
+        assertEquals(totalRows / 10, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+      // Insert few unverified index rows by failing phase 2
+      int additionalRows = 10;
+      for (int i = 0; i < additionalRows; ++i) {
+        ps.setString(1, "id_2");
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i + totalRows);
+        ps.setInt(4, 5); // set k3=5
+        ps.setString(5, "val");
+        ps.executeUpdate();
+      }
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      try {
+        commitWithException(conn);
+      } finally {
+        IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      }
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+        String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+        assertTrue(explainPlan.contains(indexname));
+        assertTrue(rs.next());
+        assertEquals(totalRows / 10, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
       }
     }
   }
@@ -481,24 +692,46 @@
     PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
     stmt.execute(
       "CREATE TABLE " + tableName + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
-    for (int i = 1; i <= 200; i++) {
+    final int rowCount = 200;
+    for (int i = 1; i <= rowCount; i++) {
       String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i);
       stmt.execute(sql);
     }
     conn.commit();
 
     // delete every alternate row
-    for (int i = 1; i <= 200; i = i + 2) {
+    for (int i = 1; i <= rowCount; i = i + 2) {
       stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
       conn.commit();
     }
 
+    // full table scan
     ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
     while (rs.next()) {
     }
     Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
     long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
-    Assert.assertEquals(101, numRpc);
+    // with 0ms page timeout every row whether it is valid or a delete marker will generate a page
+    // timeout so the number of rpcs will be row count + 1
+    assertEquals(rowCount + 1, numRpc);
+
+    // aggregate query
+    rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+    assertTrue(rs.next());
+    assertEquals(rowCount / 2, rs.getInt(1));
+    assertFalse(rs.next());
+    metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+    numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+    assertEquals(rowCount + 1, numRpc);
+
+    // aggregate query with a filter
+    rs = stmt.executeQuery("SELECT count(*) FROM " + tableName + " where Z % 4 = 0");
+    assertTrue(rs.next());
+    assertEquals(rowCount / 4, rs.getInt(1));
+    assertFalse(rs.next());
+    metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+    numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+    assertEquals(rowCount + 1, numRpc);
   }
 
   @Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c13e9c4..ee9acb4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1661,7 +1661,7 @@
     }
   }
 
-  static private void commitWithException(Connection conn) {
+  public static void commitWithException(Connection conn) {
     try {
       conn.commit();
       IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
index d8ca7e0..9758dd5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -19,6 +19,7 @@
 
 import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -323,6 +324,37 @@
     assertEquals(142, count4);
   }
 
+  @Test
+  public void testLimitOffsetWithoutSplit() throws Exception {
+    final String tablename = generateUniqueName();
+    final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
+      "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n"
+      + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTestTable(getUrl(), ddl);
+      for (int i = 0; i < 26; i++) {
+        conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
+          + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+      }
+      conn.commit();
+      int limit = 12;
+      int offset = 5;
+      ResultSet rs;
+      rs = conn.createStatement().executeQuery(
+        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
+      int i = 0;
+      while (i < limit) {
+        assertTrue(rs.next());
+        assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
+          rs.getString(1));
+        i++;
+      }
+      assertEquals(limit + offset, getRowsScanned(rs));
+    }
+  }
+
   private long countRowsScannedFromSql(Statement stmt, String sql) throws SQLException {
     ResultSet rs = stmt.executeQuery(sql);
     while (rs.next()) {