blob: 519c67aba7e2d6c50a8457431af2afb3b8a12308 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
*/
@InterfaceAudience.Private
class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
private static final Logger LOG = LoggerFactory.getLogger(RegionScannerImpl.class);
// Package local for testability
KeyValueHeap storeHeap = null;
/**
* Heap of key-values that are not essential for the provided filters and are thus read on demand,
* if on-demand column family loading is enabled.
*/
KeyValueHeap joinedHeap = null;
/**
* If the joined heap data gathering is interrupted due to scan limits, this will contain the row
* for which we are populating the values.
*/
protected Cell joinedContinuationRow = null;
private boolean filterClosed = false;
protected final byte[] stopRow;
protected final boolean includeStopRow;
protected final HRegion region;
protected final CellComparator comparator;
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
private final long readPt;
private final long maxResultSize;
private final ScannerContext defaultScannerContext;
private final FilterWrapper filter;
private RegionServerServices rsServices;
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
}
private static boolean hasNonce(HRegion region, long nonce) {
RegionServerServices rsServices = region.getRegionServerServices();
return nonce != HConstants.NO_NONCE && rsServices != null &&
rsServices.getNonceManager() != null;
}
RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
long nonceGroup, long nonce) throws IOException {
this.region = region;
this.maxResultSize = scan.getMaxResultSize();
if (scan.hasFilter()) {
this.filter = new FilterWrapper(scan.getFilter());
} else {
this.filter = null;
}
this.comparator = region.getCellComparator();
/**
* By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default
* scanner context that can be used to enforce the batch limit in the event that a
* ScannerContext is not specified during an invocation of next/nextRaw
*/
defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
this.stopRow = scan.getStopRow();
this.includeStopRow = scan.includeStopRow();
// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
IsolationLevel isolationLevel = scan.getIsolationLevel();
long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
this.scannerReadPoints = region.scannerReadPoints;
this.rsServices = region.getRegionServerServices();
synchronized (scannerReadPoints) {
if (mvccReadPoint > 0) {
this.readPt = mvccReadPoint;
} else if (hasNonce(region, nonce)) {
this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
} else {
this.readPt = region.getReadPoint(isolationLevel);
}
scannerReadPoints.put(this, this.readPt);
}
initializeScanners(scan, additionalScanners);
}
private void initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
throws IOException {
// Here we separate all scanners into two lists - scanner that provide data required
// by the filter to operate (scanners list) and all others (joinedScanners list).
List<KeyValueScanner> scanners = new ArrayList<>(scan.getFamilyMap().size());
List<KeyValueScanner> joinedScanners = new ArrayList<>(scan.getFamilyMap().size());
// Store all already instantiated scanners for exception handling
List<KeyValueScanner> instantiatedScanners = new ArrayList<>();
// handle additionalScanners
if (additionalScanners != null && !additionalScanners.isEmpty()) {
scanners.addAll(additionalScanners);
instantiatedScanners.addAll(additionalScanners);
}
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
HStore store = region.getStore(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
instantiatedScanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() ||
this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
initializeKVHeap(scanners, joinedScanners, region);
} catch (Throwable t) {
throw handleException(instantiatedScanners, t);
}
}
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
this.storeHeap = new KeyValueHeap(scanners, comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
}
}
private IOException handleException(List<KeyValueScanner> instantiatedScanners, Throwable t) {
// remove scaner read point before throw the exception
scannerReadPoints.remove(this);
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
if (joinedHeap != null) {
joinedHeap.close();
joinedHeap = null;
}
} else {
// close all already instantiated scanners before throwing the exception
for (KeyValueScanner scanner : instantiatedScanners) {
scanner.close();
}
}
return t instanceof IOException ? (IOException) t : new IOException(t);
}
@Override
public long getMaxResultSize() {
return maxResultSize;
}
@Override
public long getMvccReadPoint() {
return this.readPt;
}
@Override
public int getBatch() {
return this.defaultScannerContext.getBatchLimit();
}
/**
* Reset both the filter and the old filter.
* @throws IOException in case a filter raises an I/O exception.
*/
protected final void resetFilters() throws IOException {
if (filter != null) {
filter.reset();
}
}
@Override
public boolean next(List<Cell> outResults) throws IOException {
// apply the batching limit by default
return next(outResults, defaultScannerContext);
}
@Override
public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
throws IOException {
if (this.filterClosed) {
throw new UnknownScannerException("Scanner was closed (timed out?) " +
"after we renewed it. Could be caused by a very slow scanner " +
"or a lengthy garbage collection");
}
region.startRegionOperation(Operation.SCAN);
try {
return nextRaw(outResults, scannerContext);
} finally {
region.closeRegionOperation(Operation.SCAN);
}
}
@Override
public boolean nextRaw(List<Cell> outResults) throws IOException {
// Use the RegionScanner's context by default
return nextRaw(outResults, defaultScannerContext);
}
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
if (storeHeap == null) {
// scanner is closed
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
List<Cell> tmpList = new ArrayList<>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
if (!outResults.isEmpty()) {
region.addReadRequestsCount(1);
if (region.getMetrics() != null) {
region.getMetrics().updateReadRequestCount();
}
}
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
if (!scannerContext.mayHaveMoreCellsInRow()) {
resetFilters();
}
if (isFilterDoneInternal()) {
moreValues = false;
}
return moreValues;
}
/**
* @return true if more cells exist after this batch, false if scanner is done
*/
private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
throws IOException {
assert joinedContinuationRow != null;
boolean moreValues =
populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow);
if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
// We are done with this row, reset the continuation.
joinedContinuationRow = null;
}
// As the data is obtained from two independent heaps, we need to
// ensure that result list is sorted, because Result relies on that.
results.sort(comparator);
return moreValues;
}
/**
* Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is
* reached, or remainingResultSize (if not -1) is reaced
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
* @return state of last call to {@link KeyValueHeap#next()}
*/
private boolean populateResult(List<Cell> results, KeyValueHeap heap,
ScannerContext scannerContext, Cell currentRowCell) throws IOException {
Cell nextKv;
boolean moreCellsInRow = false;
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
do {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
// We want to maintain any progress that is made towards the limits while scanning across
// different column families. To do this, we toggle the keep progress flag on during calls
// to the StoreScanner to ensure that any progress made thus far is not wiped away.
scannerContext.setKeepProgress(true);
heap.next(results, scannerContext);
scannerContext.setKeepProgress(tmpKeepProgress);
nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
if (!moreCellsInRow) {
incrementCountOfRowsScannedMetric(scannerContext);
}
if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
return nextKv != null;
}
/**
* Based on the nextKv in the heap, and the current row, decide whether or not there are more
* cells to be read in the heap. If the row of the nextKv in the heap matches the current row then
* there are more cells to be read in the row.
* @return true When there are more cells in the row to be read
*/
private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
return nextKv != null && CellUtil.matchingRows(nextKv, currentRowCell);
}
/**
* @return True if a filter rules the scanner is over, done.
*/
@Override
public synchronized boolean isFilterDone() throws IOException {
return isFilterDoneInternal();
}
private boolean isFilterDoneInternal() throws IOException {
return this.filter != null && this.filter.filterAllRemaining();
}
private void checkClientDisconnect(Optional<RpcCall> rpcCall) throws CallerDisconnectedException {
if (rpcCall.isPresent()) {
// If a user specifies a too-restrictive or too-slow scanner, the
// client might time out and disconnect while the server side
// is still processing the request. We should abort aggressively
// in that case.
long afterTime = rpcCall.get().disconnectSince();
if (afterTime >= 0) {
throw new CallerDisconnectedException(
"Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this +
" after " + afterTime + " ms, since " + "caller disconnected");
}
}
}
private void resetProgress(ScannerContext scannerContext, int initialBatchProgress,
long initialSizeProgress, long initialHeapSizeProgress) {
// Starting to scan a new row. Reset the scanner progress according to whether or not
// progress should be kept.
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
}
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
throws IOException {
Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
// Save the initial progress from the Scanner context in these local variables. The progress
// may need to be reset a few times if rows are being filtered out so we save the initial
// progress.
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
// "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
// and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress);
checkClientDisconnect(rpcCall);
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
// Let's see what we have in the storeHeap.
Cell current = this.storeHeap.peek();
boolean shouldStop = shouldStop(current);
// When has filter row is true it means that the all the cells for a particular row must be
// read before a filtering decision can be made. This means that filters where hasFilterRow
// run the risk of enLongAddering out of memory errors in the case that they are applied to a
// table that has very large rows.
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
// If filter#hasFilterRow is true, partial results are not allowed since allowing them
// would prevent the filters from being evaluated. Thus, if it is true, change the
// scope of any limits that could potentially create partial results to
// LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
if (hasFilterRow) {
if (LOG.isTraceEnabled()) {
LOG.trace("filter#hasFilterRow is true which prevents partial results from being " +
" formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
limitScope = LimitScope.BETWEEN_ROWS;
}
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must " +
" stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
}
// Check if we were getting data from the joinedHeap and hit the limit.
// If not, then it's main path - getting results from storeHeap.
if (joinedContinuationRow == null) {
// First, check if we are at a stop row. If so, there are no more results.
if (shouldStop) {
if (hasFilterRow) {
filter.filterRowCells(results);
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(current)) {
incrementCountOfRowsFilteredMetric(scannerContext);
// early check, see HBASE-16296
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Typically the count of rows scanned is incremented inside #populateResult. However,
// here we are filtering a row based purely on its row key, preventing us from calling
// #populateResult. Thus, perform the necessary increment here to rows scanned metric
incrementCountOfRowsScannedMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue;
}
// Ok, we are good, let's try to get some results from the main heap.
populateResult(results, this.storeHeap, scannerContext, current);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must " +
" stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
Cell nextKv = this.storeHeap.peek();
shouldStop = shouldStop(nextKv);
// save that the row was empty before filters applied to it.
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List).
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {
ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress
// according to contents of results now.
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
cell.heapSize());
}
}
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
return true;
}
continue;
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Ok, we are done with storeHeap for this row.
// Now we may need to fetch additional, non-essential data into row.
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
boolean mayHaveData = joinedHeapMayHaveData(current);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// Finally, we are done with both joinedHeap and storeHeap.
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
continue;
}
}
if (shouldStop) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
}
}
private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
region.filteredReadRequestsCount.increment();
if (region.getMetrics() != null) {
region.getMetrics().updateFilteredRecords();
}
if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
return;
}
scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
}
private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
if (scannerContext == null || !scannerContext.isTrackingMetrics()) {
return;
}
scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
}
/**
* @return true when the joined heap may have data for the current row
*/
private boolean joinedHeapMayHaveData(Cell currentRowCell) throws IOException {
Cell nextJoinedKv = joinedHeap.peek();
boolean matchCurrentRow =
nextJoinedKv != null && CellUtil.matchingRows(nextJoinedKv, currentRowCell);
boolean matchAfterSeek = false;
// If the next value in the joined heap does not match the current row, try to seek to the
// correct row
if (!matchCurrentRow) {
Cell firstOnCurrentRow = PrivateCellUtil.createFirstOnRow(currentRowCell);
boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
matchAfterSeek = seekSuccessful && joinedHeap.peek() != null &&
CellUtil.matchingRows(joinedHeap.peek(), currentRowCell);
}
return matchCurrentRow || matchAfterSeek;
}
/**
* This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines both
* filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older, it may
* not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns true
* when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow(). Therefore, the
* filterRow() will be skipped.
*/
private boolean filterRow() throws IOException {
// when hasFilterRow returns true, filter.filterRow() will be called automatically inside
// filterRowCells(List<Cell> kvs) so we skip that scenario here.
return filter != null && (!filter.hasFilterRow()) && filter.filterRow();
}
private boolean filterRowKey(Cell current) throws IOException {
return filter != null && filter.filterRowKey(current);
}
/**
* A mocked list implementation - discards all updates.
*/
private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
@Override
public void add(int index, Cell element) {
// do nothing
}
@Override
public boolean addAll(int index, Collection<? extends Cell> c) {
return false; // this list is never changed as a result of an update
}
@Override
public KeyValue get(int index) {
throw new UnsupportedOperationException();
}
@Override
public int size() {
return 0;
}
};
protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
Cell next;
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
}
resetFilters();
// Calling the hook in CP which allows it to do a fast forward
return this.region.getCoprocessorHost() == null ||
this.region.getCoprocessorHost().postScannerFilterRow(this, curRowCell);
}
protected boolean shouldStop(Cell currentRowCell) {
if (currentRowCell == null) {
return true;
}
if (stopRow == null || Bytes.equals(stopRow, HConstants.EMPTY_END_ROW)) {
return false;
}
int c = comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length);
return c > 0 || (c == 0 && !includeStopRow);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
justification = "this method is only called inside close which is synchronized")
private void closeInternal() {
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
}
if (joinedHeap != null) {
joinedHeap.close();
joinedHeap = null;
}
// no need to synchronize here.
scannerReadPoints.remove(this);
this.filterClosed = true;
}
@Override
public synchronized void close() {
TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close"));
}
@Override
public synchronized boolean reseek(byte[] row) throws IOException {
return TraceUtil.trace(() -> {
if (row == null) {
throw new IllegalArgumentException("Row cannot be null.");
}
boolean result = false;
region.startRegionOperation();
Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length);
try {
// use request seek to make use of the lazy seek option. See HBASE-5520
result = this.storeHeap.requestSeek(kv, true, true);
if (this.joinedHeap != null) {
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
} finally {
region.closeRegionOperation();
}
return result;
}, () -> region.createRegionSpan("RegionScanner.reseek"));
}
@Override
public void shipped() throws IOException {
if (storeHeap != null) {
storeHeap.shipped();
}
if (joinedHeap != null) {
joinedHeap.shipped();
}
}
@Override
public void run() throws IOException {
// This is the RPC callback method executed. We do the close in of the scanner in this
// callback
this.close();
}
}