blob: 9ad0f8ca09beffbeb8a00588066157ec236f74ba [file] [log] [blame]
<!DOCTYPE HTML>
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Source code</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="source: package: org.apache.hadoop.hbase.regionserver, class: StoreScanner">
<meta name="generator" content="javadoc/SourceToHTMLConverter">
<link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style">
</head>
<body class="source-page">
<main role="main">
<div class="source-container">
<pre><span class="source-line-no">001</span><span id="line-1">/*</span>
<span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span>
<span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span>
<span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span>
<span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span>
<span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span>
<span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span>
<span class="source-line-no">009</span><span id="line-9"> *</span>
<span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="source-line-no">011</span><span id="line-11"> *</span>
<span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span>
<span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span>
<span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span>
<span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span>
<span class="source-line-no">017</span><span id="line-17"> */</span>
<span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.regionserver;</span>
<span class="source-line-no">019</span><span id="line-19"></span>
<span class="source-line-no">020</span><span id="line-20">import java.io.IOException;</span>
<span class="source-line-no">021</span><span id="line-21">import java.io.InterruptedIOException;</span>
<span class="source-line-no">022</span><span id="line-22">import java.util.ArrayList;</span>
<span class="source-line-no">023</span><span id="line-23">import java.util.List;</span>
<span class="source-line-no">024</span><span id="line-24">import java.util.NavigableSet;</span>
<span class="source-line-no">025</span><span id="line-25">import java.util.Optional;</span>
<span class="source-line-no">026</span><span id="line-26">import java.util.concurrent.CountDownLatch;</span>
<span class="source-line-no">027</span><span id="line-27">import java.util.concurrent.locks.ReentrantLock;</span>
<span class="source-line-no">028</span><span id="line-28">import org.apache.hadoop.hbase.Cell;</span>
<span class="source-line-no">029</span><span id="line-29">import org.apache.hadoop.hbase.CellComparator;</span>
<span class="source-line-no">030</span><span id="line-30">import org.apache.hadoop.hbase.CellUtil;</span>
<span class="source-line-no">031</span><span id="line-31">import org.apache.hadoop.hbase.DoNotRetryIOException;</span>
<span class="source-line-no">032</span><span id="line-32">import org.apache.hadoop.hbase.ExtendedCell;</span>
<span class="source-line-no">033</span><span id="line-33">import org.apache.hadoop.hbase.HConstants;</span>
<span class="source-line-no">034</span><span id="line-34">import org.apache.hadoop.hbase.KeyValue;</span>
<span class="source-line-no">035</span><span id="line-35">import org.apache.hadoop.hbase.KeyValueUtil;</span>
<span class="source-line-no">036</span><span id="line-36">import org.apache.hadoop.hbase.PrivateCellUtil;</span>
<span class="source-line-no">037</span><span id="line-37">import org.apache.hadoop.hbase.PrivateConstants;</span>
<span class="source-line-no">038</span><span id="line-38">import org.apache.hadoop.hbase.client.IsolationLevel;</span>
<span class="source-line-no">039</span><span id="line-39">import org.apache.hadoop.hbase.client.Scan;</span>
<span class="source-line-no">040</span><span id="line-40">import org.apache.hadoop.hbase.executor.ExecutorService;</span>
<span class="source-line-no">041</span><span id="line-41">import org.apache.hadoop.hbase.filter.Filter;</span>
<span class="source-line-no">042</span><span id="line-42">import org.apache.hadoop.hbase.ipc.RpcCall;</span>
<span class="source-line-no">043</span><span id="line-43">import org.apache.hadoop.hbase.ipc.RpcServer;</span>
<span class="source-line-no">044</span><span id="line-44">import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;</span>
<span class="source-line-no">045</span><span id="line-45">import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;</span>
<span class="source-line-no">046</span><span id="line-46">import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;</span>
<span class="source-line-no">047</span><span id="line-47">import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;</span>
<span class="source-line-no">048</span><span id="line-48">import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;</span>
<span class="source-line-no">049</span><span id="line-49">import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;</span>
<span class="source-line-no">050</span><span id="line-50">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span>
<span class="source-line-no">051</span><span id="line-51">import org.apache.yetus.audience.InterfaceAudience;</span>
<span class="source-line-no">052</span><span id="line-52">import org.slf4j.Logger;</span>
<span class="source-line-no">053</span><span id="line-53">import org.slf4j.LoggerFactory;</span>
<span class="source-line-no">054</span><span id="line-54"></span>
<span class="source-line-no">055</span><span id="line-55">import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;</span>
<span class="source-line-no">056</span><span id="line-56">import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;</span>
<span class="source-line-no">057</span><span id="line-57"></span>
<span class="source-line-no">058</span><span id="line-58">/**</span>
<span class="source-line-no">059</span><span id="line-59"> * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List&amp;lt;KeyValue&amp;gt;</span>
<span class="source-line-no">060</span><span id="line-60"> * for a single row.</span>
<span class="source-line-no">061</span><span id="line-61"> * &lt;p&gt;</span>
<span class="source-line-no">062</span><span id="line-62"> * The implementation is not thread safe. So there will be no race between next and close. The only</span>
<span class="source-line-no">063</span><span id="line-63"> * exception is updateReaders, it will be called in the memstore flush thread to indicate that there</span>
<span class="source-line-no">064</span><span id="line-64"> * is a flush.</span>
<span class="source-line-no">065</span><span id="line-65"> */</span>
<span class="source-line-no">066</span><span id="line-66">@InterfaceAudience.Private</span>
<span class="source-line-no">067</span><span id="line-67">public class StoreScanner extends NonReversedNonLazyKeyValueScanner</span>
<span class="source-line-no">068</span><span id="line-68"> implements KeyValueScanner, InternalScanner, ChangedReadersObserver {</span>
<span class="source-line-no">069</span><span id="line-69"> private static final Logger LOG = LoggerFactory.getLogger(StoreScanner.class);</span>
<span class="source-line-no">070</span><span id="line-70"> // In unit tests, the store could be null</span>
<span class="source-line-no">071</span><span id="line-71"> protected final HStore store;</span>
<span class="source-line-no">072</span><span id="line-72"> private final CellComparator comparator;</span>
<span class="source-line-no">073</span><span id="line-73"> private ScanQueryMatcher matcher;</span>
<span class="source-line-no">074</span><span id="line-74"> protected KeyValueHeap heap;</span>
<span class="source-line-no">075</span><span id="line-75"> private boolean cacheBlocks;</span>
<span class="source-line-no">076</span><span id="line-76"></span>
<span class="source-line-no">077</span><span id="line-77"> private long countPerRow = 0;</span>
<span class="source-line-no">078</span><span id="line-78"> private int storeLimit = -1;</span>
<span class="source-line-no">079</span><span id="line-79"> private int storeOffset = 0;</span>
<span class="source-line-no">080</span><span id="line-80"></span>
<span class="source-line-no">081</span><span id="line-81"> // Used to indicate that the scanner has closed (see HBASE-1107)</span>
<span class="source-line-no">082</span><span id="line-82"> private volatile boolean closing = false;</span>
<span class="source-line-no">083</span><span id="line-83"> private final boolean get;</span>
<span class="source-line-no">084</span><span id="line-84"> private final boolean explicitColumnQuery;</span>
<span class="source-line-no">085</span><span id="line-85"> private final boolean useRowColBloom;</span>
<span class="source-line-no">086</span><span id="line-86"> /**</span>
<span class="source-line-no">087</span><span id="line-87"> * A flag that enables StoreFileScanner parallel-seeking</span>
<span class="source-line-no">088</span><span id="line-88"> */</span>
<span class="source-line-no">089</span><span id="line-89"> private boolean parallelSeekEnabled = false;</span>
<span class="source-line-no">090</span><span id="line-90"> private ExecutorService executor;</span>
<span class="source-line-no">091</span><span id="line-91"> private final Scan scan;</span>
<span class="source-line-no">092</span><span id="line-92"> private final long oldestUnexpiredTS;</span>
<span class="source-line-no">093</span><span id="line-93"> private final long now;</span>
<span class="source-line-no">094</span><span id="line-94"> private final int minVersions;</span>
<span class="source-line-no">095</span><span id="line-95"> private final long maxRowSize;</span>
<span class="source-line-no">096</span><span id="line-96"> private final long cellsPerHeartbeatCheck;</span>
<span class="source-line-no">097</span><span id="line-97"> long memstoreOnlyReads;</span>
<span class="source-line-no">098</span><span id="line-98"> long mixedReads;</span>
<span class="source-line-no">099</span><span id="line-99"></span>
<span class="source-line-no">100</span><span id="line-100"> // 1) Collects all the KVHeap that are eagerly getting closed during the</span>
<span class="source-line-no">101</span><span id="line-101"> // course of a scan</span>
<span class="source-line-no">102</span><span id="line-102"> // 2) Collects the unused memstore scanners. If we close the memstore scanners</span>
<span class="source-line-no">103</span><span id="line-103"> // before sending data to client, the chunk may be reclaimed by other</span>
<span class="source-line-no">104</span><span id="line-104"> // updates and the data will be corrupt.</span>
<span class="source-line-no">105</span><span id="line-105"> private final List&lt;KeyValueScanner&gt; scannersForDelayedClose = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">106</span><span id="line-106"></span>
<span class="source-line-no">107</span><span id="line-107"> /**</span>
<span class="source-line-no">108</span><span id="line-108"> * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not KVs skipped via</span>
<span class="source-line-no">109</span><span id="line-109"> * seeking to next row/column. TODO: estimate them?</span>
<span class="source-line-no">110</span><span id="line-110"> */</span>
<span class="source-line-no">111</span><span id="line-111"> private long kvsScanned = 0;</span>
<span class="source-line-no">112</span><span id="line-112"> private ExtendedCell prevCell = null;</span>
<span class="source-line-no">113</span><span id="line-113"></span>
<span class="source-line-no">114</span><span id="line-114"> private final long preadMaxBytes;</span>
<span class="source-line-no">115</span><span id="line-115"> private long bytesRead;</span>
<span class="source-line-no">116</span><span id="line-116"></span>
<span class="source-line-no">117</span><span id="line-117"> /** We don't ever expect to change this, the constant is just for clarity. */</span>
<span class="source-line-no">118</span><span id="line-118"> static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;</span>
<span class="source-line-no">119</span><span id="line-119"> public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =</span>
<span class="source-line-no">120</span><span id="line-120"> "hbase.storescanner.parallel.seek.enable";</span>
<span class="source-line-no">121</span><span id="line-121"></span>
<span class="source-line-no">122</span><span id="line-122"> /** Used during unit testing to ensure that lazy seek does save seek ops */</span>
<span class="source-line-no">123</span><span id="line-123"> private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;</span>
<span class="source-line-no">124</span><span id="line-124"></span>
<span class="source-line-no">125</span><span id="line-125"> /**</span>
<span class="source-line-no">126</span><span id="line-126"> * The number of cells scanned in between timeout checks. Specifying a larger value means that</span>
<span class="source-line-no">127</span><span id="line-127"> * timeout checks will occur less frequently. Specifying a small value will lead to more frequent</span>
<span class="source-line-no">128</span><span id="line-128"> * timeout checks.</span>
<span class="source-line-no">129</span><span id="line-129"> */</span>
<span class="source-line-no">130</span><span id="line-130"> public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =</span>
<span class="source-line-no">131</span><span id="line-131"> "hbase.cells.scanned.per.heartbeat.check";</span>
<span class="source-line-no">132</span><span id="line-132"></span>
<span class="source-line-no">133</span><span id="line-133"> /**</span>
<span class="source-line-no">134</span><span id="line-134"> * Default value of {@link #HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK}.</span>
<span class="source-line-no">135</span><span id="line-135"> */</span>
<span class="source-line-no">136</span><span id="line-136"> public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;</span>
<span class="source-line-no">137</span><span id="line-137"></span>
<span class="source-line-no">138</span><span id="line-138"> /**</span>
<span class="source-line-no">139</span><span id="line-139"> * If the read type is Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned</span>
<span class="source-line-no">140</span><span id="line-140"> * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of</span>
<span class="source-line-no">141</span><span id="line-141"> * block size for this store. If configured with a value &lt;0, for all scans with ReadType DEFAULT,</span>
<span class="source-line-no">142</span><span id="line-142"> * we will open scanner with stream mode itself.</span>
<span class="source-line-no">143</span><span id="line-143"> */</span>
<span class="source-line-no">144</span><span id="line-144"> public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";</span>
<span class="source-line-no">145</span><span id="line-145"></span>
<span class="source-line-no">146</span><span id="line-146"> private final Scan.ReadType readType;</span>
<span class="source-line-no">147</span><span id="line-147"></span>
<span class="source-line-no">148</span><span id="line-148"> // A flag whether use pread for scan</span>
<span class="source-line-no">149</span><span id="line-149"> // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.</span>
<span class="source-line-no">150</span><span id="line-150"> private boolean scanUsePread;</span>
<span class="source-line-no">151</span><span id="line-151"> // Indicates whether there was flush during the course of the scan</span>
<span class="source-line-no">152</span><span id="line-152"> private volatile boolean flushed = false;</span>
<span class="source-line-no">153</span><span id="line-153"> // generally we get one file from a flush</span>
<span class="source-line-no">154</span><span id="line-154"> private final List&lt;KeyValueScanner&gt; flushedstoreFileScanners = new ArrayList&lt;&gt;(1);</span>
<span class="source-line-no">155</span><span id="line-155"> // Since CompactingMemstore is now default, we get three memstore scanners from a flush</span>
<span class="source-line-no">156</span><span id="line-156"> private final List&lt;KeyValueScanner&gt; memStoreScannersAfterFlush = new ArrayList&lt;&gt;(3);</span>
<span class="source-line-no">157</span><span id="line-157"> // The current list of scanners</span>
<span class="source-line-no">158</span><span id="line-158"> final List&lt;KeyValueScanner&gt; currentScanners = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">159</span><span id="line-159"> // flush update lock</span>
<span class="source-line-no">160</span><span id="line-160"> private final ReentrantLock flushLock = new ReentrantLock();</span>
<span class="source-line-no">161</span><span id="line-161"> // lock for closing.</span>
<span class="source-line-no">162</span><span id="line-162"> private final ReentrantLock closeLock = new ReentrantLock();</span>
<span class="source-line-no">163</span><span id="line-163"></span>
<span class="source-line-no">164</span><span id="line-164"> protected final long readPt;</span>
<span class="source-line-no">165</span><span id="line-165"> private boolean topChanged = false;</span>
<span class="source-line-no">166</span><span id="line-166"></span>
<span class="source-line-no">167</span><span id="line-167"> /** An internal constructor. */</span>
<span class="source-line-no">168</span><span id="line-168"> private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,</span>
<span class="source-line-no">169</span><span id="line-169"> boolean cacheBlocks, ScanType scanType) {</span>
<span class="source-line-no">170</span><span id="line-170"> this.readPt = readPt;</span>
<span class="source-line-no">171</span><span id="line-171"> this.store = store;</span>
<span class="source-line-no">172</span><span id="line-172"> this.cacheBlocks = cacheBlocks;</span>
<span class="source-line-no">173</span><span id="line-173"> this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());</span>
<span class="source-line-no">174</span><span id="line-174"> get = scan.isGetScan();</span>
<span class="source-line-no">175</span><span id="line-175"> explicitColumnQuery = numColumns &gt; 0;</span>
<span class="source-line-no">176</span><span id="line-176"> this.scan = scan;</span>
<span class="source-line-no">177</span><span id="line-177"> this.now = EnvironmentEdgeManager.currentTime();</span>
<span class="source-line-no">178</span><span id="line-178"> this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();</span>
<span class="source-line-no">179</span><span id="line-179"> this.minVersions = scanInfo.getMinVersions();</span>
<span class="source-line-no">180</span><span id="line-180"></span>
<span class="source-line-no">181</span><span id="line-181"> // We look up row-column Bloom filters for multi-column queries as part of</span>
<span class="source-line-no">182</span><span id="line-182"> // the seek operation. However, we also look the row-column Bloom filter</span>
<span class="source-line-no">183</span><span id="line-183"> // for multi-row (non-"get") scans because this is not done in</span>
<span class="source-line-no">184</span><span id="line-184"> // StoreFile.passesBloomFilter(Scan, SortedSet&lt;byte[]&gt;).</span>
<span class="source-line-no">185</span><span id="line-185"> this.useRowColBloom = numColumns &gt; 1 || (!get &amp;&amp; numColumns == 1) &amp;&amp; (store == null</span>
<span class="source-line-no">186</span><span id="line-186"> || store.getColumnFamilyDescriptor().getBloomFilterType() == BloomType.ROWCOL);</span>
<span class="source-line-no">187</span><span id="line-187"> this.maxRowSize = scanInfo.getTableMaxRowSize();</span>
<span class="source-line-no">188</span><span id="line-188"> this.preadMaxBytes = scanInfo.getPreadMaxBytes();</span>
<span class="source-line-no">189</span><span id="line-189"> if (get) {</span>
<span class="source-line-no">190</span><span id="line-190"> this.readType = Scan.ReadType.PREAD;</span>
<span class="source-line-no">191</span><span id="line-191"> this.scanUsePread = true;</span>
<span class="source-line-no">192</span><span id="line-192"> } else if (scanType != ScanType.USER_SCAN) {</span>
<span class="source-line-no">193</span><span id="line-193"> // For compaction scanners never use Pread as already we have stream based scanners on the</span>
<span class="source-line-no">194</span><span id="line-194"> // store files to be compacted</span>
<span class="source-line-no">195</span><span id="line-195"> this.readType = Scan.ReadType.STREAM;</span>
<span class="source-line-no">196</span><span id="line-196"> this.scanUsePread = false;</span>
<span class="source-line-no">197</span><span id="line-197"> } else {</span>
<span class="source-line-no">198</span><span id="line-198"> if (scan.getReadType() == Scan.ReadType.DEFAULT) {</span>
<span class="source-line-no">199</span><span id="line-199"> if (scanInfo.isUsePread()) {</span>
<span class="source-line-no">200</span><span id="line-200"> this.readType = Scan.ReadType.PREAD;</span>
<span class="source-line-no">201</span><span id="line-201"> } else if (this.preadMaxBytes &lt; 0) {</span>
<span class="source-line-no">202</span><span id="line-202"> this.readType = Scan.ReadType.STREAM;</span>
<span class="source-line-no">203</span><span id="line-203"> } else {</span>
<span class="source-line-no">204</span><span id="line-204"> this.readType = Scan.ReadType.DEFAULT;</span>
<span class="source-line-no">205</span><span id="line-205"> }</span>
<span class="source-line-no">206</span><span id="line-206"> } else {</span>
<span class="source-line-no">207</span><span id="line-207"> this.readType = scan.getReadType();</span>
<span class="source-line-no">208</span><span id="line-208"> }</span>
<span class="source-line-no">209</span><span id="line-209"> // Always start with pread unless user specific stream. Will change to stream later if</span>
<span class="source-line-no">210</span><span id="line-210"> // readType is default if the scan keeps running for a long time.</span>
<span class="source-line-no">211</span><span id="line-211"> this.scanUsePread = this.readType != Scan.ReadType.STREAM;</span>
<span class="source-line-no">212</span><span id="line-212"> }</span>
<span class="source-line-no">213</span><span id="line-213"> this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();</span>
<span class="source-line-no">214</span><span id="line-214"> // Parallel seeking is on if the config allows and more there is more than one store file.</span>
<span class="source-line-no">215</span><span id="line-215"> if (store != null &amp;&amp; store.getStorefilesCount() &gt; 1) {</span>
<span class="source-line-no">216</span><span id="line-216"> RegionServerServices rsService = store.getHRegion().getRegionServerServices();</span>
<span class="source-line-no">217</span><span id="line-217"> if (rsService != null &amp;&amp; scanInfo.isParallelSeekEnabled()) {</span>
<span class="source-line-no">218</span><span id="line-218"> this.parallelSeekEnabled = true;</span>
<span class="source-line-no">219</span><span id="line-219"> this.executor = rsService.getExecutorService();</span>
<span class="source-line-no">220</span><span id="line-220"> }</span>
<span class="source-line-no">221</span><span id="line-221"> }</span>
<span class="source-line-no">222</span><span id="line-222"> }</span>
<span class="source-line-no">223</span><span id="line-223"></span>
<span class="source-line-no">224</span><span id="line-224"> private void addCurrentScanners(List&lt;? extends KeyValueScanner&gt; scanners) {</span>
<span class="source-line-no">225</span><span id="line-225"> this.currentScanners.addAll(scanners);</span>
<span class="source-line-no">226</span><span id="line-226"> }</span>
<span class="source-line-no">227</span><span id="line-227"></span>
<span class="source-line-no">228</span><span id="line-228"> private static boolean isOnlyLatestVersionScan(Scan scan) {</span>
<span class="source-line-no">229</span><span id="line-229"> // No need to check for Scan#getMaxVersions because live version files generated by store file</span>
<span class="source-line-no">230</span><span id="line-230"> // writer retains max versions specified in ColumnFamilyDescriptor for the given CF</span>
<span class="source-line-no">231</span><span id="line-231"> return !scan.isRaw() &amp;&amp; scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP;</span>
<span class="source-line-no">232</span><span id="line-232"> }</span>
<span class="source-line-no">233</span><span id="line-233"></span>
<span class="source-line-no">234</span><span id="line-234"> /**</span>
<span class="source-line-no">235</span><span id="line-235"> * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a</span>
<span class="source-line-no">236</span><span id="line-236"> * compaction.</span>
<span class="source-line-no">237</span><span id="line-237"> * @param store who we scan</span>
<span class="source-line-no">238</span><span id="line-238"> * @param scan the spec</span>
<span class="source-line-no">239</span><span id="line-239"> * @param columns which columns we are scanning</span>
<span class="source-line-no">240</span><span id="line-240"> */</span>
<span class="source-line-no">241</span><span id="line-241"> public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet&lt;byte[]&gt; columns,</span>
<span class="source-line-no">242</span><span id="line-242"> long readPt) throws IOException {</span>
<span class="source-line-no">243</span><span id="line-243"> this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(),</span>
<span class="source-line-no">244</span><span id="line-244"> ScanType.USER_SCAN);</span>
<span class="source-line-no">245</span><span id="line-245"> if (columns != null &amp;&amp; scan.isRaw()) {</span>
<span class="source-line-no">246</span><span id="line-246"> throw new DoNotRetryIOException("Cannot specify any column for a raw scan");</span>
<span class="source-line-no">247</span><span id="line-247"> }</span>
<span class="source-line-no">248</span><span id="line-248"> matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now,</span>
<span class="source-line-no">249</span><span id="line-249"> store.getCoprocessorHost());</span>
<span class="source-line-no">250</span><span id="line-250"></span>
<span class="source-line-no">251</span><span id="line-251"> store.addChangedReaderObserver(this);</span>
<span class="source-line-no">252</span><span id="line-252"></span>
<span class="source-line-no">253</span><span id="line-253"> List&lt;KeyValueScanner&gt; scanners = null;</span>
<span class="source-line-no">254</span><span id="line-254"> try {</span>
<span class="source-line-no">255</span><span id="line-255"> // Pass columns to try to filter out unnecessary StoreFiles.</span>
<span class="source-line-no">256</span><span id="line-256"> scanners = selectScannersFrom(store,</span>
<span class="source-line-no">257</span><span id="line-257"> store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),</span>
<span class="source-line-no">258</span><span id="line-258"> scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt,</span>
<span class="source-line-no">259</span><span id="line-259"> isOnlyLatestVersionScan(scan)));</span>
<span class="source-line-no">260</span><span id="line-260"></span>
<span class="source-line-no">261</span><span id="line-261"> // Seek all scanners to the start of the Row (or if the exact matching row</span>
<span class="source-line-no">262</span><span id="line-262"> // key does not exist, then to the start of the next matching Row).</span>
<span class="source-line-no">263</span><span id="line-263"> // Always check bloom filter to optimize the top row seek for delete</span>
<span class="source-line-no">264</span><span id="line-264"> // family marker.</span>
<span class="source-line-no">265</span><span id="line-265"> seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery &amp;&amp; lazySeekEnabledGlobally,</span>
<span class="source-line-no">266</span><span id="line-266"> parallelSeekEnabled);</span>
<span class="source-line-no">267</span><span id="line-267"></span>
<span class="source-line-no">268</span><span id="line-268"> // set storeLimit</span>
<span class="source-line-no">269</span><span id="line-269"> this.storeLimit = scan.getMaxResultsPerColumnFamily();</span>
<span class="source-line-no">270</span><span id="line-270"></span>
<span class="source-line-no">271</span><span id="line-271"> // set rowOffset</span>
<span class="source-line-no">272</span><span id="line-272"> this.storeOffset = scan.getRowOffsetPerColumnFamily();</span>
<span class="source-line-no">273</span><span id="line-273"> addCurrentScanners(scanners);</span>
<span class="source-line-no">274</span><span id="line-274"> // Combine all seeked scanners with a heap</span>
<span class="source-line-no">275</span><span id="line-275"> resetKVHeap(scanners, comparator);</span>
<span class="source-line-no">276</span><span id="line-276"> } catch (IOException e) {</span>
<span class="source-line-no">277</span><span id="line-277"> clearAndClose(scanners);</span>
<span class="source-line-no">278</span><span id="line-278"> // remove us from the HStore#changedReaderObservers here or we'll have no chance to</span>
<span class="source-line-no">279</span><span id="line-279"> // and might cause memory leak</span>
<span class="source-line-no">280</span><span id="line-280"> store.deleteChangedReaderObserver(this);</span>
<span class="source-line-no">281</span><span id="line-281"> throw e;</span>
<span class="source-line-no">282</span><span id="line-282"> }</span>
<span class="source-line-no">283</span><span id="line-283"> }</span>
<span class="source-line-no">284</span><span id="line-284"></span>
<span class="source-line-no">285</span><span id="line-285"> // a dummy scan instance for compaction.</span>
<span class="source-line-no">286</span><span id="line-286"> private static final Scan SCAN_FOR_COMPACTION = new Scan();</span>
<span class="source-line-no">287</span><span id="line-287"></span>
<span class="source-line-no">288</span><span id="line-288"> /**</span>
<span class="source-line-no">289</span><span id="line-289"> * Used for store file compaction and memstore compaction.</span>
<span class="source-line-no">290</span><span id="line-290"> * &lt;p&gt;</span>
<span class="source-line-no">291</span><span id="line-291"> * Opens a scanner across specified StoreFiles/MemStoreSegments.</span>
<span class="source-line-no">292</span><span id="line-292"> * @param store who we scan</span>
<span class="source-line-no">293</span><span id="line-293"> * @param scanners ancillary scanners</span>
<span class="source-line-no">294</span><span id="line-294"> * @param smallestReadPoint the readPoint that we should use for tracking versions</span>
<span class="source-line-no">295</span><span id="line-295"> */</span>
<span class="source-line-no">296</span><span id="line-296"> public StoreScanner(HStore store, ScanInfo scanInfo, List&lt;? extends KeyValueScanner&gt; scanners,</span>
<span class="source-line-no">297</span><span id="line-297"> ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {</span>
<span class="source-line-no">298</span><span id="line-298"> this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);</span>
<span class="source-line-no">299</span><span id="line-299"> }</span>
<span class="source-line-no">300</span><span id="line-300"></span>
<span class="source-line-no">301</span><span id="line-301"> /**</span>
<span class="source-line-no">302</span><span id="line-302"> * Used for compactions that drop deletes from a limited range of rows.</span>
<span class="source-line-no">303</span><span id="line-303"> * &lt;p&gt;</span>
<span class="source-line-no">304</span><span id="line-304"> * Opens a scanner across specified StoreFiles.</span>
<span class="source-line-no">305</span><span id="line-305"> * @param store who we scan</span>
<span class="source-line-no">306</span><span id="line-306"> * @param scanners ancillary scanners</span>
<span class="source-line-no">307</span><span id="line-307"> * @param smallestReadPoint the readPoint that we should use for tracking versions</span>
<span class="source-line-no">308</span><span id="line-308"> * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.</span>
<span class="source-line-no">309</span><span id="line-309"> * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.</span>
<span class="source-line-no">310</span><span id="line-310"> */</span>
<span class="source-line-no">311</span><span id="line-311"> public StoreScanner(HStore store, ScanInfo scanInfo, List&lt;? extends KeyValueScanner&gt; scanners,</span>
<span class="source-line-no">312</span><span id="line-312"> long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow)</span>
<span class="source-line-no">313</span><span id="line-313"> throws IOException {</span>
<span class="source-line-no">314</span><span id="line-314"> this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,</span>
<span class="source-line-no">315</span><span id="line-315"> earliestPutTs, dropDeletesFromRow, dropDeletesToRow);</span>
<span class="source-line-no">316</span><span id="line-316"> }</span>
<span class="source-line-no">317</span><span id="line-317"></span>
<span class="source-line-no">318</span><span id="line-318"> private StoreScanner(HStore store, ScanInfo scanInfo, List&lt;? extends KeyValueScanner&gt; scanners,</span>
<span class="source-line-no">319</span><span id="line-319"> ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,</span>
<span class="source-line-no">320</span><span id="line-320"> byte[] dropDeletesToRow) throws IOException {</span>
<span class="source-line-no">321</span><span id="line-321"> this(store, SCAN_FOR_COMPACTION, scanInfo, 0,</span>
<span class="source-line-no">322</span><span id="line-322"> store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);</span>
<span class="source-line-no">323</span><span id="line-323"> assert scanType != ScanType.USER_SCAN;</span>
<span class="source-line-no">324</span><span id="line-324"> matcher =</span>
<span class="source-line-no">325</span><span id="line-325"> CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,</span>
<span class="source-line-no">326</span><span id="line-326"> oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());</span>
<span class="source-line-no">327</span><span id="line-327"></span>
<span class="source-line-no">328</span><span id="line-328"> // Filter the list of scanners using Bloom filters, time range, TTL, etc.</span>
<span class="source-line-no">329</span><span id="line-329"> scanners = selectScannersFrom(store, scanners);</span>
<span class="source-line-no">330</span><span id="line-330"></span>
<span class="source-line-no">331</span><span id="line-331"> // Seek all scanners to the initial key</span>
<span class="source-line-no">332</span><span id="line-332"> seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);</span>
<span class="source-line-no">333</span><span id="line-333"> addCurrentScanners(scanners);</span>
<span class="source-line-no">334</span><span id="line-334"> // Combine all seeked scanners with a heap</span>
<span class="source-line-no">335</span><span id="line-335"> resetKVHeap(scanners, comparator);</span>
<span class="source-line-no">336</span><span id="line-336"> }</span>
<span class="source-line-no">337</span><span id="line-337"></span>
<span class="source-line-no">338</span><span id="line-338"> private void seekAllScanner(ScanInfo scanInfo, List&lt;? extends KeyValueScanner&gt; scanners)</span>
<span class="source-line-no">339</span><span id="line-339"> throws IOException {</span>
<span class="source-line-no">340</span><span id="line-340"> // Seek all scanners to the initial key</span>
<span class="source-line-no">341</span><span id="line-341"> seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);</span>
<span class="source-line-no">342</span><span id="line-342"> addCurrentScanners(scanners);</span>
<span class="source-line-no">343</span><span id="line-343"> resetKVHeap(scanners, comparator);</span>
<span class="source-line-no">344</span><span id="line-344"> }</span>
<span class="source-line-no">345</span><span id="line-345"></span>
<span class="source-line-no">346</span><span id="line-346"> // For mob compaction only as we do not have a Store instance when doing mob compaction.</span>
<span class="source-line-no">347</span><span id="line-347"> public StoreScanner(ScanInfo scanInfo, ScanType scanType,</span>
<span class="source-line-no">348</span><span id="line-348"> List&lt;? extends KeyValueScanner&gt; scanners) throws IOException {</span>
<span class="source-line-no">349</span><span id="line-349"> this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);</span>
<span class="source-line-no">350</span><span id="line-350"> assert scanType != ScanType.USER_SCAN;</span>
<span class="source-line-no">351</span><span id="line-351"> this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,</span>
<span class="source-line-no">352</span><span id="line-352"> oldestUnexpiredTS, now, null, null, null);</span>
<span class="source-line-no">353</span><span id="line-353"> seekAllScanner(scanInfo, scanners);</span>
<span class="source-line-no">354</span><span id="line-354"> }</span>
<span class="source-line-no">355</span><span id="line-355"></span>
<span class="source-line-no">356</span><span id="line-356"> // Used to instantiate a scanner for user scan in test</span>
<span class="source-line-no">357</span><span id="line-357"> StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet&lt;byte[]&gt; columns,</span>
<span class="source-line-no">358</span><span id="line-358"> List&lt;? extends KeyValueScanner&gt; scanners, ScanType scanType) throws IOException {</span>
<span class="source-line-no">359</span><span id="line-359"> // 0 is passed as readpoint because the test bypasses Store</span>
<span class="source-line-no">360</span><span id="line-360"> this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),</span>
<span class="source-line-no">361</span><span id="line-361"> scanType);</span>
<span class="source-line-no">362</span><span id="line-362"> if (scanType == ScanType.USER_SCAN) {</span>
<span class="source-line-no">363</span><span id="line-363"> this.matcher =</span>
<span class="source-line-no">364</span><span id="line-364"> UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);</span>
<span class="source-line-no">365</span><span id="line-365"> } else {</span>
<span class="source-line-no">366</span><span id="line-366"> this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,</span>
<span class="source-line-no">367</span><span id="line-367"> PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);</span>
<span class="source-line-no">368</span><span id="line-368"> }</span>
<span class="source-line-no">369</span><span id="line-369"> seekAllScanner(scanInfo, scanners);</span>
<span class="source-line-no">370</span><span id="line-370"> }</span>
<span class="source-line-no">371</span><span id="line-371"></span>
<span class="source-line-no">372</span><span id="line-372"> // Used to instantiate a scanner for user scan in test</span>
<span class="source-line-no">373</span><span id="line-373"> StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet&lt;byte[]&gt; columns,</span>
<span class="source-line-no">374</span><span id="line-374"> List&lt;? extends KeyValueScanner&gt; scanners) throws IOException {</span>
<span class="source-line-no">375</span><span id="line-375"> // 0 is passed as readpoint because the test bypasses Store</span>
<span class="source-line-no">376</span><span id="line-376"> this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),</span>
<span class="source-line-no">377</span><span id="line-377"> ScanType.USER_SCAN);</span>
<span class="source-line-no">378</span><span id="line-378"> this.matcher =</span>
<span class="source-line-no">379</span><span id="line-379"> UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);</span>
<span class="source-line-no">380</span><span id="line-380"> seekAllScanner(scanInfo, scanners);</span>
<span class="source-line-no">381</span><span id="line-381"> }</span>
<span class="source-line-no">382</span><span id="line-382"></span>
<span class="source-line-no">383</span><span id="line-383"> // Used to instantiate a scanner for compaction in test</span>
<span class="source-line-no">384</span><span id="line-384"> StoreScanner(ScanInfo scanInfo, int maxVersions, ScanType scanType,</span>
<span class="source-line-no">385</span><span id="line-385"> List&lt;? extends KeyValueScanner&gt; scanners) throws IOException {</span>
<span class="source-line-no">386</span><span id="line-386"> // 0 is passed as readpoint because the test bypasses Store</span>
<span class="source-line-no">387</span><span id="line-387"> this(null, maxVersions &gt; 0 ? new Scan().readVersions(maxVersions) : SCAN_FOR_COMPACTION,</span>
<span class="source-line-no">388</span><span id="line-388"> scanInfo, 0, 0L, false, scanType);</span>
<span class="source-line-no">389</span><span id="line-389"> this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,</span>
<span class="source-line-no">390</span><span id="line-390"> PrivateConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);</span>
<span class="source-line-no">391</span><span id="line-391"> seekAllScanner(scanInfo, scanners);</span>
<span class="source-line-no">392</span><span id="line-392"> }</span>
<span class="source-line-no">393</span><span id="line-393"></span>
<span class="source-line-no">394</span><span id="line-394"> boolean isScanUsePread() {</span>
<span class="source-line-no">395</span><span id="line-395"> return this.scanUsePread;</span>
<span class="source-line-no">396</span><span id="line-396"> }</span>
<span class="source-line-no">397</span><span id="line-397"></span>
<span class="source-line-no">398</span><span id="line-398"> /**</span>
<span class="source-line-no">399</span><span id="line-399"> * Seek the specified scanners with the given key</span>
<span class="source-line-no">400</span><span id="line-400"> * @param isLazy true if using lazy seek</span>
<span class="source-line-no">401</span><span id="line-401"> * @param isParallelSeek true if using parallel seek</span>
<span class="source-line-no">402</span><span id="line-402"> */</span>
<span class="source-line-no">403</span><span id="line-403"> protected void seekScanners(List&lt;? extends KeyValueScanner&gt; scanners, ExtendedCell seekKey,</span>
<span class="source-line-no">404</span><span id="line-404"> boolean isLazy, boolean isParallelSeek) throws IOException {</span>
<span class="source-line-no">405</span><span id="line-405"> // Seek all scanners to the start of the Row (or if the exact matching row</span>
<span class="source-line-no">406</span><span id="line-406"> // key does not exist, then to the start of the next matching Row).</span>
<span class="source-line-no">407</span><span id="line-407"> // Always check bloom filter to optimize the top row seek for delete</span>
<span class="source-line-no">408</span><span id="line-408"> // family marker.</span>
<span class="source-line-no">409</span><span id="line-409"> if (isLazy) {</span>
<span class="source-line-no">410</span><span id="line-410"> for (KeyValueScanner scanner : scanners) {</span>
<span class="source-line-no">411</span><span id="line-411"> scanner.requestSeek(seekKey, false, true);</span>
<span class="source-line-no">412</span><span id="line-412"> }</span>
<span class="source-line-no">413</span><span id="line-413"> } else {</span>
<span class="source-line-no">414</span><span id="line-414"> if (!isParallelSeek) {</span>
<span class="source-line-no">415</span><span id="line-415"> long totalScannersSoughtBytes = 0;</span>
<span class="source-line-no">416</span><span id="line-416"> for (KeyValueScanner scanner : scanners) {</span>
<span class="source-line-no">417</span><span id="line-417"> if (matcher.isUserScan() &amp;&amp; totalScannersSoughtBytes &gt;= maxRowSize) {</span>
<span class="source-line-no">418</span><span id="line-418"> throw new RowTooBigException(</span>
<span class="source-line-no">419</span><span id="line-419"> "Max row size allowed: " + maxRowSize + ", but row is bigger than that");</span>
<span class="source-line-no">420</span><span id="line-420"> }</span>
<span class="source-line-no">421</span><span id="line-421"> scanner.seek(seekKey);</span>
<span class="source-line-no">422</span><span id="line-422"> Cell c = scanner.peek();</span>
<span class="source-line-no">423</span><span id="line-423"> if (c != null) {</span>
<span class="source-line-no">424</span><span id="line-424"> totalScannersSoughtBytes += PrivateCellUtil.estimatedSerializedSizeOf(c);</span>
<span class="source-line-no">425</span><span id="line-425"> }</span>
<span class="source-line-no">426</span><span id="line-426"> }</span>
<span class="source-line-no">427</span><span id="line-427"> } else {</span>
<span class="source-line-no">428</span><span id="line-428"> parallelSeek(scanners, seekKey);</span>
<span class="source-line-no">429</span><span id="line-429"> }</span>
<span class="source-line-no">430</span><span id="line-430"> }</span>
<span class="source-line-no">431</span><span id="line-431"> }</span>
<span class="source-line-no">432</span><span id="line-432"></span>
<span class="source-line-no">433</span><span id="line-433"> protected void resetKVHeap(List&lt;? extends KeyValueScanner&gt; scanners, CellComparator comparator)</span>
<span class="source-line-no">434</span><span id="line-434"> throws IOException {</span>
<span class="source-line-no">435</span><span id="line-435"> // Combine all seeked scanners with a heap</span>
<span class="source-line-no">436</span><span id="line-436"> heap = newKVHeap(scanners, comparator);</span>
<span class="source-line-no">437</span><span id="line-437"> }</span>
<span class="source-line-no">438</span><span id="line-438"></span>
<span class="source-line-no">439</span><span id="line-439"> protected KeyValueHeap newKVHeap(List&lt;? extends KeyValueScanner&gt; scanners,</span>
<span class="source-line-no">440</span><span id="line-440"> CellComparator comparator) throws IOException {</span>
<span class="source-line-no">441</span><span id="line-441"> return new KeyValueHeap(scanners, comparator);</span>
<span class="source-line-no">442</span><span id="line-442"> }</span>
<span class="source-line-no">443</span><span id="line-443"></span>
<span class="source-line-no">444</span><span id="line-444"> /**</span>
<span class="source-line-no">445</span><span id="line-445"> * Filters the given list of scanners using Bloom filter, time range, and TTL.</span>
<span class="source-line-no">446</span><span id="line-446"> * &lt;p&gt;</span>
<span class="source-line-no">447</span><span id="line-447"> * Will be overridden by testcase so declared as protected.</span>
<span class="source-line-no">448</span><span id="line-448"> */</span>
<span class="source-line-no">449</span><span id="line-449"> protected List&lt;KeyValueScanner&gt; selectScannersFrom(HStore store,</span>
<span class="source-line-no">450</span><span id="line-450"> List&lt;? extends KeyValueScanner&gt; allScanners) {</span>
<span class="source-line-no">451</span><span id="line-451"> boolean memOnly;</span>
<span class="source-line-no">452</span><span id="line-452"> boolean filesOnly;</span>
<span class="source-line-no">453</span><span id="line-453"> if (scan instanceof InternalScan) {</span>
<span class="source-line-no">454</span><span id="line-454"> InternalScan iscan = (InternalScan) scan;</span>
<span class="source-line-no">455</span><span id="line-455"> memOnly = iscan.isCheckOnlyMemStore();</span>
<span class="source-line-no">456</span><span id="line-456"> filesOnly = iscan.isCheckOnlyStoreFiles();</span>
<span class="source-line-no">457</span><span id="line-457"> } else {</span>
<span class="source-line-no">458</span><span id="line-458"> memOnly = false;</span>
<span class="source-line-no">459</span><span id="line-459"> filesOnly = false;</span>
<span class="source-line-no">460</span><span id="line-460"> }</span>
<span class="source-line-no">461</span><span id="line-461"></span>
<span class="source-line-no">462</span><span id="line-462"> List&lt;KeyValueScanner&gt; scanners = new ArrayList&lt;&gt;(allScanners.size());</span>
<span class="source-line-no">463</span><span id="line-463"></span>
<span class="source-line-no">464</span><span id="line-464"> // We can only exclude store files based on TTL if minVersions is set to 0.</span>
<span class="source-line-no">465</span><span id="line-465"> // Otherwise, we might have to return KVs that have technically expired.</span>
<span class="source-line-no">466</span><span id="line-466"> long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE;</span>
<span class="source-line-no">467</span><span id="line-467"></span>
<span class="source-line-no">468</span><span id="line-468"> // include only those scan files which pass all filters</span>
<span class="source-line-no">469</span><span id="line-469"> for (KeyValueScanner kvs : allScanners) {</span>
<span class="source-line-no">470</span><span id="line-470"> boolean isFile = kvs.isFileScanner();</span>
<span class="source-line-no">471</span><span id="line-471"> if ((!isFile &amp;&amp; filesOnly) || (isFile &amp;&amp; memOnly)) {</span>
<span class="source-line-no">472</span><span id="line-472"> kvs.close();</span>
<span class="source-line-no">473</span><span id="line-473"> continue;</span>
<span class="source-line-no">474</span><span id="line-474"> }</span>
<span class="source-line-no">475</span><span id="line-475"></span>
<span class="source-line-no">476</span><span id="line-476"> if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {</span>
<span class="source-line-no">477</span><span id="line-477"> scanners.add(kvs);</span>
<span class="source-line-no">478</span><span id="line-478"> } else {</span>
<span class="source-line-no">479</span><span id="line-479"> kvs.close();</span>
<span class="source-line-no">480</span><span id="line-480"> }</span>
<span class="source-line-no">481</span><span id="line-481"> }</span>
<span class="source-line-no">482</span><span id="line-482"> return scanners;</span>
<span class="source-line-no">483</span><span id="line-483"> }</span>
<span class="source-line-no">484</span><span id="line-484"></span>
<span class="source-line-no">485</span><span id="line-485"> @Override</span>
<span class="source-line-no">486</span><span id="line-486"> public ExtendedCell peek() {</span>
<span class="source-line-no">487</span><span id="line-487"> return heap != null ? heap.peek() : null;</span>
<span class="source-line-no">488</span><span id="line-488"> }</span>
<span class="source-line-no">489</span><span id="line-489"></span>
<span class="source-line-no">490</span><span id="line-490"> @Override</span>
<span class="source-line-no">491</span><span id="line-491"> public KeyValue next() {</span>
<span class="source-line-no">492</span><span id="line-492"> // throw runtime exception perhaps?</span>
<span class="source-line-no">493</span><span id="line-493"> throw new RuntimeException("Never call StoreScanner.next()");</span>
<span class="source-line-no">494</span><span id="line-494"> }</span>
<span class="source-line-no">495</span><span id="line-495"></span>
<span class="source-line-no">496</span><span id="line-496"> @Override</span>
<span class="source-line-no">497</span><span id="line-497"> public void close() {</span>
<span class="source-line-no">498</span><span id="line-498"> close(true);</span>
<span class="source-line-no">499</span><span id="line-499"> }</span>
<span class="source-line-no">500</span><span id="line-500"></span>
<span class="source-line-no">501</span><span id="line-501"> private void close(boolean withDelayedScannersClose) {</span>
<span class="source-line-no">502</span><span id="line-502"> closeLock.lock();</span>
<span class="source-line-no">503</span><span id="line-503"> // If the closeLock is acquired then any subsequent updateReaders()</span>
<span class="source-line-no">504</span><span id="line-504"> // call is ignored.</span>
<span class="source-line-no">505</span><span id="line-505"> try {</span>
<span class="source-line-no">506</span><span id="line-506"> if (this.closing) {</span>
<span class="source-line-no">507</span><span id="line-507"> return;</span>
<span class="source-line-no">508</span><span id="line-508"> }</span>
<span class="source-line-no">509</span><span id="line-509"> if (withDelayedScannersClose) {</span>
<span class="source-line-no">510</span><span id="line-510"> this.closing = true;</span>
<span class="source-line-no">511</span><span id="line-511"> }</span>
<span class="source-line-no">512</span><span id="line-512"> // For mob compaction, we do not have a store.</span>
<span class="source-line-no">513</span><span id="line-513"> if (this.store != null) {</span>
<span class="source-line-no">514</span><span id="line-514"> this.store.deleteChangedReaderObserver(this);</span>
<span class="source-line-no">515</span><span id="line-515"> }</span>
<span class="source-line-no">516</span><span id="line-516"> if (withDelayedScannersClose) {</span>
<span class="source-line-no">517</span><span id="line-517"> clearAndClose(scannersForDelayedClose);</span>
<span class="source-line-no">518</span><span id="line-518"> clearAndClose(memStoreScannersAfterFlush);</span>
<span class="source-line-no">519</span><span id="line-519"> clearAndClose(flushedstoreFileScanners);</span>
<span class="source-line-no">520</span><span id="line-520"> if (this.heap != null) {</span>
<span class="source-line-no">521</span><span id="line-521"> this.heap.close();</span>
<span class="source-line-no">522</span><span id="line-522"> this.currentScanners.clear();</span>
<span class="source-line-no">523</span><span id="line-523"> this.heap = null; // CLOSED!</span>
<span class="source-line-no">524</span><span id="line-524"> }</span>
<span class="source-line-no">525</span><span id="line-525"> } else {</span>
<span class="source-line-no">526</span><span id="line-526"> if (this.heap != null) {</span>
<span class="source-line-no">527</span><span id="line-527"> this.scannersForDelayedClose.add(this.heap);</span>
<span class="source-line-no">528</span><span id="line-528"> this.currentScanners.clear();</span>
<span class="source-line-no">529</span><span id="line-529"> this.heap = null;</span>
<span class="source-line-no">530</span><span id="line-530"> }</span>
<span class="source-line-no">531</span><span id="line-531"> }</span>
<span class="source-line-no">532</span><span id="line-532"> } finally {</span>
<span class="source-line-no">533</span><span id="line-533"> closeLock.unlock();</span>
<span class="source-line-no">534</span><span id="line-534"> }</span>
<span class="source-line-no">535</span><span id="line-535"> }</span>
<span class="source-line-no">536</span><span id="line-536"></span>
<span class="source-line-no">537</span><span id="line-537"> @Override</span>
<span class="source-line-no">538</span><span id="line-538"> public boolean seek(ExtendedCell key) throws IOException {</span>
<span class="source-line-no">539</span><span id="line-539"> if (checkFlushed()) {</span>
<span class="source-line-no">540</span><span id="line-540"> reopenAfterFlush();</span>
<span class="source-line-no">541</span><span id="line-541"> }</span>
<span class="source-line-no">542</span><span id="line-542"> return this.heap.seek(key);</span>
<span class="source-line-no">543</span><span id="line-543"> }</span>
<span class="source-line-no">544</span><span id="line-544"></span>
<span class="source-line-no">545</span><span id="line-545"> /**</span>
<span class="source-line-no">546</span><span id="line-546"> * Get the next row of values from this Store.</span>
<span class="source-line-no">547</span><span id="line-547"> * @return true if there are more rows, false if scanner is done</span>
<span class="source-line-no">548</span><span id="line-548"> */</span>
<span class="source-line-no">549</span><span id="line-549"> @Override</span>
<span class="source-line-no">550</span><span id="line-550"> public boolean next(List&lt;? super ExtendedCell&gt; outResult, ScannerContext scannerContext)</span>
<span class="source-line-no">551</span><span id="line-551"> throws IOException {</span>
<span class="source-line-no">552</span><span id="line-552"> if (scannerContext == null) {</span>
<span class="source-line-no">553</span><span id="line-553"> throw new IllegalArgumentException("Scanner context cannot be null");</span>
<span class="source-line-no">554</span><span id="line-554"> }</span>
<span class="source-line-no">555</span><span id="line-555"> if (checkFlushed() &amp;&amp; reopenAfterFlush()) {</span>
<span class="source-line-no">556</span><span id="line-556"> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">557</span><span id="line-557"> }</span>
<span class="source-line-no">558</span><span id="line-558"></span>
<span class="source-line-no">559</span><span id="line-559"> // if the heap was left null, then the scanners had previously run out anyways, close and</span>
<span class="source-line-no">560</span><span id="line-560"> // return.</span>
<span class="source-line-no">561</span><span id="line-561"> if (this.heap == null) {</span>
<span class="source-line-no">562</span><span id="line-562"> // By this time partial close should happened because already heap is null</span>
<span class="source-line-no">563</span><span id="line-563"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">564</span><span id="line-564"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">565</span><span id="line-565"> }</span>
<span class="source-line-no">566</span><span id="line-566"></span>
<span class="source-line-no">567</span><span id="line-567"> ExtendedCell cell = this.heap.peek();</span>
<span class="source-line-no">568</span><span id="line-568"> if (cell == null) {</span>
<span class="source-line-no">569</span><span id="line-569"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">570</span><span id="line-570"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">571</span><span id="line-571"> }</span>
<span class="source-line-no">572</span><span id="line-572"></span>
<span class="source-line-no">573</span><span id="line-573"> // only call setRow if the row changes; avoids confusing the query matcher</span>
<span class="source-line-no">574</span><span id="line-574"> // if scanning intra-row</span>
<span class="source-line-no">575</span><span id="line-575"></span>
<span class="source-line-no">576</span><span id="line-576"> // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing</span>
<span class="source-line-no">577</span><span id="line-577"> // rows. Else it is possible we are still traversing the same row so we must perform the row</span>
<span class="source-line-no">578</span><span id="line-578"> // comparison.</span>
<span class="source-line-no">579</span><span id="line-579"> if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {</span>
<span class="source-line-no">580</span><span id="line-580"> this.countPerRow = 0;</span>
<span class="source-line-no">581</span><span id="line-581"> matcher.setToNewRow(cell);</span>
<span class="source-line-no">582</span><span id="line-582"> }</span>
<span class="source-line-no">583</span><span id="line-583"></span>
<span class="source-line-no">584</span><span id="line-584"> // Clear progress away unless invoker has indicated it should be kept.</span>
<span class="source-line-no">585</span><span id="line-585"> if (!scannerContext.getKeepProgress() &amp;&amp; !scannerContext.getSkippingRow()) {</span>
<span class="source-line-no">586</span><span id="line-586"> scannerContext.clearProgress();</span>
<span class="source-line-no">587</span><span id="line-587"> }</span>
<span class="source-line-no">588</span><span id="line-588"></span>
<span class="source-line-no">589</span><span id="line-589"> Optional&lt;RpcCall&gt; rpcCall =</span>
<span class="source-line-no">590</span><span id="line-590"> matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();</span>
<span class="source-line-no">591</span><span id="line-591"></span>
<span class="source-line-no">592</span><span id="line-592"> int count = 0;</span>
<span class="source-line-no">593</span><span id="line-593"> long totalBytesRead = 0;</span>
<span class="source-line-no">594</span><span id="line-594"> // track the cells for metrics only if it is a user read request.</span>
<span class="source-line-no">595</span><span id="line-595"> boolean onlyFromMemstore = matcher.isUserScan();</span>
<span class="source-line-no">596</span><span id="line-596"> try {</span>
<span class="source-line-no">597</span><span id="line-597"> LOOP: do {</span>
<span class="source-line-no">598</span><span id="line-598"> // Update and check the time limit based on the configured value of cellsPerTimeoutCheck</span>
<span class="source-line-no">599</span><span id="line-599"> // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream</span>
<span class="source-line-no">600</span><span id="line-600"> // in</span>
<span class="source-line-no">601</span><span id="line-601"> // the shipped method below.</span>
<span class="source-line-no">602</span><span id="line-602"> if (</span>
<span class="source-line-no">603</span><span id="line-603"> kvsScanned % cellsPerHeartbeatCheck == 0</span>
<span class="source-line-no">604</span><span id="line-604"> || (scanUsePread &amp;&amp; readType == Scan.ReadType.DEFAULT &amp;&amp; bytesRead &gt; preadMaxBytes)</span>
<span class="source-line-no">605</span><span id="line-605"> ) {</span>
<span class="source-line-no">606</span><span id="line-606"> if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {</span>
<span class="source-line-no">607</span><span id="line-607"> return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();</span>
<span class="source-line-no">608</span><span id="line-608"> }</span>
<span class="source-line-no">609</span><span id="line-609"> }</span>
<span class="source-line-no">610</span><span id="line-610"> // Do object compare - we set prevKV from the same heap.</span>
<span class="source-line-no">611</span><span id="line-611"> if (prevCell != cell) {</span>
<span class="source-line-no">612</span><span id="line-612"> ++kvsScanned;</span>
<span class="source-line-no">613</span><span id="line-613"> }</span>
<span class="source-line-no">614</span><span id="line-614"> checkScanOrder(prevCell, cell, comparator);</span>
<span class="source-line-no">615</span><span id="line-615"> int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);</span>
<span class="source-line-no">616</span><span id="line-616"> bytesRead += cellSize;</span>
<span class="source-line-no">617</span><span id="line-617"> if (scanUsePread &amp;&amp; readType == Scan.ReadType.DEFAULT &amp;&amp; bytesRead &gt; preadMaxBytes) {</span>
<span class="source-line-no">618</span><span id="line-618"> // return immediately if we want to switch from pread to stream. We need this because we</span>
<span class="source-line-no">619</span><span id="line-619"> // can</span>
<span class="source-line-no">620</span><span id="line-620"> // only switch in the shipped method, if user use a filter to filter out everything and</span>
<span class="source-line-no">621</span><span id="line-621"> // rpc</span>
<span class="source-line-no">622</span><span id="line-622"> // timeout is very large then the shipped method will never be called until the whole scan</span>
<span class="source-line-no">623</span><span id="line-623"> // is finished, but at that time we have already scan all the data...</span>
<span class="source-line-no">624</span><span id="line-624"> // See HBASE-20457 for more details.</span>
<span class="source-line-no">625</span><span id="line-625"> // And there is still a scenario that can not be handled. If we have a very large row,</span>
<span class="source-line-no">626</span><span id="line-626"> // which</span>
<span class="source-line-no">627</span><span id="line-627"> // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag</span>
<span class="source-line-no">628</span><span id="line-628"> // here, we still need to scan all the qualifiers before returning...</span>
<span class="source-line-no">629</span><span id="line-629"> scannerContext.returnImmediately();</span>
<span class="source-line-no">630</span><span id="line-630"> }</span>
<span class="source-line-no">631</span><span id="line-631"></span>
<span class="source-line-no">632</span><span id="line-632"> heap.recordBlockSize(blockSize -&gt; {</span>
<span class="source-line-no">633</span><span id="line-633"> if (rpcCall.isPresent()) {</span>
<span class="source-line-no">634</span><span id="line-634"> rpcCall.get().incrementBlockBytesScanned(blockSize);</span>
<span class="source-line-no">635</span><span id="line-635"> }</span>
<span class="source-line-no">636</span><span id="line-636"> scannerContext.incrementBlockProgress(blockSize);</span>
<span class="source-line-no">637</span><span id="line-637"> });</span>
<span class="source-line-no">638</span><span id="line-638"></span>
<span class="source-line-no">639</span><span id="line-639"> prevCell = cell;</span>
<span class="source-line-no">640</span><span id="line-640"> scannerContext.setLastPeekedCell(cell);</span>
<span class="source-line-no">641</span><span id="line-641"> topChanged = false;</span>
<span class="source-line-no">642</span><span id="line-642"> ScanQueryMatcher.MatchCode qcode = matcher.match(cell);</span>
<span class="source-line-no">643</span><span id="line-643"> switch (qcode) {</span>
<span class="source-line-no">644</span><span id="line-644"> case INCLUDE:</span>
<span class="source-line-no">645</span><span id="line-645"> case INCLUDE_AND_SEEK_NEXT_ROW:</span>
<span class="source-line-no">646</span><span id="line-646"> case INCLUDE_AND_SEEK_NEXT_COL:</span>
<span class="source-line-no">647</span><span id="line-647"> Filter f = matcher.getFilter();</span>
<span class="source-line-no">648</span><span id="line-648"> if (f != null) {</span>
<span class="source-line-no">649</span><span id="line-649"> Cell transformedCell = f.transformCell(cell);</span>
<span class="source-line-no">650</span><span id="line-650"> // fast path, most filters just return the same cell instance</span>
<span class="source-line-no">651</span><span id="line-651"> if (transformedCell != cell) {</span>
<span class="source-line-no">652</span><span id="line-652"> if (transformedCell instanceof ExtendedCell) {</span>
<span class="source-line-no">653</span><span id="line-653"> cell = (ExtendedCell) transformedCell;</span>
<span class="source-line-no">654</span><span id="line-654"> } else {</span>
<span class="source-line-no">655</span><span id="line-655"> throw new DoNotRetryIOException("Incorrect filter implementation, "</span>
<span class="source-line-no">656</span><span id="line-656"> + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "</span>
<span class="source-line-no">657</span><span id="line-657"> + f.getClass().getName());</span>
<span class="source-line-no">658</span><span id="line-658"> }</span>
<span class="source-line-no">659</span><span id="line-659"> }</span>
<span class="source-line-no">660</span><span id="line-660"> }</span>
<span class="source-line-no">661</span><span id="line-661"> this.countPerRow++;</span>
<span class="source-line-no">662</span><span id="line-662"></span>
<span class="source-line-no">663</span><span id="line-663"> // add to results only if we have skipped #storeOffset kvs</span>
<span class="source-line-no">664</span><span id="line-664"> // also update metric accordingly</span>
<span class="source-line-no">665</span><span id="line-665"> if (this.countPerRow &gt; storeOffset) {</span>
<span class="source-line-no">666</span><span id="line-666"> outResult.add(cell);</span>
<span class="source-line-no">667</span><span id="line-667"></span>
<span class="source-line-no">668</span><span id="line-668"> // Update local tracking information</span>
<span class="source-line-no">669</span><span id="line-669"> count++;</span>
<span class="source-line-no">670</span><span id="line-670"> totalBytesRead += cellSize;</span>
<span class="source-line-no">671</span><span id="line-671"></span>
<span class="source-line-no">672</span><span id="line-672"> /**</span>
<span class="source-line-no">673</span><span id="line-673"> * Increment the metric if all the cells are from memstore. If not we will account it</span>
<span class="source-line-no">674</span><span id="line-674"> * for mixed reads</span>
<span class="source-line-no">675</span><span id="line-675"> */</span>
<span class="source-line-no">676</span><span id="line-676"> onlyFromMemstore = onlyFromMemstore &amp;&amp; heap.isLatestCellFromMemstore();</span>
<span class="source-line-no">677</span><span id="line-677"> // Update the progress of the scanner context</span>
<span class="source-line-no">678</span><span id="line-678"> scannerContext.incrementSizeProgress(cellSize, cell.heapSize());</span>
<span class="source-line-no">679</span><span id="line-679"> scannerContext.incrementBatchProgress(1);</span>
<span class="source-line-no">680</span><span id="line-680"></span>
<span class="source-line-no">681</span><span id="line-681"> if (matcher.isUserScan() &amp;&amp; totalBytesRead &gt; maxRowSize) {</span>
<span class="source-line-no">682</span><span id="line-682"> String message = "Max row size allowed: " + maxRowSize</span>
<span class="source-line-no">683</span><span id="line-683"> + ", but the row is bigger than that, the row info: "</span>
<span class="source-line-no">684</span><span id="line-684"> + CellUtil.toString(cell, false) + ", already have process row cells = "</span>
<span class="source-line-no">685</span><span id="line-685"> + outResult.size() + ", it belong to region = "</span>
<span class="source-line-no">686</span><span id="line-686"> + store.getHRegion().getRegionInfo().getRegionNameAsString();</span>
<span class="source-line-no">687</span><span id="line-687"> LOG.warn(message);</span>
<span class="source-line-no">688</span><span id="line-688"> throw new RowTooBigException(message);</span>
<span class="source-line-no">689</span><span id="line-689"> }</span>
<span class="source-line-no">690</span><span id="line-690"></span>
<span class="source-line-no">691</span><span id="line-691"> if (storeLimit &gt; -1 &amp;&amp; this.countPerRow &gt;= (storeLimit + storeOffset)) {</span>
<span class="source-line-no">692</span><span id="line-692"> // do what SEEK_NEXT_ROW does.</span>
<span class="source-line-no">693</span><span id="line-693"> if (!matcher.moreRowsMayExistAfter(cell)) {</span>
<span class="source-line-no">694</span><span id="line-694"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">695</span><span id="line-695"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">696</span><span id="line-696"> }</span>
<span class="source-line-no">697</span><span id="line-697"> matcher.clearCurrentRow();</span>
<span class="source-line-no">698</span><span id="line-698"> seekToNextRow(cell);</span>
<span class="source-line-no">699</span><span id="line-699"> break LOOP;</span>
<span class="source-line-no">700</span><span id="line-700"> }</span>
<span class="source-line-no">701</span><span id="line-701"> }</span>
<span class="source-line-no">702</span><span id="line-702"></span>
<span class="source-line-no">703</span><span id="line-703"> if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {</span>
<span class="source-line-no">704</span><span id="line-704"> if (!matcher.moreRowsMayExistAfter(cell)) {</span>
<span class="source-line-no">705</span><span id="line-705"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">706</span><span id="line-706"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">707</span><span id="line-707"> }</span>
<span class="source-line-no">708</span><span id="line-708"> matcher.clearCurrentRow();</span>
<span class="source-line-no">709</span><span id="line-709"> seekOrSkipToNextRow(cell);</span>
<span class="source-line-no">710</span><span id="line-710"> } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {</span>
<span class="source-line-no">711</span><span id="line-711"> seekOrSkipToNextColumn(cell);</span>
<span class="source-line-no">712</span><span id="line-712"> } else {</span>
<span class="source-line-no">713</span><span id="line-713"> this.heap.next();</span>
<span class="source-line-no">714</span><span id="line-714"> }</span>
<span class="source-line-no">715</span><span id="line-715"></span>
<span class="source-line-no">716</span><span id="line-716"> if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {</span>
<span class="source-line-no">717</span><span id="line-717"> break LOOP;</span>
<span class="source-line-no">718</span><span id="line-718"> }</span>
<span class="source-line-no">719</span><span id="line-719"> if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {</span>
<span class="source-line-no">720</span><span id="line-720"> break LOOP;</span>
<span class="source-line-no">721</span><span id="line-721"> }</span>
<span class="source-line-no">722</span><span id="line-722"> continue;</span>
<span class="source-line-no">723</span><span id="line-723"></span>
<span class="source-line-no">724</span><span id="line-724"> case DONE:</span>
<span class="source-line-no">725</span><span id="line-725"> // Optimization for Gets! If DONE, no more to get on this row, early exit!</span>
<span class="source-line-no">726</span><span id="line-726"> if (get) {</span>
<span class="source-line-no">727</span><span id="line-727"> // Then no more to this row... exit.</span>
<span class="source-line-no">728</span><span id="line-728"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">729</span><span id="line-729"> // update metric</span>
<span class="source-line-no">730</span><span id="line-730"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">731</span><span id="line-731"> }</span>
<span class="source-line-no">732</span><span id="line-732"> matcher.clearCurrentRow();</span>
<span class="source-line-no">733</span><span id="line-733"> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">734</span><span id="line-734"></span>
<span class="source-line-no">735</span><span id="line-735"> case DONE_SCAN:</span>
<span class="source-line-no">736</span><span id="line-736"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">737</span><span id="line-737"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">738</span><span id="line-738"></span>
<span class="source-line-no">739</span><span id="line-739"> case SEEK_NEXT_ROW:</span>
<span class="source-line-no">740</span><span id="line-740"> // This is just a relatively simple end of scan fix, to short-cut end</span>
<span class="source-line-no">741</span><span id="line-741"> // us if there is an endKey in the scan.</span>
<span class="source-line-no">742</span><span id="line-742"> if (!matcher.moreRowsMayExistAfter(cell)) {</span>
<span class="source-line-no">743</span><span id="line-743"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">744</span><span id="line-744"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">745</span><span id="line-745"> }</span>
<span class="source-line-no">746</span><span id="line-746"> matcher.clearCurrentRow();</span>
<span class="source-line-no">747</span><span id="line-747"> seekOrSkipToNextRow(cell);</span>
<span class="source-line-no">748</span><span id="line-748"> NextState stateAfterSeekNextRow = needToReturn(outResult);</span>
<span class="source-line-no">749</span><span id="line-749"> if (stateAfterSeekNextRow != null) {</span>
<span class="source-line-no">750</span><span id="line-750"> return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();</span>
<span class="source-line-no">751</span><span id="line-751"> }</span>
<span class="source-line-no">752</span><span id="line-752"> break;</span>
<span class="source-line-no">753</span><span id="line-753"></span>
<span class="source-line-no">754</span><span id="line-754"> case SEEK_NEXT_COL:</span>
<span class="source-line-no">755</span><span id="line-755"> seekOrSkipToNextColumn(cell);</span>
<span class="source-line-no">756</span><span id="line-756"> NextState stateAfterSeekNextColumn = needToReturn(outResult);</span>
<span class="source-line-no">757</span><span id="line-757"> if (stateAfterSeekNextColumn != null) {</span>
<span class="source-line-no">758</span><span id="line-758"> return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();</span>
<span class="source-line-no">759</span><span id="line-759"> }</span>
<span class="source-line-no">760</span><span id="line-760"> break;</span>
<span class="source-line-no">761</span><span id="line-761"></span>
<span class="source-line-no">762</span><span id="line-762"> case SKIP:</span>
<span class="source-line-no">763</span><span id="line-763"> this.heap.next();</span>
<span class="source-line-no">764</span><span id="line-764"> break;</span>
<span class="source-line-no">765</span><span id="line-765"></span>
<span class="source-line-no">766</span><span id="line-766"> case SEEK_NEXT_USING_HINT:</span>
<span class="source-line-no">767</span><span id="line-767"> ExtendedCell nextKV = matcher.getNextKeyHint(cell);</span>
<span class="source-line-no">768</span><span id="line-768"> if (nextKV != null) {</span>
<span class="source-line-no">769</span><span id="line-769"> int difference = comparator.compare(nextKV, cell);</span>
<span class="source-line-no">770</span><span id="line-770"> if (</span>
<span class="source-line-no">771</span><span id="line-771"> ((!scan.isReversed() &amp;&amp; difference &gt; 0) || (scan.isReversed() &amp;&amp; difference &lt; 0))</span>
<span class="source-line-no">772</span><span id="line-772"> ) {</span>
<span class="source-line-no">773</span><span id="line-773"> seekAsDirection(nextKV);</span>
<span class="source-line-no">774</span><span id="line-774"> NextState stateAfterSeekByHint = needToReturn(outResult);</span>
<span class="source-line-no">775</span><span id="line-775"> if (stateAfterSeekByHint != null) {</span>
<span class="source-line-no">776</span><span id="line-776"> return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();</span>
<span class="source-line-no">777</span><span id="line-777"> }</span>
<span class="source-line-no">778</span><span id="line-778"> break;</span>
<span class="source-line-no">779</span><span id="line-779"> }</span>
<span class="source-line-no">780</span><span id="line-780"> }</span>
<span class="source-line-no">781</span><span id="line-781"> heap.next();</span>
<span class="source-line-no">782</span><span id="line-782"> break;</span>
<span class="source-line-no">783</span><span id="line-783"></span>
<span class="source-line-no">784</span><span id="line-784"> default:</span>
<span class="source-line-no">785</span><span id="line-785"> throw new RuntimeException("UNEXPECTED");</span>
<span class="source-line-no">786</span><span id="line-786"> }</span>
<span class="source-line-no">787</span><span id="line-787"></span>
<span class="source-line-no">788</span><span id="line-788"> // One last chance to break due to size limit. The INCLUDE* cases above already check</span>
<span class="source-line-no">789</span><span id="line-789"> // limit and continue. For the various filtered cases, we need to check because block</span>
<span class="source-line-no">790</span><span id="line-790"> // size limit may have been exceeded even if we don't add cells to result list.</span>
<span class="source-line-no">791</span><span id="line-791"> if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {</span>
<span class="source-line-no">792</span><span id="line-792"> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">793</span><span id="line-793"> }</span>
<span class="source-line-no">794</span><span id="line-794"> } while ((cell = this.heap.peek()) != null);</span>
<span class="source-line-no">795</span><span id="line-795"></span>
<span class="source-line-no">796</span><span id="line-796"> if (count &gt; 0) {</span>
<span class="source-line-no">797</span><span id="line-797"> return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">798</span><span id="line-798"> }</span>
<span class="source-line-no">799</span><span id="line-799"></span>
<span class="source-line-no">800</span><span id="line-800"> // No more keys</span>
<span class="source-line-no">801</span><span id="line-801"> close(false);// Do all cleanup except heap.close()</span>
<span class="source-line-no">802</span><span id="line-802"> return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();</span>
<span class="source-line-no">803</span><span id="line-803"> } finally {</span>
<span class="source-line-no">804</span><span id="line-804"> // increment only if we have some result</span>
<span class="source-line-no">805</span><span id="line-805"> if (count &gt; 0 &amp;&amp; matcher.isUserScan()) {</span>
<span class="source-line-no">806</span><span id="line-806"> // if true increment memstore metrics, if not the mixed one</span>
<span class="source-line-no">807</span><span id="line-807"> updateMetricsStore(onlyFromMemstore);</span>
<span class="source-line-no">808</span><span id="line-808"> }</span>
<span class="source-line-no">809</span><span id="line-809"> }</span>
<span class="source-line-no">810</span><span id="line-810"> }</span>
<span class="source-line-no">811</span><span id="line-811"></span>
<span class="source-line-no">812</span><span id="line-812"> private void updateMetricsStore(boolean memstoreRead) {</span>
<span class="source-line-no">813</span><span id="line-813"> if (store != null) {</span>
<span class="source-line-no">814</span><span id="line-814"> store.updateMetricsStore(memstoreRead);</span>
<span class="source-line-no">815</span><span id="line-815"> } else {</span>
<span class="source-line-no">816</span><span id="line-816"> // for testing.</span>
<span class="source-line-no">817</span><span id="line-817"> if (memstoreRead) {</span>
<span class="source-line-no">818</span><span id="line-818"> memstoreOnlyReads++;</span>
<span class="source-line-no">819</span><span id="line-819"> } else {</span>
<span class="source-line-no">820</span><span id="line-820"> mixedReads++;</span>
<span class="source-line-no">821</span><span id="line-821"> }</span>
<span class="source-line-no">822</span><span id="line-822"> }</span>
<span class="source-line-no">823</span><span id="line-823"> }</span>
<span class="source-line-no">824</span><span id="line-824"></span>
<span class="source-line-no">825</span><span id="line-825"> /**</span>
<span class="source-line-no">826</span><span id="line-826"> * If the top cell won't be flushed into disk, the new top cell may be changed after</span>
<span class="source-line-no">827</span><span id="line-827"> * #reopenAfterFlush. Because the older top cell only exist in the memstore scanner but the</span>
<span class="source-line-no">828</span><span id="line-828"> * memstore scanner is replaced by hfile scanner after #reopenAfterFlush. If the row of top cell</span>
<span class="source-line-no">829</span><span id="line-829"> * is changed, we should return the current cells. Otherwise, we may return the cells across</span>
<span class="source-line-no">830</span><span id="line-830"> * different rows.</span>
<span class="source-line-no">831</span><span id="line-831"> * @param outResult the cells which are visible for user scan</span>
<span class="source-line-no">832</span><span id="line-832"> * @return null is the top cell doesn't change. Otherwise, the NextState to return</span>
<span class="source-line-no">833</span><span id="line-833"> */</span>
<span class="source-line-no">834</span><span id="line-834"> private NextState needToReturn(List&lt;? super ExtendedCell&gt; outResult) {</span>
<span class="source-line-no">835</span><span id="line-835"> if (!outResult.isEmpty() &amp;&amp; topChanged) {</span>
<span class="source-line-no">836</span><span id="line-836"> return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;</span>
<span class="source-line-no">837</span><span id="line-837"> }</span>
<span class="source-line-no">838</span><span id="line-838"> return null;</span>
<span class="source-line-no">839</span><span id="line-839"> }</span>
<span class="source-line-no">840</span><span id="line-840"></span>
<span class="source-line-no">841</span><span id="line-841"> private void seekOrSkipToNextRow(ExtendedCell cell) throws IOException {</span>
<span class="source-line-no">842</span><span id="line-842"> // If it is a Get Scan, then we know that we are done with this row; there are no more</span>
<span class="source-line-no">843</span><span id="line-843"> // rows beyond the current one: don't try to optimize.</span>
<span class="source-line-no">844</span><span id="line-844"> if (!get) {</span>
<span class="source-line-no">845</span><span id="line-845"> if (trySkipToNextRow(cell)) {</span>
<span class="source-line-no">846</span><span id="line-846"> return;</span>
<span class="source-line-no">847</span><span id="line-847"> }</span>
<span class="source-line-no">848</span><span id="line-848"> }</span>
<span class="source-line-no">849</span><span id="line-849"> seekToNextRow(cell);</span>
<span class="source-line-no">850</span><span id="line-850"> }</span>
<span class="source-line-no">851</span><span id="line-851"></span>
<span class="source-line-no">852</span><span id="line-852"> private void seekOrSkipToNextColumn(ExtendedCell cell) throws IOException {</span>
<span class="source-line-no">853</span><span id="line-853"> if (!trySkipToNextColumn(cell)) {</span>
<span class="source-line-no">854</span><span id="line-854"> seekAsDirection(matcher.getKeyForNextColumn(cell));</span>
<span class="source-line-no">855</span><span id="line-855"> }</span>
<span class="source-line-no">856</span><span id="line-856"> }</span>
<span class="source-line-no">857</span><span id="line-857"></span>
<span class="source-line-no">858</span><span id="line-858"> /**</span>
<span class="source-line-no">859</span><span id="line-859"> * See if we should actually SEEK or rather just SKIP to the next Cell (see HBASE-13109).</span>
<span class="source-line-no">860</span><span id="line-860"> * ScanQueryMatcher may issue SEEK hints, such as seek to next column, next row, or seek to an</span>
<span class="source-line-no">861</span><span id="line-861"> * arbitrary seek key. This method decides whether a seek is the most efficient _actual_ way to</span>
<span class="source-line-no">862</span><span id="line-862"> * get us to the requested cell (SEEKs are more expensive than SKIP, SKIP, SKIP inside the</span>
<span class="source-line-no">863</span><span id="line-863"> * current, loaded block). It does this by looking at the next indexed key of the current HFile.</span>
<span class="source-line-no">864</span><span id="line-864"> * This key is then compared with the _SEEK_ key, where a SEEK key is an artificial 'last possible</span>
<span class="source-line-no">865</span><span id="line-865"> * key on the row' (only in here, we avoid actually creating a SEEK key; in the compare we work</span>
<span class="source-line-no">866</span><span id="line-866"> * with the current Cell but compare as though it were a seek key; see down in</span>
<span class="source-line-no">867</span><span id="line-867"> * matcher.compareKeyForNextRow, etc). If the compare gets us onto the next block we *_SEEK,</span>
<span class="source-line-no">868</span><span id="line-868"> * otherwise we just SKIP to the next requested cell.</span>
<span class="source-line-no">869</span><span id="line-869"> * &lt;p&gt;</span>
<span class="source-line-no">870</span><span id="line-870"> * Other notes:</span>
<span class="source-line-no">871</span><span id="line-871"> * &lt;ul&gt;</span>
<span class="source-line-no">872</span><span id="line-872"> * &lt;li&gt;Rows can straddle block boundaries&lt;/li&gt;</span>
<span class="source-line-no">873</span><span id="line-873"> * &lt;li&gt;Versions of columns can straddle block boundaries (i.e. column C1 at T1 might be in a</span>
<span class="source-line-no">874</span><span id="line-874"> * different block than column C1 at T2)&lt;/li&gt;</span>
<span class="source-line-no">875</span><span id="line-875"> * &lt;li&gt;We want to SKIP if the chance is high that we'll find the desired Cell after a few</span>
<span class="source-line-no">876</span><span id="line-876"> * SKIPs...&lt;/li&gt;</span>
<span class="source-line-no">877</span><span id="line-877"> * &lt;li&gt;We want to SEEK when the chance is high that we'll be able to seek past many Cells,</span>
<span class="source-line-no">878</span><span id="line-878"> * especially if we know we need to go to the next block.&lt;/li&gt;</span>
<span class="source-line-no">879</span><span id="line-879"> * &lt;/ul&gt;</span>
<span class="source-line-no">880</span><span id="line-880"> * &lt;p&gt;</span>
<span class="source-line-no">881</span><span id="line-881"> * A good proxy (best effort) to determine whether SKIP is better than SEEK is whether we'll</span>
<span class="source-line-no">882</span><span id="line-882"> * likely end up seeking to the next block (or past the next block) to get our next column.</span>
<span class="source-line-no">883</span><span id="line-883"> * Example:</span>
<span class="source-line-no">884</span><span id="line-884"> *</span>
<span class="source-line-no">885</span><span id="line-885"> * &lt;pre&gt;</span>
<span class="source-line-no">886</span><span id="line-886"> * | BLOCK 1 | BLOCK 2 |</span>
<span class="source-line-no">887</span><span id="line-887"> * | r1/c1, r1/c2, r1/c3 | r1/c4, r1/c5, r2/c1 |</span>
<span class="source-line-no">888</span><span id="line-888"> * ^ ^</span>
<span class="source-line-no">889</span><span id="line-889"> * | |</span>
<span class="source-line-no">890</span><span id="line-890"> * Next Index Key SEEK_NEXT_ROW (before r2/c1)</span>
<span class="source-line-no">891</span><span id="line-891"> *</span>
<span class="source-line-no">892</span><span id="line-892"> *</span>
<span class="source-line-no">893</span><span id="line-893"> * | BLOCK 1 | BLOCK 2 |</span>
<span class="source-line-no">894</span><span id="line-894"> * | r1/c1/t5, r1/c1/t4, r1/c1/t3 | r1/c1/t2, r1/c1/T1, r1/c2/T3 |</span>
<span class="source-line-no">895</span><span id="line-895"> * ^ ^</span>
<span class="source-line-no">896</span><span id="line-896"> * | |</span>
<span class="source-line-no">897</span><span id="line-897"> * Next Index Key SEEK_NEXT_COL</span>
<span class="source-line-no">898</span><span id="line-898"> * &lt;/pre&gt;</span>
<span class="source-line-no">899</span><span id="line-899"> *</span>
<span class="source-line-no">900</span><span id="line-900"> * Now imagine we want columns c1 and c3 (see first diagram above), the 'Next Index Key' of r1/c4</span>
<span class="source-line-no">901</span><span id="line-901"> * is &gt; r1/c3 so we should seek to get to the c1 on the next row, r2. In second case, say we only</span>
<span class="source-line-no">902</span><span id="line-902"> * want one version of c1, after we have it, a SEEK_COL will be issued to get to c2. Looking at</span>
<span class="source-line-no">903</span><span id="line-903"> * the 'Next Index Key', it would land us in the next block, so we should SEEK. In other scenarios</span>
<span class="source-line-no">904</span><span id="line-904"> * where the SEEK will not land us in the next block, it is very likely better to issues a series</span>
<span class="source-line-no">905</span><span id="line-905"> * of SKIPs.</span>
<span class="source-line-no">906</span><span id="line-906"> * @param cell current cell</span>
<span class="source-line-no">907</span><span id="line-907"> * @return true means skip to next row, false means not</span>
<span class="source-line-no">908</span><span id="line-908"> */</span>
<span class="source-line-no">909</span><span id="line-909"> protected boolean trySkipToNextRow(ExtendedCell cell) throws IOException {</span>
<span class="source-line-no">910</span><span id="line-910"> ExtendedCell nextCell = null;</span>
<span class="source-line-no">911</span><span id="line-911"> // used to guard against a changed next indexed key by doing a identity comparison</span>
<span class="source-line-no">912</span><span id="line-912"> // when the identity changes we need to compare the bytes again</span>
<span class="source-line-no">913</span><span id="line-913"> ExtendedCell previousIndexedKey = null;</span>
<span class="source-line-no">914</span><span id="line-914"> do {</span>
<span class="source-line-no">915</span><span id="line-915"> ExtendedCell nextIndexedKey = getNextIndexedKey();</span>
<span class="source-line-no">916</span><span id="line-916"> if (</span>
<span class="source-line-no">917</span><span id="line-917"> nextIndexedKey != null &amp;&amp; nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY</span>
<span class="source-line-no">918</span><span id="line-918"> &amp;&amp; (nextIndexedKey == previousIndexedKey</span>
<span class="source-line-no">919</span><span id="line-919"> || matcher.compareKeyForNextRow(nextIndexedKey, cell) &gt;= 0)</span>
<span class="source-line-no">920</span><span id="line-920"> ) {</span>
<span class="source-line-no">921</span><span id="line-921"> this.heap.next();</span>
<span class="source-line-no">922</span><span id="line-922"> ++kvsScanned;</span>
<span class="source-line-no">923</span><span id="line-923"> previousIndexedKey = nextIndexedKey;</span>
<span class="source-line-no">924</span><span id="line-924"> } else {</span>
<span class="source-line-no">925</span><span id="line-925"> return false;</span>
<span class="source-line-no">926</span><span id="line-926"> }</span>
<span class="source-line-no">927</span><span id="line-927"> } while ((nextCell = this.heap.peek()) != null &amp;&amp; CellUtil.matchingRows(cell, nextCell));</span>
<span class="source-line-no">928</span><span id="line-928"> return true;</span>
<span class="source-line-no">929</span><span id="line-929"> }</span>
<span class="source-line-no">930</span><span id="line-930"></span>
<span class="source-line-no">931</span><span id="line-931"> /**</span>
<span class="source-line-no">932</span><span id="line-932"> * See {@link #trySkipToNextRow(ExtendedCell)}</span>
<span class="source-line-no">933</span><span id="line-933"> * @param cell current cell</span>
<span class="source-line-no">934</span><span id="line-934"> * @return true means skip to next column, false means not</span>
<span class="source-line-no">935</span><span id="line-935"> */</span>
<span class="source-line-no">936</span><span id="line-936"> protected boolean trySkipToNextColumn(ExtendedCell cell) throws IOException {</span>
<span class="source-line-no">937</span><span id="line-937"> ExtendedCell nextCell = null;</span>
<span class="source-line-no">938</span><span id="line-938"> // used to guard against a changed next indexed key by doing a identity comparison</span>
<span class="source-line-no">939</span><span id="line-939"> // when the identity changes we need to compare the bytes again</span>
<span class="source-line-no">940</span><span id="line-940"> ExtendedCell previousIndexedKey = null;</span>
<span class="source-line-no">941</span><span id="line-941"> do {</span>
<span class="source-line-no">942</span><span id="line-942"> ExtendedCell nextIndexedKey = getNextIndexedKey();</span>
<span class="source-line-no">943</span><span id="line-943"> if (</span>
<span class="source-line-no">944</span><span id="line-944"> nextIndexedKey != null &amp;&amp; nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY</span>
<span class="source-line-no">945</span><span id="line-945"> &amp;&amp; (nextIndexedKey == previousIndexedKey</span>
<span class="source-line-no">946</span><span id="line-946"> || matcher.compareKeyForNextColumn(nextIndexedKey, cell) &gt;= 0)</span>
<span class="source-line-no">947</span><span id="line-947"> ) {</span>
<span class="source-line-no">948</span><span id="line-948"> this.heap.next();</span>
<span class="source-line-no">949</span><span id="line-949"> ++kvsScanned;</span>
<span class="source-line-no">950</span><span id="line-950"> previousIndexedKey = nextIndexedKey;</span>
<span class="source-line-no">951</span><span id="line-951"> } else {</span>
<span class="source-line-no">952</span><span id="line-952"> return false;</span>
<span class="source-line-no">953</span><span id="line-953"> }</span>
<span class="source-line-no">954</span><span id="line-954"> } while ((nextCell = this.heap.peek()) != null &amp;&amp; CellUtil.matchingRowColumn(cell, nextCell));</span>
<span class="source-line-no">955</span><span id="line-955"> // We need this check because it may happen that the new scanner that we get</span>
<span class="source-line-no">956</span><span id="line-956"> // during heap.next() is requiring reseek due of fake KV previously generated for</span>
<span class="source-line-no">957</span><span id="line-957"> // ROWCOL bloom filter optimization. See HBASE-19863 for more details</span>
<span class="source-line-no">958</span><span id="line-958"> if (</span>
<span class="source-line-no">959</span><span id="line-959"> useRowColBloom &amp;&amp; nextCell != null &amp;&amp; cell.getTimestamp() == PrivateConstants.OLDEST_TIMESTAMP</span>
<span class="source-line-no">960</span><span id="line-960"> ) {</span>
<span class="source-line-no">961</span><span id="line-961"> return false;</span>
<span class="source-line-no">962</span><span id="line-962"> }</span>
<span class="source-line-no">963</span><span id="line-963"> return true;</span>
<span class="source-line-no">964</span><span id="line-964"> }</span>
<span class="source-line-no">965</span><span id="line-965"></span>
<span class="source-line-no">966</span><span id="line-966"> @Override</span>
<span class="source-line-no">967</span><span id="line-967"> public long getReadPoint() {</span>
<span class="source-line-no">968</span><span id="line-968"> return this.readPt;</span>
<span class="source-line-no">969</span><span id="line-969"> }</span>
<span class="source-line-no">970</span><span id="line-970"></span>
<span class="source-line-no">971</span><span id="line-971"> private static void clearAndClose(List&lt;KeyValueScanner&gt; scanners) {</span>
<span class="source-line-no">972</span><span id="line-972"> if (scanners == null) {</span>
<span class="source-line-no">973</span><span id="line-973"> return;</span>
<span class="source-line-no">974</span><span id="line-974"> }</span>
<span class="source-line-no">975</span><span id="line-975"> for (KeyValueScanner s : scanners) {</span>
<span class="source-line-no">976</span><span id="line-976"> s.close();</span>
<span class="source-line-no">977</span><span id="line-977"> }</span>
<span class="source-line-no">978</span><span id="line-978"> scanners.clear();</span>
<span class="source-line-no">979</span><span id="line-979"> }</span>
<span class="source-line-no">980</span><span id="line-980"></span>
<span class="source-line-no">981</span><span id="line-981"> // Implementation of ChangedReadersObserver</span>
<span class="source-line-no">982</span><span id="line-982"> @Override</span>
<span class="source-line-no">983</span><span id="line-983"> public void updateReaders(List&lt;HStoreFile&gt; sfs, List&lt;KeyValueScanner&gt; memStoreScanners)</span>
<span class="source-line-no">984</span><span id="line-984"> throws IOException {</span>
<span class="source-line-no">985</span><span id="line-985"> if (CollectionUtils.isEmpty(sfs) &amp;&amp; CollectionUtils.isEmpty(memStoreScanners)) {</span>
<span class="source-line-no">986</span><span id="line-986"> return;</span>
<span class="source-line-no">987</span><span id="line-987"> }</span>
<span class="source-line-no">988</span><span id="line-988"> boolean updateReaders = false;</span>
<span class="source-line-no">989</span><span id="line-989"> flushLock.lock();</span>
<span class="source-line-no">990</span><span id="line-990"> try {</span>
<span class="source-line-no">991</span><span id="line-991"> if (!closeLock.tryLock()) {</span>
<span class="source-line-no">992</span><span id="line-992"> // The reason for doing this is that when the current store scanner does not retrieve</span>
<span class="source-line-no">993</span><span id="line-993"> // any new cells, then the scanner is considered to be done. The heap of this scanner</span>
<span class="source-line-no">994</span><span id="line-994"> // is not closed till the shipped() call is completed. Hence in that case if at all</span>
<span class="source-line-no">995</span><span id="line-995"> // the partial close (close (false)) has been called before updateReaders(), there is no</span>
<span class="source-line-no">996</span><span id="line-996"> // need for the updateReaders() to happen.</span>
<span class="source-line-no">997</span><span id="line-997"> LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");</span>
<span class="source-line-no">998</span><span id="line-998"> // no lock acquired.</span>
<span class="source-line-no">999</span><span id="line-999"> clearAndClose(memStoreScanners);</span>
<span class="source-line-no">1000</span><span id="line-1000"> return;</span>
<span class="source-line-no">1001</span><span id="line-1001"> }</span>
<span class="source-line-no">1002</span><span id="line-1002"> // lock acquired</span>
<span class="source-line-no">1003</span><span id="line-1003"> updateReaders = true;</span>
<span class="source-line-no">1004</span><span id="line-1004"> if (this.closing) {</span>
<span class="source-line-no">1005</span><span id="line-1005"> LOG.debug("StoreScanner already closing. There is no need to updateReaders");</span>
<span class="source-line-no">1006</span><span id="line-1006"> clearAndClose(memStoreScanners);</span>
<span class="source-line-no">1007</span><span id="line-1007"> return;</span>
<span class="source-line-no">1008</span><span id="line-1008"> }</span>
<span class="source-line-no">1009</span><span id="line-1009"> flushed = true;</span>
<span class="source-line-no">1010</span><span id="line-1010"> final boolean isCompaction = false;</span>
<span class="source-line-no">1011</span><span id="line-1011"> boolean usePread = get || scanUsePread;</span>
<span class="source-line-no">1012</span><span id="line-1012"> // SEE HBASE-19468 where the flushed files are getting compacted even before a scanner</span>
<span class="source-line-no">1013</span><span id="line-1013"> // calls next(). So its better we create scanners here rather than next() call. Ensure</span>
<span class="source-line-no">1014</span><span id="line-1014"> // these scanners are properly closed() whether or not the scan is completed successfully</span>
<span class="source-line-no">1015</span><span id="line-1015"> // Eagerly creating scanners so that we have the ref counting ticking on the newly created</span>
<span class="source-line-no">1016</span><span id="line-1016"> // store files. In case of stream scanners this eager creation does not induce performance</span>
<span class="source-line-no">1017</span><span id="line-1017"> // penalty because in scans (that uses stream scanners) the next() call is bound to happen.</span>
<span class="source-line-no">1018</span><span id="line-1018"> List&lt;KeyValueScanner&gt; scanners =</span>
<span class="source-line-no">1019</span><span id="line-1019"> store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher,</span>
<span class="source-line-no">1020</span><span id="line-1020"> scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan));</span>
<span class="source-line-no">1021</span><span id="line-1021"> flushedstoreFileScanners.addAll(scanners);</span>
<span class="source-line-no">1022</span><span id="line-1022"> if (!CollectionUtils.isEmpty(memStoreScanners)) {</span>
<span class="source-line-no">1023</span><span id="line-1023"> clearAndClose(memStoreScannersAfterFlush);</span>
<span class="source-line-no">1024</span><span id="line-1024"> memStoreScannersAfterFlush.addAll(memStoreScanners);</span>
<span class="source-line-no">1025</span><span id="line-1025"> }</span>
<span class="source-line-no">1026</span><span id="line-1026"> } finally {</span>
<span class="source-line-no">1027</span><span id="line-1027"> flushLock.unlock();</span>
<span class="source-line-no">1028</span><span id="line-1028"> if (updateReaders) {</span>
<span class="source-line-no">1029</span><span id="line-1029"> closeLock.unlock();</span>
<span class="source-line-no">1030</span><span id="line-1030"> }</span>
<span class="source-line-no">1031</span><span id="line-1031"> }</span>
<span class="source-line-no">1032</span><span id="line-1032"> // Let the next() call handle re-creating and seeking</span>
<span class="source-line-no">1033</span><span id="line-1033"> }</span>
<span class="source-line-no">1034</span><span id="line-1034"></span>
<span class="source-line-no">1035</span><span id="line-1035"> /** Returns if top of heap has changed (and KeyValueHeap has to try the next KV) */</span>
<span class="source-line-no">1036</span><span id="line-1036"> protected final boolean reopenAfterFlush() throws IOException {</span>
<span class="source-line-no">1037</span><span id="line-1037"> // here we can make sure that we have a Store instance so no null check on store.</span>
<span class="source-line-no">1038</span><span id="line-1038"> ExtendedCell lastTop = heap.peek();</span>
<span class="source-line-no">1039</span><span id="line-1039"> // When we have the scan object, should we not pass it to getScanners() to get a limited set of</span>
<span class="source-line-no">1040</span><span id="line-1040"> // scanners? We did so in the constructor and we could have done it now by storing the scan</span>
<span class="source-line-no">1041</span><span id="line-1041"> // object from the constructor</span>
<span class="source-line-no">1042</span><span id="line-1042"> List&lt;KeyValueScanner&gt; scanners;</span>
<span class="source-line-no">1043</span><span id="line-1043"> flushLock.lock();</span>
<span class="source-line-no">1044</span><span id="line-1044"> try {</span>
<span class="source-line-no">1045</span><span id="line-1045"> List&lt;KeyValueScanner&gt; allScanners =</span>
<span class="source-line-no">1046</span><span id="line-1046"> new ArrayList&lt;&gt;(flushedstoreFileScanners.size() + memStoreScannersAfterFlush.size());</span>
<span class="source-line-no">1047</span><span id="line-1047"> allScanners.addAll(flushedstoreFileScanners);</span>
<span class="source-line-no">1048</span><span id="line-1048"> allScanners.addAll(memStoreScannersAfterFlush);</span>
<span class="source-line-no">1049</span><span id="line-1049"> scanners = selectScannersFrom(store, allScanners);</span>
<span class="source-line-no">1050</span><span id="line-1050"> // Clear the current set of flushed store files scanners so that they don't get added again</span>
<span class="source-line-no">1051</span><span id="line-1051"> flushedstoreFileScanners.clear();</span>
<span class="source-line-no">1052</span><span id="line-1052"> memStoreScannersAfterFlush.clear();</span>
<span class="source-line-no">1053</span><span id="line-1053"> } finally {</span>
<span class="source-line-no">1054</span><span id="line-1054"> flushLock.unlock();</span>
<span class="source-line-no">1055</span><span id="line-1055"> }</span>
<span class="source-line-no">1056</span><span id="line-1056"></span>
<span class="source-line-no">1057</span><span id="line-1057"> // Seek the new scanners to the last key</span>
<span class="source-line-no">1058</span><span id="line-1058"> seekScanners(scanners, lastTop, false, parallelSeekEnabled);</span>
<span class="source-line-no">1059</span><span id="line-1059"> // remove the older memstore scanner</span>
<span class="source-line-no">1060</span><span id="line-1060"> for (int i = currentScanners.size() - 1; i &gt;= 0; i--) {</span>
<span class="source-line-no">1061</span><span id="line-1061"> if (!currentScanners.get(i).isFileScanner()) {</span>
<span class="source-line-no">1062</span><span id="line-1062"> scannersForDelayedClose.add(currentScanners.remove(i));</span>
<span class="source-line-no">1063</span><span id="line-1063"> } else {</span>
<span class="source-line-no">1064</span><span id="line-1064"> // we add the memstore scanner to the end of currentScanners</span>
<span class="source-line-no">1065</span><span id="line-1065"> break;</span>
<span class="source-line-no">1066</span><span id="line-1066"> }</span>
<span class="source-line-no">1067</span><span id="line-1067"> }</span>
<span class="source-line-no">1068</span><span id="line-1068"> // add the newly created scanners on the flushed files and the current active memstore scanner</span>
<span class="source-line-no">1069</span><span id="line-1069"> addCurrentScanners(scanners);</span>
<span class="source-line-no">1070</span><span id="line-1070"> // Combine all seeked scanners with a heap</span>
<span class="source-line-no">1071</span><span id="line-1071"> resetKVHeap(this.currentScanners, store.getComparator());</span>
<span class="source-line-no">1072</span><span id="line-1072"> resetQueryMatcher(lastTop);</span>
<span class="source-line-no">1073</span><span id="line-1073"> if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {</span>
<span class="source-line-no">1074</span><span id="line-1074"> LOG.info("Storescanner.peek() is changed where before = " + lastTop.toString()</span>
<span class="source-line-no">1075</span><span id="line-1075"> + ",and after = " + heap.peek());</span>
<span class="source-line-no">1076</span><span id="line-1076"> topChanged = true;</span>
<span class="source-line-no">1077</span><span id="line-1077"> } else {</span>
<span class="source-line-no">1078</span><span id="line-1078"> topChanged = false;</span>
<span class="source-line-no">1079</span><span id="line-1079"> }</span>
<span class="source-line-no">1080</span><span id="line-1080"> return topChanged;</span>
<span class="source-line-no">1081</span><span id="line-1081"> }</span>
<span class="source-line-no">1082</span><span id="line-1082"></span>
<span class="source-line-no">1083</span><span id="line-1083"> private void resetQueryMatcher(ExtendedCell lastTopKey) {</span>
<span class="source-line-no">1084</span><span id="line-1084"> // Reset the state of the Query Matcher and set to top row.</span>
<span class="source-line-no">1085</span><span id="line-1085"> // Only reset and call setRow if the row changes; avoids confusing the</span>
<span class="source-line-no">1086</span><span id="line-1086"> // query matcher if scanning intra-row.</span>
<span class="source-line-no">1087</span><span id="line-1087"> ExtendedCell cell = heap.peek();</span>
<span class="source-line-no">1088</span><span id="line-1088"> if (cell == null) {</span>
<span class="source-line-no">1089</span><span id="line-1089"> cell = lastTopKey;</span>
<span class="source-line-no">1090</span><span id="line-1090"> }</span>
<span class="source-line-no">1091</span><span id="line-1091"> if ((matcher.currentRow() == null) || !CellUtil.matchingRows(cell, matcher.currentRow())) {</span>
<span class="source-line-no">1092</span><span id="line-1092"> this.countPerRow = 0;</span>
<span class="source-line-no">1093</span><span id="line-1093"> // The setToNewRow will call reset internally</span>
<span class="source-line-no">1094</span><span id="line-1094"> matcher.setToNewRow(cell);</span>
<span class="source-line-no">1095</span><span id="line-1095"> }</span>
<span class="source-line-no">1096</span><span id="line-1096"> }</span>
<span class="source-line-no">1097</span><span id="line-1097"></span>
<span class="source-line-no">1098</span><span id="line-1098"> /**</span>
<span class="source-line-no">1099</span><span id="line-1099"> * Check whether scan as expected order</span>
<span class="source-line-no">1100</span><span id="line-1100"> */</span>
<span class="source-line-no">1101</span><span id="line-1101"> protected void checkScanOrder(Cell prevKV, Cell kv, CellComparator comparator)</span>
<span class="source-line-no">1102</span><span id="line-1102"> throws IOException {</span>
<span class="source-line-no">1103</span><span id="line-1103"> // Check that the heap gives us KVs in an increasing order.</span>
<span class="source-line-no">1104</span><span id="line-1104"> assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) &lt;= 0</span>
<span class="source-line-no">1105</span><span id="line-1105"> : "Key " + prevKV + " followed by a smaller key " + kv + " in cf " + store;</span>
<span class="source-line-no">1106</span><span id="line-1106"> }</span>
<span class="source-line-no">1107</span><span id="line-1107"></span>
<span class="source-line-no">1108</span><span id="line-1108"> protected boolean seekToNextRow(ExtendedCell c) throws IOException {</span>
<span class="source-line-no">1109</span><span id="line-1109"> return reseek(PrivateCellUtil.createLastOnRow(c));</span>
<span class="source-line-no">1110</span><span id="line-1110"> }</span>
<span class="source-line-no">1111</span><span id="line-1111"></span>
<span class="source-line-no">1112</span><span id="line-1112"> /**</span>
<span class="source-line-no">1113</span><span id="line-1113"> * Do a reseek in a normal StoreScanner(scan forward)</span>
<span class="source-line-no">1114</span><span id="line-1114"> * @return true if scanner has values left, false if end of scanner</span>
<span class="source-line-no">1115</span><span id="line-1115"> */</span>
<span class="source-line-no">1116</span><span id="line-1116"> protected boolean seekAsDirection(ExtendedCell kv) throws IOException {</span>
<span class="source-line-no">1117</span><span id="line-1117"> return reseek(kv);</span>
<span class="source-line-no">1118</span><span id="line-1118"> }</span>
<span class="source-line-no">1119</span><span id="line-1119"></span>
<span class="source-line-no">1120</span><span id="line-1120"> @Override</span>
<span class="source-line-no">1121</span><span id="line-1121"> public boolean reseek(ExtendedCell kv) throws IOException {</span>
<span class="source-line-no">1122</span><span id="line-1122"> if (checkFlushed()) {</span>
<span class="source-line-no">1123</span><span id="line-1123"> reopenAfterFlush();</span>
<span class="source-line-no">1124</span><span id="line-1124"> }</span>
<span class="source-line-no">1125</span><span id="line-1125"> if (explicitColumnQuery &amp;&amp; lazySeekEnabledGlobally) {</span>
<span class="source-line-no">1126</span><span id="line-1126"> return heap.requestSeek(kv, true, useRowColBloom);</span>
<span class="source-line-no">1127</span><span id="line-1127"> }</span>
<span class="source-line-no">1128</span><span id="line-1128"> return heap.reseek(kv);</span>
<span class="source-line-no">1129</span><span id="line-1129"> }</span>
<span class="source-line-no">1130</span><span id="line-1130"></span>
<span class="source-line-no">1131</span><span id="line-1131"> void trySwitchToStreamRead() {</span>
<span class="source-line-no">1132</span><span id="line-1132"> if (</span>
<span class="source-line-no">1133</span><span id="line-1133"> readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null</span>
<span class="source-line-no">1134</span><span id="line-1134"> || bytesRead &lt; preadMaxBytes</span>
<span class="source-line-no">1135</span><span id="line-1135"> ) {</span>
<span class="source-line-no">1136</span><span id="line-1136"> return;</span>
<span class="source-line-no">1137</span><span id="line-1137"> }</span>
<span class="source-line-no">1138</span><span id="line-1138"> LOG.debug("Switch to stream read (scanned={} bytes) of {}", bytesRead,</span>
<span class="source-line-no">1139</span><span id="line-1139"> this.store.getColumnFamilyName());</span>
<span class="source-line-no">1140</span><span id="line-1140"> scanUsePread = false;</span>
<span class="source-line-no">1141</span><span id="line-1141"> ExtendedCell lastTop = heap.peek();</span>
<span class="source-line-no">1142</span><span id="line-1142"> List&lt;KeyValueScanner&gt; memstoreScanners = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">1143</span><span id="line-1143"> List&lt;KeyValueScanner&gt; scannersToClose = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">1144</span><span id="line-1144"> for (KeyValueScanner kvs : currentScanners) {</span>
<span class="source-line-no">1145</span><span id="line-1145"> if (!kvs.isFileScanner()) {</span>
<span class="source-line-no">1146</span><span id="line-1146"> // collect memstorescanners here</span>
<span class="source-line-no">1147</span><span id="line-1147"> memstoreScanners.add(kvs);</span>
<span class="source-line-no">1148</span><span id="line-1148"> } else {</span>
<span class="source-line-no">1149</span><span id="line-1149"> scannersToClose.add(kvs);</span>
<span class="source-line-no">1150</span><span id="line-1150"> }</span>
<span class="source-line-no">1151</span><span id="line-1151"> }</span>
<span class="source-line-no">1152</span><span id="line-1152"> List&lt;KeyValueScanner&gt; fileScanners = null;</span>
<span class="source-line-no">1153</span><span id="line-1153"> List&lt;KeyValueScanner&gt; newCurrentScanners;</span>
<span class="source-line-no">1154</span><span id="line-1154"> KeyValueHeap newHeap;</span>
<span class="source-line-no">1155</span><span id="line-1155"> try {</span>
<span class="source-line-no">1156</span><span id="line-1156"> // We must have a store instance here so no null check</span>
<span class="source-line-no">1157</span><span id="line-1157"> // recreate the scanners on the current file scanners</span>
<span class="source-line-no">1158</span><span id="line-1158"> fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher,</span>
<span class="source-line-no">1159</span><span id="line-1159"> scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),</span>
<span class="source-line-no">1160</span><span id="line-1160"> readPt, false);</span>
<span class="source-line-no">1161</span><span id="line-1161"> if (fileScanners == null) {</span>
<span class="source-line-no">1162</span><span id="line-1162"> return;</span>
<span class="source-line-no">1163</span><span id="line-1163"> }</span>
<span class="source-line-no">1164</span><span id="line-1164"> seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);</span>
<span class="source-line-no">1165</span><span id="line-1165"> newCurrentScanners = new ArrayList&lt;&gt;(fileScanners.size() + memstoreScanners.size());</span>
<span class="source-line-no">1166</span><span id="line-1166"> newCurrentScanners.addAll(fileScanners);</span>
<span class="source-line-no">1167</span><span id="line-1167"> newCurrentScanners.addAll(memstoreScanners);</span>
<span class="source-line-no">1168</span><span id="line-1168"> newHeap = newKVHeap(newCurrentScanners, comparator);</span>
<span class="source-line-no">1169</span><span id="line-1169"> } catch (Exception e) {</span>
<span class="source-line-no">1170</span><span id="line-1170"> LOG.warn("failed to switch to stream read", e);</span>
<span class="source-line-no">1171</span><span id="line-1171"> if (fileScanners != null) {</span>
<span class="source-line-no">1172</span><span id="line-1172"> fileScanners.forEach(KeyValueScanner::close);</span>
<span class="source-line-no">1173</span><span id="line-1173"> }</span>
<span class="source-line-no">1174</span><span id="line-1174"> return;</span>
<span class="source-line-no">1175</span><span id="line-1175"> }</span>
<span class="source-line-no">1176</span><span id="line-1176"> currentScanners.clear();</span>
<span class="source-line-no">1177</span><span id="line-1177"> addCurrentScanners(newCurrentScanners);</span>
<span class="source-line-no">1178</span><span id="line-1178"> this.heap = newHeap;</span>
<span class="source-line-no">1179</span><span id="line-1179"> resetQueryMatcher(lastTop);</span>
<span class="source-line-no">1180</span><span id="line-1180"> scannersToClose.forEach(KeyValueScanner::close);</span>
<span class="source-line-no">1181</span><span id="line-1181"> }</span>
<span class="source-line-no">1182</span><span id="line-1182"></span>
<span class="source-line-no">1183</span><span id="line-1183"> protected final boolean checkFlushed() {</span>
<span class="source-line-no">1184</span><span id="line-1184"> // check the var without any lock. Suppose even if we see the old</span>
<span class="source-line-no">1185</span><span id="line-1185"> // value here still it is ok to continue because we will not be resetting</span>
<span class="source-line-no">1186</span><span id="line-1186"> // the heap but will continue with the referenced memstore's snapshot. For compactions</span>
<span class="source-line-no">1187</span><span id="line-1187"> // any way we don't need the updateReaders at all to happen as we still continue with</span>
<span class="source-line-no">1188</span><span id="line-1188"> // the older files</span>
<span class="source-line-no">1189</span><span id="line-1189"> if (flushed) {</span>
<span class="source-line-no">1190</span><span id="line-1190"> // If there is a flush and the current scan is notified on the flush ensure that the</span>
<span class="source-line-no">1191</span><span id="line-1191"> // scan's heap gets reset and we do a seek on the newly flushed file.</span>
<span class="source-line-no">1192</span><span id="line-1192"> if (this.closing) {</span>
<span class="source-line-no">1193</span><span id="line-1193"> return false;</span>
<span class="source-line-no">1194</span><span id="line-1194"> }</span>
<span class="source-line-no">1195</span><span id="line-1195"> // reset the flag</span>
<span class="source-line-no">1196</span><span id="line-1196"> flushed = false;</span>
<span class="source-line-no">1197</span><span id="line-1197"> return true;</span>
<span class="source-line-no">1198</span><span id="line-1198"> }</span>
<span class="source-line-no">1199</span><span id="line-1199"> return false;</span>
<span class="source-line-no">1200</span><span id="line-1200"> }</span>
<span class="source-line-no">1201</span><span id="line-1201"></span>
<span class="source-line-no">1202</span><span id="line-1202"> /**</span>
<span class="source-line-no">1203</span><span id="line-1203"> * Seek storefiles in parallel to optimize IO latency as much as possible</span>
<span class="source-line-no">1204</span><span id="line-1204"> * @param scanners the list {@link KeyValueScanner}s to be read from</span>
<span class="source-line-no">1205</span><span id="line-1205"> * @param kv the KeyValue on which the operation is being requested</span>
<span class="source-line-no">1206</span><span id="line-1206"> */</span>
<span class="source-line-no">1207</span><span id="line-1207"> private void parallelSeek(final List&lt;? extends KeyValueScanner&gt; scanners, final ExtendedCell kv)</span>
<span class="source-line-no">1208</span><span id="line-1208"> throws IOException {</span>
<span class="source-line-no">1209</span><span id="line-1209"> if (scanners.isEmpty()) return;</span>
<span class="source-line-no">1210</span><span id="line-1210"> int storeFileScannerCount = scanners.size();</span>
<span class="source-line-no">1211</span><span id="line-1211"> CountDownLatch latch = new CountDownLatch(storeFileScannerCount);</span>
<span class="source-line-no">1212</span><span id="line-1212"> List&lt;ParallelSeekHandler&gt; handlers = new ArrayList&lt;&gt;(storeFileScannerCount);</span>
<span class="source-line-no">1213</span><span id="line-1213"> for (KeyValueScanner scanner : scanners) {</span>
<span class="source-line-no">1214</span><span id="line-1214"> if (scanner instanceof StoreFileScanner) {</span>
<span class="source-line-no">1215</span><span id="line-1215"> ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, this.readPt, latch);</span>
<span class="source-line-no">1216</span><span id="line-1216"> executor.submit(seekHandler);</span>
<span class="source-line-no">1217</span><span id="line-1217"> handlers.add(seekHandler);</span>
<span class="source-line-no">1218</span><span id="line-1218"> } else {</span>
<span class="source-line-no">1219</span><span id="line-1219"> scanner.seek(kv);</span>
<span class="source-line-no">1220</span><span id="line-1220"> latch.countDown();</span>
<span class="source-line-no">1221</span><span id="line-1221"> }</span>
<span class="source-line-no">1222</span><span id="line-1222"> }</span>
<span class="source-line-no">1223</span><span id="line-1223"></span>
<span class="source-line-no">1224</span><span id="line-1224"> try {</span>
<span class="source-line-no">1225</span><span id="line-1225"> latch.await();</span>
<span class="source-line-no">1226</span><span id="line-1226"> } catch (InterruptedException ie) {</span>
<span class="source-line-no">1227</span><span id="line-1227"> throw (InterruptedIOException) new InterruptedIOException().initCause(ie);</span>
<span class="source-line-no">1228</span><span id="line-1228"> }</span>
<span class="source-line-no">1229</span><span id="line-1229"></span>
<span class="source-line-no">1230</span><span id="line-1230"> for (ParallelSeekHandler handler : handlers) {</span>
<span class="source-line-no">1231</span><span id="line-1231"> if (handler.getErr() != null) {</span>
<span class="source-line-no">1232</span><span id="line-1232"> throw new IOException(handler.getErr());</span>
<span class="source-line-no">1233</span><span id="line-1233"> }</span>
<span class="source-line-no">1234</span><span id="line-1234"> }</span>
<span class="source-line-no">1235</span><span id="line-1235"> }</span>
<span class="source-line-no">1236</span><span id="line-1236"></span>
<span class="source-line-no">1237</span><span id="line-1237"> /**</span>
<span class="source-line-no">1238</span><span id="line-1238"> * Used in testing.</span>
<span class="source-line-no">1239</span><span id="line-1239"> * @return all scanners in no particular order</span>
<span class="source-line-no">1240</span><span id="line-1240"> */</span>
<span class="source-line-no">1241</span><span id="line-1241"> List&lt;KeyValueScanner&gt; getAllScannersForTesting() {</span>
<span class="source-line-no">1242</span><span id="line-1242"> List&lt;KeyValueScanner&gt; allScanners = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">1243</span><span id="line-1243"> KeyValueScanner current = heap.getCurrentForTesting();</span>
<span class="source-line-no">1244</span><span id="line-1244"> if (current != null) allScanners.add(current);</span>
<span class="source-line-no">1245</span><span id="line-1245"> for (KeyValueScanner scanner : heap.getHeap())</span>
<span class="source-line-no">1246</span><span id="line-1246"> allScanners.add(scanner);</span>
<span class="source-line-no">1247</span><span id="line-1247"> return allScanners;</span>
<span class="source-line-no">1248</span><span id="line-1248"> }</span>
<span class="source-line-no">1249</span><span id="line-1249"></span>
<span class="source-line-no">1250</span><span id="line-1250"> static void enableLazySeekGlobally(boolean enable) {</span>
<span class="source-line-no">1251</span><span id="line-1251"> lazySeekEnabledGlobally = enable;</span>
<span class="source-line-no">1252</span><span id="line-1252"> }</span>
<span class="source-line-no">1253</span><span id="line-1253"></span>
<span class="source-line-no">1254</span><span id="line-1254"> /** Returns The estimated number of KVs seen by this scanner (includes some skipped KVs). */</span>
<span class="source-line-no">1255</span><span id="line-1255"> public long getEstimatedNumberOfKvsScanned() {</span>
<span class="source-line-no">1256</span><span id="line-1256"> return this.kvsScanned;</span>
<span class="source-line-no">1257</span><span id="line-1257"> }</span>
<span class="source-line-no">1258</span><span id="line-1258"></span>
<span class="source-line-no">1259</span><span id="line-1259"> @Override</span>
<span class="source-line-no">1260</span><span id="line-1260"> public ExtendedCell getNextIndexedKey() {</span>
<span class="source-line-no">1261</span><span id="line-1261"> return this.heap.getNextIndexedKey();</span>
<span class="source-line-no">1262</span><span id="line-1262"> }</span>
<span class="source-line-no">1263</span><span id="line-1263"></span>
<span class="source-line-no">1264</span><span id="line-1264"> @Override</span>
<span class="source-line-no">1265</span><span id="line-1265"> public void shipped() throws IOException {</span>
<span class="source-line-no">1266</span><span id="line-1266"> if (prevCell != null) {</span>
<span class="source-line-no">1267</span><span id="line-1267"> // Do the copy here so that in case the prevCell ref is pointing to the previous</span>
<span class="source-line-no">1268</span><span id="line-1268"> // blocks we can safely release those blocks.</span>
<span class="source-line-no">1269</span><span id="line-1269"> // This applies to blocks that are got from Bucket cache, L1 cache and the blocks</span>
<span class="source-line-no">1270</span><span id="line-1270"> // fetched from HDFS. Copying this would ensure that we let go the references to these</span>
<span class="source-line-no">1271</span><span id="line-1271"> // blocks so that they can be GCed safely(in case of bucket cache)</span>
<span class="source-line-no">1272</span><span id="line-1272"> prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);</span>
<span class="source-line-no">1273</span><span id="line-1273"> }</span>
<span class="source-line-no">1274</span><span id="line-1274"> matcher.beforeShipped();</span>
<span class="source-line-no">1275</span><span id="line-1275"> // There wont be further fetch of Cells from these scanners. Just close.</span>
<span class="source-line-no">1276</span><span id="line-1276"> clearAndClose(scannersForDelayedClose);</span>
<span class="source-line-no">1277</span><span id="line-1277"> if (this.heap != null) {</span>
<span class="source-line-no">1278</span><span id="line-1278"> this.heap.shipped();</span>
<span class="source-line-no">1279</span><span id="line-1279"> // When switching from pread to stream, we will open a new scanner for each store file, but</span>
<span class="source-line-no">1280</span><span id="line-1280"> // the old scanner may still track the HFileBlocks we have scanned but not sent back to client</span>
<span class="source-line-no">1281</span><span id="line-1281"> // yet. If we close the scanner immediately then the HFileBlocks may be messed up by others</span>
<span class="source-line-no">1282</span><span id="line-1282"> // before we serialize and send it back to client. The HFileBlocks will be released in shipped</span>
<span class="source-line-no">1283</span><span id="line-1283"> // method, so we here will also open new scanners and close old scanners in shipped method.</span>
<span class="source-line-no">1284</span><span id="line-1284"> // See HBASE-18055 for more details.</span>
<span class="source-line-no">1285</span><span id="line-1285"> trySwitchToStreamRead();</span>
<span class="source-line-no">1286</span><span id="line-1286"> }</span>
<span class="source-line-no">1287</span><span id="line-1287"> }</span>
<span class="source-line-no">1288</span><span id="line-1288">}</span>
</pre>
</div>
</main>
</body>
</html>