| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.NavigableSet; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.locks.ReentrantLock; |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.CellComparator; |
| import org.apache.hadoop.hbase.CellUtil; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.PrivateCellUtil; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.KeyValueUtil; |
| import org.apache.hadoop.hbase.client.IsolationLevel; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.executor.ExecutorService; |
| import org.apache.hadoop.hbase.filter.Filter; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; |
| import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; |
| import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; |
| import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; |
| import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; |
| |
| /** |
| * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> |
| * for a single row. |
| * <p> |
| * The implementation is not thread safe. So there will be no race between next and close. The only |
| * exception is updateReaders, it will be called in the memstore flush thread to indicate that there |
| * is a flush. |
| */ |
| @InterfaceAudience.Private |
| public class StoreScanner extends NonReversedNonLazyKeyValueScanner |
| implements KeyValueScanner, InternalScanner, ChangedReadersObserver { |
| private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class); |
| // In unit tests, the store could be null |
| protected final HStore store; |
| private final CellComparator comparator; |
| private ScanQueryMatcher matcher; |
| protected KeyValueHeap heap; |
| private boolean cacheBlocks; |
| |
| private long countPerRow = 0; |
| private int storeLimit = -1; |
| private int storeOffset = 0; |
| |
| // Used to indicate that the scanner has closed (see HBASE-1107) |
| private volatile boolean closing = false; |
| private final boolean get; |
| private final boolean explicitColumnQuery; |
| private final boolean useRowColBloom; |
| /** |
| * A flag that enables StoreFileScanner parallel-seeking |
| */ |
| private boolean parallelSeekEnabled = false; |
| private ExecutorService executor; |
| private final Scan scan; |
| private final long oldestUnexpiredTS; |
| private final long now; |
| private final int minVersions; |
| private final long maxRowSize; |
| private final long cellsPerHeartbeatCheck; |
| long memstoreOnlyReads; |
| long mixedReads; |
| |
| // 1) Collects all the KVHeap that are eagerly getting closed during the |
| // course of a scan |
| // 2) Collects the unused memstore scanners. If we close the memstore scanners |
| // before sending data to client, the chunk may be reclaimed by other |
| // updates and the data will be corrupt. |
| private final List<KeyValueScanner> scannersForDelayedClose = new ArrayList<>(); |
| |
| /** |
| * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not |
| * KVs skipped via seeking to next row/column. TODO: estimate them? |
| */ |
| private long kvsScanned = 0; |
| private Cell prevCell = null; |
| |
| private final long preadMaxBytes; |
| private long bytesRead; |
| |
| /** We don't ever expect to change this, the constant is just for clarity. */ |
| static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; |
| public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = |
| "hbase.storescanner.parallel.seek.enable"; |
| |
| /** Used during unit testing to ensure that lazy seek does save seek ops */ |
| private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; |
| |
| /** |
| * The number of cells scanned in between timeout checks. Specifying a larger value means that |
| * timeout checks will occur less frequently. Specifying a small value will lead to more frequent |
| * timeout checks. |
| */ |
| public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = |
| "hbase.cells.scanned.per.heartbeat.check"; |
| |
| /** |
| * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}. |
| */ |
| public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; |
| |
| /** |
| * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned |
| * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of |
| * block size for this store. |
| */ |
| public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; |
| |
| private final Scan.ReadType readType; |
| |
| // A flag whether use pread for scan |
| // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. |
| private boolean scanUsePread; |
| // Indicates whether there was flush during the course of the scan |
| private volatile boolean flushed = false; |
| // generally we get one file from a flush |
| private final List<KeyValueScanner> flushedstoreFileScanners = new ArrayList<>(1); |
| // Since CompactingMemstore is now default, we get three memstore scanners from a flush |
| private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); |
| // The current list of scanners |
| final List<KeyValueScanner> currentScanners = new ArrayList<>(); |
| // flush update lock |
| private final ReentrantLock flushLock = new ReentrantLock(); |
| // lock for closing. |
| private final ReentrantLock closeLock = new ReentrantLock(); |
| |
| protected final long readPt; |
| private boolean topChanged = false; |
| |
| /** An internal constructor. */ |
| private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, |
| int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { |
| this.readPt = readPt; |
| this.store = store; |
| this.cacheBlocks = cacheBlocks; |
| this.comparator = Preconditions.checkNotNull(scanInfo.getComparator()); |
| get = scan.isGetScan(); |
| explicitColumnQuery = numColumns > 0; |
| this.scan = scan; |
| this.now = EnvironmentEdgeManager.currentTime(); |
| this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); |
| this.minVersions = scanInfo.getMinVersions(); |
| |
| // We look up row-column Bloom filters for multi-column queries as part of |
| // the seek operation. However, we also look the row-column Bloom filter |
| // for multi-row (non-"get") scans because this is not done in |
| // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). |
| this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1) |
| && (store == null || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL); |
| this.maxRowSize = scanInfo.getTableMaxRowSize(); |
| if (get) { |
| this.readType = Scan.ReadType.PREAD; |
| this.scanUsePread = true; |
| } else if (scanType != ScanType.USER_SCAN) { |
| // For compaction scanners never use Pread as already we have stream based scanners on the |
| // store files to be compacted |
| this.readType = Scan.ReadType.STREAM; |
| this.scanUsePread = false; |
| } else { |
| if (scan.getReadType() == Scan.ReadType.DEFAULT) { |
| this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; |
| } else { |
| this.readType = scan.getReadType(); |
| } |
| // Always start with pread unless user specific stream. Will change to stream later if |
| // readType is default if the scan keeps running for a long time. |
| this.scanUsePread = this.readType != Scan.ReadType.STREAM; |
| } |
| this.preadMaxBytes = scanInfo.getPreadMaxBytes(); |
| this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); |
| // Parallel seeking is on if the config allows and more there is more than one store file. |
| if (store != null && store.getStorefilesCount() > 1) { |
| RegionServerServices rsService = store.getHRegion().getRegionServerServices(); |
| if (rsService != null && scanInfo.isParallelSeekEnabled()) { |
| this.parallelSeekEnabled = true; |
| this.executor = rsService.getExecutorService(); |
| } |
| } |
| } |
| |
| private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { |
| this.currentScanners.addAll(scanners); |
| } |
| |
| /** |
| * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we |
| * are not in a compaction. |
| * |
| * @param store who we scan |
| * @param scan the spec |
| * @param columns which columns we are scanning |
| * @throws IOException |
| */ |
| public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, |
| long readPt) throws IOException { |
| this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, |
| scan.getCacheBlocks(), ScanType.USER_SCAN); |
| if (columns != null && scan.isRaw()) { |
| throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); |
| } |
| matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, |
| store.getCoprocessorHost()); |
| |
| store.addChangedReaderObserver(this); |
| |
| List<KeyValueScanner> scanners = null; |
| try { |
| // Pass columns to try to filter out unnecessary StoreFiles. |
| scanners = selectScannersFrom(store, |
| store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), |
| scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); |
| |
| // Seek all scanners to the start of the Row (or if the exact matching row |
| // key does not exist, then to the start of the next matching Row). |
| // Always check bloom filter to optimize the top row seek for delete |
| // family marker. |
| seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, |
| parallelSeekEnabled); |
| |
| // set storeLimit |
| this.storeLimit = scan.getMaxResultsPerColumnFamily(); |
| |
| // set rowOffset |
| this.storeOffset = scan.getRowOffsetPerColumnFamily(); |
| addCurrentScanners(scanners); |
| // Combine all seeked scanners with a heap |
| resetKVHeap(scanners, comparator); |
| } catch (IOException e) { |
| clearAndClose(scanners); |
| // remove us from the HStore#changedReaderObservers here or we'll have no chance to |
| // and might cause memory leak |
| store.deleteChangedReaderObserver(this); |
| throw e; |
| } |
| } |
| |
| // a dummy scan instance for compaction. |
| private static final Scan SCAN_FOR_COMPACTION = new Scan(); |
| |
| /** |
| * Used for store file compaction and memstore compaction. |
| * <p> |
| * Opens a scanner across specified StoreFiles/MemStoreSegments. |
| * @param store who we scan |
| * @param scanners ancillary scanners |
| * @param smallestReadPoint the readPoint that we should use for tracking versions |
| */ |
| public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, |
| ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { |
| this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); |
| } |
| |
| /** |
| * Used for compactions that drop deletes from a limited range of rows. |
| * <p> |
| * Opens a scanner across specified StoreFiles. |
| * @param store who we scan |
| * @param scanners ancillary scanners |
| * @param smallestReadPoint the readPoint that we should use for tracking versions |
| * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. |
| * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. |
| */ |
| public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, |
| long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, |
| byte[] dropDeletesToRow) throws IOException { |
| this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, |
| earliestPutTs, dropDeletesFromRow, dropDeletesToRow); |
| } |
| |
| private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners, |
| ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, |
| byte[] dropDeletesToRow) throws IOException { |
| this(store, SCAN_FOR_COMPACTION, scanInfo, 0, |
| store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); |
| assert scanType != ScanType.USER_SCAN; |
| matcher = |
| CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, |
| oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); |
| |
| // Filter the list of scanners using Bloom filters, time range, TTL, etc. |
| scanners = selectScannersFrom(store, scanners); |
| |
| // Seek all scanners to the initial key |
| seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); |
| addCurrentScanners(scanners); |
| // Combine all seeked scanners with a heap |
| resetKVHeap(scanners, comparator); |
| } |
| |
| private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) |
| throws IOException { |
| // Seek all scanners to the initial key |
| seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); |
| addCurrentScanners(scanners); |
| resetKVHeap(scanners, comparator); |
| } |
| |
| // For mob compaction only as we do not have a Store instance when doing mob compaction. |
| public StoreScanner(ScanInfo scanInfo, ScanType scanType, |
| List<? extends KeyValueScanner> scanners) throws IOException { |
| this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); |
| assert scanType != ScanType.USER_SCAN; |
| this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, |
| oldestUnexpiredTS, now, null, null, null); |
| seekAllScanner(scanInfo, scanners); |
| } |
| |
| // Used to instantiate a scanner for user scan in test |
| StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, |
| List<? extends KeyValueScanner> scanners, ScanType scanType) throws IOException { |
| // 0 is passed as readpoint because the test bypasses Store |
| this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), |
| scanType); |
| if (scanType == ScanType.USER_SCAN) { |
| this.matcher = |
| UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); |
| } else { |
| this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, |
| HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); |
| } |
| seekAllScanner(scanInfo, scanners); |
| } |
| |
| // Used to instantiate a scanner for user scan in test |
| StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, |
| List<? extends KeyValueScanner> scanners) throws IOException { |
| // 0 is passed as readpoint because the test bypasses Store |
| this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), |
| ScanType.USER_SCAN); |
| this.matcher = |
| UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); |
| seekAllScanner(scanInfo, scanners); |
| } |
| |
| // Used to instantiate a scanner for compaction in test |
| StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType, |
| List<? extends KeyValueScanner> scanners) throws IOException { |
| // 0 is passed as readpoint because the test bypasses Store |
| this(null, maxVersions > 0 ? new Scan().readVersions(maxVersions) |
| : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); |
| this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, |
| HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); |
| seekAllScanner(scanInfo, scanners); |
| } |
| |
| boolean isScanUsePread() { |
| return this.scanUsePread; |
| } |
| /** |
| * Seek the specified scanners with the given key |
| * @param scanners |
| * @param seekKey |
| * @param isLazy true if using lazy seek |
| * @param isParallelSeek true if using parallel seek |
| * @throws IOException |
| */ |
| protected void seekScanners(List<? extends KeyValueScanner> scanners, |
| Cell seekKey, boolean isLazy, boolean isParallelSeek) |
| throws IOException { |
| // Seek all scanners to the start of the Row (or if the exact matching row |
| // key does not exist, then to the start of the next matching Row). |
| // Always check bloom filter to optimize the top row seek for delete |
| // family marker. |
| if (isLazy) { |
| for (KeyValueScanner scanner : scanners) { |
| scanner.requestSeek(seekKey, false, true); |
| } |
| } else { |
| if (!isParallelSeek) { |
| long totalScannersSoughtBytes = 0; |
| for (KeyValueScanner scanner : scanners) { |
| if (matcher.isUserScan() && totalScannersSoughtBytes >= maxRowSize) { |
| throw new RowTooBigException("Max row size allowed: " + maxRowSize |
| + ", but row is bigger than that"); |
| } |
| scanner.seek(seekKey); |
| Cell c = scanner.peek(); |
| if (c != null) { |
| totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c); |
| } |
| } |
| } else { |
| parallelSeek(scanners, seekKey); |
| } |
| } |
| } |
| |
| protected void resetKVHeap(List<? extends KeyValueScanner> scanners, |
| CellComparator comparator) throws IOException { |
| // Combine all seeked scanners with a heap |
| heap = newKVHeap(scanners, comparator); |
| } |
| |
| protected KeyValueHeap newKVHeap(List<? extends KeyValueScanner> scanners, |
| CellComparator comparator) throws IOException { |
| return new KeyValueHeap(scanners, comparator); |
| } |
| |
| /** |
| * Filters the given list of scanners using Bloom filter, time range, and TTL. |
| * <p> |
| * Will be overridden by testcase so declared as protected. |
| */ |
| protected List<KeyValueScanner> selectScannersFrom(HStore store, |
| List<? extends KeyValueScanner> allScanners) { |
| boolean memOnly; |
| boolean filesOnly; |
| if (scan instanceof InternalScan) { |
| InternalScan iscan = (InternalScan) scan; |
| memOnly = iscan.isCheckOnlyMemStore(); |
| filesOnly = iscan.isCheckOnlyStoreFiles(); |
| } else { |
| memOnly = false; |
| filesOnly = false; |
| } |
| |
| List<KeyValueScanner> scanners = new ArrayList<>(allScanners.size()); |
| |
| // We can only exclude store files based on TTL if minVersions is set to 0. |
| // Otherwise, we might have to return KVs that have technically expired. |
| long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; |
| |
| // include only those scan files which pass all filters |
| for (KeyValueScanner kvs : allScanners) { |
| boolean isFile = kvs.isFileScanner(); |
| if ((!isFile && filesOnly) || (isFile && memOnly)) { |
| continue; |
| } |
| |
| if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { |
| scanners.add(kvs); |
| } else { |
| kvs.close(); |
| } |
| } |
| return scanners; |
| } |
| |
| @Override |
| public Cell peek() { |
| return heap != null ? heap.peek() : null; |
| } |
| |
| @Override |
| public KeyValue next() { |
| // throw runtime exception perhaps? |
| throw new RuntimeException("Never call StoreScanner.next()"); |
| } |
| |
| @Override |
| public void close() { |
| close(true); |
| } |
| |
| private void close(boolean withDelayedScannersClose) { |
| closeLock.lock(); |
| // If the closeLock is acquired then any subsequent updateReaders() |
| // call is ignored. |
| try { |
| if (this.closing) { |
| return; |
| } |
| if (withDelayedScannersClose) { |
| this.closing = true; |
| } |
| // For mob compaction, we do not have a store. |
| if (this.store != null) { |
| this.store.deleteChangedReaderObserver(this); |
| } |
| if (withDelayedScannersClose) { |
| clearAndClose(scannersForDelayedClose); |
| clearAndClose(memStoreScannersAfterFlush); |
| clearAndClose(flushedstoreFileScanners); |
| if (this.heap != null) { |
| this.heap.close(); |
| this.currentScanners.clear(); |
| this.heap = null; // CLOSED! |
| } |
| } else { |
| if (this.heap != null) { |
| this.scannersForDelayedClose.add(this.heap); |
| this.currentScanners.clear(); |
| this.heap = null; |
| } |
| } |
| } finally { |
| closeLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean seek(Cell key) throws IOException { |
| if (checkFlushed()) { |
| reopenAfterFlush(); |
| } |
| return this.heap.seek(key); |
| } |
| |
| /** |
| * Get the next row of values from this Store. |
| * @param outResult |
| * @param scannerContext |
| * @return true if there are more rows, false if scanner is done |
| */ |
| @Override |
| public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { |
| if (scannerContext == null) { |
| throw new IllegalArgumentException("Scanner context cannot be null"); |
| } |
| if (checkFlushed() && reopenAfterFlush()) { |
| return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); |
| } |
| |
| // if the heap was left null, then the scanners had previously run out anyways, close and |
| // return. |
| if (this.heap == null) { |
| // By this time partial close should happened because already heap is null |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| |
| Cell cell = this.heap.peek(); |
| if (cell == null) { |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| |
| // only call setRow if the row changes; avoids confusing the query matcher |
| // if scanning intra-row |
| |
| // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing |
| // rows. Else it is possible we are still traversing the same row so we must perform the row |
| // comparison. |
| if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) { |
| this.countPerRow = 0; |
| matcher.setToNewRow(cell); |
| } |
| |
| // Clear progress away unless invoker has indicated it should be kept. |
| if (!scannerContext.getKeepProgress()) { |
| scannerContext.clearProgress(); |
| } |
| |
| int count = 0; |
| long totalBytesRead = 0; |
| // track the cells for metrics only if it is a user read request. |
| boolean onlyFromMemstore = matcher.isUserScan(); |
| try { |
| LOOP: do { |
| // Update and check the time limit based on the configured value of cellsPerTimeoutCheck |
| // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream |
| // in |
| // the shipped method below. |
| if (kvsScanned % cellsPerHeartbeatCheck == 0 |
| || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { |
| if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { |
| return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); |
| } |
| } |
| // Do object compare - we set prevKV from the same heap. |
| if (prevCell != cell) { |
| ++kvsScanned; |
| } |
| checkScanOrder(prevCell, cell, comparator); |
| int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); |
| bytesRead += cellSize; |
| if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) { |
| // return immediately if we want to switch from pread to stream. We need this because we |
| // can |
| // only switch in the shipped method, if user use a filter to filter out everything and |
| // rpc |
| // timeout is very large then the shipped method will never be called until the whole scan |
| // is finished, but at that time we have already scan all the data... |
| // See HBASE-20457 for more details. |
| // And there is still a scenario that can not be handled. If we have a very large row, |
| // which |
| // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag |
| // here, we still need to scan all the qualifiers before returning... |
| scannerContext.returnImmediately(); |
| } |
| prevCell = cell; |
| scannerContext.setLastPeekedCell(cell); |
| topChanged = false; |
| ScanQueryMatcher.MatchCode qcode = matcher.match(cell); |
| switch (qcode) { |
| case INCLUDE: |
| case INCLUDE_AND_SEEK_NEXT_ROW: |
| case INCLUDE_AND_SEEK_NEXT_COL: |
| |
| Filter f = matcher.getFilter(); |
| if (f != null) { |
| cell = f.transformCell(cell); |
| } |
| this.countPerRow++; |
| if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { |
| // do what SEEK_NEXT_ROW does. |
| if (!matcher.moreRowsMayExistAfter(cell)) { |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| matcher.clearCurrentRow(); |
| seekToNextRow(cell); |
| break LOOP; |
| } |
| |
| // add to results only if we have skipped #storeOffset kvs |
| // also update metric accordingly |
| if (this.countPerRow > storeOffset) { |
| outResult.add(cell); |
| |
| // Update local tracking information |
| count++; |
| totalBytesRead += cellSize; |
| |
| /** |
| * Increment the metric if all the cells are from memstore. |
| * If not we will account it for mixed reads |
| */ |
| onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore(); |
| // Update the progress of the scanner context |
| scannerContext.incrementSizeProgress(cellSize, cell.heapSize()); |
| scannerContext.incrementBatchProgress(1); |
| |
| if (matcher.isUserScan() && totalBytesRead > maxRowSize) { |
| String message = "Max row size allowed: " + maxRowSize |
| + ", but the row is bigger than that, the row info: " |
| + CellUtil.toString(cell, false) + ", already have process row cells = " |
| + outResult.size() + ", it belong to region = " |
| + store.getHRegion().getRegionInfo().getRegionNameAsString(); |
| LOG.warn(message); |
| throw new RowTooBigException(message); |
| } |
| } |
| |
| if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { |
| if (!matcher.moreRowsMayExistAfter(cell)) { |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| matcher.clearCurrentRow(); |
| seekOrSkipToNextRow(cell); |
| } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { |
| seekOrSkipToNextColumn(cell); |
| } else { |
| this.heap.next(); |
| } |
| |
| if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { |
| break LOOP; |
| } |
| if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { |
| break LOOP; |
| } |
| continue; |
| |
| case DONE: |
| // Optimization for Gets! If DONE, no more to get on this row, early exit! |
| if (get) { |
| // Then no more to this row... exit. |
| close(false);// Do all cleanup except heap.close() |
| // update metric |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| matcher.clearCurrentRow(); |
| return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); |
| |
| case DONE_SCAN: |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| |
| case SEEK_NEXT_ROW: |
| // This is just a relatively simple end of scan fix, to short-cut end |
| // us if there is an endKey in the scan. |
| if (!matcher.moreRowsMayExistAfter(cell)) { |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } |
| matcher.clearCurrentRow(); |
| seekOrSkipToNextRow(cell); |
| NextState stateAfterSeekNextRow = needToReturn(outResult); |
| if (stateAfterSeekNextRow != null) { |
| return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues(); |
| } |
| break; |
| |
| case SEEK_NEXT_COL: |
| seekOrSkipToNextColumn(cell); |
| NextState stateAfterSeekNextColumn = needToReturn(outResult); |
| if (stateAfterSeekNextColumn != null) { |
| return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues(); |
| } |
| break; |
| |
| case SKIP: |
| this.heap.next(); |
| break; |
| |
| case SEEK_NEXT_USING_HINT: |
| Cell nextKV = matcher.getNextKeyHint(cell); |
| if (nextKV != null && comparator.compare(nextKV, cell) > 0) { |
| seekAsDirection(nextKV); |
| NextState stateAfterSeekByHint = needToReturn(outResult); |
| if (stateAfterSeekByHint != null) { |
| return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues(); |
| } |
| } else { |
| heap.next(); |
| } |
| break; |
| |
| default: |
| throw new RuntimeException("UNEXPECTED"); |
| } |
| } while ((cell = this.heap.peek()) != null); |
| |
| if (count > 0) { |
| return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); |
| } |
| |
| // No more keys |
| close(false);// Do all cleanup except heap.close() |
| return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); |
| } finally { |
| // increment only if we have some result |
| if (count > 0 && matcher.isUserScan()) { |
| // if true increment memstore metrics, if not the mixed one |
| updateMetricsStore(onlyFromMemstore); |
| } |
| } |
| } |
| |
| private void updateMetricsStore(boolean memstoreRead) { |
| if (store != null) { |
| store.updateMetricsStore(memstoreRead); |
| } else { |
| // for testing. |
| if (memstoreRead) { |
| memstoreOnlyReads++; |
| } else { |
| mixedReads++; |
| } |
| } |
| } |
| |
| /** |
| * If the top cell won't be flushed into disk, the new top cell may be |
| * changed after #reopenAfterFlush. Because the older top cell only exist |
| * in the memstore scanner but the memstore scanner is replaced by hfile |
| * scanner after #reopenAfterFlush. If the row of top cell is changed, |
| * we should return the current cells. Otherwise, we may return |
| * the cells across different rows. |
| * @param outResult the cells which are visible for user scan |
| * @return null is the top cell doesn't change. Otherwise, the NextState |
| * to return |
| */ |
| private NextState needToReturn(List<Cell> outResult) { |
| if (!outResult.isEmpty() && topChanged) { |
| return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES; |
| } |
| return null; |
| } |
| |
| private void seekOrSkipToNextRow(Cell cell) throws IOException { |
| // If it is a Get Scan, then we know that we are done with this row; there are no more |
| // rows beyond the current one: don't try to optimize. |
| if (!get) { |
| if (trySkipToNextRow(cell)) { |
| return; |
| } |
| } |
| seekToNextRow(cell); |
| } |
| |
| private void seekOrSkipToNextColumn(Cell cell) throws IOException { |
| if (!trySkipToNextColumn(cell)) { |
| seekAsDirection(matcher.getKeyForNextColumn(cell)); |
| } |
| } |
| |
| /** |
| * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109). |
| * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, |
| * or seek to an arbitrary seek key. This method decides whether a seek is the most efficient |
| * _actual_ way to get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, |
| * SKIP inside the current, loaded block). |
| * It does this by looking at the next indexed key of the current HFile. This key |
| * is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible key |
| * on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work with |
| * the current Cell but compare as though it were a seek key; see down in |
| * matcher.compareKeyForNextRow, etc). If the compare gets us onto the |
| * next block we *_SEEK, otherwise we just SKIP to the next requested cell. |
| * |
| * <p>Other notes: |
| * <ul> |
| * <li>Rows can straddle block boundaries</li> |
| * <li>Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a |
| * different block than column C1 at T2)</li> |
| * <li>We want to SKIP if the chance is high that we'll find the desired Cell after a |
| * few SKIPs...</li> |
| * <li>We want to SEEK when the chance is high that we'll be able to seek |
| * past many Cells, especially if we know we need to go to the next block.</li> |
| * </ul> |
| * <p>A good proxy (best effort) to determine whether SKIP is better than SEEK is whether |
| * we'll likely end up seeking to the next block (or past the next block) to get our next column. |
| * Example: |
| * <pre> |
| * | BLOCK 1 | BLOCK 2 | |
| * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 | |
| * ^ ^ |
| * | | |
| * Next Index Key SEEK_NEXT_ROW (before r2/c1) |
| * |
| * |
| * | BLOCK 1 | BLOCK 2 | |
| * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 | |
| * ^ ^ |
| * | | |
| * Next Index Key SEEK_NEXT_COL |
| * </pre> |
| * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4 |
| * is > r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only |
| * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at |
| * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios |
| * where the SEEK will not land us in the next block, it is very likely better to issues a series |
| * of SKIPs. |
| * @param cell current cell |
| * @return true means skip to next row, false means not |
| */ |
| protected boolean trySkipToNextRow(Cell cell) throws IOException { |
| Cell nextCell = null; |
| // used to guard against a changed next indexed key by doing a identity comparison |
| // when the identity changes we need to compare the bytes again |
| Cell previousIndexedKey = null; |
| do { |
| Cell nextIndexedKey = getNextIndexedKey(); |
| if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && |
| (nextIndexedKey == previousIndexedKey || |
| matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0)) { |
| this.heap.next(); |
| ++kvsScanned; |
| previousIndexedKey = nextIndexedKey; |
| } else { |
| return false; |
| } |
| } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRows(cell, nextCell)); |
| return true; |
| } |
| |
| /** |
| * See {@link org.apache.hadoop.hbase.regionserver.StoreScanner#trySkipToNextRow(Cell)} |
| * @param cell current cell |
| * @return true means skip to next column, false means not |
| */ |
| protected boolean trySkipToNextColumn(Cell cell) throws IOException { |
| Cell nextCell = null; |
| // used to guard against a changed next indexed key by doing a identity comparison |
| // when the identity changes we need to compare the bytes again |
| Cell previousIndexedKey = null; |
| do { |
| Cell nextIndexedKey = getNextIndexedKey(); |
| if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY && |
| (nextIndexedKey == previousIndexedKey || |
| matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0)) { |
| this.heap.next(); |
| ++kvsScanned; |
| previousIndexedKey = nextIndexedKey; |
| } else { |
| return false; |
| } |
| } while ((nextCell = this.heap.peek()) != null && CellUtil.matchingRowColumn(cell, nextCell)); |
| // We need this check because it may happen that the new scanner that we get |
| // during heap.next() is requiring reseek due of fake KV previously generated for |
| // ROWCOL bloom filter optimization. See HBASE-19863 for more details |
| if (useRowColBloom && nextCell != null && matcher.compareKeyForNextColumn(nextCell, cell) < 0) { |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public long getReadPoint() { |
| return this.readPt; |
| } |
| |
| private static void clearAndClose(List<KeyValueScanner> scanners) { |
| if (scanners == null) { |
| return; |
| } |
| for (KeyValueScanner s : scanners) { |
| s.close(); |
| } |
| scanners.clear(); |
| } |
| |
| // Implementation of ChangedReadersObserver |
| @Override |
| public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) |
| throws IOException { |
| if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { |
| return; |
| } |
| boolean updateReaders = false; |
| flushLock.lock(); |
| try { |
| if (!closeLock.tryLock()) { |
| // The reason for doing this is that when the current store scanner does not retrieve |
| // any new cells, then the scanner is considered to be done. The heap of this scanner |
| // is not closed till the shipped() call is completed. Hence in that case if at all |
| // the partial close (close (false)) has been called before updateReaders(), there is no |
| // need for the updateReaders() to happen. |
| LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); |
| // no lock acquired. |
| clearAndClose(memStoreScanners); |
| return; |
| } |
| // lock acquired |
| updateReaders = true; |
| if (this.closing) { |
| LOG.debug("StoreScanner already closing. There is no need to updateReaders"); |
| clearAndClose(memStoreScanners); |
| return; |
| } |
| flushed = true; |
| final boolean isCompaction = false; |
| boolean usePread = get || scanUsePread; |
| // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner |
| // calls next(). So its better we create scanners here rather than next() call. Ensure |
| // these scanners are properly closed() whether or not the scan is completed successfully |
| // Eagerly creating scanners so that we have the ref counting ticking on the newly created |
| // store files. In case of stream scanners this eager creation does not induce performance |
| // penalty because in scans (that uses stream scanners) the next() call is bound to happen. |
| List<KeyValueScanner> scanners = store.getScanners(sfs, cacheBlocks, get, usePread, |
| isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); |
| flushedstoreFileScanners.addAll(scanners); |
| if (!CollectionUtils.isEmpty(memStoreScanners)) { |
| clearAndClose(memStoreScannersAfterFlush); |
| memStoreScannersAfterFlush.addAll(memStoreScanners); |
| } |
| } finally { |
| flushLock.unlock(); |
| if (updateReaders) { |
| closeLock.unlock(); |
| } |
| } |
| // Let the next() call handle re-creating and seeking |
| } |
| |
| /** |
| * @return if top of heap has changed (and KeyValueHeap has to try the next KV) |
| */ |
| protected final boolean reopenAfterFlush() throws IOException { |
| // here we can make sure that we have a Store instance so no null check on store. |
| Cell lastTop = heap.peek(); |
| // When we have the scan object, should we not pass it to getScanners() to get a limited set of |
| // scanners? We did so in the constructor and we could have done it now by storing the scan |
| // object from the constructor |
| List<KeyValueScanner> scanners; |
| flushLock.lock(); |
| try { |
| List<KeyValueScanner> allScanners = |
| new ArrayList<>(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size()); |
| allScanners.addAll(flushedstoreFileScanners); |
| allScanners.addAll(memStoreScannersAfterFlush); |
| scanners = selectScannersFrom(store, allScanners); |
| // Clear the current set of flushed store files scanners so that they don't get added again |
| flushedstoreFileScanners.clear(); |
| memStoreScannersAfterFlush.clear(); |
| } finally { |
| flushLock.unlock(); |
| } |
| |
| // Seek the new scanners to the last key |
| seekScanners(scanners, lastTop, false, parallelSeekEnabled); |
| // remove the older memstore scanner |
| for (int i = currentScanners.size() - 1; i >=0; i--) { |
| if (!currentScanners.get(i).isFileScanner()) { |
| scannersForDelayedClose.add(currentScanners.remove(i)); |
| } else { |
| // we add the memstore scanner to the end of currentScanners |
| break; |
| } |
| } |
| // add the newly created scanners on the flushed files and the current active memstore scanner |
| addCurrentScanners(scanners); |
| // Combine all seeked scanners with a heap |
| resetKVHeap(this.currentScanners, store.getComparator()); |
| resetQueryMatcher(lastTop); |
| if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { |
| LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString() + |
| ",and after = " + heap.peek()); |
| topChanged = true; |
| } else { |
| topChanged = false; |
| } |
| return topChanged; |
| } |
| |
| private void resetQueryMatcher(Cell lastTopKey) { |
| // Reset the state of the Query Matcher and set to top row. |
| // Only reset and call setRow if the row changes; avoids confusing the |
| // query matcher if scanning intra-row. |
| Cell cell = heap.peek(); |
| if (cell == null) { |
| cell = lastTopKey; |
| } |
| if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) { |
| this.countPerRow = 0; |
| // The setToNewRow will call reset internally |
| matcher.setToNewRow(cell); |
| } |
| } |
| |
| /** |
| * Check whether scan as expected order |
| * @param prevKV |
| * @param kv |
| * @param comparator |
| * @throws IOException |
| */ |
| protected void checkScanOrder(Cell prevKV, Cell kv, |
| CellComparator comparator) throws IOException { |
| // Check that the heap gives us KVs in an increasing order. |
| assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : "Key " |
| + prevKV + " followed by a smaller key " + kv + " in cf " + store; |
| } |
| |
| protected boolean seekToNextRow(Cell c) throws IOException { |
| return reseek(PrivateCellUtil.createLastOnRow(c)); |
| } |
| |
| /** |
| * Do a reseek in a normal StoreScanner(scan forward) |
| * @param kv |
| * @return true if scanner has values left, false if end of scanner |
| * @throws IOException |
| */ |
| protected boolean seekAsDirection(Cell kv) |
| throws IOException { |
| return reseek(kv); |
| } |
| |
| @Override |
| public boolean reseek(Cell kv) throws IOException { |
| if (checkFlushed()) { |
| reopenAfterFlush(); |
| } |
| if (explicitColumnQuery && lazySeekEnabledGlobally) { |
| return heap.requestSeek(kv, true, useRowColBloom); |
| } |
| return heap.reseek(kv); |
| } |
| |
| void trySwitchToStreamRead() { |
| if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || |
| heap.peek() == null || bytesRead < preadMaxBytes) { |
| return; |
| } |
| LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead, |
| this.store.getColumnFamilyName()); |
| scanUsePread = false; |
| Cell lastTop = heap.peek(); |
| List<KeyValueScanner> memstoreScanners = new ArrayList<>(); |
| List<KeyValueScanner> scannersToClose = new ArrayList<>(); |
| for (KeyValueScanner kvs : currentScanners) { |
| if (!kvs.isFileScanner()) { |
| // collect memstorescanners here |
| memstoreScanners.add(kvs); |
| } else { |
| scannersToClose.add(kvs); |
| } |
| } |
| List<KeyValueScanner> fileScanners = null; |
| List<KeyValueScanner> newCurrentScanners; |
| KeyValueHeap newHeap; |
| try { |
| // We must have a store instance here so no null check |
| // recreate the scanners on the current file scanners |
| fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, |
| matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), |
| scan.includeStopRow(), readPt, false); |
| if (fileScanners == null) { |
| return; |
| } |
| seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); |
| newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size()); |
| newCurrentScanners.addAll(fileScanners); |
| newCurrentScanners.addAll(memstoreScanners); |
| newHeap = newKVHeap(newCurrentScanners, comparator); |
| } catch (Exception e) { |
| LOG.warn("failed to switch to stream read", e); |
| if (fileScanners != null) { |
| fileScanners.forEach(KeyValueScanner::close); |
| } |
| return; |
| } |
| currentScanners.clear(); |
| addCurrentScanners(newCurrentScanners); |
| this.heap = newHeap; |
| resetQueryMatcher(lastTop); |
| scannersToClose.forEach(KeyValueScanner::close); |
| } |
| |
| protected final boolean checkFlushed() { |
| // check the var without any lock. Suppose even if we see the old |
| // value here still it is ok to continue because we will not be resetting |
| // the heap but will continue with the referenced memstore's snapshot. For compactions |
| // any way we don't need the updateReaders at all to happen as we still continue with |
| // the older files |
| if (flushed) { |
| // If there is a flush and the current scan is notified on the flush ensure that the |
| // scan's heap gets reset and we do a seek on the newly flushed file. |
| if (this.closing) { |
| return false; |
| } |
| // reset the flag |
| flushed = false; |
| return true; |
| } |
| return false; |
| } |
| |
| |
| /** |
| * Seek storefiles in parallel to optimize IO latency as much as possible |
| * @param scanners the list {@link KeyValueScanner}s to be read from |
| * @param kv the KeyValue on which the operation is being requested |
| * @throws IOException |
| */ |
| private void parallelSeek(final List<? extends KeyValueScanner> |
| scanners, final Cell kv) throws IOException { |
| if (scanners.isEmpty()) return; |
| int storeFileScannerCount = scanners.size(); |
| CountDownLatch latch = new CountDownLatch(storeFileScannerCount); |
| List<ParallelSeekHandler> handlers = new ArrayList<>(storeFileScannerCount); |
| for (KeyValueScanner scanner : scanners) { |
| if (scanner instanceof StoreFileScanner) { |
| ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, |
| this.readPt, latch); |
| executor.submit(seekHandler); |
| handlers.add(seekHandler); |
| } else { |
| scanner.seek(kv); |
| latch.countDown(); |
| } |
| } |
| |
| try { |
| latch.await(); |
| } catch (InterruptedException ie) { |
| throw (InterruptedIOException)new InterruptedIOException().initCause(ie); |
| } |
| |
| for (ParallelSeekHandler handler : handlers) { |
| if (handler.getErr() != null) { |
| throw new IOException(handler.getErr()); |
| } |
| } |
| } |
| |
| /** |
| * Used in testing. |
| * @return all scanners in no particular order |
| */ |
| List<KeyValueScanner> getAllScannersForTesting() { |
| List<KeyValueScanner> allScanners = new ArrayList<>(); |
| KeyValueScanner current = heap.getCurrentForTesting(); |
| if (current != null) |
| allScanners.add(current); |
| for (KeyValueScanner scanner : heap.getHeap()) |
| allScanners.add(scanner); |
| return allScanners; |
| } |
| |
| static void enableLazySeekGlobally(boolean enable) { |
| lazySeekEnabledGlobally = enable; |
| } |
| |
| /** |
| * @return The estimated number of KVs seen by this scanner (includes some skipped KVs). |
| */ |
| public long getEstimatedNumberOfKvsScanned() { |
| return this.kvsScanned; |
| } |
| |
| @Override |
| public Cell getNextIndexedKey() { |
| return this.heap.getNextIndexedKey(); |
| } |
| |
| @Override |
| public void shipped() throws IOException { |
| if (prevCell != null) { |
| // Do the copy here so that in case the prevCell ref is pointing to the previous |
| // blocks we can safely release those blocks. |
| // This applies to blocks that are got from Bucket cache, L1 cache and the blocks |
| // fetched from HDFS. Copying this would ensure that we let go the references to these |
| // blocks so that they can be GCed safely(in case of bucket cache) |
| prevCell = KeyValueUtil.toNewKeyCell(this.prevCell); |
| } |
| matcher.beforeShipped(); |
| // There wont be further fetch of Cells from these scanners. Just close. |
| clearAndClose(scannersForDelayedClose); |
| if (this.heap != null) { |
| this.heap.shipped(); |
| // When switching from pread to stream, we will open a new scanner for each store file, but |
| // the old scanner may still track the HFileBlocks we have scanned but not sent back to client |
| // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others |
| // before we serialize and send it back to client. The HFileBlocks will be released in shipped |
| // method, so we here will also open new scanners and close old scanners in shipped method. |
| // See HBASE-18055 for more details. |
| trySwitchToStreamRead(); |
| } |
| } |
| } |