| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>Source code</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta name="description" content="source: package: org.apache.hadoop.hbase.regionserver, class: HStore"> |
| <meta name="generator" content="javadoc/SourceToHTMLConverter"> |
| <link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style"> |
| </head> |
| <body class="source-page"> |
| <main role="main"> |
| <div class="source-container"> |
| <pre><span class="source-line-no">001</span><span id="line-1">/*</span> |
| <span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span> |
| <span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span> |
| <span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span> |
| <span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span> |
| <span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span> |
| <span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span> |
| <span class="source-line-no">009</span><span id="line-9"> *</span> |
| <span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="source-line-no">011</span><span id="line-11"> *</span> |
| <span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span> |
| <span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span> |
| <span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span> |
| <span class="source-line-no">017</span><span id="line-17"> */</span> |
| <span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.regionserver;</span> |
| <span class="source-line-no">019</span><span id="line-19"></span> |
| <span class="source-line-no">020</span><span id="line-20">import com.google.errorprone.annotations.RestrictedApi;</span> |
| <span class="source-line-no">021</span><span id="line-21">import java.io.IOException;</span> |
| <span class="source-line-no">022</span><span id="line-22">import java.io.InterruptedIOException;</span> |
| <span class="source-line-no">023</span><span id="line-23">import java.net.InetSocketAddress;</span> |
| <span class="source-line-no">024</span><span id="line-24">import java.util.ArrayList;</span> |
| <span class="source-line-no">025</span><span id="line-25">import java.util.Collection;</span> |
| <span class="source-line-no">026</span><span id="line-26">import java.util.Collections;</span> |
| <span class="source-line-no">027</span><span id="line-27">import java.util.HashMap;</span> |
| <span class="source-line-no">028</span><span id="line-28">import java.util.HashSet;</span> |
| <span class="source-line-no">029</span><span id="line-29">import java.util.Iterator;</span> |
| <span class="source-line-no">030</span><span id="line-30">import java.util.List;</span> |
| <span class="source-line-no">031</span><span id="line-31">import java.util.Map;</span> |
| <span class="source-line-no">032</span><span id="line-32">import java.util.Map.Entry;</span> |
| <span class="source-line-no">033</span><span id="line-33">import java.util.NavigableSet;</span> |
| <span class="source-line-no">034</span><span id="line-34">import java.util.Optional;</span> |
| <span class="source-line-no">035</span><span id="line-35">import java.util.OptionalDouble;</span> |
| <span class="source-line-no">036</span><span id="line-36">import java.util.OptionalInt;</span> |
| <span class="source-line-no">037</span><span id="line-37">import java.util.OptionalLong;</span> |
| <span class="source-line-no">038</span><span id="line-38">import java.util.Set;</span> |
| <span class="source-line-no">039</span><span id="line-39">import java.util.concurrent.Callable;</span> |
| <span class="source-line-no">040</span><span id="line-40">import java.util.concurrent.CompletionService;</span> |
| <span class="source-line-no">041</span><span id="line-41">import java.util.concurrent.ConcurrentHashMap;</span> |
| <span class="source-line-no">042</span><span id="line-42">import java.util.concurrent.ExecutionException;</span> |
| <span class="source-line-no">043</span><span id="line-43">import java.util.concurrent.ExecutorCompletionService;</span> |
| <span class="source-line-no">044</span><span id="line-44">import java.util.concurrent.Future;</span> |
| <span class="source-line-no">045</span><span id="line-45">import java.util.concurrent.ThreadPoolExecutor;</span> |
| <span class="source-line-no">046</span><span id="line-46">import java.util.concurrent.atomic.AtomicBoolean;</span> |
| <span class="source-line-no">047</span><span id="line-47">import java.util.concurrent.atomic.AtomicInteger;</span> |
| <span class="source-line-no">048</span><span id="line-48">import java.util.concurrent.atomic.AtomicLong;</span> |
| <span class="source-line-no">049</span><span id="line-49">import java.util.concurrent.atomic.LongAdder;</span> |
| <span class="source-line-no">050</span><span id="line-50">import java.util.concurrent.locks.ReentrantLock;</span> |
| <span class="source-line-no">051</span><span id="line-51">import java.util.function.Consumer;</span> |
| <span class="source-line-no">052</span><span id="line-52">import java.util.function.Supplier;</span> |
| <span class="source-line-no">053</span><span id="line-53">import java.util.function.ToLongFunction;</span> |
| <span class="source-line-no">054</span><span id="line-54">import java.util.stream.Collectors;</span> |
| <span class="source-line-no">055</span><span id="line-55">import java.util.stream.LongStream;</span> |
| <span class="source-line-no">056</span><span id="line-56">import org.apache.hadoop.conf.Configuration;</span> |
| <span class="source-line-no">057</span><span id="line-57">import org.apache.hadoop.fs.FileSystem;</span> |
| <span class="source-line-no">058</span><span id="line-58">import org.apache.hadoop.fs.Path;</span> |
| <span class="source-line-no">059</span><span id="line-59">import org.apache.hadoop.fs.permission.FsAction;</span> |
| <span class="source-line-no">060</span><span id="line-60">import org.apache.hadoop.hbase.Cell;</span> |
| <span class="source-line-no">061</span><span id="line-61">import org.apache.hadoop.hbase.CellComparator;</span> |
| <span class="source-line-no">062</span><span id="line-62">import org.apache.hadoop.hbase.CellUtil;</span> |
| <span class="source-line-no">063</span><span id="line-63">import org.apache.hadoop.hbase.ExtendedCell;</span> |
| <span class="source-line-no">064</span><span id="line-64">import org.apache.hadoop.hbase.HConstants;</span> |
| <span class="source-line-no">065</span><span id="line-65">import org.apache.hadoop.hbase.InnerStoreCellComparator;</span> |
| <span class="source-line-no">066</span><span id="line-66">import org.apache.hadoop.hbase.MemoryCompactionPolicy;</span> |
| <span class="source-line-no">067</span><span id="line-67">import org.apache.hadoop.hbase.MetaCellComparator;</span> |
| <span class="source-line-no">068</span><span id="line-68">import org.apache.hadoop.hbase.TableName;</span> |
| <span class="source-line-no">069</span><span id="line-69">import org.apache.hadoop.hbase.backup.FailedArchiveException;</span> |
| <span class="source-line-no">070</span><span id="line-70">import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;</span> |
| <span class="source-line-no">071</span><span id="line-71">import org.apache.hadoop.hbase.client.RegionInfo;</span> |
| <span class="source-line-no">072</span><span id="line-72">import org.apache.hadoop.hbase.client.Scan;</span> |
| <span class="source-line-no">073</span><span id="line-73">import org.apache.hadoop.hbase.conf.ConfigurationManager;</span> |
| <span class="source-line-no">074</span><span id="line-74">import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;</span> |
| <span class="source-line-no">075</span><span id="line-75">import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;</span> |
| <span class="source-line-no">076</span><span id="line-76">import org.apache.hadoop.hbase.io.HeapSize;</span> |
| <span class="source-line-no">077</span><span id="line-77">import org.apache.hadoop.hbase.io.hfile.CacheConfig;</span> |
| <span class="source-line-no">078</span><span id="line-78">import org.apache.hadoop.hbase.io.hfile.HFile;</span> |
| <span class="source-line-no">079</span><span id="line-79">import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;</span> |
| <span class="source-line-no">080</span><span id="line-80">import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;</span> |
| <span class="source-line-no">081</span><span id="line-81">import org.apache.hadoop.hbase.io.hfile.HFileScanner;</span> |
| <span class="source-line-no">082</span><span id="line-82">import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;</span> |
| <span class="source-line-no">083</span><span id="line-83">import org.apache.hadoop.hbase.monitoring.MonitoredTask;</span> |
| <span class="source-line-no">084</span><span id="line-84">import org.apache.hadoop.hbase.quotas.RegionSizeStore;</span> |
| <span class="source-line-no">085</span><span id="line-85">import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;</span> |
| <span class="source-line-no">086</span><span id="line-86">import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;</span> |
| <span class="source-line-no">087</span><span id="line-87">import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;</span> |
| <span class="source-line-no">088</span><span id="line-88">import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;</span> |
| <span class="source-line-no">089</span><span id="line-89">import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;</span> |
| <span class="source-line-no">090</span><span id="line-90">import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;</span> |
| <span class="source-line-no">091</span><span id="line-91">import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;</span> |
| <span class="source-line-no">092</span><span id="line-92">import org.apache.hadoop.hbase.regionserver.wal.WALUtil;</span> |
| <span class="source-line-no">093</span><span id="line-93">import org.apache.hadoop.hbase.security.EncryptionUtil;</span> |
| <span class="source-line-no">094</span><span id="line-94">import org.apache.hadoop.hbase.security.User;</span> |
| <span class="source-line-no">095</span><span id="line-95">import org.apache.hadoop.hbase.util.Bytes;</span> |
| <span class="source-line-no">096</span><span id="line-96">import org.apache.hadoop.hbase.util.ClassSize;</span> |
| <span class="source-line-no">097</span><span id="line-97">import org.apache.hadoop.hbase.util.CommonFSUtils;</span> |
| <span class="source-line-no">098</span><span id="line-98">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span> |
| <span class="source-line-no">099</span><span id="line-99">import org.apache.hadoop.hbase.util.Pair;</span> |
| <span class="source-line-no">100</span><span id="line-100">import org.apache.hadoop.hbase.util.ReflectionUtils;</span> |
| <span class="source-line-no">101</span><span id="line-101">import org.apache.hadoop.util.StringUtils;</span> |
| <span class="source-line-no">102</span><span id="line-102">import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;</span> |
| <span class="source-line-no">103</span><span id="line-103">import org.apache.yetus.audience.InterfaceAudience;</span> |
| <span class="source-line-no">104</span><span id="line-104">import org.slf4j.Logger;</span> |
| <span class="source-line-no">105</span><span id="line-105">import org.slf4j.LoggerFactory;</span> |
| <span class="source-line-no">106</span><span id="line-106"></span> |
| <span class="source-line-no">107</span><span id="line-107">import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;</span> |
| <span class="source-line-no">108</span><span id="line-108">import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection;</span> |
| <span class="source-line-no">109</span><span id="line-109">import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;</span> |
| <span class="source-line-no">110</span><span id="line-110">import org.apache.hbase.thirdparty.com.google.common.collect.Lists;</span> |
| <span class="source-line-no">111</span><span id="line-111">import org.apache.hbase.thirdparty.com.google.common.collect.Maps;</span> |
| <span class="source-line-no">112</span><span id="line-112">import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;</span> |
| <span class="source-line-no">113</span><span id="line-113">import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;</span> |
| <span class="source-line-no">114</span><span id="line-114"></span> |
| <span class="source-line-no">115</span><span id="line-115">import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;</span> |
| <span class="source-line-no">116</span><span id="line-116">import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;</span> |
| <span class="source-line-no">117</span><span id="line-117"></span> |
| <span class="source-line-no">118</span><span id="line-118">/**</span> |
| <span class="source-line-no">119</span><span id="line-119"> * A Store holds a column family in a Region. Its a memstore and a set of zero or more StoreFiles,</span> |
| <span class="source-line-no">120</span><span id="line-120"> * which stretch backwards over time.</span> |
| <span class="source-line-no">121</span><span id="line-121"> * <p></span> |
| <span class="source-line-no">122</span><span id="line-122"> * There's no reason to consider append-logging at this level; all logging and locking is handled at</span> |
| <span class="source-line-no">123</span><span id="line-123"> * the HRegion level. Store just provides services to manage sets of StoreFiles. One of the most</span> |
| <span class="source-line-no">124</span><span id="line-124"> * important of those services is compaction services where files are aggregated once they pass a</span> |
| <span class="source-line-no">125</span><span id="line-125"> * configurable threshold.</span> |
| <span class="source-line-no">126</span><span id="line-126"> * <p></span> |
| <span class="source-line-no">127</span><span id="line-127"> * Locking and transactions are handled at a higher level. This API should not be called directly</span> |
| <span class="source-line-no">128</span><span id="line-128"> * but by an HRegion manager.</span> |
| <span class="source-line-no">129</span><span id="line-129"> */</span> |
| <span class="source-line-no">130</span><span id="line-130">@InterfaceAudience.Private</span> |
| <span class="source-line-no">131</span><span id="line-131">public class HStore</span> |
| <span class="source-line-no">132</span><span id="line-132"> implements Store, HeapSize, StoreConfigInformation, PropagatingConfigurationObserver {</span> |
| <span class="source-line-no">133</span><span id="line-133"> public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";</span> |
| <span class="source-line-no">134</span><span id="line-134"> public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =</span> |
| <span class="source-line-no">135</span><span id="line-135"> "hbase.server.compactchecker.interval.multiplier";</span> |
| <span class="source-line-no">136</span><span id="line-136"> public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";</span> |
| <span class="source-line-no">137</span><span id="line-137"> public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";</span> |
| <span class="source-line-no">138</span><span id="line-138"> // "NONE" is not a valid storage policy and means we defer the policy to HDFS</span> |
| <span class="source-line-no">139</span><span id="line-139"> public static final String DEFAULT_BLOCK_STORAGE_POLICY = "NONE";</span> |
| <span class="source-line-no">140</span><span id="line-140"> public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;</span> |
| <span class="source-line-no">141</span><span id="line-141"> public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;</span> |
| <span class="source-line-no">142</span><span id="line-142"></span> |
| <span class="source-line-no">143</span><span id="line-143"> // HBASE-24428 : Update compaction priority for recently split daughter regions</span> |
| <span class="source-line-no">144</span><span id="line-144"> // so as to prioritize their compaction.</span> |
| <span class="source-line-no">145</span><span id="line-145"> // Any compaction candidate with higher priority than compaction of newly split daugher regions</span> |
| <span class="source-line-no">146</span><span id="line-146"> // should have priority value < (Integer.MIN_VALUE + 1000)</span> |
| <span class="source-line-no">147</span><span id="line-147"> private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;</span> |
| <span class="source-line-no">148</span><span id="line-148"></span> |
| <span class="source-line-no">149</span><span id="line-149"> private static final Logger LOG = LoggerFactory.getLogger(HStore.class);</span> |
| <span class="source-line-no">150</span><span id="line-150"></span> |
| <span class="source-line-no">151</span><span id="line-151"> protected final MemStore memstore;</span> |
| <span class="source-line-no">152</span><span id="line-152"> // This stores directory in the filesystem.</span> |
| <span class="source-line-no">153</span><span id="line-153"> private final HRegion region;</span> |
| <span class="source-line-no">154</span><span id="line-154"> protected Configuration conf;</span> |
| <span class="source-line-no">155</span><span id="line-155"> private long lastCompactSize = 0;</span> |
| <span class="source-line-no">156</span><span id="line-156"> volatile boolean forceMajor = false;</span> |
| <span class="source-line-no">157</span><span id="line-157"> private AtomicLong storeSize = new AtomicLong();</span> |
| <span class="source-line-no">158</span><span id="line-158"> private AtomicLong totalUncompressedBytes = new AtomicLong();</span> |
| <span class="source-line-no">159</span><span id="line-159"> private LongAdder memstoreOnlyRowReadsCount = new LongAdder();</span> |
| <span class="source-line-no">160</span><span id="line-160"> // rows that has cells from both memstore and files (or only files)</span> |
| <span class="source-line-no">161</span><span id="line-161"> private LongAdder mixedRowReadsCount = new LongAdder();</span> |
| <span class="source-line-no">162</span><span id="line-162"></span> |
| <span class="source-line-no">163</span><span id="line-163"> /**</span> |
| <span class="source-line-no">164</span><span id="line-164"> * Lock specific to archiving compacted store files. This avoids races around the combination of</span> |
| <span class="source-line-no">165</span><span id="line-165"> * retrieving the list of compacted files and moving them to the archive directory. Since this is</span> |
| <span class="source-line-no">166</span><span id="line-166"> * usually a background process (other than on close), we don't want to handle this with the store</span> |
| <span class="source-line-no">167</span><span id="line-167"> * write lock, which would block readers and degrade performance. Locked by: -</span> |
| <span class="source-line-no">168</span><span id="line-168"> * CompactedHFilesDispatchHandler via closeAndArchiveCompactedFiles() - close()</span> |
| <span class="source-line-no">169</span><span id="line-169"> */</span> |
| <span class="source-line-no">170</span><span id="line-170"> final ReentrantLock archiveLock = new ReentrantLock();</span> |
| <span class="source-line-no">171</span><span id="line-171"></span> |
| <span class="source-line-no">172</span><span id="line-172"> private final boolean verifyBulkLoads;</span> |
| <span class="source-line-no">173</span><span id="line-173"></span> |
| <span class="source-line-no">174</span><span id="line-174"> /**</span> |
| <span class="source-line-no">175</span><span id="line-175"> * Use this counter to track concurrent puts. If TRACE-log is enabled, if we are over the</span> |
| <span class="source-line-no">176</span><span id="line-176"> * threshold set by hbase.region.store.parallel.put.print.threshold (Default is 50) we will log a</span> |
| <span class="source-line-no">177</span><span id="line-177"> * message that identifies the Store experience this high-level of concurrency.</span> |
| <span class="source-line-no">178</span><span id="line-178"> */</span> |
| <span class="source-line-no">179</span><span id="line-179"> private final AtomicInteger currentParallelPutCount = new AtomicInteger(0);</span> |
| <span class="source-line-no">180</span><span id="line-180"> private final int parallelPutCountPrintThreshold;</span> |
| <span class="source-line-no">181</span><span id="line-181"></span> |
| <span class="source-line-no">182</span><span id="line-182"> private ScanInfo scanInfo;</span> |
| <span class="source-line-no">183</span><span id="line-183"></span> |
| <span class="source-line-no">184</span><span id="line-184"> // All access must be synchronized.</span> |
| <span class="source-line-no">185</span><span id="line-185"> // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it.</span> |
| <span class="source-line-no">186</span><span id="line-186"> private final List<HStoreFile> filesCompacting = Lists.newArrayList();</span> |
| <span class="source-line-no">187</span><span id="line-187"></span> |
| <span class="source-line-no">188</span><span id="line-188"> // All access must be synchronized.</span> |
| <span class="source-line-no">189</span><span id="line-189"> private final Set<ChangedReadersObserver> changedReaderObservers =</span> |
| <span class="source-line-no">190</span><span id="line-190"> Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());</span> |
| <span class="source-line-no">191</span><span id="line-191"></span> |
| <span class="source-line-no">192</span><span id="line-192"> private HFileDataBlockEncoder dataBlockEncoder;</span> |
| <span class="source-line-no">193</span><span id="line-193"></span> |
| <span class="source-line-no">194</span><span id="line-194"> final StoreEngine<?, ?, ?, ?> storeEngine;</span> |
| <span class="source-line-no">195</span><span id="line-195"></span> |
| <span class="source-line-no">196</span><span id="line-196"> private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();</span> |
| <span class="source-line-no">197</span><span id="line-197"> private volatile OffPeakHours offPeakHours;</span> |
| <span class="source-line-no">198</span><span id="line-198"></span> |
| <span class="source-line-no">199</span><span id="line-199"> private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;</span> |
| <span class="source-line-no">200</span><span id="line-200"> private int flushRetriesNumber;</span> |
| <span class="source-line-no">201</span><span id="line-201"> private int pauseTime;</span> |
| <span class="source-line-no">202</span><span id="line-202"></span> |
| <span class="source-line-no">203</span><span id="line-203"> private long blockingFileCount;</span> |
| <span class="source-line-no">204</span><span id="line-204"> private int compactionCheckMultiplier;</span> |
| <span class="source-line-no">205</span><span id="line-205"></span> |
| <span class="source-line-no">206</span><span id="line-206"> private AtomicLong flushedCellsCount = new AtomicLong();</span> |
| <span class="source-line-no">207</span><span id="line-207"> private AtomicLong compactedCellsCount = new AtomicLong();</span> |
| <span class="source-line-no">208</span><span id="line-208"> private AtomicLong majorCompactedCellsCount = new AtomicLong();</span> |
| <span class="source-line-no">209</span><span id="line-209"> private AtomicLong flushedCellsSize = new AtomicLong();</span> |
| <span class="source-line-no">210</span><span id="line-210"> private AtomicLong flushedOutputFileSize = new AtomicLong();</span> |
| <span class="source-line-no">211</span><span id="line-211"> private AtomicLong compactedCellsSize = new AtomicLong();</span> |
| <span class="source-line-no">212</span><span id="line-212"> private AtomicLong majorCompactedCellsSize = new AtomicLong();</span> |
| <span class="source-line-no">213</span><span id="line-213"></span> |
| <span class="source-line-no">214</span><span id="line-214"> private final StoreContext storeContext;</span> |
| <span class="source-line-no">215</span><span id="line-215"></span> |
| <span class="source-line-no">216</span><span id="line-216"> // Used to track the store files which are currently being written. For compaction, if we want to</span> |
| <span class="source-line-no">217</span><span id="line-217"> // compact store file [a, b, c] to [d], then here we will record 'd'. And we will also use it to</span> |
| <span class="source-line-no">218</span><span id="line-218"> // track the store files being written when flushing.</span> |
| <span class="source-line-no">219</span><span id="line-219"> // Notice that the creation is in the background compaction or flush thread and we will get the</span> |
| <span class="source-line-no">220</span><span id="line-220"> // files in other thread, so it needs to be thread safe.</span> |
| <span class="source-line-no">221</span><span id="line-221"> private static final class StoreFileWriterCreationTracker implements Consumer<Path> {</span> |
| <span class="source-line-no">222</span><span id="line-222"></span> |
| <span class="source-line-no">223</span><span id="line-223"> private final Set<Path> files = Collections.newSetFromMap(new ConcurrentHashMap<>());</span> |
| <span class="source-line-no">224</span><span id="line-224"></span> |
| <span class="source-line-no">225</span><span id="line-225"> @Override</span> |
| <span class="source-line-no">226</span><span id="line-226"> public void accept(Path t) {</span> |
| <span class="source-line-no">227</span><span id="line-227"> files.add(t);</span> |
| <span class="source-line-no">228</span><span id="line-228"> }</span> |
| <span class="source-line-no">229</span><span id="line-229"></span> |
| <span class="source-line-no">230</span><span id="line-230"> public Set<Path> get() {</span> |
| <span class="source-line-no">231</span><span id="line-231"> return Collections.unmodifiableSet(files);</span> |
| <span class="source-line-no">232</span><span id="line-232"> }</span> |
| <span class="source-line-no">233</span><span id="line-233"> }</span> |
| <span class="source-line-no">234</span><span id="line-234"></span> |
| <span class="source-line-no">235</span><span id="line-235"> // We may have multiple compaction running at the same time, and flush can also happen at the same</span> |
| <span class="source-line-no">236</span><span id="line-236"> // time, so here we need to use a collection, and the collection needs to be thread safe.</span> |
| <span class="source-line-no">237</span><span id="line-237"> // The implementation of StoreFileWriterCreationTracker is very simple and we will not likely to</span> |
| <span class="source-line-no">238</span><span id="line-238"> // implement hashCode or equals for it, so here we just use ConcurrentHashMap. Changed to</span> |
| <span class="source-line-no">239</span><span id="line-239"> // IdentityHashMap if later we want to implement hashCode or equals.</span> |
| <span class="source-line-no">240</span><span id="line-240"> private final Set<StoreFileWriterCreationTracker> storeFileWriterCreationTrackers =</span> |
| <span class="source-line-no">241</span><span id="line-241"> Collections.newSetFromMap(new ConcurrentHashMap<>());</span> |
| <span class="source-line-no">242</span><span id="line-242"></span> |
| <span class="source-line-no">243</span><span id="line-243"> // For the SFT implementation which we will write tmp store file first, we do not need to clean up</span> |
| <span class="source-line-no">244</span><span id="line-244"> // the broken store files under the data directory, which means we do not need to track the store</span> |
| <span class="source-line-no">245</span><span id="line-245"> // file writer creation. So here we abstract a factory to return different trackers for different</span> |
| <span class="source-line-no">246</span><span id="line-246"> // SFT implementations.</span> |
| <span class="source-line-no">247</span><span id="line-247"> private final Supplier<StoreFileWriterCreationTracker> storeFileWriterCreationTrackerFactory;</span> |
| <span class="source-line-no">248</span><span id="line-248"></span> |
| <span class="source-line-no">249</span><span id="line-249"> private final boolean warmup;</span> |
| <span class="source-line-no">250</span><span id="line-250"></span> |
| <span class="source-line-no">251</span><span id="line-251"> /**</span> |
| <span class="source-line-no">252</span><span id="line-252"> * Constructor</span> |
| <span class="source-line-no">253</span><span id="line-253"> * @param family HColumnDescriptor for this column</span> |
| <span class="source-line-no">254</span><span id="line-254"> * @param confParam configuration object failed. Can be null.</span> |
| <span class="source-line-no">255</span><span id="line-255"> */</span> |
| <span class="source-line-no">256</span><span id="line-256"> protected HStore(final HRegion region, final ColumnFamilyDescriptor family,</span> |
| <span class="source-line-no">257</span><span id="line-257"> final Configuration confParam, boolean warmup) throws IOException {</span> |
| <span class="source-line-no">258</span><span id="line-258"> this.conf = StoreUtils.createStoreConfiguration(confParam, region.getTableDescriptor(), family);</span> |
| <span class="source-line-no">259</span><span id="line-259"></span> |
| <span class="source-line-no">260</span><span id="line-260"> this.region = region;</span> |
| <span class="source-line-no">261</span><span id="line-261"> this.storeContext = initializeStoreContext(family);</span> |
| <span class="source-line-no">262</span><span id="line-262"></span> |
| <span class="source-line-no">263</span><span id="line-263"> // Assemble the store's home directory and Ensure it exists.</span> |
| <span class="source-line-no">264</span><span id="line-264"> region.getRegionFileSystem().createStoreDir(family.getNameAsString());</span> |
| <span class="source-line-no">265</span><span id="line-265"></span> |
| <span class="source-line-no">266</span><span id="line-266"> // set block storage policy for store directory</span> |
| <span class="source-line-no">267</span><span id="line-267"> String policyName = family.getStoragePolicy();</span> |
| <span class="source-line-no">268</span><span id="line-268"> if (null == policyName) {</span> |
| <span class="source-line-no">269</span><span id="line-269"> policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);</span> |
| <span class="source-line-no">270</span><span id="line-270"> }</span> |
| <span class="source-line-no">271</span><span id="line-271"> region.getRegionFileSystem().setStoragePolicy(family.getNameAsString(), policyName.trim());</span> |
| <span class="source-line-no">272</span><span id="line-272"></span> |
| <span class="source-line-no">273</span><span id="line-273"> this.dataBlockEncoder = new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());</span> |
| <span class="source-line-no">274</span><span id="line-274"></span> |
| <span class="source-line-no">275</span><span id="line-275"> // used by ScanQueryMatcher</span> |
| <span class="source-line-no">276</span><span id="line-276"> long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);</span> |
| <span class="source-line-no">277</span><span id="line-277"> LOG.trace("Time to purge deletes set to {}ms in {}", timeToPurgeDeletes, this);</span> |
| <span class="source-line-no">278</span><span id="line-278"> // Get TTL</span> |
| <span class="source-line-no">279</span><span id="line-279"> long ttl = determineTTLFromFamily(family);</span> |
| <span class="source-line-no">280</span><span id="line-280"> // Why not just pass a HColumnDescriptor in here altogether? Even if have</span> |
| <span class="source-line-no">281</span><span id="line-281"> // to clone it?</span> |
| <span class="source-line-no">282</span><span id="line-282"> scanInfo =</span> |
| <span class="source-line-no">283</span><span id="line-283"> new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.storeContext.getComparator());</span> |
| <span class="source-line-no">284</span><span id="line-284"> this.memstore = getMemstore();</span> |
| <span class="source-line-no">285</span><span id="line-285"></span> |
| <span class="source-line-no">286</span><span id="line-286"> this.offPeakHours = OffPeakHours.getInstance(conf);</span> |
| <span class="source-line-no">287</span><span id="line-287"></span> |
| <span class="source-line-no">288</span><span id="line-288"> this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);</span> |
| <span class="source-line-no">289</span><span id="line-289"></span> |
| <span class="source-line-no">290</span><span id="line-290"> this.blockingFileCount = conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);</span> |
| <span class="source-line-no">291</span><span id="line-291"> this.compactionCheckMultiplier = conf.getInt(COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY,</span> |
| <span class="source-line-no">292</span><span id="line-292"> DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);</span> |
| <span class="source-line-no">293</span><span id="line-293"> if (this.compactionCheckMultiplier <= 0) {</span> |
| <span class="source-line-no">294</span><span id="line-294"> LOG.error("Compaction check period multiplier must be positive, setting default: {}",</span> |
| <span class="source-line-no">295</span><span id="line-295"> DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);</span> |
| <span class="source-line-no">296</span><span id="line-296"> this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;</span> |
| <span class="source-line-no">297</span><span id="line-297"> }</span> |
| <span class="source-line-no">298</span><span id="line-298"></span> |
| <span class="source-line-no">299</span><span id="line-299"> this.warmup = warmup;</span> |
| <span class="source-line-no">300</span><span id="line-300"> this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator());</span> |
| <span class="source-line-no">301</span><span id="line-301"> storeEngine.initialize(warmup);</span> |
| <span class="source-line-no">302</span><span id="line-302"> // if require writing to tmp dir first, then we just return null, which indicate that we do not</span> |
| <span class="source-line-no">303</span><span id="line-303"> // need to track the creation of store file writer, otherwise we return a new</span> |
| <span class="source-line-no">304</span><span id="line-304"> // StoreFileWriterCreationTracker.</span> |
| <span class="source-line-no">305</span><span id="line-305"> this.storeFileWriterCreationTrackerFactory = storeEngine.requireWritingToTmpDirFirst()</span> |
| <span class="source-line-no">306</span><span id="line-306"> ? () -> null</span> |
| <span class="source-line-no">307</span><span id="line-307"> : () -> new StoreFileWriterCreationTracker();</span> |
| <span class="source-line-no">308</span><span id="line-308"> refreshStoreSizeAndTotalBytes();</span> |
| <span class="source-line-no">309</span><span id="line-309"></span> |
| <span class="source-line-no">310</span><span id="line-310"> flushRetriesNumber =</span> |
| <span class="source-line-no">311</span><span id="line-311"> conf.getInt("hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);</span> |
| <span class="source-line-no">312</span><span id="line-312"> pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);</span> |
| <span class="source-line-no">313</span><span id="line-313"> if (flushRetriesNumber <= 0) {</span> |
| <span class="source-line-no">314</span><span id="line-314"> throw new IllegalArgumentException(</span> |
| <span class="source-line-no">315</span><span id="line-315"> "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber);</span> |
| <span class="source-line-no">316</span><span id="line-316"> }</span> |
| <span class="source-line-no">317</span><span id="line-317"></span> |
| <span class="source-line-no">318</span><span id="line-318"> int confPrintThreshold =</span> |
| <span class="source-line-no">319</span><span id="line-319"> this.conf.getInt("hbase.region.store.parallel.put.print.threshold", 50);</span> |
| <span class="source-line-no">320</span><span id="line-320"> if (confPrintThreshold < 10) {</span> |
| <span class="source-line-no">321</span><span id="line-321"> confPrintThreshold = 10;</span> |
| <span class="source-line-no">322</span><span id="line-322"> }</span> |
| <span class="source-line-no">323</span><span id="line-323"> this.parallelPutCountPrintThreshold = confPrintThreshold;</span> |
| <span class="source-line-no">324</span><span id="line-324"></span> |
| <span class="source-line-no">325</span><span id="line-325"> LOG.info(</span> |
| <span class="source-line-no">326</span><span id="line-326"> "Store={}, memstore type={}, storagePolicy={}, verifyBulkLoads={}, "</span> |
| <span class="source-line-no">327</span><span id="line-327"> + "parallelPutCountPrintThreshold={}, encoding={}, compression={}",</span> |
| <span class="source-line-no">328</span><span id="line-328"> this, memstore.getClass().getSimpleName(), policyName, verifyBulkLoads,</span> |
| <span class="source-line-no">329</span><span id="line-329"> parallelPutCountPrintThreshold, family.getDataBlockEncoding(), family.getCompressionType());</span> |
| <span class="source-line-no">330</span><span id="line-330"> }</span> |
| <span class="source-line-no">331</span><span id="line-331"></span> |
| <span class="source-line-no">332</span><span id="line-332"> private StoreContext initializeStoreContext(ColumnFamilyDescriptor family) throws IOException {</span> |
| <span class="source-line-no">333</span><span id="line-333"> return new StoreContext.Builder().withBlockSize(family.getBlocksize())</span> |
| <span class="source-line-no">334</span><span id="line-334"> .withEncryptionContext(EncryptionUtil.createEncryptionContext(conf, family))</span> |
| <span class="source-line-no">335</span><span id="line-335"> .withBloomType(family.getBloomFilterType()).withCacheConfig(createCacheConf(family))</span> |
| <span class="source-line-no">336</span><span id="line-336"> .withCellComparator(region.getTableDescriptor().isMetaTable() || conf</span> |
| <span class="source-line-no">337</span><span id="line-337"> .getBoolean(HRegion.USE_META_CELL_COMPARATOR, HRegion.DEFAULT_USE_META_CELL_COMPARATOR)</span> |
| <span class="source-line-no">338</span><span id="line-338"> ? MetaCellComparator.META_COMPARATOR</span> |
| <span class="source-line-no">339</span><span id="line-339"> : InnerStoreCellComparator.INNER_STORE_COMPARATOR)</span> |
| <span class="source-line-no">340</span><span id="line-340"> .withColumnFamilyDescriptor(family).withCompactedFilesSupplier(this::getCompactedFiles)</span> |
| <span class="source-line-no">341</span><span id="line-341"> .withRegionFileSystem(region.getRegionFileSystem())</span> |
| <span class="source-line-no">342</span><span id="line-342"> .withFavoredNodesSupplier(this::getFavoredNodes)</span> |
| <span class="source-line-no">343</span><span id="line-343"> .withFamilyStoreDirectoryPath(</span> |
| <span class="source-line-no">344</span><span id="line-344"> region.getRegionFileSystem().getStoreDir(family.getNameAsString()))</span> |
| <span class="source-line-no">345</span><span id="line-345"> .withRegionCoprocessorHost(region.getCoprocessorHost()).build();</span> |
| <span class="source-line-no">346</span><span id="line-346"> }</span> |
| <span class="source-line-no">347</span><span id="line-347"></span> |
| <span class="source-line-no">348</span><span id="line-348"> private InetSocketAddress[] getFavoredNodes() {</span> |
| <span class="source-line-no">349</span><span id="line-349"> InetSocketAddress[] favoredNodes = null;</span> |
| <span class="source-line-no">350</span><span id="line-350"> if (region.getRegionServerServices() != null) {</span> |
| <span class="source-line-no">351</span><span id="line-351"> favoredNodes = region.getRegionServerServices()</span> |
| <span class="source-line-no">352</span><span id="line-352"> .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());</span> |
| <span class="source-line-no">353</span><span id="line-353"> }</span> |
| <span class="source-line-no">354</span><span id="line-354"> return favoredNodes;</span> |
| <span class="source-line-no">355</span><span id="line-355"> }</span> |
| <span class="source-line-no">356</span><span id="line-356"></span> |
| <span class="source-line-no">357</span><span id="line-357"> /** Returns MemStore Instance to use in this store. */</span> |
| <span class="source-line-no">358</span><span id="line-358"> private MemStore getMemstore() {</span> |
| <span class="source-line-no">359</span><span id="line-359"> MemStore ms = null;</span> |
| <span class="source-line-no">360</span><span id="line-360"> // Check if in-memory-compaction configured. Note MemoryCompactionPolicy is an enum!</span> |
| <span class="source-line-no">361</span><span id="line-361"> MemoryCompactionPolicy inMemoryCompaction = null;</span> |
| <span class="source-line-no">362</span><span id="line-362"> if (this.getTableName().isSystemTable()) {</span> |
| <span class="source-line-no">363</span><span id="line-363"> inMemoryCompaction = MemoryCompactionPolicy</span> |
| <span class="source-line-no">364</span><span id="line-364"> .valueOf(conf.get("hbase.systemtables.compacting.memstore.type", "NONE"));</span> |
| <span class="source-line-no">365</span><span id="line-365"> } else {</span> |
| <span class="source-line-no">366</span><span id="line-366"> inMemoryCompaction = getColumnFamilyDescriptor().getInMemoryCompaction();</span> |
| <span class="source-line-no">367</span><span id="line-367"> }</span> |
| <span class="source-line-no">368</span><span id="line-368"> if (inMemoryCompaction == null) {</span> |
| <span class="source-line-no">369</span><span id="line-369"> inMemoryCompaction =</span> |
| <span class="source-line-no">370</span><span id="line-370"> MemoryCompactionPolicy.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,</span> |
| <span class="source-line-no">371</span><span id="line-371"> CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT).toUpperCase());</span> |
| <span class="source-line-no">372</span><span id="line-372"> }</span> |
| <span class="source-line-no">373</span><span id="line-373"></span> |
| <span class="source-line-no">374</span><span id="line-374"> switch (inMemoryCompaction) {</span> |
| <span class="source-line-no">375</span><span id="line-375"> case NONE:</span> |
| <span class="source-line-no">376</span><span id="line-376"> Class<? extends MemStore> memStoreClass =</span> |
| <span class="source-line-no">377</span><span id="line-377"> conf.getClass(MEMSTORE_CLASS_NAME, DefaultMemStore.class, MemStore.class);</span> |
| <span class="source-line-no">378</span><span id="line-378"> ms = ReflectionUtils.newInstance(memStoreClass,</span> |
| <span class="source-line-no">379</span><span id="line-379"> new Object[] { conf, getComparator(), this.getHRegion().getRegionServicesForStores() });</span> |
| <span class="source-line-no">380</span><span id="line-380"> break;</span> |
| <span class="source-line-no">381</span><span id="line-381"> default:</span> |
| <span class="source-line-no">382</span><span id="line-382"> Class<? extends CompactingMemStore> compactingMemStoreClass =</span> |
| <span class="source-line-no">383</span><span id="line-383"> conf.getClass(MEMSTORE_CLASS_NAME, CompactingMemStore.class, CompactingMemStore.class);</span> |
| <span class="source-line-no">384</span><span id="line-384"> ms =</span> |
| <span class="source-line-no">385</span><span id="line-385"> ReflectionUtils.newInstance(compactingMemStoreClass, new Object[] { conf, getComparator(),</span> |
| <span class="source-line-no">386</span><span id="line-386"> this, this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });</span> |
| <span class="source-line-no">387</span><span id="line-387"> }</span> |
| <span class="source-line-no">388</span><span id="line-388"> return ms;</span> |
| <span class="source-line-no">389</span><span id="line-389"> }</span> |
| <span class="source-line-no">390</span><span id="line-390"></span> |
| <span class="source-line-no">391</span><span id="line-391"> /**</span> |
| <span class="source-line-no">392</span><span id="line-392"> * Creates the cache config.</span> |
| <span class="source-line-no">393</span><span id="line-393"> * @param family The current column family.</span> |
| <span class="source-line-no">394</span><span id="line-394"> */</span> |
| <span class="source-line-no">395</span><span id="line-395"> protected CacheConfig createCacheConf(final ColumnFamilyDescriptor family) {</span> |
| <span class="source-line-no">396</span><span id="line-396"> CacheConfig cacheConf = new CacheConfig(conf, family, region.getBlockCache(),</span> |
| <span class="source-line-no">397</span><span id="line-397"> region.getRegionServicesForStores().getByteBuffAllocator());</span> |
| <span class="source-line-no">398</span><span id="line-398"> LOG.info("Created cacheConfig: {}, for column family {} of region {} ", cacheConf,</span> |
| <span class="source-line-no">399</span><span id="line-399"> family.getNameAsString(), region.getRegionInfo().getEncodedName());</span> |
| <span class="source-line-no">400</span><span id="line-400"> return cacheConf;</span> |
| <span class="source-line-no">401</span><span id="line-401"> }</span> |
| <span class="source-line-no">402</span><span id="line-402"></span> |
| <span class="source-line-no">403</span><span id="line-403"> /**</span> |
| <span class="source-line-no">404</span><span id="line-404"> * Creates the store engine configured for the given Store.</span> |
| <span class="source-line-no">405</span><span id="line-405"> * @param store The store. An unfortunate dependency needed due to it being passed to</span> |
| <span class="source-line-no">406</span><span id="line-406"> * coprocessors via the compactor.</span> |
| <span class="source-line-no">407</span><span id="line-407"> * @param conf Store configuration.</span> |
| <span class="source-line-no">408</span><span id="line-408"> * @param kvComparator KVComparator for storeFileManager.</span> |
| <span class="source-line-no">409</span><span id="line-409"> * @return StoreEngine to use.</span> |
| <span class="source-line-no">410</span><span id="line-410"> */</span> |
| <span class="source-line-no">411</span><span id="line-411"> protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf,</span> |
| <span class="source-line-no">412</span><span id="line-412"> CellComparator kvComparator) throws IOException {</span> |
| <span class="source-line-no">413</span><span id="line-413"> return StoreEngine.create(store, conf, kvComparator);</span> |
| <span class="source-line-no">414</span><span id="line-414"> }</span> |
| <span class="source-line-no">415</span><span id="line-415"></span> |
| <span class="source-line-no">416</span><span id="line-416"> /** Returns TTL in seconds of the specified family */</span> |
| <span class="source-line-no">417</span><span id="line-417"> public static long determineTTLFromFamily(final ColumnFamilyDescriptor family) {</span> |
| <span class="source-line-no">418</span><span id="line-418"> // HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.</span> |
| <span class="source-line-no">419</span><span id="line-419"> long ttl = family.getTimeToLive();</span> |
| <span class="source-line-no">420</span><span id="line-420"> if (ttl == HConstants.FOREVER) {</span> |
| <span class="source-line-no">421</span><span id="line-421"> // Default is unlimited ttl.</span> |
| <span class="source-line-no">422</span><span id="line-422"> ttl = Long.MAX_VALUE;</span> |
| <span class="source-line-no">423</span><span id="line-423"> } else if (ttl == -1) {</span> |
| <span class="source-line-no">424</span><span id="line-424"> ttl = Long.MAX_VALUE;</span> |
| <span class="source-line-no">425</span><span id="line-425"> } else {</span> |
| <span class="source-line-no">426</span><span id="line-426"> // Second -> ms adjust for user data</span> |
| <span class="source-line-no">427</span><span id="line-427"> ttl *= 1000;</span> |
| <span class="source-line-no">428</span><span id="line-428"> }</span> |
| <span class="source-line-no">429</span><span id="line-429"> return ttl;</span> |
| <span class="source-line-no">430</span><span id="line-430"> }</span> |
| <span class="source-line-no">431</span><span id="line-431"></span> |
| <span class="source-line-no">432</span><span id="line-432"> StoreContext getStoreContext() {</span> |
| <span class="source-line-no">433</span><span id="line-433"> return storeContext;</span> |
| <span class="source-line-no">434</span><span id="line-434"> }</span> |
| <span class="source-line-no">435</span><span id="line-435"></span> |
| <span class="source-line-no">436</span><span id="line-436"> @Override</span> |
| <span class="source-line-no">437</span><span id="line-437"> public String getColumnFamilyName() {</span> |
| <span class="source-line-no">438</span><span id="line-438"> return this.storeContext.getFamily().getNameAsString();</span> |
| <span class="source-line-no">439</span><span id="line-439"> }</span> |
| <span class="source-line-no">440</span><span id="line-440"></span> |
| <span class="source-line-no">441</span><span id="line-441"> @Override</span> |
| <span class="source-line-no">442</span><span id="line-442"> public TableName getTableName() {</span> |
| <span class="source-line-no">443</span><span id="line-443"> return this.getRegionInfo().getTable();</span> |
| <span class="source-line-no">444</span><span id="line-444"> }</span> |
| <span class="source-line-no">445</span><span id="line-445"></span> |
| <span class="source-line-no">446</span><span id="line-446"> @Override</span> |
| <span class="source-line-no">447</span><span id="line-447"> public FileSystem getFileSystem() {</span> |
| <span class="source-line-no">448</span><span id="line-448"> return storeContext.getRegionFileSystem().getFileSystem();</span> |
| <span class="source-line-no">449</span><span id="line-449"> }</span> |
| <span class="source-line-no">450</span><span id="line-450"></span> |
| <span class="source-line-no">451</span><span id="line-451"> public HRegionFileSystem getRegionFileSystem() {</span> |
| <span class="source-line-no">452</span><span id="line-452"> return storeContext.getRegionFileSystem();</span> |
| <span class="source-line-no">453</span><span id="line-453"> }</span> |
| <span class="source-line-no">454</span><span id="line-454"></span> |
| <span class="source-line-no">455</span><span id="line-455"> /* Implementation of StoreConfigInformation */</span> |
| <span class="source-line-no">456</span><span id="line-456"> @Override</span> |
| <span class="source-line-no">457</span><span id="line-457"> public long getStoreFileTtl() {</span> |
| <span class="source-line-no">458</span><span id="line-458"> // TTL only applies if there's no MIN_VERSIONs setting on the column.</span> |
| <span class="source-line-no">459</span><span id="line-459"> return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;</span> |
| <span class="source-line-no">460</span><span id="line-460"> }</span> |
| <span class="source-line-no">461</span><span id="line-461"></span> |
| <span class="source-line-no">462</span><span id="line-462"> @Override</span> |
| <span class="source-line-no">463</span><span id="line-463"> public long getMemStoreFlushSize() {</span> |
| <span class="source-line-no">464</span><span id="line-464"> // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack</span> |
| <span class="source-line-no">465</span><span id="line-465"> return this.region.memstoreFlushSize;</span> |
| <span class="source-line-no">466</span><span id="line-466"> }</span> |
| <span class="source-line-no">467</span><span id="line-467"></span> |
| <span class="source-line-no">468</span><span id="line-468"> @Override</span> |
| <span class="source-line-no">469</span><span id="line-469"> public MemStoreSize getFlushableSize() {</span> |
| <span class="source-line-no">470</span><span id="line-470"> return this.memstore.getFlushableSize();</span> |
| <span class="source-line-no">471</span><span id="line-471"> }</span> |
| <span class="source-line-no">472</span><span id="line-472"></span> |
| <span class="source-line-no">473</span><span id="line-473"> @Override</span> |
| <span class="source-line-no">474</span><span id="line-474"> public MemStoreSize getSnapshotSize() {</span> |
| <span class="source-line-no">475</span><span id="line-475"> return this.memstore.getSnapshotSize();</span> |
| <span class="source-line-no">476</span><span id="line-476"> }</span> |
| <span class="source-line-no">477</span><span id="line-477"></span> |
| <span class="source-line-no">478</span><span id="line-478"> @Override</span> |
| <span class="source-line-no">479</span><span id="line-479"> public long getCompactionCheckMultiplier() {</span> |
| <span class="source-line-no">480</span><span id="line-480"> return this.compactionCheckMultiplier;</span> |
| <span class="source-line-no">481</span><span id="line-481"> }</span> |
| <span class="source-line-no">482</span><span id="line-482"></span> |
| <span class="source-line-no">483</span><span id="line-483"> @Override</span> |
| <span class="source-line-no">484</span><span id="line-484"> public long getBlockingFileCount() {</span> |
| <span class="source-line-no">485</span><span id="line-485"> return blockingFileCount;</span> |
| <span class="source-line-no">486</span><span id="line-486"> }</span> |
| <span class="source-line-no">487</span><span id="line-487"> /* End implementation of StoreConfigInformation */</span> |
| <span class="source-line-no">488</span><span id="line-488"></span> |
| <span class="source-line-no">489</span><span id="line-489"> @Override</span> |
| <span class="source-line-no">490</span><span id="line-490"> public ColumnFamilyDescriptor getColumnFamilyDescriptor() {</span> |
| <span class="source-line-no">491</span><span id="line-491"> return this.storeContext.getFamily();</span> |
| <span class="source-line-no">492</span><span id="line-492"> }</span> |
| <span class="source-line-no">493</span><span id="line-493"></span> |
| <span class="source-line-no">494</span><span id="line-494"> @Override</span> |
| <span class="source-line-no">495</span><span id="line-495"> public OptionalLong getMaxSequenceId() {</span> |
| <span class="source-line-no">496</span><span id="line-496"> return StoreUtils.getMaxSequenceIdInList(this.getStorefiles());</span> |
| <span class="source-line-no">497</span><span id="line-497"> }</span> |
| <span class="source-line-no">498</span><span id="line-498"></span> |
| <span class="source-line-no">499</span><span id="line-499"> @Override</span> |
| <span class="source-line-no">500</span><span id="line-500"> public OptionalLong getMaxMemStoreTS() {</span> |
| <span class="source-line-no">501</span><span id="line-501"> return StoreUtils.getMaxMemStoreTSInList(this.getStorefiles());</span> |
| <span class="source-line-no">502</span><span id="line-502"> }</span> |
| <span class="source-line-no">503</span><span id="line-503"></span> |
| <span class="source-line-no">504</span><span id="line-504"> /** Returns the data block encoder */</span> |
| <span class="source-line-no">505</span><span id="line-505"> public HFileDataBlockEncoder getDataBlockEncoder() {</span> |
| <span class="source-line-no">506</span><span id="line-506"> return dataBlockEncoder;</span> |
| <span class="source-line-no">507</span><span id="line-507"> }</span> |
| <span class="source-line-no">508</span><span id="line-508"></span> |
| <span class="source-line-no">509</span><span id="line-509"> /**</span> |
| <span class="source-line-no">510</span><span id="line-510"> * Should be used only in tests.</span> |
| <span class="source-line-no">511</span><span id="line-511"> * @param blockEncoder the block delta encoder to use</span> |
| <span class="source-line-no">512</span><span id="line-512"> */</span> |
| <span class="source-line-no">513</span><span id="line-513"> void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {</span> |
| <span class="source-line-no">514</span><span id="line-514"> this.dataBlockEncoder = blockEncoder;</span> |
| <span class="source-line-no">515</span><span id="line-515"> }</span> |
| <span class="source-line-no">516</span><span id="line-516"></span> |
| <span class="source-line-no">517</span><span id="line-517"> private void postRefreshStoreFiles() throws IOException {</span> |
| <span class="source-line-no">518</span><span id="line-518"> // Advance the memstore read point to be at least the new store files seqIds so that</span> |
| <span class="source-line-no">519</span><span id="line-519"> // readers might pick it up. This assumes that the store is not getting any writes (otherwise</span> |
| <span class="source-line-no">520</span><span id="line-520"> // in-flight transactions might be made visible)</span> |
| <span class="source-line-no">521</span><span id="line-521"> getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);</span> |
| <span class="source-line-no">522</span><span id="line-522"> refreshStoreSizeAndTotalBytes();</span> |
| <span class="source-line-no">523</span><span id="line-523"> }</span> |
| <span class="source-line-no">524</span><span id="line-524"></span> |
| <span class="source-line-no">525</span><span id="line-525"> @Override</span> |
| <span class="source-line-no">526</span><span id="line-526"> public void refreshStoreFiles() throws IOException {</span> |
| <span class="source-line-no">527</span><span id="line-527"> storeEngine.refreshStoreFiles();</span> |
| <span class="source-line-no">528</span><span id="line-528"> postRefreshStoreFiles();</span> |
| <span class="source-line-no">529</span><span id="line-529"> }</span> |
| <span class="source-line-no">530</span><span id="line-530"></span> |
| <span class="source-line-no">531</span><span id="line-531"> /**</span> |
| <span class="source-line-no">532</span><span id="line-532"> * Replaces the store files that the store has with the given files. Mainly used by secondary</span> |
| <span class="source-line-no">533</span><span id="line-533"> * region replicas to keep up to date with the primary region files.</span> |
| <span class="source-line-no">534</span><span id="line-534"> */</span> |
| <span class="source-line-no">535</span><span id="line-535"> public void refreshStoreFiles(Collection<String> newFiles) throws IOException {</span> |
| <span class="source-line-no">536</span><span id="line-536"> storeEngine.refreshStoreFiles(newFiles);</span> |
| <span class="source-line-no">537</span><span id="line-537"> postRefreshStoreFiles();</span> |
| <span class="source-line-no">538</span><span id="line-538"> }</span> |
| <span class="source-line-no">539</span><span id="line-539"></span> |
| <span class="source-line-no">540</span><span id="line-540"> /**</span> |
| <span class="source-line-no">541</span><span id="line-541"> * This message intends to inform the MemStore that next coming updates are going to be part of</span> |
| <span class="source-line-no">542</span><span id="line-542"> * the replaying edits from WAL</span> |
| <span class="source-line-no">543</span><span id="line-543"> */</span> |
| <span class="source-line-no">544</span><span id="line-544"> public void startReplayingFromWAL() {</span> |
| <span class="source-line-no">545</span><span id="line-545"> this.memstore.startReplayingFromWAL();</span> |
| <span class="source-line-no">546</span><span id="line-546"> }</span> |
| <span class="source-line-no">547</span><span id="line-547"></span> |
| <span class="source-line-no">548</span><span id="line-548"> /**</span> |
| <span class="source-line-no">549</span><span id="line-549"> * This message intends to inform the MemStore that the replaying edits from WAL are done</span> |
| <span class="source-line-no">550</span><span id="line-550"> */</span> |
| <span class="source-line-no">551</span><span id="line-551"> public void stopReplayingFromWAL() {</span> |
| <span class="source-line-no">552</span><span id="line-552"> this.memstore.stopReplayingFromWAL();</span> |
| <span class="source-line-no">553</span><span id="line-553"> }</span> |
| <span class="source-line-no">554</span><span id="line-554"></span> |
| <span class="source-line-no">555</span><span id="line-555"> /**</span> |
| <span class="source-line-no">556</span><span id="line-556"> * Adds a value to the memstore</span> |
| <span class="source-line-no">557</span><span id="line-557"> */</span> |
| <span class="source-line-no">558</span><span id="line-558"> public void add(final ExtendedCell cell, MemStoreSizing memstoreSizing) {</span> |
| <span class="source-line-no">559</span><span id="line-559"> storeEngine.readLock();</span> |
| <span class="source-line-no">560</span><span id="line-560"> try {</span> |
| <span class="source-line-no">561</span><span id="line-561"> if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {</span> |
| <span class="source-line-no">562</span><span id="line-562"> LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",</span> |
| <span class="source-line-no">563</span><span id="line-563"> this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());</span> |
| <span class="source-line-no">564</span><span id="line-564"> }</span> |
| <span class="source-line-no">565</span><span id="line-565"> this.memstore.add(cell, memstoreSizing);</span> |
| <span class="source-line-no">566</span><span id="line-566"> } finally {</span> |
| <span class="source-line-no">567</span><span id="line-567"> storeEngine.readUnlock();</span> |
| <span class="source-line-no">568</span><span id="line-568"> currentParallelPutCount.decrementAndGet();</span> |
| <span class="source-line-no">569</span><span id="line-569"> }</span> |
| <span class="source-line-no">570</span><span id="line-570"> }</span> |
| <span class="source-line-no">571</span><span id="line-571"></span> |
| <span class="source-line-no">572</span><span id="line-572"> /**</span> |
| <span class="source-line-no">573</span><span id="line-573"> * Adds the specified value to the memstore</span> |
| <span class="source-line-no">574</span><span id="line-574"> */</span> |
| <span class="source-line-no">575</span><span id="line-575"> public void add(final Iterable<ExtendedCell> cells, MemStoreSizing memstoreSizing) {</span> |
| <span class="source-line-no">576</span><span id="line-576"> storeEngine.readLock();</span> |
| <span class="source-line-no">577</span><span id="line-577"> try {</span> |
| <span class="source-line-no">578</span><span id="line-578"> if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) {</span> |
| <span class="source-line-no">579</span><span id="line-579"> LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!",</span> |
| <span class="source-line-no">580</span><span id="line-580"> this.getTableName(), this.getRegionInfo().getEncodedName(), this.getColumnFamilyName());</span> |
| <span class="source-line-no">581</span><span id="line-581"> }</span> |
| <span class="source-line-no">582</span><span id="line-582"> memstore.add(cells, memstoreSizing);</span> |
| <span class="source-line-no">583</span><span id="line-583"> } finally {</span> |
| <span class="source-line-no">584</span><span id="line-584"> storeEngine.readUnlock();</span> |
| <span class="source-line-no">585</span><span id="line-585"> currentParallelPutCount.decrementAndGet();</span> |
| <span class="source-line-no">586</span><span id="line-586"> }</span> |
| <span class="source-line-no">587</span><span id="line-587"> }</span> |
| <span class="source-line-no">588</span><span id="line-588"></span> |
| <span class="source-line-no">589</span><span id="line-589"> @Override</span> |
| <span class="source-line-no">590</span><span id="line-590"> public long timeOfOldestEdit() {</span> |
| <span class="source-line-no">591</span><span id="line-591"> return memstore.timeOfOldestEdit();</span> |
| <span class="source-line-no">592</span><span id="line-592"> }</span> |
| <span class="source-line-no">593</span><span id="line-593"></span> |
| <span class="source-line-no">594</span><span id="line-594"> /** Returns All store files. */</span> |
| <span class="source-line-no">595</span><span id="line-595"> @Override</span> |
| <span class="source-line-no">596</span><span id="line-596"> public Collection<HStoreFile> getStorefiles() {</span> |
| <span class="source-line-no">597</span><span id="line-597"> return this.storeEngine.getStoreFileManager().getStoreFiles();</span> |
| <span class="source-line-no">598</span><span id="line-598"> }</span> |
| <span class="source-line-no">599</span><span id="line-599"></span> |
| <span class="source-line-no">600</span><span id="line-600"> @Override</span> |
| <span class="source-line-no">601</span><span id="line-601"> public Collection<HStoreFile> getCompactedFiles() {</span> |
| <span class="source-line-no">602</span><span id="line-602"> return this.storeEngine.getStoreFileManager().getCompactedfiles();</span> |
| <span class="source-line-no">603</span><span id="line-603"> }</span> |
| <span class="source-line-no">604</span><span id="line-604"></span> |
| <span class="source-line-no">605</span><span id="line-605"> /**</span> |
| <span class="source-line-no">606</span><span id="line-606"> * This throws a WrongRegionException if the HFile does not fit in this region, or an</span> |
| <span class="source-line-no">607</span><span id="line-607"> * InvalidHFileException if the HFile is not valid.</span> |
| <span class="source-line-no">608</span><span id="line-608"> */</span> |
| <span class="source-line-no">609</span><span id="line-609"> public void assertBulkLoadHFileOk(Path srcPath) throws IOException {</span> |
| <span class="source-line-no">610</span><span id="line-610"> HFile.Reader reader = null;</span> |
| <span class="source-line-no">611</span><span id="line-611"> try {</span> |
| <span class="source-line-no">612</span><span id="line-612"> LOG.info("Validating hfile at " + srcPath + " for inclusion in " + this);</span> |
| <span class="source-line-no">613</span><span id="line-613"> FileSystem srcFs = srcPath.getFileSystem(conf);</span> |
| <span class="source-line-no">614</span><span id="line-614"> srcFs.access(srcPath, FsAction.READ_WRITE);</span> |
| <span class="source-line-no">615</span><span id="line-615"> reader = HFile.createReader(srcFs, srcPath, getCacheConfig(), isPrimaryReplicaStore(), conf);</span> |
| <span class="source-line-no">616</span><span id="line-616"></span> |
| <span class="source-line-no">617</span><span id="line-617"> Optional<byte[]> firstKey = reader.getFirstRowKey();</span> |
| <span class="source-line-no">618</span><span id="line-618"> Preconditions.checkState(firstKey.isPresent(), "First key can not be null");</span> |
| <span class="source-line-no">619</span><span id="line-619"> Optional<ExtendedCell> lk = reader.getLastKey();</span> |
| <span class="source-line-no">620</span><span id="line-620"> Preconditions.checkState(lk.isPresent(), "Last key can not be null");</span> |
| <span class="source-line-no">621</span><span id="line-621"> byte[] lastKey = CellUtil.cloneRow(lk.get());</span> |
| <span class="source-line-no">622</span><span id="line-622"></span> |
| <span class="source-line-no">623</span><span id="line-623"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">624</span><span id="line-624"> LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last="</span> |
| <span class="source-line-no">625</span><span id="line-625"> + Bytes.toStringBinary(lastKey));</span> |
| <span class="source-line-no">626</span><span id="line-626"> LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey())</span> |
| <span class="source-line-no">627</span><span id="line-627"> + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));</span> |
| <span class="source-line-no">628</span><span id="line-628"> }</span> |
| <span class="source-line-no">629</span><span id="line-629"></span> |
| <span class="source-line-no">630</span><span id="line-630"> if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {</span> |
| <span class="source-line-no">631</span><span id="line-631"> throw new WrongRegionException("Bulk load file " + srcPath.toString()</span> |
| <span class="source-line-no">632</span><span id="line-632"> + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());</span> |
| <span class="source-line-no">633</span><span id="line-633"> }</span> |
| <span class="source-line-no">634</span><span id="line-634"></span> |
| <span class="source-line-no">635</span><span id="line-635"> if (</span> |
| <span class="source-line-no">636</span><span id="line-636"> reader.length()</span> |
| <span class="source-line-no">637</span><span id="line-637"> > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)</span> |
| <span class="source-line-no">638</span><span id="line-638"> ) {</span> |
| <span class="source-line-no">639</span><span id="line-639"> LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " + reader.length()</span> |
| <span class="source-line-no">640</span><span id="line-640"> + " bytes can be problematic as it may lead to oversplitting.");</span> |
| <span class="source-line-no">641</span><span id="line-641"> }</span> |
| <span class="source-line-no">642</span><span id="line-642"></span> |
| <span class="source-line-no">643</span><span id="line-643"> if (verifyBulkLoads) {</span> |
| <span class="source-line-no">644</span><span id="line-644"> long verificationStartTime = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">645</span><span id="line-645"> LOG.info("Full verification started for bulk load hfile: {}", srcPath);</span> |
| <span class="source-line-no">646</span><span id="line-646"> Cell prevCell = null;</span> |
| <span class="source-line-no">647</span><span id="line-647"> HFileScanner scanner = reader.getScanner(conf, false, false, false);</span> |
| <span class="source-line-no">648</span><span id="line-648"> scanner.seekTo();</span> |
| <span class="source-line-no">649</span><span id="line-649"> do {</span> |
| <span class="source-line-no">650</span><span id="line-650"> Cell cell = scanner.getCell();</span> |
| <span class="source-line-no">651</span><span id="line-651"> if (prevCell != null) {</span> |
| <span class="source-line-no">652</span><span id="line-652"> if (getComparator().compareRows(prevCell, cell) > 0) {</span> |
| <span class="source-line-no">653</span><span id="line-653"> throw new InvalidHFileException("Previous row is greater than" + " current row: path="</span> |
| <span class="source-line-no">654</span><span id="line-654"> + srcPath + " previous=" + CellUtil.getCellKeyAsString(prevCell) + " current="</span> |
| <span class="source-line-no">655</span><span id="line-655"> + CellUtil.getCellKeyAsString(cell));</span> |
| <span class="source-line-no">656</span><span id="line-656"> }</span> |
| <span class="source-line-no">657</span><span id="line-657"> if (CellComparator.getInstance().compareFamilies(prevCell, cell) != 0) {</span> |
| <span class="source-line-no">658</span><span id="line-658"> throw new InvalidHFileException("Previous key had different"</span> |
| <span class="source-line-no">659</span><span id="line-659"> + " family compared to current key: path=" + srcPath + " previous="</span> |
| <span class="source-line-no">660</span><span id="line-660"> + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),</span> |
| <span class="source-line-no">661</span><span id="line-661"> prevCell.getFamilyLength())</span> |
| <span class="source-line-no">662</span><span id="line-662"> + " current=" + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),</span> |
| <span class="source-line-no">663</span><span id="line-663"> cell.getFamilyLength()));</span> |
| <span class="source-line-no">664</span><span id="line-664"> }</span> |
| <span class="source-line-no">665</span><span id="line-665"> }</span> |
| <span class="source-line-no">666</span><span id="line-666"> prevCell = cell;</span> |
| <span class="source-line-no">667</span><span id="line-667"> } while (scanner.next());</span> |
| <span class="source-line-no">668</span><span id="line-668"> LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString() + " took "</span> |
| <span class="source-line-no">669</span><span id="line-669"> + (EnvironmentEdgeManager.currentTime() - verificationStartTime) + " ms");</span> |
| <span class="source-line-no">670</span><span id="line-670"> }</span> |
| <span class="source-line-no">671</span><span id="line-671"> } finally {</span> |
| <span class="source-line-no">672</span><span id="line-672"> if (reader != null) {</span> |
| <span class="source-line-no">673</span><span id="line-673"> reader.close();</span> |
| <span class="source-line-no">674</span><span id="line-674"> }</span> |
| <span class="source-line-no">675</span><span id="line-675"> }</span> |
| <span class="source-line-no">676</span><span id="line-676"> }</span> |
| <span class="source-line-no">677</span><span id="line-677"></span> |
| <span class="source-line-no">678</span><span id="line-678"> /**</span> |
| <span class="source-line-no">679</span><span id="line-679"> * This method should only be called from Region. It is assumed that the ranges of values in the</span> |
| <span class="source-line-no">680</span><span id="line-680"> * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)</span> |
| <span class="source-line-no">681</span><span id="line-681"> * @param seqNum sequence Id associated with the HFile</span> |
| <span class="source-line-no">682</span><span id="line-682"> */</span> |
| <span class="source-line-no">683</span><span id="line-683"> public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {</span> |
| <span class="source-line-no">684</span><span id="line-684"> Path srcPath = new Path(srcPathStr);</span> |
| <span class="source-line-no">685</span><span id="line-685"> return getRegionFileSystem().bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);</span> |
| <span class="source-line-no">686</span><span id="line-686"> }</span> |
| <span class="source-line-no">687</span><span id="line-687"></span> |
| <span class="source-line-no">688</span><span id="line-688"> public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {</span> |
| <span class="source-line-no">689</span><span id="line-689"> Path srcPath = new Path(srcPathStr);</span> |
| <span class="source-line-no">690</span><span id="line-690"> try {</span> |
| <span class="source-line-no">691</span><span id="line-691"> getRegionFileSystem().commitStoreFile(srcPath, dstPath);</span> |
| <span class="source-line-no">692</span><span id="line-692"> } finally {</span> |
| <span class="source-line-no">693</span><span id="line-693"> if (this.getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">694</span><span id="line-694"> this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);</span> |
| <span class="source-line-no">695</span><span id="line-695"> }</span> |
| <span class="source-line-no">696</span><span id="line-696"> }</span> |
| <span class="source-line-no">697</span><span id="line-697"></span> |
| <span class="source-line-no">698</span><span id="line-698"> LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath</span> |
| <span class="source-line-no">699</span><span id="line-699"> + " - updating store file list.");</span> |
| <span class="source-line-no">700</span><span id="line-700"></span> |
| <span class="source-line-no">701</span><span id="line-701"> HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);</span> |
| <span class="source-line-no">702</span><span id="line-702"> bulkLoadHFile(sf);</span> |
| <span class="source-line-no">703</span><span id="line-703"></span> |
| <span class="source-line-no">704</span><span id="line-704"> LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath);</span> |
| <span class="source-line-no">705</span><span id="line-705"></span> |
| <span class="source-line-no">706</span><span id="line-706"> return dstPath;</span> |
| <span class="source-line-no">707</span><span id="line-707"> }</span> |
| <span class="source-line-no">708</span><span id="line-708"></span> |
| <span class="source-line-no">709</span><span id="line-709"> public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {</span> |
| <span class="source-line-no">710</span><span id="line-710"> HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);</span> |
| <span class="source-line-no">711</span><span id="line-711"> bulkLoadHFile(sf);</span> |
| <span class="source-line-no">712</span><span id="line-712"> }</span> |
| <span class="source-line-no">713</span><span id="line-713"></span> |
| <span class="source-line-no">714</span><span id="line-714"> private void bulkLoadHFile(HStoreFile sf) throws IOException {</span> |
| <span class="source-line-no">715</span><span id="line-715"> StoreFileReader r = sf.getReader();</span> |
| <span class="source-line-no">716</span><span id="line-716"> this.storeSize.addAndGet(r.length());</span> |
| <span class="source-line-no">717</span><span id="line-717"> this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());</span> |
| <span class="source-line-no">718</span><span id="line-718"> storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {</span> |
| <span class="source-line-no">719</span><span id="line-719"> });</span> |
| <span class="source-line-no">720</span><span id="line-720"> LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);</span> |
| <span class="source-line-no">721</span><span id="line-721"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">722</span><span id="line-722"> String traceMessage = "BULK LOAD time,size,store size,store files ["</span> |
| <span class="source-line-no">723</span><span id="line-723"> + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + ","</span> |
| <span class="source-line-no">724</span><span id="line-724"> + storeEngine.getStoreFileManager().getStorefileCount() + "]";</span> |
| <span class="source-line-no">725</span><span id="line-725"> LOG.trace(traceMessage);</span> |
| <span class="source-line-no">726</span><span id="line-726"> }</span> |
| <span class="source-line-no">727</span><span id="line-727"> }</span> |
| <span class="source-line-no">728</span><span id="line-728"></span> |
| <span class="source-line-no">729</span><span id="line-729"> private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {</span> |
| <span class="source-line-no">730</span><span id="line-730"> memstore.close();</span> |
| <span class="source-line-no">731</span><span id="line-731"> // Clear so metrics doesn't find them.</span> |
| <span class="source-line-no">732</span><span id="line-732"> ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();</span> |
| <span class="source-line-no">733</span><span id="line-733"> Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();</span> |
| <span class="source-line-no">734</span><span id="line-734"> // clear the compacted files</span> |
| <span class="source-line-no">735</span><span id="line-735"> if (CollectionUtils.isNotEmpty(compactedfiles)) {</span> |
| <span class="source-line-no">736</span><span id="line-736"> removeCompactedfiles(compactedfiles,</span> |
| <span class="source-line-no">737</span><span id="line-737"> getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true);</span> |
| <span class="source-line-no">738</span><span id="line-738"> }</span> |
| <span class="source-line-no">739</span><span id="line-739"> if (!result.isEmpty()) {</span> |
| <span class="source-line-no">740</span><span id="line-740"> // initialize the thread pool for closing store files in parallel.</span> |
| <span class="source-line-no">741</span><span id="line-741"> ThreadPoolExecutor storeFileCloserThreadPool =</span> |
| <span class="source-line-no">742</span><span id="line-742"> this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"</span> |
| <span class="source-line-no">743</span><span id="line-743"> + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName());</span> |
| <span class="source-line-no">744</span><span id="line-744"></span> |
| <span class="source-line-no">745</span><span id="line-745"> // close each store file in parallel</span> |
| <span class="source-line-no">746</span><span id="line-746"> CompletionService<Void> completionService =</span> |
| <span class="source-line-no">747</span><span id="line-747"> new ExecutorCompletionService<>(storeFileCloserThreadPool);</span> |
| <span class="source-line-no">748</span><span id="line-748"> for (HStoreFile f : result) {</span> |
| <span class="source-line-no">749</span><span id="line-749"> completionService.submit(new Callable<Void>() {</span> |
| <span class="source-line-no">750</span><span id="line-750"> @Override</span> |
| <span class="source-line-no">751</span><span id="line-751"> public Void call() throws IOException {</span> |
| <span class="source-line-no">752</span><span id="line-752"> boolean evictOnClose =</span> |
| <span class="source-line-no">753</span><span id="line-753"> getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true;</span> |
| <span class="source-line-no">754</span><span id="line-754"> f.closeStoreFile(!warmup && evictOnClose);</span> |
| <span class="source-line-no">755</span><span id="line-755"> return null;</span> |
| <span class="source-line-no">756</span><span id="line-756"> }</span> |
| <span class="source-line-no">757</span><span id="line-757"> });</span> |
| <span class="source-line-no">758</span><span id="line-758"> }</span> |
| <span class="source-line-no">759</span><span id="line-759"></span> |
| <span class="source-line-no">760</span><span id="line-760"> IOException ioe = null;</span> |
| <span class="source-line-no">761</span><span id="line-761"> try {</span> |
| <span class="source-line-no">762</span><span id="line-762"> for (int i = 0; i < result.size(); i++) {</span> |
| <span class="source-line-no">763</span><span id="line-763"> try {</span> |
| <span class="source-line-no">764</span><span id="line-764"> Future<Void> future = completionService.take();</span> |
| <span class="source-line-no">765</span><span id="line-765"> future.get();</span> |
| <span class="source-line-no">766</span><span id="line-766"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">767</span><span id="line-767"> if (ioe == null) {</span> |
| <span class="source-line-no">768</span><span id="line-768"> ioe = new InterruptedIOException();</span> |
| <span class="source-line-no">769</span><span id="line-769"> ioe.initCause(e);</span> |
| <span class="source-line-no">770</span><span id="line-770"> }</span> |
| <span class="source-line-no">771</span><span id="line-771"> } catch (ExecutionException e) {</span> |
| <span class="source-line-no">772</span><span id="line-772"> if (ioe == null) {</span> |
| <span class="source-line-no">773</span><span id="line-773"> ioe = new IOException(e.getCause());</span> |
| <span class="source-line-no">774</span><span id="line-774"> }</span> |
| <span class="source-line-no">775</span><span id="line-775"> }</span> |
| <span class="source-line-no">776</span><span id="line-776"> }</span> |
| <span class="source-line-no">777</span><span id="line-777"> } finally {</span> |
| <span class="source-line-no">778</span><span id="line-778"> storeFileCloserThreadPool.shutdownNow();</span> |
| <span class="source-line-no">779</span><span id="line-779"> }</span> |
| <span class="source-line-no">780</span><span id="line-780"> if (ioe != null) {</span> |
| <span class="source-line-no">781</span><span id="line-781"> throw ioe;</span> |
| <span class="source-line-no">782</span><span id="line-782"> }</span> |
| <span class="source-line-no">783</span><span id="line-783"> }</span> |
| <span class="source-line-no">784</span><span id="line-784"> LOG.trace("Closed {}", this);</span> |
| <span class="source-line-no">785</span><span id="line-785"> return result;</span> |
| <span class="source-line-no">786</span><span id="line-786"> }</span> |
| <span class="source-line-no">787</span><span id="line-787"></span> |
| <span class="source-line-no">788</span><span id="line-788"> /**</span> |
| <span class="source-line-no">789</span><span id="line-789"> * Close all the readers We don't need to worry about subsequent requests because the Region holds</span> |
| <span class="source-line-no">790</span><span id="line-790"> * a write lock that will prevent any more reads or writes.</span> |
| <span class="source-line-no">791</span><span id="line-791"> * @return the {@link StoreFile StoreFiles} that were previously being used.</span> |
| <span class="source-line-no">792</span><span id="line-792"> * @throws IOException on failure</span> |
| <span class="source-line-no">793</span><span id="line-793"> */</span> |
| <span class="source-line-no">794</span><span id="line-794"> public ImmutableCollection<HStoreFile> close() throws IOException {</span> |
| <span class="source-line-no">795</span><span id="line-795"> // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report</span> |
| <span class="source-line-no">796</span><span id="line-796"> // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally...</span> |
| <span class="source-line-no">797</span><span id="line-797"> // Change later if findbugs becomes smarter in the future.</span> |
| <span class="source-line-no">798</span><span id="line-798"> this.archiveLock.lock();</span> |
| <span class="source-line-no">799</span><span id="line-799"> try {</span> |
| <span class="source-line-no">800</span><span id="line-800"> this.storeEngine.writeLock();</span> |
| <span class="source-line-no">801</span><span id="line-801"> try {</span> |
| <span class="source-line-no">802</span><span id="line-802"> return closeWithoutLock();</span> |
| <span class="source-line-no">803</span><span id="line-803"> } finally {</span> |
| <span class="source-line-no">804</span><span id="line-804"> this.storeEngine.writeUnlock();</span> |
| <span class="source-line-no">805</span><span id="line-805"> }</span> |
| <span class="source-line-no">806</span><span id="line-806"> } finally {</span> |
| <span class="source-line-no">807</span><span id="line-807"> this.archiveLock.unlock();</span> |
| <span class="source-line-no">808</span><span id="line-808"> }</span> |
| <span class="source-line-no">809</span><span id="line-809"> }</span> |
| <span class="source-line-no">810</span><span id="line-810"></span> |
| <span class="source-line-no">811</span><span id="line-811"> /**</span> |
| <span class="source-line-no">812</span><span id="line-812"> * Write out current snapshot. Presumes {@code StoreFlusherImpl.prepare()} has been called</span> |
| <span class="source-line-no">813</span><span id="line-813"> * previously.</span> |
| <span class="source-line-no">814</span><span id="line-814"> * @param logCacheFlushId flush sequence number</span> |
| <span class="source-line-no">815</span><span id="line-815"> * @return The path name of the tmp file to which the store was flushed</span> |
| <span class="source-line-no">816</span><span id="line-816"> * @throws IOException if exception occurs during process</span> |
| <span class="source-line-no">817</span><span id="line-817"> */</span> |
| <span class="source-line-no">818</span><span id="line-818"> protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,</span> |
| <span class="source-line-no">819</span><span id="line-819"> MonitoredTask status, ThroughputController throughputController, FlushLifeCycleTracker tracker,</span> |
| <span class="source-line-no">820</span><span id="line-820"> Consumer<Path> writerCreationTracker) throws IOException {</span> |
| <span class="source-line-no">821</span><span id="line-821"> // If an exception happens flushing, we let it out without clearing</span> |
| <span class="source-line-no">822</span><span id="line-822"> // the memstore snapshot. The old snapshot will be returned when we say</span> |
| <span class="source-line-no">823</span><span id="line-823"> // 'snapshot', the next time flush comes around.</span> |
| <span class="source-line-no">824</span><span id="line-824"> // Retry after catching exception when flushing, otherwise server will abort</span> |
| <span class="source-line-no">825</span><span id="line-825"> // itself</span> |
| <span class="source-line-no">826</span><span id="line-826"> StoreFlusher flusher = storeEngine.getStoreFlusher();</span> |
| <span class="source-line-no">827</span><span id="line-827"> IOException lastException = null;</span> |
| <span class="source-line-no">828</span><span id="line-828"> for (int i = 0; i < flushRetriesNumber; i++) {</span> |
| <span class="source-line-no">829</span><span id="line-829"> try {</span> |
| <span class="source-line-no">830</span><span id="line-830"> List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status,</span> |
| <span class="source-line-no">831</span><span id="line-831"> throughputController, tracker, writerCreationTracker);</span> |
| <span class="source-line-no">832</span><span id="line-832"> Path lastPathName = null;</span> |
| <span class="source-line-no">833</span><span id="line-833"> try {</span> |
| <span class="source-line-no">834</span><span id="line-834"> for (Path pathName : pathNames) {</span> |
| <span class="source-line-no">835</span><span id="line-835"> lastPathName = pathName;</span> |
| <span class="source-line-no">836</span><span id="line-836"> storeEngine.validateStoreFile(pathName);</span> |
| <span class="source-line-no">837</span><span id="line-837"> }</span> |
| <span class="source-line-no">838</span><span id="line-838"> return pathNames;</span> |
| <span class="source-line-no">839</span><span id="line-839"> } catch (Exception e) {</span> |
| <span class="source-line-no">840</span><span id="line-840"> LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);</span> |
| <span class="source-line-no">841</span><span id="line-841"> if (e instanceof IOException) {</span> |
| <span class="source-line-no">842</span><span id="line-842"> lastException = (IOException) e;</span> |
| <span class="source-line-no">843</span><span id="line-843"> } else {</span> |
| <span class="source-line-no">844</span><span id="line-844"> lastException = new IOException(e);</span> |
| <span class="source-line-no">845</span><span id="line-845"> }</span> |
| <span class="source-line-no">846</span><span id="line-846"> }</span> |
| <span class="source-line-no">847</span><span id="line-847"> } catch (IOException e) {</span> |
| <span class="source-line-no">848</span><span id="line-848"> LOG.warn("Failed flushing store file for {}, retrying num={}", this, i, e);</span> |
| <span class="source-line-no">849</span><span id="line-849"> lastException = e;</span> |
| <span class="source-line-no">850</span><span id="line-850"> }</span> |
| <span class="source-line-no">851</span><span id="line-851"> if (lastException != null && i < (flushRetriesNumber - 1)) {</span> |
| <span class="source-line-no">852</span><span id="line-852"> try {</span> |
| <span class="source-line-no">853</span><span id="line-853"> Thread.sleep(pauseTime);</span> |
| <span class="source-line-no">854</span><span id="line-854"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">855</span><span id="line-855"> IOException iie = new InterruptedIOException();</span> |
| <span class="source-line-no">856</span><span id="line-856"> iie.initCause(e);</span> |
| <span class="source-line-no">857</span><span id="line-857"> throw iie;</span> |
| <span class="source-line-no">858</span><span id="line-858"> }</span> |
| <span class="source-line-no">859</span><span id="line-859"> }</span> |
| <span class="source-line-no">860</span><span id="line-860"> }</span> |
| <span class="source-line-no">861</span><span id="line-861"> throw lastException;</span> |
| <span class="source-line-no">862</span><span id="line-862"> }</span> |
| <span class="source-line-no">863</span><span id="line-863"></span> |
| <span class="source-line-no">864</span><span id="line-864"> public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {</span> |
| <span class="source-line-no">865</span><span id="line-865"> LOG.info("Validating recovered hfile at {} for inclusion in store {}", path, this);</span> |
| <span class="source-line-no">866</span><span id="line-866"> FileSystem srcFs = path.getFileSystem(conf);</span> |
| <span class="source-line-no">867</span><span id="line-867"> srcFs.access(path, FsAction.READ_WRITE);</span> |
| <span class="source-line-no">868</span><span id="line-868"> try (HFile.Reader reader =</span> |
| <span class="source-line-no">869</span><span id="line-869"> HFile.createReader(srcFs, path, getCacheConfig(), isPrimaryReplicaStore(), conf)) {</span> |
| <span class="source-line-no">870</span><span id="line-870"> Optional<byte[]> firstKey = reader.getFirstRowKey();</span> |
| <span class="source-line-no">871</span><span id="line-871"> Preconditions.checkState(firstKey.isPresent(), "First key can not be null");</span> |
| <span class="source-line-no">872</span><span id="line-872"> Optional<ExtendedCell> lk = reader.getLastKey();</span> |
| <span class="source-line-no">873</span><span id="line-873"> Preconditions.checkState(lk.isPresent(), "Last key can not be null");</span> |
| <span class="source-line-no">874</span><span id="line-874"> byte[] lastKey = CellUtil.cloneRow(lk.get());</span> |
| <span class="source-line-no">875</span><span id="line-875"> if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {</span> |
| <span class="source-line-no">876</span><span id="line-876"> throw new WrongRegionException("Recovered hfile " + path.toString()</span> |
| <span class="source-line-no">877</span><span id="line-877"> + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString());</span> |
| <span class="source-line-no">878</span><span id="line-878"> }</span> |
| <span class="source-line-no">879</span><span id="line-879"> }</span> |
| <span class="source-line-no">880</span><span id="line-880"></span> |
| <span class="source-line-no">881</span><span id="line-881"> Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);</span> |
| <span class="source-line-no">882</span><span id="line-882"> HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);</span> |
| <span class="source-line-no">883</span><span id="line-883"> StoreFileReader r = sf.getReader();</span> |
| <span class="source-line-no">884</span><span id="line-884"> this.storeSize.addAndGet(r.length());</span> |
| <span class="source-line-no">885</span><span id="line-885"> this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());</span> |
| <span class="source-line-no">886</span><span id="line-886"></span> |
| <span class="source-line-no">887</span><span id="line-887"> storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {</span> |
| <span class="source-line-no">888</span><span id="line-888"> });</span> |
| <span class="source-line-no">889</span><span id="line-889"></span> |
| <span class="source-line-no">890</span><span id="line-890"> LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,</span> |
| <span class="source-line-no">891</span><span id="line-891"> r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));</span> |
| <span class="source-line-no">892</span><span id="line-892"> return sf;</span> |
| <span class="source-line-no">893</span><span id="line-893"> }</span> |
| <span class="source-line-no">894</span><span id="line-894"></span> |
| <span class="source-line-no">895</span><span id="line-895"> private long getTotalSize(Collection<HStoreFile> sfs) {</span> |
| <span class="source-line-no">896</span><span id="line-896"> return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();</span> |
| <span class="source-line-no">897</span><span id="line-897"> }</span> |
| <span class="source-line-no">898</span><span id="line-898"></span> |
| <span class="source-line-no">899</span><span id="line-899"> private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {</span> |
| <span class="source-line-no">900</span><span id="line-900"> // NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may</span> |
| <span class="source-line-no">901</span><span id="line-901"> // close {@link DefaultMemStore#snapshot}, which may be used by</span> |
| <span class="source-line-no">902</span><span id="line-902"> // {@link DefaultMemStore#getScanners}.</span> |
| <span class="source-line-no">903</span><span id="line-903"> storeEngine.addStoreFiles(sfs,</span> |
| <span class="source-line-no">904</span><span id="line-904"> // NOTE: here we must increase the refCount for storeFiles because we would open the</span> |
| <span class="source-line-no">905</span><span id="line-905"> // storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.</span> |
| <span class="source-line-no">906</span><span id="line-906"> // If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by</span> |
| <span class="source-line-no">907</span><span id="line-907"> // CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because</span> |
| <span class="source-line-no">908</span><span id="line-908"> // HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under</span> |
| <span class="source-line-no">909</span><span id="line-909"> // storeEngine lock. see HBASE-27519 for more details.</span> |
| <span class="source-line-no">910</span><span id="line-910"> snapshotId > 0 ? () -> {</span> |
| <span class="source-line-no">911</span><span id="line-911"> this.memstore.clearSnapshot(snapshotId);</span> |
| <span class="source-line-no">912</span><span id="line-912"> HStoreFile.increaseStoreFilesRefeCount(sfs);</span> |
| <span class="source-line-no">913</span><span id="line-913"> } : () -> {</span> |
| <span class="source-line-no">914</span><span id="line-914"> HStoreFile.increaseStoreFilesRefeCount(sfs);</span> |
| <span class="source-line-no">915</span><span id="line-915"> });</span> |
| <span class="source-line-no">916</span><span id="line-916"> // notify to be called here - only in case of flushes</span> |
| <span class="source-line-no">917</span><span id="line-917"> try {</span> |
| <span class="source-line-no">918</span><span id="line-918"> notifyChangedReadersObservers(sfs);</span> |
| <span class="source-line-no">919</span><span id="line-919"> } finally {</span> |
| <span class="source-line-no">920</span><span id="line-920"> HStoreFile.decreaseStoreFilesRefeCount(sfs);</span> |
| <span class="source-line-no">921</span><span id="line-921"> }</span> |
| <span class="source-line-no">922</span><span id="line-922"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">923</span><span id="line-923"> long totalSize = getTotalSize(sfs);</span> |
| <span class="source-line-no">924</span><span id="line-924"> String traceMessage = "FLUSH time,count,size,store size,store files ["</span> |
| <span class="source-line-no">925</span><span id="line-925"> + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + ","</span> |
| <span class="source-line-no">926</span><span id="line-926"> + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";</span> |
| <span class="source-line-no">927</span><span id="line-927"> LOG.trace(traceMessage);</span> |
| <span class="source-line-no">928</span><span id="line-928"> }</span> |
| <span class="source-line-no">929</span><span id="line-929"> return needsCompaction();</span> |
| <span class="source-line-no">930</span><span id="line-930"> }</span> |
| <span class="source-line-no">931</span><span id="line-931"></span> |
| <span class="source-line-no">932</span><span id="line-932"> /**</span> |
| <span class="source-line-no">933</span><span id="line-933"> * Notify all observers that set of Readers has changed.</span> |
| <span class="source-line-no">934</span><span id="line-934"> */</span> |
| <span class="source-line-no">935</span><span id="line-935"> private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException {</span> |
| <span class="source-line-no">936</span><span id="line-936"> for (ChangedReadersObserver o : this.changedReaderObservers) {</span> |
| <span class="source-line-no">937</span><span id="line-937"> List<KeyValueScanner> memStoreScanners;</span> |
| <span class="source-line-no">938</span><span id="line-938"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">939</span><span id="line-939"> try {</span> |
| <span class="source-line-no">940</span><span id="line-940"> memStoreScanners = this.memstore.getScanners(o.getReadPoint());</span> |
| <span class="source-line-no">941</span><span id="line-941"> } finally {</span> |
| <span class="source-line-no">942</span><span id="line-942"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">943</span><span id="line-943"> }</span> |
| <span class="source-line-no">944</span><span id="line-944"> o.updateReaders(sfs, memStoreScanners);</span> |
| <span class="source-line-no">945</span><span id="line-945"> }</span> |
| <span class="source-line-no">946</span><span id="line-946"> }</span> |
| <span class="source-line-no">947</span><span id="line-947"></span> |
| <span class="source-line-no">948</span><span id="line-948"> /**</span> |
| <span class="source-line-no">949</span><span id="line-949"> * Get all scanners with no filtering based on TTL (that happens further down the line).</span> |
| <span class="source-line-no">950</span><span id="line-950"> * @param cacheBlocks cache the blocks or not</span> |
| <span class="source-line-no">951</span><span id="line-951"> * @param usePread true to use pread, false if not</span> |
| <span class="source-line-no">952</span><span id="line-952"> * @param isCompaction true if the scanner is created for compaction</span> |
| <span class="source-line-no">953</span><span id="line-953"> * @param matcher the scan query matcher</span> |
| <span class="source-line-no">954</span><span id="line-954"> * @param startRow the start row</span> |
| <span class="source-line-no">955</span><span id="line-955"> * @param stopRow the stop row</span> |
| <span class="source-line-no">956</span><span id="line-956"> * @param readPt the read point of the current scan</span> |
| <span class="source-line-no">957</span><span id="line-957"> * @return all scanners for this store</span> |
| <span class="source-line-no">958</span><span id="line-958"> */</span> |
| <span class="source-line-no">959</span><span id="line-959"> public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread,</span> |
| <span class="source-line-no">960</span><span id="line-960"> boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt,</span> |
| <span class="source-line-no">961</span><span id="line-961"> boolean onlyLatestVersion) throws IOException {</span> |
| <span class="source-line-no">962</span><span id="line-962"> return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false,</span> |
| <span class="source-line-no">963</span><span id="line-963"> readPt, onlyLatestVersion);</span> |
| <span class="source-line-no">964</span><span id="line-964"> }</span> |
| <span class="source-line-no">965</span><span id="line-965"></span> |
| <span class="source-line-no">966</span><span id="line-966"> /**</span> |
| <span class="source-line-no">967</span><span id="line-967"> * Get all scanners with no filtering based on TTL (that happens further down the line).</span> |
| <span class="source-line-no">968</span><span id="line-968"> * @param cacheBlocks cache the blocks or not</span> |
| <span class="source-line-no">969</span><span id="line-969"> * @param usePread true to use pread, false if not</span> |
| <span class="source-line-no">970</span><span id="line-970"> * @param isCompaction true if the scanner is created for compaction</span> |
| <span class="source-line-no">971</span><span id="line-971"> * @param matcher the scan query matcher</span> |
| <span class="source-line-no">972</span><span id="line-972"> * @param startRow the start row</span> |
| <span class="source-line-no">973</span><span id="line-973"> * @param includeStartRow true to include start row, false if not</span> |
| <span class="source-line-no">974</span><span id="line-974"> * @param stopRow the stop row</span> |
| <span class="source-line-no">975</span><span id="line-975"> * @param includeStopRow true to include stop row, false if not</span> |
| <span class="source-line-no">976</span><span id="line-976"> * @param readPt the read point of the current scan</span> |
| <span class="source-line-no">977</span><span id="line-977"> * @return all scanners for this store</span> |
| <span class="source-line-no">978</span><span id="line-978"> */</span> |
| <span class="source-line-no">979</span><span id="line-979"> public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,</span> |
| <span class="source-line-no">980</span><span id="line-980"> boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow,</span> |
| <span class="source-line-no">981</span><span id="line-981"> byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion)</span> |
| <span class="source-line-no">982</span><span id="line-982"> throws IOException {</span> |
| <span class="source-line-no">983</span><span id="line-983"> Collection<HStoreFile> storeFilesToScan;</span> |
| <span class="source-line-no">984</span><span id="line-984"> List<KeyValueScanner> memStoreScanners;</span> |
| <span class="source-line-no">985</span><span id="line-985"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">986</span><span id="line-986"> try {</span> |
| <span class="source-line-no">987</span><span id="line-987"> storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,</span> |
| <span class="source-line-no">988</span><span id="line-988"> includeStartRow, stopRow, includeStopRow, onlyLatestVersion);</span> |
| <span class="source-line-no">989</span><span id="line-989"> memStoreScanners = this.memstore.getScanners(readPt);</span> |
| <span class="source-line-no">990</span><span id="line-990"> // NOTE: here we must increase the refCount for storeFiles because we would open the</span> |
| <span class="source-line-no">991</span><span id="line-991"> // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,</span> |
| <span class="source-line-no">992</span><span id="line-992"> // HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the</span> |
| <span class="source-line-no">993</span><span id="line-993"> // storeFiles after a concurrent compaction.Because HStore.requestCompaction is under</span> |
| <span class="source-line-no">994</span><span id="line-994"> // storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484</span> |
| <span class="source-line-no">995</span><span id="line-995"> // for more details.</span> |
| <span class="source-line-no">996</span><span id="line-996"> HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);</span> |
| <span class="source-line-no">997</span><span id="line-997"> } finally {</span> |
| <span class="source-line-no">998</span><span id="line-998"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">999</span><span id="line-999"> }</span> |
| <span class="source-line-no">1000</span><span id="line-1000"> try {</span> |
| <span class="source-line-no">1001</span><span id="line-1001"> // First the store file scanners</span> |
| <span class="source-line-no">1002</span><span id="line-1002"></span> |
| <span class="source-line-no">1003</span><span id="line-1003"> // TODO this used to get the store files in descending order,</span> |
| <span class="source-line-no">1004</span><span id="line-1004"> // but now we get them in ascending order, which I think is</span> |
| <span class="source-line-no">1005</span><span id="line-1005"> // actually more correct, since memstore get put at the end.</span> |
| <span class="source-line-no">1006</span><span id="line-1006"> List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(</span> |
| <span class="source-line-no">1007</span><span id="line-1007"> storeFilesToScan, cacheBlocks, usePread, isCompaction, false, matcher, readPt);</span> |
| <span class="source-line-no">1008</span><span id="line-1008"> List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);</span> |
| <span class="source-line-no">1009</span><span id="line-1009"> scanners.addAll(sfScanners);</span> |
| <span class="source-line-no">1010</span><span id="line-1010"> // Then the memstore scanners</span> |
| <span class="source-line-no">1011</span><span id="line-1011"> scanners.addAll(memStoreScanners);</span> |
| <span class="source-line-no">1012</span><span id="line-1012"> return scanners;</span> |
| <span class="source-line-no">1013</span><span id="line-1013"> } catch (Throwable t) {</span> |
| <span class="source-line-no">1014</span><span id="line-1014"> clearAndClose(memStoreScanners);</span> |
| <span class="source-line-no">1015</span><span id="line-1015"> throw t instanceof IOException ? (IOException) t : new IOException(t);</span> |
| <span class="source-line-no">1016</span><span id="line-1016"> } finally {</span> |
| <span class="source-line-no">1017</span><span id="line-1017"> HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);</span> |
| <span class="source-line-no">1018</span><span id="line-1018"> }</span> |
| <span class="source-line-no">1019</span><span id="line-1019"> }</span> |
| <span class="source-line-no">1020</span><span id="line-1020"></span> |
| <span class="source-line-no">1021</span><span id="line-1021"> private static void clearAndClose(List<KeyValueScanner> scanners) {</span> |
| <span class="source-line-no">1022</span><span id="line-1022"> if (scanners == null) {</span> |
| <span class="source-line-no">1023</span><span id="line-1023"> return;</span> |
| <span class="source-line-no">1024</span><span id="line-1024"> }</span> |
| <span class="source-line-no">1025</span><span id="line-1025"> for (KeyValueScanner s : scanners) {</span> |
| <span class="source-line-no">1026</span><span id="line-1026"> s.close();</span> |
| <span class="source-line-no">1027</span><span id="line-1027"> }</span> |
| <span class="source-line-no">1028</span><span id="line-1028"> scanners.clear();</span> |
| <span class="source-line-no">1029</span><span id="line-1029"> }</span> |
| <span class="source-line-no">1030</span><span id="line-1030"></span> |
| <span class="source-line-no">1031</span><span id="line-1031"> /**</span> |
| <span class="source-line-no">1032</span><span id="line-1032"> * Create scanners on the given files and if needed on the memstore with no filtering based on TTL</span> |
| <span class="source-line-no">1033</span><span id="line-1033"> * (that happens further down the line).</span> |
| <span class="source-line-no">1034</span><span id="line-1034"> * @param files the list of files on which the scanners has to be created</span> |
| <span class="source-line-no">1035</span><span id="line-1035"> * @param cacheBlocks cache the blocks or not</span> |
| <span class="source-line-no">1036</span><span id="line-1036"> * @param usePread true to use pread, false if not</span> |
| <span class="source-line-no">1037</span><span id="line-1037"> * @param isCompaction true if the scanner is created for compaction</span> |
| <span class="source-line-no">1038</span><span id="line-1038"> * @param matcher the scan query matcher</span> |
| <span class="source-line-no">1039</span><span id="line-1039"> * @param startRow the start row</span> |
| <span class="source-line-no">1040</span><span id="line-1040"> * @param stopRow the stop row</span> |
| <span class="source-line-no">1041</span><span id="line-1041"> * @param readPt the read point of the current scan</span> |
| <span class="source-line-no">1042</span><span id="line-1042"> * @param includeMemstoreScanner true if memstore has to be included</span> |
| <span class="source-line-no">1043</span><span id="line-1043"> * @return scanners on the given files and on the memstore if specified</span> |
| <span class="source-line-no">1044</span><span id="line-1044"> */</span> |
| <span class="source-line-no">1045</span><span id="line-1045"> public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,</span> |
| <span class="source-line-no">1046</span><span id="line-1046"> boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,</span> |
| <span class="source-line-no">1047</span><span id="line-1047"> byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner,</span> |
| <span class="source-line-no">1048</span><span id="line-1048"> boolean onlyLatestVersion) throws IOException {</span> |
| <span class="source-line-no">1049</span><span id="line-1049"> return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow,</span> |
| <span class="source-line-no">1050</span><span id="line-1050"> false, readPt, includeMemstoreScanner, onlyLatestVersion);</span> |
| <span class="source-line-no">1051</span><span id="line-1051"> }</span> |
| <span class="source-line-no">1052</span><span id="line-1052"></span> |
| <span class="source-line-no">1053</span><span id="line-1053"> /**</span> |
| <span class="source-line-no">1054</span><span id="line-1054"> * Create scanners on the given files and if needed on the memstore with no filtering based on TTL</span> |
| <span class="source-line-no">1055</span><span id="line-1055"> * (that happens further down the line).</span> |
| <span class="source-line-no">1056</span><span id="line-1056"> * @param files the list of files on which the scanners has to be created</span> |
| <span class="source-line-no">1057</span><span id="line-1057"> * @param cacheBlocks ache the blocks or not</span> |
| <span class="source-line-no">1058</span><span id="line-1058"> * @param usePread true to use pread, false if not</span> |
| <span class="source-line-no">1059</span><span id="line-1059"> * @param isCompaction true if the scanner is created for compaction</span> |
| <span class="source-line-no">1060</span><span id="line-1060"> * @param matcher the scan query matcher</span> |
| <span class="source-line-no">1061</span><span id="line-1061"> * @param startRow the start row</span> |
| <span class="source-line-no">1062</span><span id="line-1062"> * @param includeStartRow true to include start row, false if not</span> |
| <span class="source-line-no">1063</span><span id="line-1063"> * @param stopRow the stop row</span> |
| <span class="source-line-no">1064</span><span id="line-1064"> * @param includeStopRow true to include stop row, false if not</span> |
| <span class="source-line-no">1065</span><span id="line-1065"> * @param readPt the read point of the current scan</span> |
| <span class="source-line-no">1066</span><span id="line-1066"> * @param includeMemstoreScanner true if memstore has to be included</span> |
| <span class="source-line-no">1067</span><span id="line-1067"> * @return scanners on the given files and on the memstore if specified</span> |
| <span class="source-line-no">1068</span><span id="line-1068"> */</span> |
| <span class="source-line-no">1069</span><span id="line-1069"> public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks,</span> |
| <span class="source-line-no">1070</span><span id="line-1070"> boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,</span> |
| <span class="source-line-no">1071</span><span id="line-1071"> boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,</span> |
| <span class="source-line-no">1072</span><span id="line-1072"> boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException {</span> |
| <span class="source-line-no">1073</span><span id="line-1073"> List<KeyValueScanner> memStoreScanners = null;</span> |
| <span class="source-line-no">1074</span><span id="line-1074"> if (includeMemstoreScanner) {</span> |
| <span class="source-line-no">1075</span><span id="line-1075"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1076</span><span id="line-1076"> try {</span> |
| <span class="source-line-no">1077</span><span id="line-1077"> memStoreScanners = this.memstore.getScanners(readPt);</span> |
| <span class="source-line-no">1078</span><span id="line-1078"> } finally {</span> |
| <span class="source-line-no">1079</span><span id="line-1079"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1080</span><span id="line-1080"> }</span> |
| <span class="source-line-no">1081</span><span id="line-1081"> }</span> |
| <span class="source-line-no">1082</span><span id="line-1082"> try {</span> |
| <span class="source-line-no">1083</span><span id="line-1083"> List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,</span> |
| <span class="source-line-no">1084</span><span id="line-1084"> cacheBlocks, usePread, isCompaction, false, matcher, readPt);</span> |
| <span class="source-line-no">1085</span><span id="line-1085"> List<KeyValueScanner> scanners = new ArrayList<>(sfScanners.size() + 1);</span> |
| <span class="source-line-no">1086</span><span id="line-1086"> scanners.addAll(sfScanners);</span> |
| <span class="source-line-no">1087</span><span id="line-1087"> // Then the memstore scanners</span> |
| <span class="source-line-no">1088</span><span id="line-1088"> if (memStoreScanners != null) {</span> |
| <span class="source-line-no">1089</span><span id="line-1089"> scanners.addAll(memStoreScanners);</span> |
| <span class="source-line-no">1090</span><span id="line-1090"> }</span> |
| <span class="source-line-no">1091</span><span id="line-1091"> return scanners;</span> |
| <span class="source-line-no">1092</span><span id="line-1092"> } catch (Throwable t) {</span> |
| <span class="source-line-no">1093</span><span id="line-1093"> clearAndClose(memStoreScanners);</span> |
| <span class="source-line-no">1094</span><span id="line-1094"> throw t instanceof IOException ? (IOException) t : new IOException(t);</span> |
| <span class="source-line-no">1095</span><span id="line-1095"> }</span> |
| <span class="source-line-no">1096</span><span id="line-1096"> }</span> |
| <span class="source-line-no">1097</span><span id="line-1097"></span> |
| <span class="source-line-no">1098</span><span id="line-1098"> /**</span> |
| <span class="source-line-no">1099</span><span id="line-1099"> * @param o Observer who wants to know about changes in set of Readers</span> |
| <span class="source-line-no">1100</span><span id="line-1100"> */</span> |
| <span class="source-line-no">1101</span><span id="line-1101"> public void addChangedReaderObserver(ChangedReadersObserver o) {</span> |
| <span class="source-line-no">1102</span><span id="line-1102"> this.changedReaderObservers.add(o);</span> |
| <span class="source-line-no">1103</span><span id="line-1103"> }</span> |
| <span class="source-line-no">1104</span><span id="line-1104"></span> |
| <span class="source-line-no">1105</span><span id="line-1105"> /**</span> |
| <span class="source-line-no">1106</span><span id="line-1106"> * @param o Observer no longer interested in changes in set of Readers.</span> |
| <span class="source-line-no">1107</span><span id="line-1107"> */</span> |
| <span class="source-line-no">1108</span><span id="line-1108"> public void deleteChangedReaderObserver(ChangedReadersObserver o) {</span> |
| <span class="source-line-no">1109</span><span id="line-1109"> // We don't check if observer present; it may not be (legitimately)</span> |
| <span class="source-line-no">1110</span><span id="line-1110"> this.changedReaderObservers.remove(o);</span> |
| <span class="source-line-no">1111</span><span id="line-1111"> }</span> |
| <span class="source-line-no">1112</span><span id="line-1112"></span> |
| <span class="source-line-no">1113</span><span id="line-1113"> //////////////////////////////////////////////////////////////////////////////</span> |
| <span class="source-line-no">1114</span><span id="line-1114"> // Compaction</span> |
| <span class="source-line-no">1115</span><span id="line-1115"> //////////////////////////////////////////////////////////////////////////////</span> |
| <span class="source-line-no">1116</span><span id="line-1116"></span> |
| <span class="source-line-no">1117</span><span id="line-1117"> /**</span> |
| <span class="source-line-no">1118</span><span id="line-1118"> * Compact the StoreFiles. This method may take some time, so the calling thread must be able to</span> |
| <span class="source-line-no">1119</span><span id="line-1119"> * block for long periods.</span> |
| <span class="source-line-no">1120</span><span id="line-1120"> * <p></span> |
| <span class="source-line-no">1121</span><span id="line-1121"> * During this time, the Store can work as usual, getting values from StoreFiles and writing new</span> |
| <span class="source-line-no">1122</span><span id="line-1122"> * StoreFiles from the memstore. Existing StoreFiles are not destroyed until the new compacted</span> |
| <span class="source-line-no">1123</span><span id="line-1123"> * StoreFile is completely written-out to disk.</span> |
| <span class="source-line-no">1124</span><span id="line-1124"> * <p></span> |
| <span class="source-line-no">1125</span><span id="line-1125"> * The compactLock prevents multiple simultaneous compactions. The structureLock prevents us from</span> |
| <span class="source-line-no">1126</span><span id="line-1126"> * interfering with other write operations.</span> |
| <span class="source-line-no">1127</span><span id="line-1127"> * <p></span> |
| <span class="source-line-no">1128</span><span id="line-1128"> * We don't want to hold the structureLock for the whole time, as a compact() can be lengthy and</span> |
| <span class="source-line-no">1129</span><span id="line-1129"> * we want to allow cache-flushes during this period.</span> |
| <span class="source-line-no">1130</span><span id="line-1130"> * <p></span> |
| <span class="source-line-no">1131</span><span id="line-1131"> * Compaction event should be idempotent, since there is no IO Fencing for the region directory in</span> |
| <span class="source-line-no">1132</span><span id="line-1132"> * hdfs. A region server might still try to complete the compaction after it lost the region. That</span> |
| <span class="source-line-no">1133</span><span id="line-1133"> * is why the following events are carefully ordered for a compaction: 1. Compaction writes new</span> |
| <span class="source-line-no">1134</span><span id="line-1134"> * files under region/.tmp directory (compaction output) 2. Compaction atomically moves the</span> |
| <span class="source-line-no">1135</span><span id="line-1135"> * temporary file under region directory 3. Compaction appends a WAL edit containing the</span> |
| <span class="source-line-no">1136</span><span id="line-1136"> * compaction input and output files. Forces sync on WAL. 4. Compaction deletes the input files</span> |
| <span class="source-line-no">1137</span><span id="line-1137"> * from the region directory. Failure conditions are handled like this: - If RS fails before 2,</span> |
| <span class="source-line-no">1138</span><span id="line-1138"> * compaction wont complete. Even if RS lives on and finishes the compaction later, it will only</span> |
| <span class="source-line-no">1139</span><span id="line-1139"> * write the new data file to the region directory. Since we already have this data, this will be</span> |
| <span class="source-line-no">1140</span><span id="line-1140"> * idempotent but we will have a redundant copy of the data. - If RS fails between 2 and 3, the</span> |
| <span class="source-line-no">1141</span><span id="line-1141"> * region will have a redundant copy of the data. The RS that failed won't be able to finish</span> |
| <span class="source-line-no">1142</span><span id="line-1142"> * sync() for WAL because of lease recovery in WAL. - If RS fails after 3, the region region</span> |
| <span class="source-line-no">1143</span><span id="line-1143"> * server who opens the region will pick up the the compaction marker from the WAL and replay it</span> |
| <span class="source-line-no">1144</span><span id="line-1144"> * by removing the compaction input files. Failed RS can also attempt to delete those files, but</span> |
| <span class="source-line-no">1145</span><span id="line-1145"> * the operation will be idempotent See HBASE-2231 for details.</span> |
| <span class="source-line-no">1146</span><span id="line-1146"> * @param compaction compaction details obtained from requestCompaction()</span> |
| <span class="source-line-no">1147</span><span id="line-1147"> * @return Storefile we compacted into or null if we failed or opted out early.</span> |
| <span class="source-line-no">1148</span><span id="line-1148"> */</span> |
| <span class="source-line-no">1149</span><span id="line-1149"> public List<HStoreFile> compact(CompactionContext compaction,</span> |
| <span class="source-line-no">1150</span><span id="line-1150"> ThroughputController throughputController, User user) throws IOException {</span> |
| <span class="source-line-no">1151</span><span id="line-1151"> assert compaction != null;</span> |
| <span class="source-line-no">1152</span><span id="line-1152"> CompactionRequestImpl cr = compaction.getRequest();</span> |
| <span class="source-line-no">1153</span><span id="line-1153"> StoreFileWriterCreationTracker writerCreationTracker =</span> |
| <span class="source-line-no">1154</span><span id="line-1154"> storeFileWriterCreationTrackerFactory.get();</span> |
| <span class="source-line-no">1155</span><span id="line-1155"> if (writerCreationTracker != null) {</span> |
| <span class="source-line-no">1156</span><span id="line-1156"> cr.setWriterCreationTracker(writerCreationTracker);</span> |
| <span class="source-line-no">1157</span><span id="line-1157"> storeFileWriterCreationTrackers.add(writerCreationTracker);</span> |
| <span class="source-line-no">1158</span><span id="line-1158"> }</span> |
| <span class="source-line-no">1159</span><span id="line-1159"> try {</span> |
| <span class="source-line-no">1160</span><span id="line-1160"> // Do all sanity checking in here if we have a valid CompactionRequestImpl</span> |
| <span class="source-line-no">1161</span><span id="line-1161"> // because we need to clean up after it on the way out in a finally</span> |
| <span class="source-line-no">1162</span><span id="line-1162"> // block below</span> |
| <span class="source-line-no">1163</span><span id="line-1163"> long compactionStartTime = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">1164</span><span id="line-1164"> assert compaction.hasSelection();</span> |
| <span class="source-line-no">1165</span><span id="line-1165"> Collection<HStoreFile> filesToCompact = cr.getFiles();</span> |
| <span class="source-line-no">1166</span><span id="line-1166"> assert !filesToCompact.isEmpty();</span> |
| <span class="source-line-no">1167</span><span id="line-1167"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">1168</span><span id="line-1168"> // sanity check: we're compacting files that this store knows about</span> |
| <span class="source-line-no">1169</span><span id="line-1169"> // TODO: change this to LOG.error() after more debugging</span> |
| <span class="source-line-no">1170</span><span id="line-1170"> Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));</span> |
| <span class="source-line-no">1171</span><span id="line-1171"> }</span> |
| <span class="source-line-no">1172</span><span id="line-1172"></span> |
| <span class="source-line-no">1173</span><span id="line-1173"> // Ready to go. Have list of files to compact.</span> |
| <span class="source-line-no">1174</span><span id="line-1174"> LOG.info("Starting compaction of " + filesToCompact + " into tmpdir="</span> |
| <span class="source-line-no">1175</span><span id="line-1175"> + getRegionFileSystem().getTempDir() + ", totalSize="</span> |
| <span class="source-line-no">1176</span><span id="line-1176"> + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));</span> |
| <span class="source-line-no">1177</span><span id="line-1177"></span> |
| <span class="source-line-no">1178</span><span id="line-1178"> return doCompaction(cr, filesToCompact, user, compactionStartTime,</span> |
| <span class="source-line-no">1179</span><span id="line-1179"> compaction.compact(throughputController, user));</span> |
| <span class="source-line-no">1180</span><span id="line-1180"> } finally {</span> |
| <span class="source-line-no">1181</span><span id="line-1181"> finishCompactionRequest(cr);</span> |
| <span class="source-line-no">1182</span><span id="line-1182"> }</span> |
| <span class="source-line-no">1183</span><span id="line-1183"> }</span> |
| <span class="source-line-no">1184</span><span id="line-1184"></span> |
| <span class="source-line-no">1185</span><span id="line-1185"> protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,</span> |
| <span class="source-line-no">1186</span><span id="line-1186"> Collection<HStoreFile> filesToCompact, User user, long compactionStartTime, List<Path> newFiles)</span> |
| <span class="source-line-no">1187</span><span id="line-1187"> throws IOException {</span> |
| <span class="source-line-no">1188</span><span id="line-1188"> // Do the steps necessary to complete the compaction.</span> |
| <span class="source-line-no">1189</span><span id="line-1189"> setStoragePolicyFromFileName(newFiles);</span> |
| <span class="source-line-no">1190</span><span id="line-1190"> List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);</span> |
| <span class="source-line-no">1191</span><span id="line-1191"> if (this.getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">1192</span><span id="line-1192"> for (HStoreFile sf : sfs) {</span> |
| <span class="source-line-no">1193</span><span id="line-1193"> getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);</span> |
| <span class="source-line-no">1194</span><span id="line-1194"> }</span> |
| <span class="source-line-no">1195</span><span id="line-1195"> }</span> |
| <span class="source-line-no">1196</span><span id="line-1196"> replaceStoreFiles(filesToCompact, sfs, true);</span> |
| <span class="source-line-no">1197</span><span id="line-1197"></span> |
| <span class="source-line-no">1198</span><span id="line-1198"> long outputBytes = getTotalSize(sfs);</span> |
| <span class="source-line-no">1199</span><span id="line-1199"></span> |
| <span class="source-line-no">1200</span><span id="line-1200"> // At this point the store will use new files for all new scanners.</span> |
| <span class="source-line-no">1201</span><span id="line-1201"> refreshStoreSizeAndTotalBytes(); // update store size.</span> |
| <span class="source-line-no">1202</span><span id="line-1202"></span> |
| <span class="source-line-no">1203</span><span id="line-1203"> long now = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">1204</span><span id="line-1204"> if (</span> |
| <span class="source-line-no">1205</span><span id="line-1205"> region.getRegionServerServices() != null</span> |
| <span class="source-line-no">1206</span><span id="line-1206"> && region.getRegionServerServices().getMetrics() != null</span> |
| <span class="source-line-no">1207</span><span id="line-1207"> ) {</span> |
| <span class="source-line-no">1208</span><span id="line-1208"> region.getRegionServerServices().getMetrics().updateCompaction(</span> |
| <span class="source-line-no">1209</span><span id="line-1209"> region.getTableDescriptor().getTableName().getNameAsString(), cr.isMajor(),</span> |
| <span class="source-line-no">1210</span><span id="line-1210"> now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),</span> |
| <span class="source-line-no">1211</span><span id="line-1211"> outputBytes);</span> |
| <span class="source-line-no">1212</span><span id="line-1212"></span> |
| <span class="source-line-no">1213</span><span id="line-1213"> }</span> |
| <span class="source-line-no">1214</span><span id="line-1214"></span> |
| <span class="source-line-no">1215</span><span id="line-1215"> logCompactionEndMessage(cr, sfs, now, compactionStartTime);</span> |
| <span class="source-line-no">1216</span><span id="line-1216"> return sfs;</span> |
| <span class="source-line-no">1217</span><span id="line-1217"> }</span> |
| <span class="source-line-no">1218</span><span id="line-1218"></span> |
| <span class="source-line-no">1219</span><span id="line-1219"> // Set correct storage policy from the file name of DTCP.</span> |
| <span class="source-line-no">1220</span><span id="line-1220"> // Rename file will not change the storage policy.</span> |
| <span class="source-line-no">1221</span><span id="line-1221"> private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {</span> |
| <span class="source-line-no">1222</span><span id="line-1222"> String prefix = HConstants.STORAGE_POLICY_PREFIX;</span> |
| <span class="source-line-no">1223</span><span id="line-1223"> for (Path newFile : newFiles) {</span> |
| <span class="source-line-no">1224</span><span id="line-1224"> if (newFile.getParent().getName().startsWith(prefix)) {</span> |
| <span class="source-line-no">1225</span><span id="line-1225"> CommonFSUtils.setStoragePolicy(getRegionFileSystem().getFileSystem(), newFile,</span> |
| <span class="source-line-no">1226</span><span id="line-1226"> newFile.getParent().getName().substring(prefix.length()));</span> |
| <span class="source-line-no">1227</span><span id="line-1227"> }</span> |
| <span class="source-line-no">1228</span><span id="line-1228"> }</span> |
| <span class="source-line-no">1229</span><span id="line-1229"> }</span> |
| <span class="source-line-no">1230</span><span id="line-1230"></span> |
| <span class="source-line-no">1231</span><span id="line-1231"> /**</span> |
| <span class="source-line-no">1232</span><span id="line-1232"> * Writes the compaction WAL record.</span> |
| <span class="source-line-no">1233</span><span id="line-1233"> * @param filesCompacted Files compacted (input).</span> |
| <span class="source-line-no">1234</span><span id="line-1234"> * @param newFiles Files from compaction.</span> |
| <span class="source-line-no">1235</span><span id="line-1235"> */</span> |
| <span class="source-line-no">1236</span><span id="line-1236"> private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,</span> |
| <span class="source-line-no">1237</span><span id="line-1237"> Collection<HStoreFile> newFiles) throws IOException {</span> |
| <span class="source-line-no">1238</span><span id="line-1238"> if (region.getWAL() == null) {</span> |
| <span class="source-line-no">1239</span><span id="line-1239"> return;</span> |
| <span class="source-line-no">1240</span><span id="line-1240"> }</span> |
| <span class="source-line-no">1241</span><span id="line-1241"> List<Path> inputPaths =</span> |
| <span class="source-line-no">1242</span><span id="line-1242"> filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList());</span> |
| <span class="source-line-no">1243</span><span id="line-1243"> List<Path> outputPaths =</span> |
| <span class="source-line-no">1244</span><span id="line-1244"> newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList());</span> |
| <span class="source-line-no">1245</span><span id="line-1245"> RegionInfo info = this.region.getRegionInfo();</span> |
| <span class="source-line-no">1246</span><span id="line-1246"> CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,</span> |
| <span class="source-line-no">1247</span><span id="line-1247"> getColumnFamilyDescriptor().getName(), inputPaths, outputPaths,</span> |
| <span class="source-line-no">1248</span><span id="line-1248"> getRegionFileSystem().getStoreDir(getColumnFamilyDescriptor().getNameAsString()));</span> |
| <span class="source-line-no">1249</span><span id="line-1249"> // Fix reaching into Region to get the maxWaitForSeqId.</span> |
| <span class="source-line-no">1250</span><span id="line-1250"> // Does this method belong in Region altogether given it is making so many references up there?</span> |
| <span class="source-line-no">1251</span><span id="line-1251"> // Could be Region#writeCompactionMarker(compactionDescriptor);</span> |
| <span class="source-line-no">1252</span><span id="line-1252"> WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),</span> |
| <span class="source-line-no">1253</span><span id="line-1253"> this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),</span> |
| <span class="source-line-no">1254</span><span id="line-1254"> region.getRegionReplicationSink().orElse(null));</span> |
| <span class="source-line-no">1255</span><span id="line-1255"> }</span> |
| <span class="source-line-no">1256</span><span id="line-1256"></span> |
| <span class="source-line-no">1257</span><span id="line-1257"> @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",</span> |
| <span class="source-line-no">1258</span><span id="line-1258"> allowedOnPath = ".*/(HStore|TestHStore).java")</span> |
| <span class="source-line-no">1259</span><span id="line-1259"> void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,</span> |
| <span class="source-line-no">1260</span><span id="line-1260"> boolean writeCompactionMarker) throws IOException {</span> |
| <span class="source-line-no">1261</span><span id="line-1261"> storeEngine.replaceStoreFiles(compactedFiles, result, () -> {</span> |
| <span class="source-line-no">1262</span><span id="line-1262"> if (writeCompactionMarker) {</span> |
| <span class="source-line-no">1263</span><span id="line-1263"> writeCompactionWalRecord(compactedFiles, result);</span> |
| <span class="source-line-no">1264</span><span id="line-1264"> }</span> |
| <span class="source-line-no">1265</span><span id="line-1265"> }, () -> {</span> |
| <span class="source-line-no">1266</span><span id="line-1266"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">1267</span><span id="line-1267"> filesCompacting.removeAll(compactedFiles);</span> |
| <span class="source-line-no">1268</span><span id="line-1268"> }</span> |
| <span class="source-line-no">1269</span><span id="line-1269"> });</span> |
| <span class="source-line-no">1270</span><span id="line-1270"> // These may be null when the RS is shutting down. The space quota Chores will fix the Region</span> |
| <span class="source-line-no">1271</span><span id="line-1271"> // sizes later so it's not super-critical if we miss these.</span> |
| <span class="source-line-no">1272</span><span id="line-1272"> RegionServerServices rsServices = region.getRegionServerServices();</span> |
| <span class="source-line-no">1273</span><span id="line-1273"> if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) {</span> |
| <span class="source-line-no">1274</span><span id="line-1274"> updateSpaceQuotaAfterFileReplacement(</span> |
| <span class="source-line-no">1275</span><span id="line-1275"> rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(),</span> |
| <span class="source-line-no">1276</span><span id="line-1276"> compactedFiles, result);</span> |
| <span class="source-line-no">1277</span><span id="line-1277"> }</span> |
| <span class="source-line-no">1278</span><span id="line-1278"> }</span> |
| <span class="source-line-no">1279</span><span id="line-1279"></span> |
| <span class="source-line-no">1280</span><span id="line-1280"> /**</span> |
| <span class="source-line-no">1281</span><span id="line-1281"> * Updates the space quota usage for this region, removing the size for files compacted away and</span> |
| <span class="source-line-no">1282</span><span id="line-1282"> * adding in the size for new files.</span> |
| <span class="source-line-no">1283</span><span id="line-1283"> * @param sizeStore The object tracking changes in region size for space quotas.</span> |
| <span class="source-line-no">1284</span><span id="line-1284"> * @param regionInfo The identifier for the region whose size is being updated.</span> |
| <span class="source-line-no">1285</span><span id="line-1285"> * @param oldFiles Files removed from this store's region.</span> |
| <span class="source-line-no">1286</span><span id="line-1286"> * @param newFiles Files added to this store's region.</span> |
| <span class="source-line-no">1287</span><span id="line-1287"> */</span> |
| <span class="source-line-no">1288</span><span id="line-1288"> void updateSpaceQuotaAfterFileReplacement(RegionSizeStore sizeStore, RegionInfo regionInfo,</span> |
| <span class="source-line-no">1289</span><span id="line-1289"> Collection<HStoreFile> oldFiles, Collection<HStoreFile> newFiles) {</span> |
| <span class="source-line-no">1290</span><span id="line-1290"> long delta = 0;</span> |
| <span class="source-line-no">1291</span><span id="line-1291"> if (oldFiles != null) {</span> |
| <span class="source-line-no">1292</span><span id="line-1292"> for (HStoreFile compactedFile : oldFiles) {</span> |
| <span class="source-line-no">1293</span><span id="line-1293"> if (compactedFile.isHFile()) {</span> |
| <span class="source-line-no">1294</span><span id="line-1294"> delta -= compactedFile.getReader().length();</span> |
| <span class="source-line-no">1295</span><span id="line-1295"> }</span> |
| <span class="source-line-no">1296</span><span id="line-1296"> }</span> |
| <span class="source-line-no">1297</span><span id="line-1297"> }</span> |
| <span class="source-line-no">1298</span><span id="line-1298"> if (newFiles != null) {</span> |
| <span class="source-line-no">1299</span><span id="line-1299"> for (HStoreFile newFile : newFiles) {</span> |
| <span class="source-line-no">1300</span><span id="line-1300"> if (newFile.isHFile()) {</span> |
| <span class="source-line-no">1301</span><span id="line-1301"> delta += newFile.getReader().length();</span> |
| <span class="source-line-no">1302</span><span id="line-1302"> }</span> |
| <span class="source-line-no">1303</span><span id="line-1303"> }</span> |
| <span class="source-line-no">1304</span><span id="line-1304"> }</span> |
| <span class="source-line-no">1305</span><span id="line-1305"> sizeStore.incrementRegionSize(regionInfo, delta);</span> |
| <span class="source-line-no">1306</span><span id="line-1306"> }</span> |
| <span class="source-line-no">1307</span><span id="line-1307"></span> |
| <span class="source-line-no">1308</span><span id="line-1308"> /**</span> |
| <span class="source-line-no">1309</span><span id="line-1309"> * Log a very elaborate compaction completion message.</span> |
| <span class="source-line-no">1310</span><span id="line-1310"> * @param cr Request.</span> |
| <span class="source-line-no">1311</span><span id="line-1311"> * @param sfs Resulting files.</span> |
| <span class="source-line-no">1312</span><span id="line-1312"> * @param compactionStartTime Start time.</span> |
| <span class="source-line-no">1313</span><span id="line-1313"> */</span> |
| <span class="source-line-no">1314</span><span id="line-1314"> private void logCompactionEndMessage(CompactionRequestImpl cr, List<HStoreFile> sfs, long now,</span> |
| <span class="source-line-no">1315</span><span id="line-1315"> long compactionStartTime) {</span> |
| <span class="source-line-no">1316</span><span id="line-1316"> StringBuilder message = new StringBuilder("Completed" + (cr.isMajor() ? " major" : "")</span> |
| <span class="source-line-no">1317</span><span id="line-1317"> + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "")</span> |
| <span class="source-line-no">1318</span><span id="line-1318"> + " file(s) in " + this + " of " + this.getRegionInfo().getShortNameToLog() + " into ");</span> |
| <span class="source-line-no">1319</span><span id="line-1319"> if (sfs.isEmpty()) {</span> |
| <span class="source-line-no">1320</span><span id="line-1320"> message.append("none, ");</span> |
| <span class="source-line-no">1321</span><span id="line-1321"> } else {</span> |
| <span class="source-line-no">1322</span><span id="line-1322"> for (HStoreFile sf : sfs) {</span> |
| <span class="source-line-no">1323</span><span id="line-1323"> message.append(sf.getPath().getName());</span> |
| <span class="source-line-no">1324</span><span id="line-1324"> message.append("(size=");</span> |
| <span class="source-line-no">1325</span><span id="line-1325"> message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));</span> |
| <span class="source-line-no">1326</span><span id="line-1326"> message.append("), ");</span> |
| <span class="source-line-no">1327</span><span id="line-1327"> }</span> |
| <span class="source-line-no">1328</span><span id="line-1328"> }</span> |
| <span class="source-line-no">1329</span><span id="line-1329"> message.append("total size for store is ")</span> |
| <span class="source-line-no">1330</span><span id="line-1330"> .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))</span> |
| <span class="source-line-no">1331</span><span id="line-1331"> .append(". This selection was in queue for ")</span> |
| <span class="source-line-no">1332</span><span id="line-1332"> .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))</span> |
| <span class="source-line-no">1333</span><span id="line-1333"> .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))</span> |
| <span class="source-line-no">1334</span><span id="line-1334"> .append(" to execute.");</span> |
| <span class="source-line-no">1335</span><span id="line-1335"> LOG.info(message.toString());</span> |
| <span class="source-line-no">1336</span><span id="line-1336"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">1337</span><span id="line-1337"> int fileCount = storeEngine.getStoreFileManager().getStorefileCount();</span> |
| <span class="source-line-no">1338</span><span id="line-1338"> long resultSize = getTotalSize(sfs);</span> |
| <span class="source-line-no">1339</span><span id="line-1339"> String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"</span> |
| <span class="source-line-no">1340</span><span id="line-1340"> + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","</span> |
| <span class="source-line-no">1341</span><span id="line-1341"> + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";</span> |
| <span class="source-line-no">1342</span><span id="line-1342"> LOG.trace(traceMessage);</span> |
| <span class="source-line-no">1343</span><span id="line-1343"> }</span> |
| <span class="source-line-no">1344</span><span id="line-1344"> }</span> |
| <span class="source-line-no">1345</span><span id="line-1345"></span> |
| <span class="source-line-no">1346</span><span id="line-1346"> /**</span> |
| <span class="source-line-no">1347</span><span id="line-1347"> * Call to complete a compaction. Its for the case where we find in the WAL a compaction that was</span> |
| <span class="source-line-no">1348</span><span id="line-1348"> * not finished. We could find one recovering a WAL after a regionserver crash. See HBASE-2231.</span> |
| <span class="source-line-no">1349</span><span id="line-1349"> */</span> |
| <span class="source-line-no">1350</span><span id="line-1350"> public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,</span> |
| <span class="source-line-no">1351</span><span id="line-1351"> boolean removeFiles) throws IOException {</span> |
| <span class="source-line-no">1352</span><span id="line-1352"> LOG.debug("Completing compaction from the WAL marker");</span> |
| <span class="source-line-no">1353</span><span id="line-1353"> List<String> compactionInputs = compaction.getCompactionInputList();</span> |
| <span class="source-line-no">1354</span><span id="line-1354"> List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());</span> |
| <span class="source-line-no">1355</span><span id="line-1355"></span> |
| <span class="source-line-no">1356</span><span id="line-1356"> // The Compaction Marker is written after the compaction is completed,</span> |
| <span class="source-line-no">1357</span><span id="line-1357"> // and the files moved into the region/family folder.</span> |
| <span class="source-line-no">1358</span><span id="line-1358"> //</span> |
| <span class="source-line-no">1359</span><span id="line-1359"> // If we crash after the entry is written, we may not have removed the</span> |
| <span class="source-line-no">1360</span><span id="line-1360"> // input files, but the output file is present.</span> |
| <span class="source-line-no">1361</span><span id="line-1361"> // (The unremoved input files will be removed by this function)</span> |
| <span class="source-line-no">1362</span><span id="line-1362"> //</span> |
| <span class="source-line-no">1363</span><span id="line-1363"> // If we scan the directory and the file is not present, it can mean that:</span> |
| <span class="source-line-no">1364</span><span id="line-1364"> // - The file was manually removed by the user</span> |
| <span class="source-line-no">1365</span><span id="line-1365"> // - The file was removed as consequence of subsequent compaction</span> |
| <span class="source-line-no">1366</span><span id="line-1366"> // so, we can't do anything with the "compaction output list" because those</span> |
| <span class="source-line-no">1367</span><span id="line-1367"> // files have already been loaded when opening the region (by virtue of</span> |
| <span class="source-line-no">1368</span><span id="line-1368"> // being in the store's folder) or they may be missing due to a compaction.</span> |
| <span class="source-line-no">1369</span><span id="line-1369"></span> |
| <span class="source-line-no">1370</span><span id="line-1370"> String familyName = this.getColumnFamilyName();</span> |
| <span class="source-line-no">1371</span><span id="line-1371"> Set<String> inputFiles = new HashSet<>();</span> |
| <span class="source-line-no">1372</span><span id="line-1372"> for (String compactionInput : compactionInputs) {</span> |
| <span class="source-line-no">1373</span><span id="line-1373"> Path inputPath = getRegionFileSystem().getStoreFilePath(familyName, compactionInput);</span> |
| <span class="source-line-no">1374</span><span id="line-1374"> inputFiles.add(inputPath.getName());</span> |
| <span class="source-line-no">1375</span><span id="line-1375"> }</span> |
| <span class="source-line-no">1376</span><span id="line-1376"></span> |
| <span class="source-line-no">1377</span><span id="line-1377"> // some of the input files might already be deleted</span> |
| <span class="source-line-no">1378</span><span id="line-1378"> List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size());</span> |
| <span class="source-line-no">1379</span><span id="line-1379"> for (HStoreFile sf : this.getStorefiles()) {</span> |
| <span class="source-line-no">1380</span><span id="line-1380"> if (inputFiles.contains(sf.getPath().getName())) {</span> |
| <span class="source-line-no">1381</span><span id="line-1381"> inputStoreFiles.add(sf);</span> |
| <span class="source-line-no">1382</span><span id="line-1382"> }</span> |
| <span class="source-line-no">1383</span><span id="line-1383"> }</span> |
| <span class="source-line-no">1384</span><span id="line-1384"></span> |
| <span class="source-line-no">1385</span><span id="line-1385"> // check whether we need to pick up the new files</span> |
| <span class="source-line-no">1386</span><span id="line-1386"> List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size());</span> |
| <span class="source-line-no">1387</span><span id="line-1387"></span> |
| <span class="source-line-no">1388</span><span id="line-1388"> if (pickCompactionFiles) {</span> |
| <span class="source-line-no">1389</span><span id="line-1389"> for (HStoreFile sf : this.getStorefiles()) {</span> |
| <span class="source-line-no">1390</span><span id="line-1390"> compactionOutputs.remove(sf.getPath().getName());</span> |
| <span class="source-line-no">1391</span><span id="line-1391"> }</span> |
| <span class="source-line-no">1392</span><span id="line-1392"> for (String compactionOutput : compactionOutputs) {</span> |
| <span class="source-line-no">1393</span><span id="line-1393"> StoreFileInfo storeFileInfo =</span> |
| <span class="source-line-no">1394</span><span id="line-1394"> getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput);</span> |
| <span class="source-line-no">1395</span><span id="line-1395"> HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);</span> |
| <span class="source-line-no">1396</span><span id="line-1396"> outputStoreFiles.add(storeFile);</span> |
| <span class="source-line-no">1397</span><span id="line-1397"> }</span> |
| <span class="source-line-no">1398</span><span id="line-1398"> }</span> |
| <span class="source-line-no">1399</span><span id="line-1399"></span> |
| <span class="source-line-no">1400</span><span id="line-1400"> if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {</span> |
| <span class="source-line-no">1401</span><span id="line-1401"> LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles</span> |
| <span class="source-line-no">1402</span><span id="line-1402"> + " with output files : " + outputStoreFiles);</span> |
| <span class="source-line-no">1403</span><span id="line-1403"> this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);</span> |
| <span class="source-line-no">1404</span><span id="line-1404"> this.refreshStoreSizeAndTotalBytes();</span> |
| <span class="source-line-no">1405</span><span id="line-1405"> }</span> |
| <span class="source-line-no">1406</span><span id="line-1406"> }</span> |
| <span class="source-line-no">1407</span><span id="line-1407"></span> |
| <span class="source-line-no">1408</span><span id="line-1408"> @Override</span> |
| <span class="source-line-no">1409</span><span id="line-1409"> public boolean hasReferences() {</span> |
| <span class="source-line-no">1410</span><span id="line-1410"> // Grab the read lock here, because we need to ensure that: only when the atomic</span> |
| <span class="source-line-no">1411</span><span id="line-1411"> // replaceStoreFiles(..) finished, we can get all the complete store file list.</span> |
| <span class="source-line-no">1412</span><span id="line-1412"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1413</span><span id="line-1413"> try {</span> |
| <span class="source-line-no">1414</span><span id="line-1414"> // Merge the current store files with compacted files here due to HBASE-20940.</span> |
| <span class="source-line-no">1415</span><span id="line-1415"> Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());</span> |
| <span class="source-line-no">1416</span><span id="line-1416"> allStoreFiles.addAll(getCompactedFiles());</span> |
| <span class="source-line-no">1417</span><span id="line-1417"> return StoreUtils.hasReferences(allStoreFiles);</span> |
| <span class="source-line-no">1418</span><span id="line-1418"> } finally {</span> |
| <span class="source-line-no">1419</span><span id="line-1419"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1420</span><span id="line-1420"> }</span> |
| <span class="source-line-no">1421</span><span id="line-1421"> }</span> |
| <span class="source-line-no">1422</span><span id="line-1422"></span> |
| <span class="source-line-no">1423</span><span id="line-1423"> /**</span> |
| <span class="source-line-no">1424</span><span id="line-1424"> * getter for CompactionProgress object</span> |
| <span class="source-line-no">1425</span><span id="line-1425"> * @return CompactionProgress object; can be null</span> |
| <span class="source-line-no">1426</span><span id="line-1426"> */</span> |
| <span class="source-line-no">1427</span><span id="line-1427"> public CompactionProgress getCompactionProgress() {</span> |
| <span class="source-line-no">1428</span><span id="line-1428"> return this.storeEngine.getCompactor().getProgress();</span> |
| <span class="source-line-no">1429</span><span id="line-1429"> }</span> |
| <span class="source-line-no">1430</span><span id="line-1430"></span> |
| <span class="source-line-no">1431</span><span id="line-1431"> @Override</span> |
| <span class="source-line-no">1432</span><span id="line-1432"> public boolean shouldPerformMajorCompaction() throws IOException {</span> |
| <span class="source-line-no">1433</span><span id="line-1433"> for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) {</span> |
| <span class="source-line-no">1434</span><span id="line-1434"> // TODO: what are these reader checks all over the place?</span> |
| <span class="source-line-no">1435</span><span id="line-1435"> if (sf.getReader() == null) {</span> |
| <span class="source-line-no">1436</span><span id="line-1436"> LOG.debug("StoreFile {} has null Reader", sf);</span> |
| <span class="source-line-no">1437</span><span id="line-1437"> return false;</span> |
| <span class="source-line-no">1438</span><span id="line-1438"> }</span> |
| <span class="source-line-no">1439</span><span id="line-1439"> }</span> |
| <span class="source-line-no">1440</span><span id="line-1440"> return storeEngine.getCompactionPolicy()</span> |
| <span class="source-line-no">1441</span><span id="line-1441"> .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles());</span> |
| <span class="source-line-no">1442</span><span id="line-1442"> }</span> |
| <span class="source-line-no">1443</span><span id="line-1443"></span> |
| <span class="source-line-no">1444</span><span id="line-1444"> public Optional<CompactionContext> requestCompaction() throws IOException {</span> |
| <span class="source-line-no">1445</span><span id="line-1445"> return requestCompaction(NO_PRIORITY, CompactionLifeCycleTracker.DUMMY, null);</span> |
| <span class="source-line-no">1446</span><span id="line-1446"> }</span> |
| <span class="source-line-no">1447</span><span id="line-1447"></span> |
| <span class="source-line-no">1448</span><span id="line-1448"> public Optional<CompactionContext> requestCompaction(int priority,</span> |
| <span class="source-line-no">1449</span><span id="line-1449"> CompactionLifeCycleTracker tracker, User user) throws IOException {</span> |
| <span class="source-line-no">1450</span><span id="line-1450"> // don't even select for compaction if writes are disabled</span> |
| <span class="source-line-no">1451</span><span id="line-1451"> if (!this.areWritesEnabled()) {</span> |
| <span class="source-line-no">1452</span><span id="line-1452"> return Optional.empty();</span> |
| <span class="source-line-no">1453</span><span id="line-1453"> }</span> |
| <span class="source-line-no">1454</span><span id="line-1454"> // Before we do compaction, try to get rid of unneeded files to simplify things.</span> |
| <span class="source-line-no">1455</span><span id="line-1455"> removeUnneededFiles();</span> |
| <span class="source-line-no">1456</span><span id="line-1456"></span> |
| <span class="source-line-no">1457</span><span id="line-1457"> final CompactionContext compaction = storeEngine.createCompaction();</span> |
| <span class="source-line-no">1458</span><span id="line-1458"> CompactionRequestImpl request = null;</span> |
| <span class="source-line-no">1459</span><span id="line-1459"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1460</span><span id="line-1460"> try {</span> |
| <span class="source-line-no">1461</span><span id="line-1461"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">1462</span><span id="line-1462"> // First, see if coprocessor would want to override selection.</span> |
| <span class="source-line-no">1463</span><span id="line-1463"> if (this.getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">1464</span><span id="line-1464"> final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);</span> |
| <span class="source-line-no">1465</span><span id="line-1465"> boolean override =</span> |
| <span class="source-line-no">1466</span><span id="line-1466"> getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user);</span> |
| <span class="source-line-no">1467</span><span id="line-1467"> if (override) {</span> |
| <span class="source-line-no">1468</span><span id="line-1468"> // Coprocessor is overriding normal file selection.</span> |
| <span class="source-line-no">1469</span><span id="line-1469"> compaction.forceSelect(new CompactionRequestImpl(candidatesForCoproc));</span> |
| <span class="source-line-no">1470</span><span id="line-1470"> }</span> |
| <span class="source-line-no">1471</span><span id="line-1471"> }</span> |
| <span class="source-line-no">1472</span><span id="line-1472"></span> |
| <span class="source-line-no">1473</span><span id="line-1473"> // Normal case - coprocessor is not overriding file selection.</span> |
| <span class="source-line-no">1474</span><span id="line-1474"> if (!compaction.hasSelection()) {</span> |
| <span class="source-line-no">1475</span><span id="line-1475"> boolean isUserCompaction = priority == Store.PRIORITY_USER;</span> |
| <span class="source-line-no">1476</span><span id="line-1476"> boolean mayUseOffPeak =</span> |
| <span class="source-line-no">1477</span><span id="line-1477"> offPeakHours.isOffPeakHour() && offPeakCompactionTracker.compareAndSet(false, true);</span> |
| <span class="source-line-no">1478</span><span id="line-1478"> try {</span> |
| <span class="source-line-no">1479</span><span id="line-1479"> compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak,</span> |
| <span class="source-line-no">1480</span><span id="line-1480"> forceMajor && filesCompacting.isEmpty());</span> |
| <span class="source-line-no">1481</span><span id="line-1481"> } catch (IOException e) {</span> |
| <span class="source-line-no">1482</span><span id="line-1482"> if (mayUseOffPeak) {</span> |
| <span class="source-line-no">1483</span><span id="line-1483"> offPeakCompactionTracker.set(false);</span> |
| <span class="source-line-no">1484</span><span id="line-1484"> }</span> |
| <span class="source-line-no">1485</span><span id="line-1485"> throw e;</span> |
| <span class="source-line-no">1486</span><span id="line-1486"> }</span> |
| <span class="source-line-no">1487</span><span id="line-1487"> assert compaction.hasSelection();</span> |
| <span class="source-line-no">1488</span><span id="line-1488"> if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {</span> |
| <span class="source-line-no">1489</span><span id="line-1489"> // Compaction policy doesn't want to take advantage of off-peak.</span> |
| <span class="source-line-no">1490</span><span id="line-1490"> offPeakCompactionTracker.set(false);</span> |
| <span class="source-line-no">1491</span><span id="line-1491"> }</span> |
| <span class="source-line-no">1492</span><span id="line-1492"> }</span> |
| <span class="source-line-no">1493</span><span id="line-1493"> if (this.getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">1494</span><span id="line-1494"> this.getCoprocessorHost().postCompactSelection(this,</span> |
| <span class="source-line-no">1495</span><span id="line-1495"> ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker,</span> |
| <span class="source-line-no">1496</span><span id="line-1496"> compaction.getRequest(), user);</span> |
| <span class="source-line-no">1497</span><span id="line-1497"> }</span> |
| <span class="source-line-no">1498</span><span id="line-1498"> // Finally, we have the resulting files list. Check if we have any files at all.</span> |
| <span class="source-line-no">1499</span><span id="line-1499"> request = compaction.getRequest();</span> |
| <span class="source-line-no">1500</span><span id="line-1500"> Collection<HStoreFile> selectedFiles = request.getFiles();</span> |
| <span class="source-line-no">1501</span><span id="line-1501"> if (selectedFiles.isEmpty()) {</span> |
| <span class="source-line-no">1502</span><span id="line-1502"> return Optional.empty();</span> |
| <span class="source-line-no">1503</span><span id="line-1503"> }</span> |
| <span class="source-line-no">1504</span><span id="line-1504"></span> |
| <span class="source-line-no">1505</span><span id="line-1505"> addToCompactingFiles(selectedFiles);</span> |
| <span class="source-line-no">1506</span><span id="line-1506"></span> |
| <span class="source-line-no">1507</span><span id="line-1507"> // If we're enqueuing a major, clear the force flag.</span> |
| <span class="source-line-no">1508</span><span id="line-1508"> this.forceMajor = this.forceMajor && !request.isMajor();</span> |
| <span class="source-line-no">1509</span><span id="line-1509"></span> |
| <span class="source-line-no">1510</span><span id="line-1510"> // Set common request properties.</span> |
| <span class="source-line-no">1511</span><span id="line-1511"> // Set priority, either override value supplied by caller or from store.</span> |
| <span class="source-line-no">1512</span><span id="line-1512"> final int compactionPriority =</span> |
| <span class="source-line-no">1513</span><span id="line-1513"> (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();</span> |
| <span class="source-line-no">1514</span><span id="line-1514"> request.setPriority(compactionPriority);</span> |
| <span class="source-line-no">1515</span><span id="line-1515"></span> |
| <span class="source-line-no">1516</span><span id="line-1516"> if (request.isAfterSplit()) {</span> |
| <span class="source-line-no">1517</span><span id="line-1517"> // If the store belongs to recently splitted daughter regions, better we consider</span> |
| <span class="source-line-no">1518</span><span id="line-1518"> // them with the higher priority in the compaction queue.</span> |
| <span class="source-line-no">1519</span><span id="line-1519"> // Override priority if it is lower (higher int value) than</span> |
| <span class="source-line-no">1520</span><span id="line-1520"> // SPLIT_REGION_COMPACTION_PRIORITY</span> |
| <span class="source-line-no">1521</span><span id="line-1521"> final int splitHousekeepingPriority =</span> |
| <span class="source-line-no">1522</span><span id="line-1522"> Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);</span> |
| <span class="source-line-no">1523</span><span id="line-1523"> request.setPriority(splitHousekeepingPriority);</span> |
| <span class="source-line-no">1524</span><span id="line-1524"> LOG.info(</span> |
| <span class="source-line-no">1525</span><span id="line-1525"> "Keeping/Overriding Compaction request priority to {} for CF {} since it"</span> |
| <span class="source-line-no">1526</span><span id="line-1526"> + " belongs to recently split daughter region {}",</span> |
| <span class="source-line-no">1527</span><span id="line-1527"> splitHousekeepingPriority, this.getColumnFamilyName(),</span> |
| <span class="source-line-no">1528</span><span id="line-1528"> getRegionInfo().getRegionNameAsString());</span> |
| <span class="source-line-no">1529</span><span id="line-1529"> }</span> |
| <span class="source-line-no">1530</span><span id="line-1530"> request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());</span> |
| <span class="source-line-no">1531</span><span id="line-1531"> request.setTracker(tracker);</span> |
| <span class="source-line-no">1532</span><span id="line-1532"> }</span> |
| <span class="source-line-no">1533</span><span id="line-1533"> } finally {</span> |
| <span class="source-line-no">1534</span><span id="line-1534"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1535</span><span id="line-1535"> }</span> |
| <span class="source-line-no">1536</span><span id="line-1536"></span> |
| <span class="source-line-no">1537</span><span id="line-1537"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">1538</span><span id="line-1538"> LOG.debug(this + " is initiating " + (request.isMajor() ? "major" : "minor") + " compaction"</span> |
| <span class="source-line-no">1539</span><span id="line-1539"> + (request.isAllFiles() ? " (all files)" : ""));</span> |
| <span class="source-line-no">1540</span><span id="line-1540"> }</span> |
| <span class="source-line-no">1541</span><span id="line-1541"> this.region.reportCompactionRequestStart(request.isMajor());</span> |
| <span class="source-line-no">1542</span><span id="line-1542"> return Optional.of(compaction);</span> |
| <span class="source-line-no">1543</span><span id="line-1543"> }</span> |
| <span class="source-line-no">1544</span><span id="line-1544"></span> |
| <span class="source-line-no">1545</span><span id="line-1545"> /** Adds the files to compacting files. filesCompacting must be locked. */</span> |
| <span class="source-line-no">1546</span><span id="line-1546"> private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {</span> |
| <span class="source-line-no">1547</span><span id="line-1547"> if (CollectionUtils.isEmpty(filesToAdd)) {</span> |
| <span class="source-line-no">1548</span><span id="line-1548"> return;</span> |
| <span class="source-line-no">1549</span><span id="line-1549"> }</span> |
| <span class="source-line-no">1550</span><span id="line-1550"> // Check that we do not try to compact the same StoreFile twice.</span> |
| <span class="source-line-no">1551</span><span id="line-1551"> if (!Collections.disjoint(filesCompacting, filesToAdd)) {</span> |
| <span class="source-line-no">1552</span><span id="line-1552"> Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);</span> |
| <span class="source-line-no">1553</span><span id="line-1553"> }</span> |
| <span class="source-line-no">1554</span><span id="line-1554"> filesCompacting.addAll(filesToAdd);</span> |
| <span class="source-line-no">1555</span><span id="line-1555"> Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());</span> |
| <span class="source-line-no">1556</span><span id="line-1556"> }</span> |
| <span class="source-line-no">1557</span><span id="line-1557"></span> |
| <span class="source-line-no">1558</span><span id="line-1558"> private void removeUnneededFiles() throws IOException {</span> |
| <span class="source-line-no">1559</span><span id="line-1559"> if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {</span> |
| <span class="source-line-no">1560</span><span id="line-1560"> return;</span> |
| <span class="source-line-no">1561</span><span id="line-1561"> }</span> |
| <span class="source-line-no">1562</span><span id="line-1562"> if (getColumnFamilyDescriptor().getMinVersions() > 0) {</span> |
| <span class="source-line-no">1563</span><span id="line-1563"> LOG.debug("Skipping expired store file removal due to min version of {} being {}", this,</span> |
| <span class="source-line-no">1564</span><span id="line-1564"> getColumnFamilyDescriptor().getMinVersions());</span> |
| <span class="source-line-no">1565</span><span id="line-1565"> return;</span> |
| <span class="source-line-no">1566</span><span id="line-1566"> }</span> |
| <span class="source-line-no">1567</span><span id="line-1567"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1568</span><span id="line-1568"> Collection<HStoreFile> delSfs = null;</span> |
| <span class="source-line-no">1569</span><span id="line-1569"> try {</span> |
| <span class="source-line-no">1570</span><span id="line-1570"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">1571</span><span id="line-1571"> long cfTtl = getStoreFileTtl();</span> |
| <span class="source-line-no">1572</span><span id="line-1572"> if (cfTtl != Long.MAX_VALUE) {</span> |
| <span class="source-line-no">1573</span><span id="line-1573"> delSfs = storeEngine.getStoreFileManager()</span> |
| <span class="source-line-no">1574</span><span id="line-1574"> .getUnneededFiles(EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);</span> |
| <span class="source-line-no">1575</span><span id="line-1575"> addToCompactingFiles(delSfs);</span> |
| <span class="source-line-no">1576</span><span id="line-1576"> }</span> |
| <span class="source-line-no">1577</span><span id="line-1577"> }</span> |
| <span class="source-line-no">1578</span><span id="line-1578"> } finally {</span> |
| <span class="source-line-no">1579</span><span id="line-1579"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1580</span><span id="line-1580"> }</span> |
| <span class="source-line-no">1581</span><span id="line-1581"></span> |
| <span class="source-line-no">1582</span><span id="line-1582"> if (CollectionUtils.isEmpty(delSfs)) {</span> |
| <span class="source-line-no">1583</span><span id="line-1583"> return;</span> |
| <span class="source-line-no">1584</span><span id="line-1584"> }</span> |
| <span class="source-line-no">1585</span><span id="line-1585"></span> |
| <span class="source-line-no">1586</span><span id="line-1586"> Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.</span> |
| <span class="source-line-no">1587</span><span id="line-1587"> replaceStoreFiles(delSfs, newFiles, true);</span> |
| <span class="source-line-no">1588</span><span id="line-1588"> refreshStoreSizeAndTotalBytes();</span> |
| <span class="source-line-no">1589</span><span id="line-1589"> LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this</span> |
| <span class="source-line-no">1590</span><span id="line-1590"> + "; total size is " + TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));</span> |
| <span class="source-line-no">1591</span><span id="line-1591"> }</span> |
| <span class="source-line-no">1592</span><span id="line-1592"></span> |
| <span class="source-line-no">1593</span><span id="line-1593"> public void cancelRequestedCompaction(CompactionContext compaction) {</span> |
| <span class="source-line-no">1594</span><span id="line-1594"> finishCompactionRequest(compaction.getRequest());</span> |
| <span class="source-line-no">1595</span><span id="line-1595"> }</span> |
| <span class="source-line-no">1596</span><span id="line-1596"></span> |
| <span class="source-line-no">1597</span><span id="line-1597"> private void finishCompactionRequest(CompactionRequestImpl cr) {</span> |
| <span class="source-line-no">1598</span><span id="line-1598"> this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());</span> |
| <span class="source-line-no">1599</span><span id="line-1599"> if (cr.isOffPeak()) {</span> |
| <span class="source-line-no">1600</span><span id="line-1600"> offPeakCompactionTracker.set(false);</span> |
| <span class="source-line-no">1601</span><span id="line-1601"> cr.setOffPeak(false);</span> |
| <span class="source-line-no">1602</span><span id="line-1602"> }</span> |
| <span class="source-line-no">1603</span><span id="line-1603"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">1604</span><span id="line-1604"> filesCompacting.removeAll(cr.getFiles());</span> |
| <span class="source-line-no">1605</span><span id="line-1605"> }</span> |
| <span class="source-line-no">1606</span><span id="line-1606"> // The tracker could be null, for example, we do not need to track the creation of store file</span> |
| <span class="source-line-no">1607</span><span id="line-1607"> // writer due to different implementation of SFT, or the compaction is canceled.</span> |
| <span class="source-line-no">1608</span><span id="line-1608"> if (cr.getWriterCreationTracker() != null) {</span> |
| <span class="source-line-no">1609</span><span id="line-1609"> storeFileWriterCreationTrackers.remove(cr.getWriterCreationTracker());</span> |
| <span class="source-line-no">1610</span><span id="line-1610"> }</span> |
| <span class="source-line-no">1611</span><span id="line-1611"> }</span> |
| <span class="source-line-no">1612</span><span id="line-1612"></span> |
| <span class="source-line-no">1613</span><span id="line-1613"> /**</span> |
| <span class="source-line-no">1614</span><span id="line-1614"> * Update counts.</span> |
| <span class="source-line-no">1615</span><span id="line-1615"> */</span> |
| <span class="source-line-no">1616</span><span id="line-1616"> protected void refreshStoreSizeAndTotalBytes() throws IOException {</span> |
| <span class="source-line-no">1617</span><span id="line-1617"> this.storeSize.set(0L);</span> |
| <span class="source-line-no">1618</span><span id="line-1618"> this.totalUncompressedBytes.set(0L);</span> |
| <span class="source-line-no">1619</span><span id="line-1619"> for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) {</span> |
| <span class="source-line-no">1620</span><span id="line-1620"> StoreFileReader r = hsf.getReader();</span> |
| <span class="source-line-no">1621</span><span id="line-1621"> if (r == null) {</span> |
| <span class="source-line-no">1622</span><span id="line-1622"> LOG.debug("StoreFile {} has a null Reader", hsf);</span> |
| <span class="source-line-no">1623</span><span id="line-1623"> continue;</span> |
| <span class="source-line-no">1624</span><span id="line-1624"> }</span> |
| <span class="source-line-no">1625</span><span id="line-1625"> this.storeSize.addAndGet(r.length());</span> |
| <span class="source-line-no">1626</span><span id="line-1626"> this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());</span> |
| <span class="source-line-no">1627</span><span id="line-1627"> }</span> |
| <span class="source-line-no">1628</span><span id="line-1628"> }</span> |
| <span class="source-line-no">1629</span><span id="line-1629"></span> |
| <span class="source-line-no">1630</span><span id="line-1630"> /*</span> |
| <span class="source-line-no">1631</span><span id="line-1631"> * @param wantedVersions How many versions were asked for.</span> |
| <span class="source-line-no">1632</span><span id="line-1632"> * @return wantedVersions or this families' {@link HConstants#VERSIONS}.</span> |
| <span class="source-line-no">1633</span><span id="line-1633"> */</span> |
| <span class="source-line-no">1634</span><span id="line-1634"> int versionsToReturn(final int wantedVersions) {</span> |
| <span class="source-line-no">1635</span><span id="line-1635"> if (wantedVersions <= 0) {</span> |
| <span class="source-line-no">1636</span><span id="line-1636"> throw new IllegalArgumentException("Number of versions must be > 0");</span> |
| <span class="source-line-no">1637</span><span id="line-1637"> }</span> |
| <span class="source-line-no">1638</span><span id="line-1638"> // Make sure we do not return more than maximum versions for this store.</span> |
| <span class="source-line-no">1639</span><span id="line-1639"> int maxVersions = getColumnFamilyDescriptor().getMaxVersions();</span> |
| <span class="source-line-no">1640</span><span id="line-1640"> return wantedVersions > maxVersions ? maxVersions : wantedVersions;</span> |
| <span class="source-line-no">1641</span><span id="line-1641"> }</span> |
| <span class="source-line-no">1642</span><span id="line-1642"></span> |
| <span class="source-line-no">1643</span><span id="line-1643"> @Override</span> |
| <span class="source-line-no">1644</span><span id="line-1644"> public boolean canSplit() {</span> |
| <span class="source-line-no">1645</span><span id="line-1645"> // Not split-able if we find a reference store file present in the store.</span> |
| <span class="source-line-no">1646</span><span id="line-1646"> boolean result = !hasReferences();</span> |
| <span class="source-line-no">1647</span><span id="line-1647"> if (!result) {</span> |
| <span class="source-line-no">1648</span><span id="line-1648"> LOG.trace("Not splittable; has references: {}", this);</span> |
| <span class="source-line-no">1649</span><span id="line-1649"> }</span> |
| <span class="source-line-no">1650</span><span id="line-1650"> return result;</span> |
| <span class="source-line-no">1651</span><span id="line-1651"> }</span> |
| <span class="source-line-no">1652</span><span id="line-1652"></span> |
| <span class="source-line-no">1653</span><span id="line-1653"> /**</span> |
| <span class="source-line-no">1654</span><span id="line-1654"> * Determines if Store should be split.</span> |
| <span class="source-line-no">1655</span><span id="line-1655"> */</span> |
| <span class="source-line-no">1656</span><span id="line-1656"> public Optional<byte[]> getSplitPoint() {</span> |
| <span class="source-line-no">1657</span><span id="line-1657"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1658</span><span id="line-1658"> try {</span> |
| <span class="source-line-no">1659</span><span id="line-1659"> // Should already be enforced by the split policy!</span> |
| <span class="source-line-no">1660</span><span id="line-1660"> assert !this.getRegionInfo().isMetaRegion();</span> |
| <span class="source-line-no">1661</span><span id="line-1661"> // Not split-able if we find a reference store file present in the store.</span> |
| <span class="source-line-no">1662</span><span id="line-1662"> if (hasReferences()) {</span> |
| <span class="source-line-no">1663</span><span id="line-1663"> LOG.trace("Not splittable; has references: {}", this);</span> |
| <span class="source-line-no">1664</span><span id="line-1664"> return Optional.empty();</span> |
| <span class="source-line-no">1665</span><span id="line-1665"> }</span> |
| <span class="source-line-no">1666</span><span id="line-1666"> return this.storeEngine.getStoreFileManager().getSplitPoint();</span> |
| <span class="source-line-no">1667</span><span id="line-1667"> } catch (IOException e) {</span> |
| <span class="source-line-no">1668</span><span id="line-1668"> LOG.warn("Failed getting store size for {}", this, e);</span> |
| <span class="source-line-no">1669</span><span id="line-1669"> } finally {</span> |
| <span class="source-line-no">1670</span><span id="line-1670"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1671</span><span id="line-1671"> }</span> |
| <span class="source-line-no">1672</span><span id="line-1672"> return Optional.empty();</span> |
| <span class="source-line-no">1673</span><span id="line-1673"> }</span> |
| <span class="source-line-no">1674</span><span id="line-1674"></span> |
| <span class="source-line-no">1675</span><span id="line-1675"> @Override</span> |
| <span class="source-line-no">1676</span><span id="line-1676"> public long getLastCompactSize() {</span> |
| <span class="source-line-no">1677</span><span id="line-1677"> return this.lastCompactSize;</span> |
| <span class="source-line-no">1678</span><span id="line-1678"> }</span> |
| <span class="source-line-no">1679</span><span id="line-1679"></span> |
| <span class="source-line-no">1680</span><span id="line-1680"> @Override</span> |
| <span class="source-line-no">1681</span><span id="line-1681"> public long getSize() {</span> |
| <span class="source-line-no">1682</span><span id="line-1682"> return storeSize.get();</span> |
| <span class="source-line-no">1683</span><span id="line-1683"> }</span> |
| <span class="source-line-no">1684</span><span id="line-1684"></span> |
| <span class="source-line-no">1685</span><span id="line-1685"> public void triggerMajorCompaction() {</span> |
| <span class="source-line-no">1686</span><span id="line-1686"> this.forceMajor = true;</span> |
| <span class="source-line-no">1687</span><span id="line-1687"> }</span> |
| <span class="source-line-no">1688</span><span id="line-1688"></span> |
| <span class="source-line-no">1689</span><span id="line-1689"> //////////////////////////////////////////////////////////////////////////////</span> |
| <span class="source-line-no">1690</span><span id="line-1690"> // File administration</span> |
| <span class="source-line-no">1691</span><span id="line-1691"> //////////////////////////////////////////////////////////////////////////////</span> |
| <span class="source-line-no">1692</span><span id="line-1692"></span> |
| <span class="source-line-no">1693</span><span id="line-1693"> /**</span> |
| <span class="source-line-no">1694</span><span id="line-1694"> * Return a scanner for both the memstore and the HStore files. Assumes we are not in a</span> |
| <span class="source-line-no">1695</span><span id="line-1695"> * compaction.</span> |
| <span class="source-line-no">1696</span><span id="line-1696"> * @param scan Scan to apply when scanning the stores</span> |
| <span class="source-line-no">1697</span><span id="line-1697"> * @param targetCols columns to scan</span> |
| <span class="source-line-no">1698</span><span id="line-1698"> * @return a scanner over the current key values</span> |
| <span class="source-line-no">1699</span><span id="line-1699"> * @throws IOException on failure</span> |
| <span class="source-line-no">1700</span><span id="line-1700"> */</span> |
| <span class="source-line-no">1701</span><span id="line-1701"> public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)</span> |
| <span class="source-line-no">1702</span><span id="line-1702"> throws IOException {</span> |
| <span class="source-line-no">1703</span><span id="line-1703"> storeEngine.readLock();</span> |
| <span class="source-line-no">1704</span><span id="line-1704"> try {</span> |
| <span class="source-line-no">1705</span><span id="line-1705"> ScanInfo scanInfo;</span> |
| <span class="source-line-no">1706</span><span id="line-1706"> if (this.getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">1707</span><span id="line-1707"> scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);</span> |
| <span class="source-line-no">1708</span><span id="line-1708"> } else {</span> |
| <span class="source-line-no">1709</span><span id="line-1709"> scanInfo = getScanInfo();</span> |
| <span class="source-line-no">1710</span><span id="line-1710"> }</span> |
| <span class="source-line-no">1711</span><span id="line-1711"> return createScanner(scan, scanInfo, targetCols, readPt);</span> |
| <span class="source-line-no">1712</span><span id="line-1712"> } finally {</span> |
| <span class="source-line-no">1713</span><span id="line-1713"> storeEngine.readUnlock();</span> |
| <span class="source-line-no">1714</span><span id="line-1714"> }</span> |
| <span class="source-line-no">1715</span><span id="line-1715"> }</span> |
| <span class="source-line-no">1716</span><span id="line-1716"></span> |
| <span class="source-line-no">1717</span><span id="line-1717"> // HMobStore will override this method to return its own implementation.</span> |
| <span class="source-line-no">1718</span><span id="line-1718"> protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,</span> |
| <span class="source-line-no">1719</span><span id="line-1719"> NavigableSet<byte[]> targetCols, long readPt) throws IOException {</span> |
| <span class="source-line-no">1720</span><span id="line-1720"> return scan.isReversed()</span> |
| <span class="source-line-no">1721</span><span id="line-1721"> ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)</span> |
| <span class="source-line-no">1722</span><span id="line-1722"> : new StoreScanner(this, scanInfo, scan, targetCols, readPt);</span> |
| <span class="source-line-no">1723</span><span id="line-1723"> }</span> |
| <span class="source-line-no">1724</span><span id="line-1724"></span> |
| <span class="source-line-no">1725</span><span id="line-1725"> /**</span> |
| <span class="source-line-no">1726</span><span id="line-1726"> * Recreates the scanners on the current list of active store file scanners</span> |
| <span class="source-line-no">1727</span><span id="line-1727"> * @param currentFileScanners the current set of active store file scanners</span> |
| <span class="source-line-no">1728</span><span id="line-1728"> * @param cacheBlocks cache the blocks or not</span> |
| <span class="source-line-no">1729</span><span id="line-1729"> * @param usePread use pread or not</span> |
| <span class="source-line-no">1730</span><span id="line-1730"> * @param isCompaction is the scanner for compaction</span> |
| <span class="source-line-no">1731</span><span id="line-1731"> * @param matcher the scan query matcher</span> |
| <span class="source-line-no">1732</span><span id="line-1732"> * @param startRow the scan's start row</span> |
| <span class="source-line-no">1733</span><span id="line-1733"> * @param includeStartRow should the scan include the start row</span> |
| <span class="source-line-no">1734</span><span id="line-1734"> * @param stopRow the scan's stop row</span> |
| <span class="source-line-no">1735</span><span id="line-1735"> * @param includeStopRow should the scan include the stop row</span> |
| <span class="source-line-no">1736</span><span id="line-1736"> * @param readPt the read point of the current scane</span> |
| <span class="source-line-no">1737</span><span id="line-1737"> * @param includeMemstoreScanner whether the current scanner should include memstorescanner</span> |
| <span class="source-line-no">1738</span><span id="line-1738"> * @return list of scanners recreated on the current Scanners</span> |
| <span class="source-line-no">1739</span><span id="line-1739"> */</span> |
| <span class="source-line-no">1740</span><span id="line-1740"> public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,</span> |
| <span class="source-line-no">1741</span><span id="line-1741"> boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,</span> |
| <span class="source-line-no">1742</span><span id="line-1742"> byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,</span> |
| <span class="source-line-no">1743</span><span id="line-1743"> boolean includeMemstoreScanner) throws IOException {</span> |
| <span class="source-line-no">1744</span><span id="line-1744"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1745</span><span id="line-1745"> try {</span> |
| <span class="source-line-no">1746</span><span id="line-1746"> Map<String, HStoreFile> name2File =</span> |
| <span class="source-line-no">1747</span><span id="line-1747"> new HashMap<>(getStorefilesCount() + getCompactedFilesCount());</span> |
| <span class="source-line-no">1748</span><span id="line-1748"> for (HStoreFile file : getStorefiles()) {</span> |
| <span class="source-line-no">1749</span><span id="line-1749"> name2File.put(file.getFileInfo().getActiveFileName(), file);</span> |
| <span class="source-line-no">1750</span><span id="line-1750"> }</span> |
| <span class="source-line-no">1751</span><span id="line-1751"> Collection<HStoreFile> compactedFiles = getCompactedFiles();</span> |
| <span class="source-line-no">1752</span><span id="line-1752"> for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {</span> |
| <span class="source-line-no">1753</span><span id="line-1753"> name2File.put(file.getFileInfo().getActiveFileName(), file);</span> |
| <span class="source-line-no">1754</span><span id="line-1754"> }</span> |
| <span class="source-line-no">1755</span><span id="line-1755"> List<HStoreFile> filesToReopen = new ArrayList<>();</span> |
| <span class="source-line-no">1756</span><span id="line-1756"> for (KeyValueScanner kvs : currentFileScanners) {</span> |
| <span class="source-line-no">1757</span><span id="line-1757"> assert kvs.isFileScanner();</span> |
| <span class="source-line-no">1758</span><span id="line-1758"> if (kvs.peek() == null) {</span> |
| <span class="source-line-no">1759</span><span id="line-1759"> continue;</span> |
| <span class="source-line-no">1760</span><span id="line-1760"> }</span> |
| <span class="source-line-no">1761</span><span id="line-1761"> filesToReopen.add(name2File.get(kvs.getFilePath().getName()));</span> |
| <span class="source-line-no">1762</span><span id="line-1762"> }</span> |
| <span class="source-line-no">1763</span><span id="line-1763"> if (filesToReopen.isEmpty()) {</span> |
| <span class="source-line-no">1764</span><span id="line-1764"> return null;</span> |
| <span class="source-line-no">1765</span><span id="line-1765"> }</span> |
| <span class="source-line-no">1766</span><span id="line-1766"> return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,</span> |
| <span class="source-line-no">1767</span><span id="line-1767"> includeStartRow, stopRow, includeStopRow, readPt, false, false);</span> |
| <span class="source-line-no">1768</span><span id="line-1768"> } finally {</span> |
| <span class="source-line-no">1769</span><span id="line-1769"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1770</span><span id="line-1770"> }</span> |
| <span class="source-line-no">1771</span><span id="line-1771"> }</span> |
| <span class="source-line-no">1772</span><span id="line-1772"></span> |
| <span class="source-line-no">1773</span><span id="line-1773"> @Override</span> |
| <span class="source-line-no">1774</span><span id="line-1774"> public String toString() {</span> |
| <span class="source-line-no">1775</span><span id="line-1775"> return this.getRegionInfo().getShortNameToLog() + "/" + this.getColumnFamilyName();</span> |
| <span class="source-line-no">1776</span><span id="line-1776"> }</span> |
| <span class="source-line-no">1777</span><span id="line-1777"></span> |
| <span class="source-line-no">1778</span><span id="line-1778"> @Override</span> |
| <span class="source-line-no">1779</span><span id="line-1779"> public int getStorefilesCount() {</span> |
| <span class="source-line-no">1780</span><span id="line-1780"> return this.storeEngine.getStoreFileManager().getStorefileCount();</span> |
| <span class="source-line-no">1781</span><span id="line-1781"> }</span> |
| <span class="source-line-no">1782</span><span id="line-1782"></span> |
| <span class="source-line-no">1783</span><span id="line-1783"> @Override</span> |
| <span class="source-line-no">1784</span><span id="line-1784"> public int getCompactedFilesCount() {</span> |
| <span class="source-line-no">1785</span><span id="line-1785"> return this.storeEngine.getStoreFileManager().getCompactedFilesCount();</span> |
| <span class="source-line-no">1786</span><span id="line-1786"> }</span> |
| <span class="source-line-no">1787</span><span id="line-1787"></span> |
| <span class="source-line-no">1788</span><span id="line-1788"> private LongStream getStoreFileAgeStream() {</span> |
| <span class="source-line-no">1789</span><span id="line-1789"> return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> {</span> |
| <span class="source-line-no">1790</span><span id="line-1790"> if (sf.getReader() == null) {</span> |
| <span class="source-line-no">1791</span><span id="line-1791"> LOG.debug("StoreFile {} has a null Reader", sf);</span> |
| <span class="source-line-no">1792</span><span id="line-1792"> return false;</span> |
| <span class="source-line-no">1793</span><span id="line-1793"> } else {</span> |
| <span class="source-line-no">1794</span><span id="line-1794"> return true;</span> |
| <span class="source-line-no">1795</span><span id="line-1795"> }</span> |
| <span class="source-line-no">1796</span><span id="line-1796"> }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp())</span> |
| <span class="source-line-no">1797</span><span id="line-1797"> .map(t -> EnvironmentEdgeManager.currentTime() - t);</span> |
| <span class="source-line-no">1798</span><span id="line-1798"> }</span> |
| <span class="source-line-no">1799</span><span id="line-1799"></span> |
| <span class="source-line-no">1800</span><span id="line-1800"> @Override</span> |
| <span class="source-line-no">1801</span><span id="line-1801"> public OptionalLong getMaxStoreFileAge() {</span> |
| <span class="source-line-no">1802</span><span id="line-1802"> return getStoreFileAgeStream().max();</span> |
| <span class="source-line-no">1803</span><span id="line-1803"> }</span> |
| <span class="source-line-no">1804</span><span id="line-1804"></span> |
| <span class="source-line-no">1805</span><span id="line-1805"> @Override</span> |
| <span class="source-line-no">1806</span><span id="line-1806"> public OptionalLong getMinStoreFileAge() {</span> |
| <span class="source-line-no">1807</span><span id="line-1807"> return getStoreFileAgeStream().min();</span> |
| <span class="source-line-no">1808</span><span id="line-1808"> }</span> |
| <span class="source-line-no">1809</span><span id="line-1809"></span> |
| <span class="source-line-no">1810</span><span id="line-1810"> @Override</span> |
| <span class="source-line-no">1811</span><span id="line-1811"> public OptionalDouble getAvgStoreFileAge() {</span> |
| <span class="source-line-no">1812</span><span id="line-1812"> return getStoreFileAgeStream().average();</span> |
| <span class="source-line-no">1813</span><span id="line-1813"> }</span> |
| <span class="source-line-no">1814</span><span id="line-1814"></span> |
| <span class="source-line-no">1815</span><span id="line-1815"> @Override</span> |
| <span class="source-line-no">1816</span><span id="line-1816"> public long getNumReferenceFiles() {</span> |
| <span class="source-line-no">1817</span><span id="line-1817"> return this.storeEngine.getStoreFileManager().getStoreFiles().stream()</span> |
| <span class="source-line-no">1818</span><span id="line-1818"> .filter(HStoreFile::isReference).count();</span> |
| <span class="source-line-no">1819</span><span id="line-1819"> }</span> |
| <span class="source-line-no">1820</span><span id="line-1820"></span> |
| <span class="source-line-no">1821</span><span id="line-1821"> @Override</span> |
| <span class="source-line-no">1822</span><span id="line-1822"> public long getNumHFiles() {</span> |
| <span class="source-line-no">1823</span><span id="line-1823"> return this.storeEngine.getStoreFileManager().getStoreFiles().stream()</span> |
| <span class="source-line-no">1824</span><span id="line-1824"> .filter(HStoreFile::isHFile).count();</span> |
| <span class="source-line-no">1825</span><span id="line-1825"> }</span> |
| <span class="source-line-no">1826</span><span id="line-1826"></span> |
| <span class="source-line-no">1827</span><span id="line-1827"> @Override</span> |
| <span class="source-line-no">1828</span><span id="line-1828"> public long getStoreSizeUncompressed() {</span> |
| <span class="source-line-no">1829</span><span id="line-1829"> return this.totalUncompressedBytes.get();</span> |
| <span class="source-line-no">1830</span><span id="line-1830"> }</span> |
| <span class="source-line-no">1831</span><span id="line-1831"></span> |
| <span class="source-line-no">1832</span><span id="line-1832"> @Override</span> |
| <span class="source-line-no">1833</span><span id="line-1833"> public long getStorefilesSize() {</span> |
| <span class="source-line-no">1834</span><span id="line-1834"> // Include all StoreFiles</span> |
| <span class="source-line-no">1835</span><span id="line-1835"> return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),</span> |
| <span class="source-line-no">1836</span><span id="line-1836"> sf -> true);</span> |
| <span class="source-line-no">1837</span><span id="line-1837"> }</span> |
| <span class="source-line-no">1838</span><span id="line-1838"></span> |
| <span class="source-line-no">1839</span><span id="line-1839"> @Override</span> |
| <span class="source-line-no">1840</span><span id="line-1840"> public long getHFilesSize() {</span> |
| <span class="source-line-no">1841</span><span id="line-1841"> // Include only StoreFiles which are HFiles</span> |
| <span class="source-line-no">1842</span><span id="line-1842"> return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(),</span> |
| <span class="source-line-no">1843</span><span id="line-1843"> HStoreFile::isHFile);</span> |
| <span class="source-line-no">1844</span><span id="line-1844"> }</span> |
| <span class="source-line-no">1845</span><span id="line-1845"></span> |
| <span class="source-line-no">1846</span><span id="line-1846"> private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {</span> |
| <span class="source-line-no">1847</span><span id="line-1847"> return this.storeEngine.getStoreFileManager().getStoreFiles().stream()</span> |
| <span class="source-line-no">1848</span><span id="line-1848"> .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();</span> |
| <span class="source-line-no">1849</span><span id="line-1849"> }</span> |
| <span class="source-line-no">1850</span><span id="line-1850"></span> |
| <span class="source-line-no">1851</span><span id="line-1851"> @Override</span> |
| <span class="source-line-no">1852</span><span id="line-1852"> public long getStorefilesRootLevelIndexSize() {</span> |
| <span class="source-line-no">1853</span><span id="line-1853"> return getStorefilesFieldSize(StoreFileReader::indexSize);</span> |
| <span class="source-line-no">1854</span><span id="line-1854"> }</span> |
| <span class="source-line-no">1855</span><span id="line-1855"></span> |
| <span class="source-line-no">1856</span><span id="line-1856"> @Override</span> |
| <span class="source-line-no">1857</span><span id="line-1857"> public long getTotalStaticIndexSize() {</span> |
| <span class="source-line-no">1858</span><span id="line-1858"> return getStorefilesFieldSize(StoreFileReader::getUncompressedDataIndexSize);</span> |
| <span class="source-line-no">1859</span><span id="line-1859"> }</span> |
| <span class="source-line-no">1860</span><span id="line-1860"></span> |
| <span class="source-line-no">1861</span><span id="line-1861"> @Override</span> |
| <span class="source-line-no">1862</span><span id="line-1862"> public long getTotalStaticBloomSize() {</span> |
| <span class="source-line-no">1863</span><span id="line-1863"> return getStorefilesFieldSize(StoreFileReader::getTotalBloomSize);</span> |
| <span class="source-line-no">1864</span><span id="line-1864"> }</span> |
| <span class="source-line-no">1865</span><span id="line-1865"></span> |
| <span class="source-line-no">1866</span><span id="line-1866"> @Override</span> |
| <span class="source-line-no">1867</span><span id="line-1867"> public MemStoreSize getMemStoreSize() {</span> |
| <span class="source-line-no">1868</span><span id="line-1868"> return this.memstore.size();</span> |
| <span class="source-line-no">1869</span><span id="line-1869"> }</span> |
| <span class="source-line-no">1870</span><span id="line-1870"></span> |
| <span class="source-line-no">1871</span><span id="line-1871"> @Override</span> |
| <span class="source-line-no">1872</span><span id="line-1872"> public int getCompactPriority() {</span> |
| <span class="source-line-no">1873</span><span id="line-1873"> int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();</span> |
| <span class="source-line-no">1874</span><span id="line-1874"> if (priority == PRIORITY_USER) {</span> |
| <span class="source-line-no">1875</span><span id="line-1875"> LOG.warn("Compaction priority is USER despite there being no user compaction");</span> |
| <span class="source-line-no">1876</span><span id="line-1876"> }</span> |
| <span class="source-line-no">1877</span><span id="line-1877"> return priority;</span> |
| <span class="source-line-no">1878</span><span id="line-1878"> }</span> |
| <span class="source-line-no">1879</span><span id="line-1879"></span> |
| <span class="source-line-no">1880</span><span id="line-1880"> public boolean throttleCompaction(long compactionSize) {</span> |
| <span class="source-line-no">1881</span><span id="line-1881"> return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);</span> |
| <span class="source-line-no">1882</span><span id="line-1882"> }</span> |
| <span class="source-line-no">1883</span><span id="line-1883"></span> |
| <span class="source-line-no">1884</span><span id="line-1884"> public HRegion getHRegion() {</span> |
| <span class="source-line-no">1885</span><span id="line-1885"> return this.region;</span> |
| <span class="source-line-no">1886</span><span id="line-1886"> }</span> |
| <span class="source-line-no">1887</span><span id="line-1887"></span> |
| <span class="source-line-no">1888</span><span id="line-1888"> public RegionCoprocessorHost getCoprocessorHost() {</span> |
| <span class="source-line-no">1889</span><span id="line-1889"> return this.region.getCoprocessorHost();</span> |
| <span class="source-line-no">1890</span><span id="line-1890"> }</span> |
| <span class="source-line-no">1891</span><span id="line-1891"></span> |
| <span class="source-line-no">1892</span><span id="line-1892"> @Override</span> |
| <span class="source-line-no">1893</span><span id="line-1893"> public RegionInfo getRegionInfo() {</span> |
| <span class="source-line-no">1894</span><span id="line-1894"> return getRegionFileSystem().getRegionInfo();</span> |
| <span class="source-line-no">1895</span><span id="line-1895"> }</span> |
| <span class="source-line-no">1896</span><span id="line-1896"></span> |
| <span class="source-line-no">1897</span><span id="line-1897"> @Override</span> |
| <span class="source-line-no">1898</span><span id="line-1898"> public boolean areWritesEnabled() {</span> |
| <span class="source-line-no">1899</span><span id="line-1899"> return this.region.areWritesEnabled();</span> |
| <span class="source-line-no">1900</span><span id="line-1900"> }</span> |
| <span class="source-line-no">1901</span><span id="line-1901"></span> |
| <span class="source-line-no">1902</span><span id="line-1902"> @Override</span> |
| <span class="source-line-no">1903</span><span id="line-1903"> public long getSmallestReadPoint() {</span> |
| <span class="source-line-no">1904</span><span id="line-1904"> return this.region.getSmallestReadPoint();</span> |
| <span class="source-line-no">1905</span><span id="line-1905"> }</span> |
| <span class="source-line-no">1906</span><span id="line-1906"></span> |
| <span class="source-line-no">1907</span><span id="line-1907"> /**</span> |
| <span class="source-line-no">1908</span><span id="line-1908"> * Adds or replaces the specified KeyValues.</span> |
| <span class="source-line-no">1909</span><span id="line-1909"> * <p></span> |
| <span class="source-line-no">1910</span><span id="line-1910"> * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in</span> |
| <span class="source-line-no">1911</span><span id="line-1911"> * MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.</span> |
| <span class="source-line-no">1912</span><span id="line-1912"> * <p></span> |
| <span class="source-line-no">1913</span><span id="line-1913"> * This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic</span> |
| <span class="source-line-no">1914</span><span id="line-1914"> * across all of them.</span> |
| <span class="source-line-no">1915</span><span id="line-1915"> * @param readpoint readpoint below which we can safely remove duplicate KVs</span> |
| <span class="source-line-no">1916</span><span id="line-1916"> */</span> |
| <span class="source-line-no">1917</span><span id="line-1917"> public void upsert(Iterable<ExtendedCell> cells, long readpoint, MemStoreSizing memstoreSizing) {</span> |
| <span class="source-line-no">1918</span><span id="line-1918"> this.storeEngine.readLock();</span> |
| <span class="source-line-no">1919</span><span id="line-1919"> try {</span> |
| <span class="source-line-no">1920</span><span id="line-1920"> this.memstore.upsert(cells, readpoint, memstoreSizing);</span> |
| <span class="source-line-no">1921</span><span id="line-1921"> } finally {</span> |
| <span class="source-line-no">1922</span><span id="line-1922"> this.storeEngine.readUnlock();</span> |
| <span class="source-line-no">1923</span><span id="line-1923"> }</span> |
| <span class="source-line-no">1924</span><span id="line-1924"> }</span> |
| <span class="source-line-no">1925</span><span id="line-1925"></span> |
| <span class="source-line-no">1926</span><span id="line-1926"> public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {</span> |
| <span class="source-line-no">1927</span><span id="line-1927"> return new StoreFlusherImpl(cacheFlushId, tracker);</span> |
| <span class="source-line-no">1928</span><span id="line-1928"> }</span> |
| <span class="source-line-no">1929</span><span id="line-1929"></span> |
| <span class="source-line-no">1930</span><span id="line-1930"> private final class StoreFlusherImpl implements StoreFlushContext {</span> |
| <span class="source-line-no">1931</span><span id="line-1931"></span> |
| <span class="source-line-no">1932</span><span id="line-1932"> private final FlushLifeCycleTracker tracker;</span> |
| <span class="source-line-no">1933</span><span id="line-1933"> private final StoreFileWriterCreationTracker writerCreationTracker;</span> |
| <span class="source-line-no">1934</span><span id="line-1934"> private final long cacheFlushSeqNum;</span> |
| <span class="source-line-no">1935</span><span id="line-1935"> private MemStoreSnapshot snapshot;</span> |
| <span class="source-line-no">1936</span><span id="line-1936"> private List<Path> tempFiles;</span> |
| <span class="source-line-no">1937</span><span id="line-1937"> private List<Path> committedFiles;</span> |
| <span class="source-line-no">1938</span><span id="line-1938"> private long cacheFlushCount;</span> |
| <span class="source-line-no">1939</span><span id="line-1939"> private long cacheFlushSize;</span> |
| <span class="source-line-no">1940</span><span id="line-1940"> private long outputFileSize;</span> |
| <span class="source-line-no">1941</span><span id="line-1941"></span> |
| <span class="source-line-no">1942</span><span id="line-1942"> private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {</span> |
| <span class="source-line-no">1943</span><span id="line-1943"> this.cacheFlushSeqNum = cacheFlushSeqNum;</span> |
| <span class="source-line-no">1944</span><span id="line-1944"> this.tracker = tracker;</span> |
| <span class="source-line-no">1945</span><span id="line-1945"> this.writerCreationTracker = storeFileWriterCreationTrackerFactory.get();</span> |
| <span class="source-line-no">1946</span><span id="line-1946"> }</span> |
| <span class="source-line-no">1947</span><span id="line-1947"></span> |
| <span class="source-line-no">1948</span><span id="line-1948"> /**</span> |
| <span class="source-line-no">1949</span><span id="line-1949"> * This is not thread safe. The caller should have a lock on the region or the store. If</span> |
| <span class="source-line-no">1950</span><span id="line-1950"> * necessary, the lock can be added with the patch provided in HBASE-10087</span> |
| <span class="source-line-no">1951</span><span id="line-1951"> */</span> |
| <span class="source-line-no">1952</span><span id="line-1952"> @Override</span> |
| <span class="source-line-no">1953</span><span id="line-1953"> public MemStoreSize prepare() {</span> |
| <span class="source-line-no">1954</span><span id="line-1954"> // passing the current sequence number of the wal - to allow bookkeeping in the memstore</span> |
| <span class="source-line-no">1955</span><span id="line-1955"> this.snapshot = memstore.snapshot();</span> |
| <span class="source-line-no">1956</span><span id="line-1956"> this.cacheFlushCount = snapshot.getCellsCount();</span> |
| <span class="source-line-no">1957</span><span id="line-1957"> this.cacheFlushSize = snapshot.getDataSize();</span> |
| <span class="source-line-no">1958</span><span id="line-1958"> committedFiles = new ArrayList<>(1);</span> |
| <span class="source-line-no">1959</span><span id="line-1959"> return snapshot.getMemStoreSize();</span> |
| <span class="source-line-no">1960</span><span id="line-1960"> }</span> |
| <span class="source-line-no">1961</span><span id="line-1961"></span> |
| <span class="source-line-no">1962</span><span id="line-1962"> @Override</span> |
| <span class="source-line-no">1963</span><span id="line-1963"> public void flushCache(MonitoredTask status) throws IOException {</span> |
| <span class="source-line-no">1964</span><span id="line-1964"> RegionServerServices rsService = region.getRegionServerServices();</span> |
| <span class="source-line-no">1965</span><span id="line-1965"> ThroughputController throughputController =</span> |
| <span class="source-line-no">1966</span><span id="line-1966"> rsService == null ? null : rsService.getFlushThroughputController();</span> |
| <span class="source-line-no">1967</span><span id="line-1967"> // it could be null if we do not need to track the creation of store file writer due to</span> |
| <span class="source-line-no">1968</span><span id="line-1968"> // different SFT implementation.</span> |
| <span class="source-line-no">1969</span><span id="line-1969"> if (writerCreationTracker != null) {</span> |
| <span class="source-line-no">1970</span><span id="line-1970"> HStore.this.storeFileWriterCreationTrackers.add(writerCreationTracker);</span> |
| <span class="source-line-no">1971</span><span id="line-1971"> }</span> |
| <span class="source-line-no">1972</span><span id="line-1972"> tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController,</span> |
| <span class="source-line-no">1973</span><span id="line-1973"> tracker, writerCreationTracker);</span> |
| <span class="source-line-no">1974</span><span id="line-1974"> }</span> |
| <span class="source-line-no">1975</span><span id="line-1975"></span> |
| <span class="source-line-no">1976</span><span id="line-1976"> @Override</span> |
| <span class="source-line-no">1977</span><span id="line-1977"> public boolean commit(MonitoredTask status) throws IOException {</span> |
| <span class="source-line-no">1978</span><span id="line-1978"> try {</span> |
| <span class="source-line-no">1979</span><span id="line-1979"> if (CollectionUtils.isEmpty(this.tempFiles)) {</span> |
| <span class="source-line-no">1980</span><span id="line-1980"> return false;</span> |
| <span class="source-line-no">1981</span><span id="line-1981"> }</span> |
| <span class="source-line-no">1982</span><span id="line-1982"> status.setStatus("Flushing " + this + ": reopening flushed file");</span> |
| <span class="source-line-no">1983</span><span id="line-1983"> List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, false);</span> |
| <span class="source-line-no">1984</span><span id="line-1984"> for (HStoreFile sf : storeFiles) {</span> |
| <span class="source-line-no">1985</span><span id="line-1985"> StoreFileReader r = sf.getReader();</span> |
| <span class="source-line-no">1986</span><span id="line-1986"> if (LOG.isInfoEnabled()) {</span> |
| <span class="source-line-no">1987</span><span id="line-1987"> LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(),</span> |
| <span class="source-line-no">1988</span><span id="line-1988"> cacheFlushSeqNum, TraditionalBinaryPrefix.long2String(r.length(), "", 1));</span> |
| <span class="source-line-no">1989</span><span id="line-1989"> }</span> |
| <span class="source-line-no">1990</span><span id="line-1990"> outputFileSize += r.length();</span> |
| <span class="source-line-no">1991</span><span id="line-1991"> storeSize.addAndGet(r.length());</span> |
| <span class="source-line-no">1992</span><span id="line-1992"> totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());</span> |
| <span class="source-line-no">1993</span><span id="line-1993"> committedFiles.add(sf.getPath());</span> |
| <span class="source-line-no">1994</span><span id="line-1994"> }</span> |
| <span class="source-line-no">1995</span><span id="line-1995"></span> |
| <span class="source-line-no">1996</span><span id="line-1996"> flushedCellsCount.addAndGet(cacheFlushCount);</span> |
| <span class="source-line-no">1997</span><span id="line-1997"> flushedCellsSize.addAndGet(cacheFlushSize);</span> |
| <span class="source-line-no">1998</span><span id="line-1998"> flushedOutputFileSize.addAndGet(outputFileSize);</span> |
| <span class="source-line-no">1999</span><span id="line-1999"> // call coprocessor after we have done all the accounting above</span> |
| <span class="source-line-no">2000</span><span id="line-2000"> for (HStoreFile sf : storeFiles) {</span> |
| <span class="source-line-no">2001</span><span id="line-2001"> if (getCoprocessorHost() != null) {</span> |
| <span class="source-line-no">2002</span><span id="line-2002"> getCoprocessorHost().postFlush(HStore.this, sf, tracker);</span> |
| <span class="source-line-no">2003</span><span id="line-2003"> }</span> |
| <span class="source-line-no">2004</span><span id="line-2004"> }</span> |
| <span class="source-line-no">2005</span><span id="line-2005"> // Add new file to store files. Clear snapshot too while we have the Store write lock.</span> |
| <span class="source-line-no">2006</span><span id="line-2006"> return completeFlush(storeFiles, snapshot.getId());</span> |
| <span class="source-line-no">2007</span><span id="line-2007"> } finally {</span> |
| <span class="source-line-no">2008</span><span id="line-2008"> if (writerCreationTracker != null) {</span> |
| <span class="source-line-no">2009</span><span id="line-2009"> HStore.this.storeFileWriterCreationTrackers.remove(writerCreationTracker);</span> |
| <span class="source-line-no">2010</span><span id="line-2010"> }</span> |
| <span class="source-line-no">2011</span><span id="line-2011"> }</span> |
| <span class="source-line-no">2012</span><span id="line-2012"> }</span> |
| <span class="source-line-no">2013</span><span id="line-2013"></span> |
| <span class="source-line-no">2014</span><span id="line-2014"> @Override</span> |
| <span class="source-line-no">2015</span><span id="line-2015"> public long getOutputFileSize() {</span> |
| <span class="source-line-no">2016</span><span id="line-2016"> return outputFileSize;</span> |
| <span class="source-line-no">2017</span><span id="line-2017"> }</span> |
| <span class="source-line-no">2018</span><span id="line-2018"></span> |
| <span class="source-line-no">2019</span><span id="line-2019"> @Override</span> |
| <span class="source-line-no">2020</span><span id="line-2020"> public List<Path> getCommittedFiles() {</span> |
| <span class="source-line-no">2021</span><span id="line-2021"> return committedFiles;</span> |
| <span class="source-line-no">2022</span><span id="line-2022"> }</span> |
| <span class="source-line-no">2023</span><span id="line-2023"></span> |
| <span class="source-line-no">2024</span><span id="line-2024"> /**</span> |
| <span class="source-line-no">2025</span><span id="line-2025"> * Similar to commit, but called in secondary region replicas for replaying the flush cache from</span> |
| <span class="source-line-no">2026</span><span id="line-2026"> * primary region. Adds the new files to the store, and drops the snapshot depending on</span> |
| <span class="source-line-no">2027</span><span id="line-2027"> * dropMemstoreSnapshot argument.</span> |
| <span class="source-line-no">2028</span><span id="line-2028"> * @param fileNames names of the flushed files</span> |
| <span class="source-line-no">2029</span><span id="line-2029"> * @param dropMemstoreSnapshot whether to drop the prepared memstore snapshot</span> |
| <span class="source-line-no">2030</span><span id="line-2030"> */</span> |
| <span class="source-line-no">2031</span><span id="line-2031"> @Override</span> |
| <span class="source-line-no">2032</span><span id="line-2032"> public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)</span> |
| <span class="source-line-no">2033</span><span id="line-2033"> throws IOException {</span> |
| <span class="source-line-no">2034</span><span id="line-2034"> List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());</span> |
| <span class="source-line-no">2035</span><span id="line-2035"> for (String file : fileNames) {</span> |
| <span class="source-line-no">2036</span><span id="line-2036"> // open the file as a store file (hfile link, etc)</span> |
| <span class="source-line-no">2037</span><span id="line-2037"> StoreFileInfo storeFileInfo =</span> |
| <span class="source-line-no">2038</span><span id="line-2038"> getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);</span> |
| <span class="source-line-no">2039</span><span id="line-2039"> HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo);</span> |
| <span class="source-line-no">2040</span><span id="line-2040"> storeFiles.add(storeFile);</span> |
| <span class="source-line-no">2041</span><span id="line-2041"> HStore.this.storeSize.addAndGet(storeFile.getReader().length());</span> |
| <span class="source-line-no">2042</span><span id="line-2042"> HStore.this.totalUncompressedBytes</span> |
| <span class="source-line-no">2043</span><span id="line-2043"> .addAndGet(storeFile.getReader().getTotalUncompressedBytes());</span> |
| <span class="source-line-no">2044</span><span id="line-2044"> if (LOG.isInfoEnabled()) {</span> |
| <span class="source-line-no">2045</span><span id="line-2045"> LOG.info(this + " added " + storeFile + ", entries=" + storeFile.getReader().getEntries()</span> |
| <span class="source-line-no">2046</span><span id="line-2046"> + ", sequenceid=" + storeFile.getReader().getSequenceID() + ", filesize="</span> |
| <span class="source-line-no">2047</span><span id="line-2047"> + TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));</span> |
| <span class="source-line-no">2048</span><span id="line-2048"> }</span> |
| <span class="source-line-no">2049</span><span id="line-2049"> }</span> |
| <span class="source-line-no">2050</span><span id="line-2050"></span> |
| <span class="source-line-no">2051</span><span id="line-2051"> long snapshotId = -1; // -1 means do not drop</span> |
| <span class="source-line-no">2052</span><span id="line-2052"> if (dropMemstoreSnapshot && snapshot != null) {</span> |
| <span class="source-line-no">2053</span><span id="line-2053"> snapshotId = snapshot.getId();</span> |
| <span class="source-line-no">2054</span><span id="line-2054"> }</span> |
| <span class="source-line-no">2055</span><span id="line-2055"> HStore.this.completeFlush(storeFiles, snapshotId);</span> |
| <span class="source-line-no">2056</span><span id="line-2056"> }</span> |
| <span class="source-line-no">2057</span><span id="line-2057"></span> |
| <span class="source-line-no">2058</span><span id="line-2058"> /**</span> |
| <span class="source-line-no">2059</span><span id="line-2059"> * Abort the snapshot preparation. Drops the snapshot if any.</span> |
| <span class="source-line-no">2060</span><span id="line-2060"> */</span> |
| <span class="source-line-no">2061</span><span id="line-2061"> @Override</span> |
| <span class="source-line-no">2062</span><span id="line-2062"> public void abort() throws IOException {</span> |
| <span class="source-line-no">2063</span><span id="line-2063"> if (snapshot != null) {</span> |
| <span class="source-line-no">2064</span><span id="line-2064"> HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());</span> |
| <span class="source-line-no">2065</span><span id="line-2065"> }</span> |
| <span class="source-line-no">2066</span><span id="line-2066"> }</span> |
| <span class="source-line-no">2067</span><span id="line-2067"> }</span> |
| <span class="source-line-no">2068</span><span id="line-2068"></span> |
| <span class="source-line-no">2069</span><span id="line-2069"> @Override</span> |
| <span class="source-line-no">2070</span><span id="line-2070"> public boolean needsCompaction() {</span> |
| <span class="source-line-no">2071</span><span id="line-2071"> List<HStoreFile> filesCompactingClone = null;</span> |
| <span class="source-line-no">2072</span><span id="line-2072"> synchronized (filesCompacting) {</span> |
| <span class="source-line-no">2073</span><span id="line-2073"> filesCompactingClone = Lists.newArrayList(filesCompacting);</span> |
| <span class="source-line-no">2074</span><span id="line-2074"> }</span> |
| <span class="source-line-no">2075</span><span id="line-2075"> return this.storeEngine.needsCompaction(filesCompactingClone);</span> |
| <span class="source-line-no">2076</span><span id="line-2076"> }</span> |
| <span class="source-line-no">2077</span><span id="line-2077"></span> |
| <span class="source-line-no">2078</span><span id="line-2078"> /**</span> |
| <span class="source-line-no">2079</span><span id="line-2079"> * Used for tests.</span> |
| <span class="source-line-no">2080</span><span id="line-2080"> * @return cache configuration for this Store.</span> |
| <span class="source-line-no">2081</span><span id="line-2081"> */</span> |
| <span class="source-line-no">2082</span><span id="line-2082"> public CacheConfig getCacheConfig() {</span> |
| <span class="source-line-no">2083</span><span id="line-2083"> return storeContext.getCacheConf();</span> |
| <span class="source-line-no">2084</span><span id="line-2084"> }</span> |
| <span class="source-line-no">2085</span><span id="line-2085"></span> |
| <span class="source-line-no">2086</span><span id="line-2086"> public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HStore.class, false);</span> |
| <span class="source-line-no">2087</span><span id="line-2087"></span> |
| <span class="source-line-no">2088</span><span id="line-2088"> public static final long DEEP_OVERHEAD = ClassSize.align(</span> |
| <span class="source-line-no">2089</span><span id="line-2089"> FIXED_OVERHEAD + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + ClassSize.CONCURRENT_SKIPLISTMAP</span> |
| <span class="source-line-no">2090</span><span id="line-2090"> + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT + ScanInfo.FIXED_OVERHEAD);</span> |
| <span class="source-line-no">2091</span><span id="line-2091"></span> |
| <span class="source-line-no">2092</span><span id="line-2092"> @Override</span> |
| <span class="source-line-no">2093</span><span id="line-2093"> public long heapSize() {</span> |
| <span class="source-line-no">2094</span><span id="line-2094"> MemStoreSize memstoreSize = this.memstore.size();</span> |
| <span class="source-line-no">2095</span><span id="line-2095"> return DEEP_OVERHEAD + memstoreSize.getHeapSize() + storeContext.heapSize();</span> |
| <span class="source-line-no">2096</span><span id="line-2096"> }</span> |
| <span class="source-line-no">2097</span><span id="line-2097"></span> |
| <span class="source-line-no">2098</span><span id="line-2098"> @Override</span> |
| <span class="source-line-no">2099</span><span id="line-2099"> public CellComparator getComparator() {</span> |
| <span class="source-line-no">2100</span><span id="line-2100"> return storeContext.getComparator();</span> |
| <span class="source-line-no">2101</span><span id="line-2101"> }</span> |
| <span class="source-line-no">2102</span><span id="line-2102"></span> |
| <span class="source-line-no">2103</span><span id="line-2103"> public ScanInfo getScanInfo() {</span> |
| <span class="source-line-no">2104</span><span id="line-2104"> return scanInfo;</span> |
| <span class="source-line-no">2105</span><span id="line-2105"> }</span> |
| <span class="source-line-no">2106</span><span id="line-2106"></span> |
| <span class="source-line-no">2107</span><span id="line-2107"> /**</span> |
| <span class="source-line-no">2108</span><span id="line-2108"> * Set scan info, used by test</span> |
| <span class="source-line-no">2109</span><span id="line-2109"> * @param scanInfo new scan info to use for test</span> |
| <span class="source-line-no">2110</span><span id="line-2110"> */</span> |
| <span class="source-line-no">2111</span><span id="line-2111"> void setScanInfo(ScanInfo scanInfo) {</span> |
| <span class="source-line-no">2112</span><span id="line-2112"> this.scanInfo = scanInfo;</span> |
| <span class="source-line-no">2113</span><span id="line-2113"> }</span> |
| <span class="source-line-no">2114</span><span id="line-2114"></span> |
| <span class="source-line-no">2115</span><span id="line-2115"> @Override</span> |
| <span class="source-line-no">2116</span><span id="line-2116"> public boolean hasTooManyStoreFiles() {</span> |
| <span class="source-line-no">2117</span><span id="line-2117"> return getStorefilesCount() > this.blockingFileCount;</span> |
| <span class="source-line-no">2118</span><span id="line-2118"> }</span> |
| <span class="source-line-no">2119</span><span id="line-2119"></span> |
| <span class="source-line-no">2120</span><span id="line-2120"> @Override</span> |
| <span class="source-line-no">2121</span><span id="line-2121"> public long getFlushedCellsCount() {</span> |
| <span class="source-line-no">2122</span><span id="line-2122"> return flushedCellsCount.get();</span> |
| <span class="source-line-no">2123</span><span id="line-2123"> }</span> |
| <span class="source-line-no">2124</span><span id="line-2124"></span> |
| <span class="source-line-no">2125</span><span id="line-2125"> @Override</span> |
| <span class="source-line-no">2126</span><span id="line-2126"> public long getFlushedCellsSize() {</span> |
| <span class="source-line-no">2127</span><span id="line-2127"> return flushedCellsSize.get();</span> |
| <span class="source-line-no">2128</span><span id="line-2128"> }</span> |
| <span class="source-line-no">2129</span><span id="line-2129"></span> |
| <span class="source-line-no">2130</span><span id="line-2130"> @Override</span> |
| <span class="source-line-no">2131</span><span id="line-2131"> public long getFlushedOutputFileSize() {</span> |
| <span class="source-line-no">2132</span><span id="line-2132"> return flushedOutputFileSize.get();</span> |
| <span class="source-line-no">2133</span><span id="line-2133"> }</span> |
| <span class="source-line-no">2134</span><span id="line-2134"></span> |
| <span class="source-line-no">2135</span><span id="line-2135"> @Override</span> |
| <span class="source-line-no">2136</span><span id="line-2136"> public long getCompactedCellsCount() {</span> |
| <span class="source-line-no">2137</span><span id="line-2137"> return compactedCellsCount.get();</span> |
| <span class="source-line-no">2138</span><span id="line-2138"> }</span> |
| <span class="source-line-no">2139</span><span id="line-2139"></span> |
| <span class="source-line-no">2140</span><span id="line-2140"> @Override</span> |
| <span class="source-line-no">2141</span><span id="line-2141"> public long getCompactedCellsSize() {</span> |
| <span class="source-line-no">2142</span><span id="line-2142"> return compactedCellsSize.get();</span> |
| <span class="source-line-no">2143</span><span id="line-2143"> }</span> |
| <span class="source-line-no">2144</span><span id="line-2144"></span> |
| <span class="source-line-no">2145</span><span id="line-2145"> @Override</span> |
| <span class="source-line-no">2146</span><span id="line-2146"> public long getMajorCompactedCellsCount() {</span> |
| <span class="source-line-no">2147</span><span id="line-2147"> return majorCompactedCellsCount.get();</span> |
| <span class="source-line-no">2148</span><span id="line-2148"> }</span> |
| <span class="source-line-no">2149</span><span id="line-2149"></span> |
| <span class="source-line-no">2150</span><span id="line-2150"> @Override</span> |
| <span class="source-line-no">2151</span><span id="line-2151"> public long getMajorCompactedCellsSize() {</span> |
| <span class="source-line-no">2152</span><span id="line-2152"> return majorCompactedCellsSize.get();</span> |
| <span class="source-line-no">2153</span><span id="line-2153"> }</span> |
| <span class="source-line-no">2154</span><span id="line-2154"></span> |
| <span class="source-line-no">2155</span><span id="line-2155"> public void updateCompactedMetrics(boolean isMajor, CompactionProgress progress) {</span> |
| <span class="source-line-no">2156</span><span id="line-2156"> if (isMajor) {</span> |
| <span class="source-line-no">2157</span><span id="line-2157"> majorCompactedCellsCount.addAndGet(progress.getTotalCompactingKVs());</span> |
| <span class="source-line-no">2158</span><span id="line-2158"> majorCompactedCellsSize.addAndGet(progress.totalCompactedSize);</span> |
| <span class="source-line-no">2159</span><span id="line-2159"> } else {</span> |
| <span class="source-line-no">2160</span><span id="line-2160"> compactedCellsCount.addAndGet(progress.getTotalCompactingKVs());</span> |
| <span class="source-line-no">2161</span><span id="line-2161"> compactedCellsSize.addAndGet(progress.totalCompactedSize);</span> |
| <span class="source-line-no">2162</span><span id="line-2162"> }</span> |
| <span class="source-line-no">2163</span><span id="line-2163"> }</span> |
| <span class="source-line-no">2164</span><span id="line-2164"></span> |
| <span class="source-line-no">2165</span><span id="line-2165"> /**</span> |
| <span class="source-line-no">2166</span><span id="line-2166"> * Returns the StoreEngine that is backing this concrete implementation of Store.</span> |
| <span class="source-line-no">2167</span><span id="line-2167"> * @return Returns the {@link StoreEngine} object used internally inside this HStore object.</span> |
| <span class="source-line-no">2168</span><span id="line-2168"> */</span> |
| <span class="source-line-no">2169</span><span id="line-2169"> public StoreEngine<?, ?, ?, ?> getStoreEngine() {</span> |
| <span class="source-line-no">2170</span><span id="line-2170"> return this.storeEngine;</span> |
| <span class="source-line-no">2171</span><span id="line-2171"> }</span> |
| <span class="source-line-no">2172</span><span id="line-2172"></span> |
| <span class="source-line-no">2173</span><span id="line-2173"> protected OffPeakHours getOffPeakHours() {</span> |
| <span class="source-line-no">2174</span><span id="line-2174"> return this.offPeakHours;</span> |
| <span class="source-line-no">2175</span><span id="line-2175"> }</span> |
| <span class="source-line-no">2176</span><span id="line-2176"></span> |
| <span class="source-line-no">2177</span><span id="line-2177"> @Override</span> |
| <span class="source-line-no">2178</span><span id="line-2178"> public void onConfigurationChange(Configuration conf) {</span> |
| <span class="source-line-no">2179</span><span id="line-2179"> Configuration storeConf = StoreUtils.createStoreConfiguration(conf, region.getTableDescriptor(),</span> |
| <span class="source-line-no">2180</span><span id="line-2180"> getColumnFamilyDescriptor());</span> |
| <span class="source-line-no">2181</span><span id="line-2181"> this.conf = storeConf;</span> |
| <span class="source-line-no">2182</span><span id="line-2182"> this.storeEngine.compactionPolicy.setConf(storeConf);</span> |
| <span class="source-line-no">2183</span><span id="line-2183"> this.offPeakHours = OffPeakHours.getInstance(storeConf);</span> |
| <span class="source-line-no">2184</span><span id="line-2184"> }</span> |
| <span class="source-line-no">2185</span><span id="line-2185"></span> |
| <span class="source-line-no">2186</span><span id="line-2186"> /**</span> |
| <span class="source-line-no">2187</span><span id="line-2187"> * {@inheritDoc}</span> |
| <span class="source-line-no">2188</span><span id="line-2188"> */</span> |
| <span class="source-line-no">2189</span><span id="line-2189"> @Override</span> |
| <span class="source-line-no">2190</span><span id="line-2190"> public void registerChildren(ConfigurationManager manager) {</span> |
| <span class="source-line-no">2191</span><span id="line-2191"> CacheConfig cacheConfig = this.storeContext.getCacheConf();</span> |
| <span class="source-line-no">2192</span><span id="line-2192"> if (cacheConfig != null) {</span> |
| <span class="source-line-no">2193</span><span id="line-2193"> manager.registerObserver(cacheConfig);</span> |
| <span class="source-line-no">2194</span><span id="line-2194"> }</span> |
| <span class="source-line-no">2195</span><span id="line-2195"> }</span> |
| <span class="source-line-no">2196</span><span id="line-2196"></span> |
| <span class="source-line-no">2197</span><span id="line-2197"> /**</span> |
| <span class="source-line-no">2198</span><span id="line-2198"> * {@inheritDoc}</span> |
| <span class="source-line-no">2199</span><span id="line-2199"> */</span> |
| <span class="source-line-no">2200</span><span id="line-2200"> @Override</span> |
| <span class="source-line-no">2201</span><span id="line-2201"> public void deregisterChildren(ConfigurationManager manager) {</span> |
| <span class="source-line-no">2202</span><span id="line-2202"> // No children to deregister</span> |
| <span class="source-line-no">2203</span><span id="line-2203"> }</span> |
| <span class="source-line-no">2204</span><span id="line-2204"></span> |
| <span class="source-line-no">2205</span><span id="line-2205"> @Override</span> |
| <span class="source-line-no">2206</span><span id="line-2206"> public double getCompactionPressure() {</span> |
| <span class="source-line-no">2207</span><span id="line-2207"> return storeEngine.getStoreFileManager().getCompactionPressure();</span> |
| <span class="source-line-no">2208</span><span id="line-2208"> }</span> |
| <span class="source-line-no">2209</span><span id="line-2209"></span> |
| <span class="source-line-no">2210</span><span id="line-2210"> @Override</span> |
| <span class="source-line-no">2211</span><span id="line-2211"> public boolean isPrimaryReplicaStore() {</span> |
| <span class="source-line-no">2212</span><span id="line-2212"> return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;</span> |
| <span class="source-line-no">2213</span><span id="line-2213"> }</span> |
| <span class="source-line-no">2214</span><span id="line-2214"></span> |
| <span class="source-line-no">2215</span><span id="line-2215"> /**</span> |
| <span class="source-line-no">2216</span><span id="line-2216"> * Sets the store up for a region level snapshot operation.</span> |
| <span class="source-line-no">2217</span><span id="line-2217"> * @see #postSnapshotOperation()</span> |
| <span class="source-line-no">2218</span><span id="line-2218"> */</span> |
| <span class="source-line-no">2219</span><span id="line-2219"> public void preSnapshotOperation() {</span> |
| <span class="source-line-no">2220</span><span id="line-2220"> archiveLock.lock();</span> |
| <span class="source-line-no">2221</span><span id="line-2221"> }</span> |
| <span class="source-line-no">2222</span><span id="line-2222"></span> |
| <span class="source-line-no">2223</span><span id="line-2223"> /**</span> |
| <span class="source-line-no">2224</span><span id="line-2224"> * Perform tasks needed after the completion of snapshot operation.</span> |
| <span class="source-line-no">2225</span><span id="line-2225"> * @see #preSnapshotOperation()</span> |
| <span class="source-line-no">2226</span><span id="line-2226"> */</span> |
| <span class="source-line-no">2227</span><span id="line-2227"> public void postSnapshotOperation() {</span> |
| <span class="source-line-no">2228</span><span id="line-2228"> archiveLock.unlock();</span> |
| <span class="source-line-no">2229</span><span id="line-2229"> }</span> |
| <span class="source-line-no">2230</span><span id="line-2230"></span> |
| <span class="source-line-no">2231</span><span id="line-2231"> /**</span> |
| <span class="source-line-no">2232</span><span id="line-2232"> * Closes and archives the compacted files under this store</span> |
| <span class="source-line-no">2233</span><span id="line-2233"> */</span> |
| <span class="source-line-no">2234</span><span id="line-2234"> public synchronized void closeAndArchiveCompactedFiles() throws IOException {</span> |
| <span class="source-line-no">2235</span><span id="line-2235"> // ensure other threads do not attempt to archive the same files on close()</span> |
| <span class="source-line-no">2236</span><span id="line-2236"> archiveLock.lock();</span> |
| <span class="source-line-no">2237</span><span id="line-2237"> try {</span> |
| <span class="source-line-no">2238</span><span id="line-2238"> storeEngine.readLock();</span> |
| <span class="source-line-no">2239</span><span id="line-2239"> Collection<HStoreFile> copyCompactedfiles = null;</span> |
| <span class="source-line-no">2240</span><span id="line-2240"> try {</span> |
| <span class="source-line-no">2241</span><span id="line-2241"> Collection<HStoreFile> compactedfiles =</span> |
| <span class="source-line-no">2242</span><span id="line-2242"> this.getStoreEngine().getStoreFileManager().getCompactedfiles();</span> |
| <span class="source-line-no">2243</span><span id="line-2243"> if (CollectionUtils.isNotEmpty(compactedfiles)) {</span> |
| <span class="source-line-no">2244</span><span id="line-2244"> // Do a copy under read lock</span> |
| <span class="source-line-no">2245</span><span id="line-2245"> copyCompactedfiles = new ArrayList<>(compactedfiles);</span> |
| <span class="source-line-no">2246</span><span id="line-2246"> } else {</span> |
| <span class="source-line-no">2247</span><span id="line-2247"> LOG.trace("No compacted files to archive");</span> |
| <span class="source-line-no">2248</span><span id="line-2248"> }</span> |
| <span class="source-line-no">2249</span><span id="line-2249"> } finally {</span> |
| <span class="source-line-no">2250</span><span id="line-2250"> storeEngine.readUnlock();</span> |
| <span class="source-line-no">2251</span><span id="line-2251"> }</span> |
| <span class="source-line-no">2252</span><span id="line-2252"> if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {</span> |
| <span class="source-line-no">2253</span><span id="line-2253"> removeCompactedfiles(copyCompactedfiles, true);</span> |
| <span class="source-line-no">2254</span><span id="line-2254"> }</span> |
| <span class="source-line-no">2255</span><span id="line-2255"> } finally {</span> |
| <span class="source-line-no">2256</span><span id="line-2256"> archiveLock.unlock();</span> |
| <span class="source-line-no">2257</span><span id="line-2257"> }</span> |
| <span class="source-line-no">2258</span><span id="line-2258"> }</span> |
| <span class="source-line-no">2259</span><span id="line-2259"></span> |
| <span class="source-line-no">2260</span><span id="line-2260"> /**</span> |
| <span class="source-line-no">2261</span><span id="line-2261"> * Archives and removes the compacted files</span> |
| <span class="source-line-no">2262</span><span id="line-2262"> * @param compactedfiles The compacted files in this store that are not active in reads</span> |
| <span class="source-line-no">2263</span><span id="line-2263"> * @param evictOnClose true if blocks should be evicted from the cache when an HFile reader is</span> |
| <span class="source-line-no">2264</span><span id="line-2264"> * closed, false if not</span> |
| <span class="source-line-no">2265</span><span id="line-2265"> */</span> |
| <span class="source-line-no">2266</span><span id="line-2266"> private void removeCompactedfiles(Collection<HStoreFile> compactedfiles, boolean evictOnClose)</span> |
| <span class="source-line-no">2267</span><span id="line-2267"> throws IOException {</span> |
| <span class="source-line-no">2268</span><span id="line-2268"> final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size());</span> |
| <span class="source-line-no">2269</span><span id="line-2269"> final List<Long> storeFileSizes = new ArrayList<>(compactedfiles.size());</span> |
| <span class="source-line-no">2270</span><span id="line-2270"> for (final HStoreFile file : compactedfiles) {</span> |
| <span class="source-line-no">2271</span><span id="line-2271"> synchronized (file) {</span> |
| <span class="source-line-no">2272</span><span id="line-2272"> try {</span> |
| <span class="source-line-no">2273</span><span id="line-2273"> StoreFileReader r = file.getReader();</span> |
| <span class="source-line-no">2274</span><span id="line-2274"> if (r == null) {</span> |
| <span class="source-line-no">2275</span><span id="line-2275"> LOG.debug("The file {} was closed but still not archived", file);</span> |
| <span class="source-line-no">2276</span><span id="line-2276"> // HACK: Temporarily re-open the reader so we can get the size of the file. Ideally,</span> |
| <span class="source-line-no">2277</span><span id="line-2277"> // we should know the size of an HStoreFile without having to ask the HStoreFileReader</span> |
| <span class="source-line-no">2278</span><span id="line-2278"> // for that.</span> |
| <span class="source-line-no">2279</span><span id="line-2279"> long length = getStoreFileSize(file);</span> |
| <span class="source-line-no">2280</span><span id="line-2280"> filesToRemove.add(file);</span> |
| <span class="source-line-no">2281</span><span id="line-2281"> storeFileSizes.add(length);</span> |
| <span class="source-line-no">2282</span><span id="line-2282"> continue;</span> |
| <span class="source-line-no">2283</span><span id="line-2283"> }</span> |
| <span class="source-line-no">2284</span><span id="line-2284"></span> |
| <span class="source-line-no">2285</span><span id="line-2285"> if (file.isCompactedAway() && !file.isReferencedInReads()) {</span> |
| <span class="source-line-no">2286</span><span id="line-2286"> // Even if deleting fails we need not bother as any new scanners won't be</span> |
| <span class="source-line-no">2287</span><span id="line-2287"> // able to use the compacted file as the status is already compactedAway</span> |
| <span class="source-line-no">2288</span><span id="line-2288"> LOG.trace("Closing and archiving the file {}", file);</span> |
| <span class="source-line-no">2289</span><span id="line-2289"> // Copy the file size before closing the reader</span> |
| <span class="source-line-no">2290</span><span id="line-2290"> final long length = r.length();</span> |
| <span class="source-line-no">2291</span><span id="line-2291"> r.close(evictOnClose);</span> |
| <span class="source-line-no">2292</span><span id="line-2292"> // Just close and return</span> |
| <span class="source-line-no">2293</span><span id="line-2293"> filesToRemove.add(file);</span> |
| <span class="source-line-no">2294</span><span id="line-2294"> // Only add the length if we successfully added the file to `filesToRemove`</span> |
| <span class="source-line-no">2295</span><span id="line-2295"> storeFileSizes.add(length);</span> |
| <span class="source-line-no">2296</span><span id="line-2296"> } else {</span> |
| <span class="source-line-no">2297</span><span id="line-2297"> LOG.info("Can't archive compacted file " + file.getPath()</span> |
| <span class="source-line-no">2298</span><span id="line-2298"> + " because of either isCompactedAway=" + file.isCompactedAway()</span> |
| <span class="source-line-no">2299</span><span id="line-2299"> + " or file has reference, isReferencedInReads=" + file.isReferencedInReads()</span> |
| <span class="source-line-no">2300</span><span id="line-2300"> + ", refCount=" + r.getRefCount() + ", skipping for now.");</span> |
| <span class="source-line-no">2301</span><span id="line-2301"> }</span> |
| <span class="source-line-no">2302</span><span id="line-2302"> } catch (Exception e) {</span> |
| <span class="source-line-no">2303</span><span id="line-2303"> LOG.error("Exception while trying to close the compacted store file {}", file.getPath(),</span> |
| <span class="source-line-no">2304</span><span id="line-2304"> e);</span> |
| <span class="source-line-no">2305</span><span id="line-2305"> }</span> |
| <span class="source-line-no">2306</span><span id="line-2306"> }</span> |
| <span class="source-line-no">2307</span><span id="line-2307"> }</span> |
| <span class="source-line-no">2308</span><span id="line-2308"> if (this.isPrimaryReplicaStore()) {</span> |
| <span class="source-line-no">2309</span><span id="line-2309"> // Only the primary region is allowed to move the file to archive.</span> |
| <span class="source-line-no">2310</span><span id="line-2310"> // The secondary region does not move the files to archive. Any active reads from</span> |
| <span class="source-line-no">2311</span><span id="line-2311"> // the secondary region will still work because the file as such has active readers on it.</span> |
| <span class="source-line-no">2312</span><span id="line-2312"> if (!filesToRemove.isEmpty()) {</span> |
| <span class="source-line-no">2313</span><span id="line-2313"> LOG.debug("Moving the files {} to archive", filesToRemove);</span> |
| <span class="source-line-no">2314</span><span id="line-2314"> // Only if this is successful it has to be removed</span> |
| <span class="source-line-no">2315</span><span id="line-2315"> try {</span> |
| <span class="source-line-no">2316</span><span id="line-2316"> getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),</span> |
| <span class="source-line-no">2317</span><span id="line-2317"> filesToRemove);</span> |
| <span class="source-line-no">2318</span><span id="line-2318"> } catch (FailedArchiveException fae) {</span> |
| <span class="source-line-no">2319</span><span id="line-2319"> // Even if archiving some files failed, we still need to clear out any of the</span> |
| <span class="source-line-no">2320</span><span id="line-2320"> // files which were successfully archived. Otherwise we will receive a</span> |
| <span class="source-line-no">2321</span><span id="line-2321"> // FileNotFoundException when we attempt to re-archive them in the next go around.</span> |
| <span class="source-line-no">2322</span><span id="line-2322"> Collection<Path> failedFiles = fae.getFailedFiles();</span> |
| <span class="source-line-no">2323</span><span id="line-2323"> Iterator<HStoreFile> iter = filesToRemove.iterator();</span> |
| <span class="source-line-no">2324</span><span id="line-2324"> Iterator<Long> sizeIter = storeFileSizes.iterator();</span> |
| <span class="source-line-no">2325</span><span id="line-2325"> while (iter.hasNext()) {</span> |
| <span class="source-line-no">2326</span><span id="line-2326"> sizeIter.next();</span> |
| <span class="source-line-no">2327</span><span id="line-2327"> if (failedFiles.contains(iter.next().getPath())) {</span> |
| <span class="source-line-no">2328</span><span id="line-2328"> iter.remove();</span> |
| <span class="source-line-no">2329</span><span id="line-2329"> sizeIter.remove();</span> |
| <span class="source-line-no">2330</span><span id="line-2330"> }</span> |
| <span class="source-line-no">2331</span><span id="line-2331"> }</span> |
| <span class="source-line-no">2332</span><span id="line-2332"> if (!filesToRemove.isEmpty()) {</span> |
| <span class="source-line-no">2333</span><span id="line-2333"> clearCompactedfiles(filesToRemove);</span> |
| <span class="source-line-no">2334</span><span id="line-2334"> }</span> |
| <span class="source-line-no">2335</span><span id="line-2335"> throw fae;</span> |
| <span class="source-line-no">2336</span><span id="line-2336"> }</span> |
| <span class="source-line-no">2337</span><span id="line-2337"> }</span> |
| <span class="source-line-no">2338</span><span id="line-2338"> }</span> |
| <span class="source-line-no">2339</span><span id="line-2339"> if (!filesToRemove.isEmpty()) {</span> |
| <span class="source-line-no">2340</span><span id="line-2340"> // Clear the compactedfiles from the store file manager</span> |
| <span class="source-line-no">2341</span><span id="line-2341"> clearCompactedfiles(filesToRemove);</span> |
| <span class="source-line-no">2342</span><span id="line-2342"> // Try to send report of this archival to the Master for updating quota usage faster</span> |
| <span class="source-line-no">2343</span><span id="line-2343"> reportArchivedFilesForQuota(filesToRemove, storeFileSizes);</span> |
| <span class="source-line-no">2344</span><span id="line-2344"> }</span> |
| <span class="source-line-no">2345</span><span id="line-2345"> }</span> |
| <span class="source-line-no">2346</span><span id="line-2346"></span> |
| <span class="source-line-no">2347</span><span id="line-2347"> /**</span> |
| <span class="source-line-no">2348</span><span id="line-2348"> * Computes the length of a store file without succumbing to any errors along the way. If an error</span> |
| <span class="source-line-no">2349</span><span id="line-2349"> * is encountered, the implementation returns {@code 0} instead of the actual size.</span> |
| <span class="source-line-no">2350</span><span id="line-2350"> * @param file The file to compute the size of.</span> |
| <span class="source-line-no">2351</span><span id="line-2351"> * @return The size in bytes of the provided {@code file}.</span> |
| <span class="source-line-no">2352</span><span id="line-2352"> */</span> |
| <span class="source-line-no">2353</span><span id="line-2353"> long getStoreFileSize(HStoreFile file) {</span> |
| <span class="source-line-no">2354</span><span id="line-2354"> long length = 0;</span> |
| <span class="source-line-no">2355</span><span id="line-2355"> try {</span> |
| <span class="source-line-no">2356</span><span id="line-2356"> file.initReader();</span> |
| <span class="source-line-no">2357</span><span id="line-2357"> length = file.getReader().length();</span> |
| <span class="source-line-no">2358</span><span id="line-2358"> } catch (IOException e) {</span> |
| <span class="source-line-no">2359</span><span id="line-2359"> LOG.trace("Failed to open reader when trying to compute store file size for {}, ignoring",</span> |
| <span class="source-line-no">2360</span><span id="line-2360"> file, e);</span> |
| <span class="source-line-no">2361</span><span id="line-2361"> } finally {</span> |
| <span class="source-line-no">2362</span><span id="line-2362"> try {</span> |
| <span class="source-line-no">2363</span><span id="line-2363"> file.closeStoreFile(</span> |
| <span class="source-line-no">2364</span><span id="line-2364"> file.getCacheConf() != null ? file.getCacheConf().shouldEvictOnClose() : true);</span> |
| <span class="source-line-no">2365</span><span id="line-2365"> } catch (IOException e) {</span> |
| <span class="source-line-no">2366</span><span id="line-2366"> LOG.trace("Failed to close reader after computing store file size for {}, ignoring", file,</span> |
| <span class="source-line-no">2367</span><span id="line-2367"> e);</span> |
| <span class="source-line-no">2368</span><span id="line-2368"> }</span> |
| <span class="source-line-no">2369</span><span id="line-2369"> }</span> |
| <span class="source-line-no">2370</span><span id="line-2370"> return length;</span> |
| <span class="source-line-no">2371</span><span id="line-2371"> }</span> |
| <span class="source-line-no">2372</span><span id="line-2372"></span> |
| <span class="source-line-no">2373</span><span id="line-2373"> public Long preFlushSeqIDEstimation() {</span> |
| <span class="source-line-no">2374</span><span id="line-2374"> return memstore.preFlushSeqIDEstimation();</span> |
| <span class="source-line-no">2375</span><span id="line-2375"> }</span> |
| <span class="source-line-no">2376</span><span id="line-2376"></span> |
| <span class="source-line-no">2377</span><span id="line-2377"> @Override</span> |
| <span class="source-line-no">2378</span><span id="line-2378"> public boolean isSloppyMemStore() {</span> |
| <span class="source-line-no">2379</span><span id="line-2379"> return this.memstore.isSloppy();</span> |
| <span class="source-line-no">2380</span><span id="line-2380"> }</span> |
| <span class="source-line-no">2381</span><span id="line-2381"></span> |
| <span class="source-line-no">2382</span><span id="line-2382"> private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {</span> |
| <span class="source-line-no">2383</span><span id="line-2383"> LOG.trace("Clearing the compacted file {} from this store", filesToRemove);</span> |
| <span class="source-line-no">2384</span><span id="line-2384"> storeEngine.removeCompactedFiles(filesToRemove);</span> |
| <span class="source-line-no">2385</span><span id="line-2385"> }</span> |
| <span class="source-line-no">2386</span><span id="line-2386"></span> |
| <span class="source-line-no">2387</span><span id="line-2387"> void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, List<Long> fileSizes) {</span> |
| <span class="source-line-no">2388</span><span id="line-2388"> // Sanity check from the caller</span> |
| <span class="source-line-no">2389</span><span id="line-2389"> if (archivedFiles.size() != fileSizes.size()) {</span> |
| <span class="source-line-no">2390</span><span id="line-2390"> throw new RuntimeException("Coding error: should never see lists of varying size");</span> |
| <span class="source-line-no">2391</span><span id="line-2391"> }</span> |
| <span class="source-line-no">2392</span><span id="line-2392"> RegionServerServices rss = this.region.getRegionServerServices();</span> |
| <span class="source-line-no">2393</span><span id="line-2393"> if (rss == null) {</span> |
| <span class="source-line-no">2394</span><span id="line-2394"> return;</span> |
| <span class="source-line-no">2395</span><span id="line-2395"> }</span> |
| <span class="source-line-no">2396</span><span id="line-2396"> List<Entry<String, Long>> filesWithSizes = new ArrayList<>(archivedFiles.size());</span> |
| <span class="source-line-no">2397</span><span id="line-2397"> Iterator<Long> fileSizeIter = fileSizes.iterator();</span> |
| <span class="source-line-no">2398</span><span id="line-2398"> for (StoreFile storeFile : archivedFiles) {</span> |
| <span class="source-line-no">2399</span><span id="line-2399"> final long fileSize = fileSizeIter.next();</span> |
| <span class="source-line-no">2400</span><span id="line-2400"> if (storeFile.isHFile() && fileSize != 0) {</span> |
| <span class="source-line-no">2401</span><span id="line-2401"> filesWithSizes.add(Maps.immutableEntry(storeFile.getPath().getName(), fileSize));</span> |
| <span class="source-line-no">2402</span><span id="line-2402"> }</span> |
| <span class="source-line-no">2403</span><span id="line-2403"> }</span> |
| <span class="source-line-no">2404</span><span id="line-2404"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">2405</span><span id="line-2405"> LOG.trace("Files archived: " + archivedFiles + ", reporting the following to the Master: "</span> |
| <span class="source-line-no">2406</span><span id="line-2406"> + filesWithSizes);</span> |
| <span class="source-line-no">2407</span><span id="line-2407"> }</span> |
| <span class="source-line-no">2408</span><span id="line-2408"> boolean success = rss.reportFileArchivalForQuotas(getTableName(), filesWithSizes);</span> |
| <span class="source-line-no">2409</span><span id="line-2409"> if (!success) {</span> |
| <span class="source-line-no">2410</span><span id="line-2410"> LOG.warn("Failed to report archival of files: " + filesWithSizes);</span> |
| <span class="source-line-no">2411</span><span id="line-2411"> }</span> |
| <span class="source-line-no">2412</span><span id="line-2412"> }</span> |
| <span class="source-line-no">2413</span><span id="line-2413"></span> |
| <span class="source-line-no">2414</span><span id="line-2414"> @Override</span> |
| <span class="source-line-no">2415</span><span id="line-2415"> public int getCurrentParallelPutCount() {</span> |
| <span class="source-line-no">2416</span><span id="line-2416"> return currentParallelPutCount.get();</span> |
| <span class="source-line-no">2417</span><span id="line-2417"> }</span> |
| <span class="source-line-no">2418</span><span id="line-2418"></span> |
| <span class="source-line-no">2419</span><span id="line-2419"> public int getStoreRefCount() {</span> |
| <span class="source-line-no">2420</span><span id="line-2420"> return this.storeEngine.getStoreFileManager().getStoreFiles().stream()</span> |
| <span class="source-line-no">2421</span><span id="line-2421"> .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)</span> |
| <span class="source-line-no">2422</span><span id="line-2422"> .mapToInt(HStoreFile::getRefCount).sum();</span> |
| <span class="source-line-no">2423</span><span id="line-2423"> }</span> |
| <span class="source-line-no">2424</span><span id="line-2424"></span> |
| <span class="source-line-no">2425</span><span id="line-2425"> /** Returns get maximum ref count of storeFile among all compacted HStore Files for the HStore */</span> |
| <span class="source-line-no">2426</span><span id="line-2426"> public int getMaxCompactedStoreFileRefCount() {</span> |
| <span class="source-line-no">2427</span><span id="line-2427"> OptionalInt maxCompactedStoreFileRefCount = this.storeEngine.getStoreFileManager()</span> |
| <span class="source-line-no">2428</span><span id="line-2428"> .getCompactedfiles().stream().filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile)</span> |
| <span class="source-line-no">2429</span><span id="line-2429"> .mapToInt(HStoreFile::getRefCount).max();</span> |
| <span class="source-line-no">2430</span><span id="line-2430"> return maxCompactedStoreFileRefCount.isPresent() ? maxCompactedStoreFileRefCount.getAsInt() : 0;</span> |
| <span class="source-line-no">2431</span><span id="line-2431"> }</span> |
| <span class="source-line-no">2432</span><span id="line-2432"></span> |
| <span class="source-line-no">2433</span><span id="line-2433"> @Override</span> |
| <span class="source-line-no">2434</span><span id="line-2434"> public long getMemstoreOnlyRowReadsCount() {</span> |
| <span class="source-line-no">2435</span><span id="line-2435"> return memstoreOnlyRowReadsCount.sum();</span> |
| <span class="source-line-no">2436</span><span id="line-2436"> }</span> |
| <span class="source-line-no">2437</span><span id="line-2437"></span> |
| <span class="source-line-no">2438</span><span id="line-2438"> @Override</span> |
| <span class="source-line-no">2439</span><span id="line-2439"> public long getMixedRowReadsCount() {</span> |
| <span class="source-line-no">2440</span><span id="line-2440"> return mixedRowReadsCount.sum();</span> |
| <span class="source-line-no">2441</span><span id="line-2441"> }</span> |
| <span class="source-line-no">2442</span><span id="line-2442"></span> |
| <span class="source-line-no">2443</span><span id="line-2443"> @Override</span> |
| <span class="source-line-no">2444</span><span id="line-2444"> public Configuration getReadOnlyConfiguration() {</span> |
| <span class="source-line-no">2445</span><span id="line-2445"> return new ReadOnlyConfiguration(this.conf);</span> |
| <span class="source-line-no">2446</span><span id="line-2446"> }</span> |
| <span class="source-line-no">2447</span><span id="line-2447"></span> |
| <span class="source-line-no">2448</span><span id="line-2448"> void updateMetricsStore(boolean memstoreRead) {</span> |
| <span class="source-line-no">2449</span><span id="line-2449"> if (memstoreRead) {</span> |
| <span class="source-line-no">2450</span><span id="line-2450"> memstoreOnlyRowReadsCount.increment();</span> |
| <span class="source-line-no">2451</span><span id="line-2451"> } else {</span> |
| <span class="source-line-no">2452</span><span id="line-2452"> mixedRowReadsCount.increment();</span> |
| <span class="source-line-no">2453</span><span id="line-2453"> }</span> |
| <span class="source-line-no">2454</span><span id="line-2454"> }</span> |
| <span class="source-line-no">2455</span><span id="line-2455"></span> |
| <span class="source-line-no">2456</span><span id="line-2456"> /**</span> |
| <span class="source-line-no">2457</span><span id="line-2457"> * Return the storefiles which are currently being written to. Mainly used by</span> |
| <span class="source-line-no">2458</span><span id="line-2458"> * {@link BrokenStoreFileCleaner} to prevent deleting the these files as they are not present in</span> |
| <span class="source-line-no">2459</span><span id="line-2459"> * SFT yet.</span> |
| <span class="source-line-no">2460</span><span id="line-2460"> */</span> |
| <span class="source-line-no">2461</span><span id="line-2461"> public Set<Path> getStoreFilesBeingWritten() {</span> |
| <span class="source-line-no">2462</span><span id="line-2462"> return storeFileWriterCreationTrackers.stream().flatMap(t -> t.get().stream())</span> |
| <span class="source-line-no">2463</span><span id="line-2463"> .collect(Collectors.toSet());</span> |
| <span class="source-line-no">2464</span><span id="line-2464"> }</span> |
| <span class="source-line-no">2465</span><span id="line-2465"></span> |
| <span class="source-line-no">2466</span><span id="line-2466"> @Override</span> |
| <span class="source-line-no">2467</span><span id="line-2467"> public long getBloomFilterRequestsCount() {</span> |
| <span class="source-line-no">2468</span><span id="line-2468"> return storeEngine.getBloomFilterMetrics().getRequestsCount();</span> |
| <span class="source-line-no">2469</span><span id="line-2469"> }</span> |
| <span class="source-line-no">2470</span><span id="line-2470"></span> |
| <span class="source-line-no">2471</span><span id="line-2471"> @Override</span> |
| <span class="source-line-no">2472</span><span id="line-2472"> public long getBloomFilterNegativeResultsCount() {</span> |
| <span class="source-line-no">2473</span><span id="line-2473"> return storeEngine.getBloomFilterMetrics().getNegativeResultsCount();</span> |
| <span class="source-line-no">2474</span><span id="line-2474"> }</span> |
| <span class="source-line-no">2475</span><span id="line-2475"></span> |
| <span class="source-line-no">2476</span><span id="line-2476"> @Override</span> |
| <span class="source-line-no">2477</span><span id="line-2477"> public long getBloomFilterEligibleRequestsCount() {</span> |
| <span class="source-line-no">2478</span><span id="line-2478"> return storeEngine.getBloomFilterMetrics().getEligibleRequestsCount();</span> |
| <span class="source-line-no">2479</span><span id="line-2479"> }</span> |
| <span class="source-line-no">2480</span><span id="line-2480">}</span> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </pre> |
| </div> |
| </main> |
| </body> |
| </html> |