PHOENIX-7707 Phoenix server paging on valid rows (#2294)
diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
new file mode 100644
index 0000000..49d1068
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ScannerContext has all methods package visible. To properly update the context progress for our
+ * scanners we need this helper
+ */
+public class PhoenixScannerContext extends ScannerContext {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixScannerContext.class);
+
+ // tracks the start time of the rpc on the server for server paging
+ private final long startTime;
+
+ /**
+ * The scanner remains open on the server during the course of multiple scan rpc requests. We need
+ * a way to determine during the next() call if it is a new scan rpc request on the same scanner.
+ * This is needed so that we can reset the start time for server paging. Every scan rpc request
+ * creates a new ScannerContext which has the lastPeekedCell set to null in the beginning.
+ * Subsequent next() calls will set this field in the ScannerContext.
+ */
+ public static boolean isNewScanRpcRequest(ScannerContext scannerContext) {
+ return scannerContext != null && scannerContext.getLastPeekedCell() == null;
+ }
+
+ public PhoenixScannerContext(ScannerContext hbaseContext) {
+ // set limits to null to create no limit context
+ super(Objects.requireNonNull(hbaseContext).keepProgress, null,
+ Objects.requireNonNull(hbaseContext).isTrackingMetrics());
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ public PhoenixScannerContext(boolean trackMetrics) {
+ super(false, null, trackMetrics);
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void incrementSizeProgress(List<Cell> cells) {
+ for (Cell cell : cells) {
+ super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize());
+ }
+ }
+
+ /**
+ * returnImmediately is a private field in ScannerContext and there is no getter API on it But the
+ * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set
+ */
+ public boolean isReturnImmediately() {
+ return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+ }
+
+ /**
+ * Update the scanner context created by RSRpcServices so that it can act accordingly
+ * @param dst hbase scanner context created on every new scan rpc request
+ * @param result list of cells to be returned to the client as scan rpc response
+ */
+ public void updateHBaseScannerContext(ScannerContext dst, List<Cell> result) {
+ if (dst == null) {
+ return;
+ }
+ // update last peeked cell
+ dst.setLastPeekedCell(getLastPeekedCell());
+ // update return immediately
+ if (isDummy(result) || isReturnImmediately()) {
+ // when a dummy row is returned by a lower layer, set returnImmediately
+ // on the ScannerContext to force HBase to return a response to the client
+ dst.returnImmediately();
+ }
+ // update metrics
+ if (isTrackingMetrics() && dst.isTrackingMetrics()) {
+ // getMetricsMap call resets the metrics internally
+ for (Map.Entry<String, Long> entry : getMetrics().getMetricsMap().entrySet()) {
+ dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+ }
+ }
+ // update progress
+ dst.setProgress(getBatchProgress(), getDataSizeProgress(), getHeapSizeProgress());
+ }
+
+ public static boolean isTimedOut(ScannerContext context, long pageSizeMs) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return false;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+ return EnvironmentEdgeManager.currentTimeMillis() - phoenixScannerContext.startTime
+ > pageSizeMs;
+ }
+
+ /**
+ * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching
+ * the time limit. Use this to make RSRpcService.scan return immediately.
+ */
+ public static void setReturnImmediately(ScannerContext context) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+ phoenixScannerContext.returnImmediately();
+ }
+
+ public static boolean isReturnImmediately(ScannerContext context) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return false;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) context;
+ return phoenixScannerContext.isReturnImmediately();
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
deleted file mode 100644
index 23bf60c..0000000
--- a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-
-/**
- * ScannerContext has all methods package visible. To properly update the context progress for our
- * scanners we need this helper
- */
-public class ScannerContextUtil {
- public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) {
- for (Cell cell : cells) {
- sc.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), cell.heapSize());
- }
- }
-
- public static void updateMetrics(ScannerContext src, ScannerContext dst) {
- if (src != null && dst != null && src.isTrackingMetrics() && dst.isTrackingMetrics()) {
- for (Map.Entry<String, Long> entry : src.getMetrics().getMetricsMap().entrySet()) {
- dst.metrics.addToCounter(entry.getKey(), entry.getValue());
- }
- }
- }
-
- public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
- return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics());
- }
-
- public static void updateTimeProgress(ScannerContext sc) {
- sc.updateTimeProgress();
- }
-
- /**
- * Set returnImmediately on the ScannerContext to true, it will have the same behavior as reaching
- * the time limit. Use this to make RSRpcService.scan return immediately.
- */
- public static void setReturnImmediately(ScannerContext sc) {
- sc.returnImmediately();
- }
-
- /**
- * returnImmediately is a private field in ScannerContext and there is no getter API on it But the
- * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set
- */
- public static boolean checkTimeLimit(ScannerContext sc) {
- return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
- }
-}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa8689f..31bddd8 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -36,12 +36,12 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -182,6 +182,8 @@
private final Scan scan;
private final ObserverContext<RegionCoprocessorEnvironment> c;
private boolean wasOverriden;
+ // tracks the current phoenix scanner context corresponding to the hbase scanner context
+ private PhoenixScannerContext phoenixScannerContext;
public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner) {
@@ -239,10 +241,7 @@
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- overrideDelegate();
- boolean res = super.next(result, scannerContext);
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
- return res;
+ return nextInternal(result, scannerContext, false);
}
@Override
@@ -253,9 +252,30 @@
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+ return nextInternal(result, scannerContext, true);
+ }
+
+ private boolean nextInternal(List<Cell> result, ScannerContext scannerContext, boolean isRaw)
+ throws IOException {
overrideDelegate();
- boolean res = super.nextRaw(result, scannerContext);
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ if (scannerContext instanceof PhoenixScannerContext) {
+ // This is an optimization to avoid creating multiple phoenix scanner context objects for
+ // the same scan rpc request when multiple RegionScannerHolder objects are stacked which
+ // happens if multiple coprocs (not scanners) are processing the scan like
+ // UngroupedAggregateRegionObserver and GlobalIndexChecker
+ phoenixScannerContext = (PhoenixScannerContext) scannerContext;
+ } else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) {
+ // An open scanner can process multiple scan rpcs during its lifetime.
+ // We need to create a new phoenix scanner context for every new scan rpc request.
+ phoenixScannerContext = new PhoenixScannerContext(scannerContext);
+ }
+ boolean res = isRaw
+ ? super.nextRaw(result, phoenixScannerContext)
+ : super.next(result, phoenixScannerContext);
+ if (!(scannerContext instanceof PhoenixScannerContext)) {
+ // only update the top level hbase scanner context
+ phoenixScannerContext.updateHBaseScannerContext(scannerContext, result);
+ }
return res;
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index ad64e3a..4563f10 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.coprocessor;
-import static org.apache.phoenix.util.ScanUtil.isDummy;
-
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@@ -27,7 +25,6 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
public class DelegateRegionScanner implements RegionScanner {
@@ -103,17 +100,7 @@
private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
throws IOException {
if (scannerContext != null) {
- ScannerContext noLimitContext = ScannerContextUtil.copyNoLimitScanner(scannerContext);
- boolean hasMore =
- raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, noLimitContext);
- if (isDummy(result) || ScannerContextUtil.checkTimeLimit(noLimitContext)) {
- // when a dummy row is returned by a lower layer or if the result is valid but the lower
- // layer signals us to return immediately, we need to set returnImmediately
- // on the ScannerContext to force HBase to return a response to the client
- ScannerContextUtil.setReturnImmediately(scannerContext);
- }
- ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
- return hasMore;
+ return raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext);
}
return raw ? delegate.nextRaw(result) : delegate.next(result);
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 39d9527..b7163c0 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -81,7 +82,6 @@
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
@@ -605,8 +605,6 @@
private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext scannerContext)
throws IOException {
boolean hasMore;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- long now;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
boolean acquiredLock = false;
@@ -643,8 +641,11 @@
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
- now = EnvironmentEdgeManager.currentTimeMillis();
- if (hasMore && groupByCache.size() < limit && (now - startTime) >= pageSizeMs) {
+ if (
+ hasMore && groupByCache.size() < limit
+ && (PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs))
+ ) {
return getDummyResult(resultsToReturn);
}
} while (hasMore && groupByCache.size() < limit);
@@ -784,8 +785,7 @@
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- long now;
+ boolean pageTimeout = false;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
ImmutableBytesPtr key = null;
@@ -835,8 +835,14 @@
atLimit = rowCount + countOffset >= limit;
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
- now = EnvironmentEdgeManager.currentTimeMillis();
- } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < pageSizeMs);
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
+ pageTimeout = true;
+ break;
+ }
+ } while (hasMore && !aggBoundary && !atLimit && !pageTimeout);
}
} catch (Exception e) {
LOGGER.error("Ordered group-by scanner next encountered error for region {}",
@@ -850,7 +856,7 @@
if (acquiredLock) region.closeRegionOperation();
}
try {
- if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= pageSizeMs) {
+ if (hasMore && !aggBoundary && !atLimit && pageTimeout) {
updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan);
return true;
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index e56d902..ab62a7d 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
@@ -59,7 +60,6 @@
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.TupleUtil;
public class HashJoinRegionScanner implements RegionScanner {
@@ -306,7 +306,6 @@
private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
throws IOException {
try {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (shouldAdvance()) {
if (scannerContext != null) {
hasMore =
@@ -322,7 +321,10 @@
}
Cell cell = result.get(0);
processResults(result, false);
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
byte[] rowKey = CellUtil.cloneRow(cell);
result.clear();
getDummyResult(rowKey, result);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index e3f1d6a..7cde2a3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -34,7 +35,6 @@
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,17 +60,16 @@
private PagingFilter pagingFilter;
private MultiKeyPointLookup multiKeyPointLookup = null;
private boolean initialized = false;
+ private long pageSizeMs;
private class MultiKeyPointLookup {
private SkipScanFilter skipScanFilter;
private List<KeyRange> pointLookupRanges = null;
private int lookupPosition = 0;
private byte[] lookupKeyPrefix = null;
- private long pageSizeMs;
private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws IOException {
this.skipScanFilter = skipScanFilter;
- pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
pointLookupRanges = skipScanFilter.getPointLookupKeyRanges();
lookupPosition = findLookupPosition(scan.getStartRow());
if (skipScanFilter.getOffset() > 0) {
@@ -133,7 +132,6 @@
private boolean next(List<Cell> results, boolean raw, RegionScanner scanner,
ScannerContext scannerContext) throws IOException {
try {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
boolean hasMore;
if (scannerContext != null) {
@@ -152,15 +150,19 @@
"Each scan is supposed to return only one row, scan " + scan + ", region " + region);
}
if (!results.isEmpty()) {
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+ // we got a valid result but scanner timed out so return immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ }
return hasMore();
}
// The scanner returned an empty result. This means that one of the rows
- // has been deleted.
+ // has been deleted or the row key is not present in the table.
if (!hasMore()) {
return false;
}
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) {
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
byte[] rowKey = pointLookupRanges.get(lookupPosition - 1).getLowerRange();
ScanUtil.getDummyResult(rowKey, results);
return true;
@@ -187,6 +189,7 @@
this.region = region;
this.scan = scan;
pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
+ pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
}
@VisibleForTesting
@@ -258,10 +261,11 @@
}
}
+ boolean hasMore;
if (multiKeyPointLookup != null) {
return multiKeyPointLookup.next(results, raw, delegate, scannerContext);
}
- boolean hasMore;
+
if (scannerContext != null) {
hasMore =
raw ? delegate.nextRaw(results, scannerContext) : delegate.next(results, scannerContext);
@@ -277,16 +281,23 @@
if (pagingFilter.isStopped()) {
if (results.isEmpty()) {
byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded();
- LOGGER.info("Page filter stopped, generating dummy key {} ",
- Bytes.toStringBinary(rowKey));
+ LOGGER.info("{} Paging filter stopped, generating dummy key {} ",
+ getRegionInfo().getRegionNameAsString(), Bytes.toStringBinary(rowKey));
ScanUtil.getDummyResult(rowKey, results);
+ } else {
+ // we got a valid result but page filter stopped set return immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
}
return true;
}
return false;
} else {
// We got a row from the HBase scanner within the configured time (i.e.,
- // the page size). We need to start a new page on the next next() call.
+ // the page size).
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+ // we got a valid result but scanner timed out so return immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ }
return true;
}
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index aa412d6..2a6e3c3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.query.QueryServices;
@@ -181,7 +182,8 @@
return false;
}
- private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore) throws IOException {
+ private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore,
+ ScannerContext scannerContext) throws IOException {
boolean expired = isExpired(result);
if (!expired) {
return hasMore;
@@ -190,23 +192,28 @@
if (!hasMore) {
return false;
}
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
- hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+ hasMore =
+ raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, scannerContext);
if (result.isEmpty() || ScanUtil.isDummy(result)) {
- return hasMore;
+ break;
}
+ // non dummy result check if it is expired
if (!isExpired(result)) {
- return hasMore;
+ break;
}
+ // result is expired
Cell cell = result.get(0);
result.clear();
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) {
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
ScanUtil.getDummyResult(CellUtil.cloneRow(cell), result);
- return hasMore;
+ break;
}
} while (hasMore);
- return false;
+ return hasMore;
}
private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext)
@@ -236,7 +243,7 @@
if (result.isEmpty() || ScanUtil.isDummy(result)) {
return hasMore;
}
- hasMore = skipExpired(result, raw, hasMore);
+ hasMore = skipExpired(result, raw, hasMore, scannerContext);
if (result.isEmpty() || ScanUtil.isDummy(result)) {
return hasMore;
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 25062a6..8d98507 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -205,9 +206,8 @@
}
}
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- final byte[] actualStartKey, final int offset, ScannerContext scannerContext)
- throws IOException {
+ protected boolean scanIndexTableRows(List<Cell> result, final byte[] actualStartKey,
+ final int offset, ScannerContext scannerContext) throws IOException {
boolean hasMore = false;
if (actualStartKey != null) {
do {
@@ -226,7 +226,10 @@
firstCell.getRowLength(), actualStartKey, 0, actualStartKey.length) < 0
) {
result.clear();
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
byte[] rowKey = CellUtil.cloneRow(firstCell);
ScanUtil.getDummyResult(rowKey, result);
return true;
@@ -266,7 +269,10 @@
viewConstants));
indexRows.add(row);
indexRowCount++;
- if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ if (
+ hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ || PhoenixScannerContext.isReturnImmediately(scannerContext))
+ ) {
getDummyResult(lastIndexRowKey, result);
// We do not need to change the state, State.SCANNING_INDEX
// since we will continue scanning the index table after
@@ -279,9 +285,9 @@
return hasMore;
}
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- ScannerContext scannerContext) throws IOException {
- return scanIndexTableRows(result, startTime, null, 0, scannerContext);
+ protected boolean scanIndexTableRows(List<Cell> result, ScannerContext scannerContext)
+ throws IOException {
+ return scanIndexTableRows(result, null, 0, scannerContext);
}
private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] indexRowKey,
@@ -374,7 +380,9 @@
*/
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long startTime = (scannerContext != null)
+ ? ((PhoenixScannerContext) scannerContext).getStartTime()
+ : EnvironmentEdgeManager.currentTimeMillis();
boolean hasMore;
region.startRegionOperation();
try {
@@ -390,7 +398,7 @@
state = State.SCANNING_INDEX;
}
if (state == State.SCANNING_INDEX) {
- hasMore = scanIndexTableRows(result, startTime, scannerContext);
+ hasMore = scanIndexTableRows(result, scannerContext);
if (isDummy(result)) {
updateDummyWithPrevRowKey(result, initStartRowKey, includeInitStartRowKey, scan);
return hasMore;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 34d9fb6..6b5d124 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -105,9 +105,9 @@
}
@Override
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- ScannerContext scannerContext) throws IOException {
- return scanIndexTableRows(result, startTime, actualStartKey, offset, scannerContext);
+ protected boolean scanIndexTableRows(List<Cell> result, ScannerContext scannerContext)
+ throws IOException {
+ return scanIndexTableRows(result, actualStartKey, offset, scannerContext);
}
@Override
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 821bbd1..2574fd6 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -62,7 +62,6 @@
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -743,13 +742,6 @@
private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
final Region region, final Scan scan, Configuration config) throws IOException {
- ScannerContext groupScannerContext;
- if (scan.isScanMetricsEnabled()) {
- groupScannerContext =
- ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
- } else {
- groupScannerContext = null;
- }
StatsCollectionCallable callable =
new StatsCollectionCallable(stats, region, innerScanner, config, scan);
byte[] asyncBytes =
@@ -803,9 +795,6 @@
@Override
public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
- if (groupScannerContext != null && scannerContext != null) {
- ScannerContextUtil.updateMetrics(groupScannerContext, scannerContext);
- }
return next(results);
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index fd8951f..6673aab 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -60,10 +60,10 @@
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -108,7 +108,6 @@
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
@@ -583,8 +582,6 @@
public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext)
throws IOException {
boolean hasMore;
- boolean returnImmediately = false;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
Configuration conf = env.getConfiguration();
final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
@@ -626,10 +623,10 @@
resultsToReturn.addAll(results);
return true;
}
- // we got a dummy result from the lower scanner but hasAny is true which means that
- // we have a valid result which can be returned to the client instead of a dummy.
- // We need to signal the RPC handler to return.
- returnImmediately = true;
+ // we got a page timeout from the lower scanner but hasAny is true which means that
+ // we have a valid result which we can return to the client instead of a dummy but we
+ // still need to finish the rpc and release the handler
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
break;
}
if (!results.isEmpty()) {
@@ -679,13 +676,16 @@
aggregators.aggregate(rowAggregators, result);
hasAny = true;
}
- } while (
- hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs
- );
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) {
- // we hit a page scanner timeout, signal the RPC handler to return.
- returnImmediately = true;
- }
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
+ // we could have a valid result which we can return to the client instead of a dummy,
+ // but we still need to finish the rpc and release the handler
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ break;
+ }
+ } while (hasMore);
if (!mutations.isEmpty()) {
annotateAndCommit(mutations);
}
@@ -727,10 +727,6 @@
}
resultsToReturn.add(keyValue);
}
- if (returnImmediately && scannerContext != null) {
- // signal the RPC handler to return
- ScannerContextUtil.setReturnImmediately(scannerContext);
- }
return hasMore;
}
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 8e82f17..97f23c4 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -256,7 +257,6 @@
init();
initialized = true;
}
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
if (raw) {
hasMore = (scannerContext == null)
@@ -277,7 +277,10 @@
if (verifyRowAndRepairIfNecessary(result)) {
break;
}
- if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) >= pageSizeMs) {
+ if (
+ hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ || PhoenixScannerContext.isReturnImmediately(scannerContext))
+ ) {
byte[] rowKey = CellUtil.cloneRow(cell);
result.clear();
getDummyResult(rowKey, result);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 33de7aa..b398188 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -37,10 +37,10 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
@@ -177,7 +177,11 @@
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
RegionScannerResultIterator iterator = new RegionScannerResultIterator(scan, innerScanner,
getMinMaxQualifiersFromScan(scan), encodingScheme);
- ScannerContext sc = iterator.getRegionScannerContext();
+ // we need to create our own scanner context because we are still opening the scanner and
+ // and don't have a rpc scanner context which is created in the next() call. This scanner
+ // context is used when we are skipping the rows until we hit the offset
+ PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled());
+ iterator.setRegionScannerContext(sc);
innerScanner = getOffsetScanner(innerScanner,
new OffsetResultIterator(iterator, scanOffset, getPageSizeMsForRegionScanner(scan),
isIncompatibleClient),
@@ -237,10 +241,15 @@
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
RegionScannerResultIterator inner = new RegionScannerResultIterator(scan, s,
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+ // we need to create our own scanner context because we are still opening the scanner and
+ // and don't have a rpc scanner context which is created in the next() call. This scanner
+ // context is used when we are iterating over the top n rows before the first next() call
+ PhoenixScannerContext sc = new PhoenixScannerContext(scan.isScanMetricsEnabled());
+ inner.setRegionScannerContext(sc);
OrderedResultIterator iterator = new OrderedResultIterator(inner, orderByExpressions,
spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize,
getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo());
- return new OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(), iterator);
+ return new OrderedResultIteratorWithScannerContext(sc, iterator);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@@ -253,15 +262,15 @@
}
private static class OrderedResultIteratorWithScannerContext {
- private ScannerContext scannerContext;
+ private PhoenixScannerContext scannerContext;
private OrderedResultIterator iterator;
- OrderedResultIteratorWithScannerContext(ScannerContext sc, OrderedResultIterator ori) {
+ OrderedResultIteratorWithScannerContext(PhoenixScannerContext sc, OrderedResultIterator ori) {
this.scannerContext = sc;
this.iterator = ori;
}
- public ScannerContext getScannerContext() {
+ public PhoenixScannerContext getScannerContext() {
return scannerContext;
}
@@ -310,7 +319,7 @@
private RegionScanner getOffsetScanner(final RegionScanner s, final OffsetResultIterator iterator,
final boolean isLastScan, final boolean incompatibleClient, final Scan scan,
- final ScannerContext sc) throws IOException {
+ final PhoenixScannerContext sc) throws IOException {
final Tuple firstTuple;
final Region region = getRegion();
region.startRegionOperation();
@@ -383,7 +392,9 @@
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
private byte[] previousResultRowKey;
- private ScannerContext regionScannerContext = sc;
+ // scanner context used when we are opening the scanner and skipping up to offset rows
+ // We copy this context to the hbase rpc context on the first next call
+ private PhoenixScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -401,6 +412,18 @@
if (isFilterDone()) {
return false;
}
+ if (regionScannerContext != null) {
+ regionScannerContext.updateHBaseScannerContext(scannerContext, results);
+ // we no longer need this context
+ regionScannerContext = null;
+ if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+ return true;
+ }
+ }
+ RegionScannerResultIterator delegate =
+ (RegionScannerResultIterator) (iterator.getDelegate());
+ // just use the scanner context passed to us from now on
+ delegate.setRegionScannerContext(scannerContext);
Tuple nextTuple = iterator.next();
if (tuple.size() > 0 && !isDummy(tuple)) {
for (int i = 0; i < tuple.size(); i++) {
@@ -425,10 +448,6 @@
}
}
tuple = nextTuple;
- if (regionScannerContext != null) {
- ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext);
- regionScannerContext = null;
- }
return !isFilterDone();
} catch (Throwable t) {
LOGGER.error("Error while iterating Offset scanner.", t);
@@ -488,7 +507,7 @@
* region) since after this everything is held in memory
*/
private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s,
- final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, ScannerContext sc)
+ final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, PhoenixScannerContext sc)
throws Throwable {
final Tuple firstTuple;
@@ -512,7 +531,9 @@
}
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
- private ScannerContext regionScannerContext = sc;
+ // scanner context used when we are opening the scanner and reading the topN rows
+ // We copy this context to the hbase rpc context on the first next call
+ private PhoenixScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -530,6 +551,14 @@
if (isFilterDone()) {
return false;
}
+ if (regionScannerContext != null) {
+ regionScannerContext.updateHBaseScannerContext(scannerContext, results);
+ // we no longer need this context
+ regionScannerContext = null;
+ if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+ return true;
+ }
+ }
if (isDummy(tuple)) {
ScanUtil.getDummyResult(CellUtil.cloneRow(tuple.getValue(0)), results);
} else {
@@ -537,11 +566,11 @@
results.add(tuple.getValue(i));
}
}
+ RegionScannerResultIterator delegate =
+ (RegionScannerResultIterator) (iterator.getDelegate());
+ // just use the scanner context passed to us from now on
+ delegate.setRegionScannerContext(scannerContext);
tuple = iterator.next();
- if (regionScannerContext != null) {
- ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext);
- regionScannerContext = null;
- }
return !isFilterDone();
} catch (Throwable t) {
ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index bcc9539..04a9dfc 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -39,7 +39,6 @@
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -297,11 +296,6 @@
if (extraLimit >= 0 && --extraLimit == 0) {
return false;
}
- // There is a scanattribute set to retrieve the specific array element
- if (scannerContext != null) {
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
- ScannerContextUtil.updateTimeProgress(scannerContext);
- }
return next;
} catch (Throwable t) {
ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index c0cc6ec..adbca1f 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -43,7 +43,7 @@
private final Pair<Integer, Integer> minMaxQualifiers;
private final boolean useQualifierAsIndex;
private final QualifierEncodingScheme encodingScheme;
- private final ScannerContext regionScannerContext;
+ private ScannerContext scannerContext;
public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
@@ -51,12 +51,6 @@
this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
this.minMaxQualifiers = minMaxQualifiers;
this.encodingScheme = encodingScheme;
- if (scan.isScanMetricsEnabled()) {
- regionScannerContext =
- ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
- } else {
- regionScannerContext = null;
- }
}
@Override
@@ -74,10 +68,10 @@
// since this is an indication of whether or not there are more values after the
// ones returned
boolean hasMore;
- if (regionScannerContext == null) {
+ if (scannerContext == null) {
hasMore = scanner.nextRaw(results);
} else {
- hasMore = scanner.nextRaw(results, regionScannerContext);
+ hasMore = scanner.nextRaw(results, scannerContext);
}
if (!hasMore && results.isEmpty()) {
@@ -98,8 +92,8 @@
}
}
- public ScannerContext getRegionScannerContext() {
- return regionScannerContext;
+ public void setRegionScannerContext(ScannerContext scannerContext) {
+ this.scannerContext = scannerContext;
}
@Override
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index 1a43161..b3565f3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -19,6 +19,7 @@
import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.commitWithException;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -33,12 +34,17 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PagingRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -60,6 +66,7 @@
@Category(NeedsOwnMiniClusterTest.class)
public class ServerPagingIT extends ParallelStatsDisabledIT {
+ private static final Random RAND = new Random(11);
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -262,6 +269,210 @@
assertEquals(D2.getTime(), rs.getDate(1).getTime());
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+ assertEquals(6, numRows);
+ long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // 3 regions * (2 rows per region + 1 scanner open with page timeout set to 0 ms)
+ assertEquals(9, numRpcs);
+ }
+ }
+ }
+
+ @Test
+ public void testMultiKeyPointLookup() throws Exception {
+ final String tablename = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+ String ddl = String.format("CREATE TABLE %s (id VARCHAR NOT NULL, k1 INTEGER NOT NULL, "
+ + "k2 INTEGER NOT NULL, k3 INTEGER, v1 VARCHAR CONSTRAINT pk PRIMARY KEY (id, k1, k2)) "
+ + "\"%s\" = true", tablename, USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ int rowKeyCount = 1000;
+ List<String> inList =
+ Stream.generate(() -> "(?, ?, ?)").limit(rowKeyCount).collect(Collectors.toList());
+ String dql = String.format("select id, k1, k2 from %s where (id, k1, k2) IN (%s)", tablename,
+ String.join(",", inList));
+ ps = conn.prepareStatement(dql);
+ int expectedValidRows = 0;
+ for (int i = 0; i < rowKeyCount; i++) {
+ ps.setString(3 * i + 1, "id_" + i % 3);
+ if (RAND.nextBoolean()) {
+ ++expectedValidRows;
+ ps.setInt(3 * i + 2, i % 20);
+ } else {
+ // generate a non-existing row key
+ ps.setInt(3 * i + 2, 78123);
+ }
+ ps.setInt(3 * i + 3, i);
+ }
+ int actualValidRows = 0;
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ ++actualValidRows;
+ }
+ assertEquals(expectedValidRows, actualValidRows);
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+ assertEquals(expectedValidRows, numRows);
+ long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertTrue(numRpcs > 1);
+ }
+ }
+ }
+
+ @Test
+ public void testPagingWithTTLMasking() throws Exception {
+ final String tablename = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+ int ttl = 2; // 2 seconds
+ String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) TTL=" + ttl;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ // Sleep so that the rows expire
+ // Can't use EnvironmentEdgeManager because that messes up page timeout calculations
+ Thread.sleep(ttl * 1000 + 50);
+ String dql = String.format("SELECT count(*) from %s where id = '%s'", tablename, "id_2");
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ // Insert few more rows
+ int additionalRows = 5;
+ for (int i = 0; i < additionalRows; ++i) {
+ ps.setString(1, "id_2");
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i + totalRows);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ }
+ conn.commit();
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(additionalRows, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ }
+ }
+
+ @Test
+ public void testPagingWithUnverifiedIndexRows() throws Exception {
+ final String tablename = generateUniqueName();
+ final String indexname = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+ String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (id, k1, k2))";
+ String indexddl = "CREATE INDEX " + indexname + " ON " + tablename + "(k3) include(v1)";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ conn.createStatement().execute(indexddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ String dql = String.format("SELECT count(*) from %s where k3 = 5", tablename);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexname));
+ assertTrue(rs.next());
+ assertEquals(totalRows / 10, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ // Insert few unverified index rows by failing phase 2
+ int additionalRows = 10;
+ for (int i = 0; i < additionalRows; ++i) {
+ ps.setString(1, "id_2");
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i + totalRows);
+ ps.setInt(4, 5); // set k3=5
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ }
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ try {
+ commitWithException(conn);
+ } finally {
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexname));
+ assertTrue(rs.next());
+ assertEquals(totalRows / 10, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
}
}
}
@@ -481,24 +692,46 @@
PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
stmt.execute(
"CREATE TABLE " + tableName + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
- for (int i = 1; i <= 200; i++) {
+ final int rowCount = 200;
+ for (int i = 1; i <= rowCount; i++) {
String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i);
stmt.execute(sql);
}
conn.commit();
// delete every alternate row
- for (int i = 1; i <= 200; i = i + 2) {
+ for (int i = 1; i <= rowCount; i = i + 2) {
stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
conn.commit();
}
+ // full table scan
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
while (rs.next()) {
}
Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
- Assert.assertEquals(101, numRpc);
+ // with 0ms page timeout every row whether it is valid or a delete marker will generate a page
+ // timeout so the number of rpcs will be row count + 1
+ assertEquals(rowCount + 1, numRpc);
+
+ // aggregate query
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(rowCount / 2, rs.getInt(1));
+ assertFalse(rs.next());
+ metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertEquals(rowCount + 1, numRpc);
+
+ // aggregate query with a filter
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName + " where Z % 4 = 0");
+ assertTrue(rs.next());
+ assertEquals(rowCount / 4, rs.getInt(1));
+ assertFalse(rs.next());
+ metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertEquals(rowCount + 1, numRpc);
}
@Test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c13e9c4..ee9acb4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1661,7 +1661,7 @@
}
}
- static private void commitWithException(Connection conn) {
+ public static void commitWithException(Connection conn) {
try {
conn.commit();
IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
index d8ca7e0..9758dd5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -19,6 +19,7 @@
import static org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -323,6 +324,37 @@
assertEquals(142, count4);
}
+ @Test
+ public void testLimitOffsetWithoutSplit() throws Exception {
+ final String tablename = generateUniqueName();
+ final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n",
+ "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+ String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n"
+ + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTestTable(getUrl(), ddl);
+ for (int i = 0; i < 26; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "',"
+ + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+ }
+ conn.commit();
+ int limit = 12;
+ int offset = 5;
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(
+ "SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset);
+ int i = 0;
+ while (i < limit) {
+ assertTrue(rs.next());
+ assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i],
+ rs.getString(1));
+ i++;
+ }
+ assertEquals(limit + offset, getRowsScanned(rs));
+ }
+ }
+
private long countRowsScannedFromSql(Statement stmt, String sql) throws SQLException {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {