| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>Source code</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta name="description" content="source: package: org.apache.hadoop.hbase.regionserver.wal, class: SequenceIdAccounting"> |
| <meta name="generator" content="javadoc/SourceToHTMLConverter"> |
| <link rel="stylesheet" type="text/css" href="../../../../../../../stylesheet.css" title="Style"> |
| </head> |
| <body class="source-page"> |
| <main role="main"> |
| <div class="source-container"> |
| <pre><span class="source-line-no">001</span><span id="line-1">/*</span> |
| <span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span> |
| <span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span> |
| <span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span> |
| <span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span> |
| <span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span> |
| <span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span> |
| <span class="source-line-no">009</span><span id="line-9"> *</span> |
| <span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="source-line-no">011</span><span id="line-11"> *</span> |
| <span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span> |
| <span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span> |
| <span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span> |
| <span class="source-line-no">017</span><span id="line-17"> */</span> |
| <span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.regionserver.wal;</span> |
| <span class="source-line-no">019</span><span id="line-19"></span> |
| <span class="source-line-no">020</span><span id="line-20">import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;</span> |
| <span class="source-line-no">021</span><span id="line-21"></span> |
| <span class="source-line-no">022</span><span id="line-22">import com.google.errorprone.annotations.RestrictedApi;</span> |
| <span class="source-line-no">023</span><span id="line-23">import java.util.ArrayList;</span> |
| <span class="source-line-no">024</span><span id="line-24">import java.util.Collection;</span> |
| <span class="source-line-no">025</span><span id="line-25">import java.util.Collections;</span> |
| <span class="source-line-no">026</span><span id="line-26">import java.util.HashMap;</span> |
| <span class="source-line-no">027</span><span id="line-27">import java.util.List;</span> |
| <span class="source-line-no">028</span><span id="line-28">import java.util.Map;</span> |
| <span class="source-line-no">029</span><span id="line-29">import java.util.Set;</span> |
| <span class="source-line-no">030</span><span id="line-30">import java.util.TreeMap;</span> |
| <span class="source-line-no">031</span><span id="line-31">import java.util.concurrent.ConcurrentHashMap;</span> |
| <span class="source-line-no">032</span><span id="line-32">import java.util.concurrent.ConcurrentMap;</span> |
| <span class="source-line-no">033</span><span id="line-33">import java.util.stream.Collectors;</span> |
| <span class="source-line-no">034</span><span id="line-34">import org.apache.hadoop.hbase.HConstants;</span> |
| <span class="source-line-no">035</span><span id="line-35">import org.apache.hadoop.hbase.util.Bytes;</span> |
| <span class="source-line-no">036</span><span id="line-36">import org.apache.hadoop.hbase.util.ImmutableByteArray;</span> |
| <span class="source-line-no">037</span><span id="line-37">import org.apache.yetus.audience.InterfaceAudience;</span> |
| <span class="source-line-no">038</span><span id="line-38">import org.slf4j.Logger;</span> |
| <span class="source-line-no">039</span><span id="line-39">import org.slf4j.LoggerFactory;</span> |
| <span class="source-line-no">040</span><span id="line-40"></span> |
| <span class="source-line-no">041</span><span id="line-41">/**</span> |
| <span class="source-line-no">042</span><span id="line-42"> * Accounting of sequence ids per region and then by column family. So we can keep our accounting</span> |
| <span class="source-line-no">043</span><span id="line-43"> * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can</span> |
| <span class="source-line-no">044</span><span id="line-44"> * keep abreast of the state of sequence id persistence. Also call update per append.</span> |
| <span class="source-line-no">045</span><span id="line-45"> * <p></span> |
| <span class="source-line-no">046</span><span id="line-46"> * For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by</span> |
| <span class="source-line-no">047</span><span id="line-47"> * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use</span> |
| <span class="source-line-no">048</span><span id="line-48"> * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because</span> |
| <span class="source-line-no">049</span><span id="line-49"> * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See</span> |
| <span class="source-line-no">050</span><span id="line-50"> * HBASE-16278 for more details.</span> |
| <span class="source-line-no">051</span><span id="line-51"> * </p></span> |
| <span class="source-line-no">052</span><span id="line-52"> */</span> |
| <span class="source-line-no">053</span><span id="line-53">@InterfaceAudience.Private</span> |
| <span class="source-line-no">054</span><span id="line-54">public class SequenceIdAccounting {</span> |
| <span class="source-line-no">055</span><span id="line-55"> private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);</span> |
| <span class="source-line-no">056</span><span id="line-56"></span> |
| <span class="source-line-no">057</span><span id="line-57"> /**</span> |
| <span class="source-line-no">058</span><span id="line-58"> * This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and</span> |
| <span class="source-line-no">059</span><span id="line-59"> * {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the lowest</span> |
| <span class="source-line-no">060</span><span id="line-60"> * outstanding sequence ids EXCEPT when flushing. When we flush, the current lowest set for the</span> |
| <span class="source-line-no">061</span><span id="line-61"> * region/column family are moved (atomically because of this lock) to</span> |
| <span class="source-line-no">062</span><span id="line-62"> * {@link #flushingSequenceIds}.</span> |
| <span class="source-line-no">063</span><span id="line-63"> * <p></span> |
| <span class="source-line-no">064</span><span id="line-64"> * The two Maps are tied by this locking object EXCEPT when we go to update the lowest entry; see</span> |
| <span class="source-line-no">065</span><span id="line-65"> * {@link #lowestUnflushedSequenceIds}. In here is a putIfAbsent call on</span> |
| <span class="source-line-no">066</span><span id="line-66"> * {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest sequence id</span> |
| <span class="source-line-no">067</span><span id="line-67"> * if we find that there is no entry for the current column family. There will be no entry only if</span> |
| <span class="source-line-no">068</span><span id="line-68"> * we just came up OR we have moved aside current set of lowest sequence ids because the current</span> |
| <span class="source-line-no">069</span><span id="line-69"> * set are being flushed (by putting them into {@link #flushingSequenceIds}). This is how we pick</span> |
| <span class="source-line-no">070</span><span id="line-70"> * up the next 'lowest' sequence id per region per column family to be used figuring what is in</span> |
| <span class="source-line-no">071</span><span id="line-71"> * the next flush.</span> |
| <span class="source-line-no">072</span><span id="line-72"> */</span> |
| <span class="source-line-no">073</span><span id="line-73"> private final Object tieLock = new Object();</span> |
| <span class="source-line-no">074</span><span id="line-74"></span> |
| <span class="source-line-no">075</span><span id="line-75"> /**</span> |
| <span class="source-line-no">076</span><span id="line-76"> * Map of encoded region names and family names to their OLDEST -- i.e. their first, the</span> |
| <span class="source-line-no">077</span><span id="line-77"> * longest-lived, their 'earliest', the 'lowest' -- sequence id.</span> |
| <span class="source-line-no">078</span><span id="line-78"> * <p></span> |
| <span class="source-line-no">079</span><span id="line-79"> * When we flush, the current lowest sequence ids get cleared and added to</span> |
| <span class="source-line-no">080</span><span id="line-80"> * {@link #flushingSequenceIds}. The next append that comes in, is then added here to</span> |
| <span class="source-line-no">081</span><span id="line-81"> * {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid.</span> |
| <span class="source-line-no">082</span><span id="line-82"> * <p></span> |
| <span class="source-line-no">083</span><span id="line-83"> * If flush fails, currently server is aborted so no need to restore previous sequence ids.</span> |
| <span class="source-line-no">084</span><span id="line-84"> * <p></span> |
| <span class="source-line-no">085</span><span id="line-85"> * Needs to be concurrent Maps because we use putIfAbsent updating oldest.</span> |
| <span class="source-line-no">086</span><span id="line-86"> */</span> |
| <span class="source-line-no">087</span><span id="line-87"> private final ConcurrentMap<byte[],</span> |
| <span class="source-line-no">088</span><span id="line-88"> ConcurrentMap<ImmutableByteArray, Long>> lowestUnflushedSequenceIds = new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">089</span><span id="line-89"></span> |
| <span class="source-line-no">090</span><span id="line-90"> /**</span> |
| <span class="source-line-no">091</span><span id="line-91"> * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id</span> |
| <span class="source-line-no">092</span><span id="line-92"> * currently being flushed out to hfiles. Entries are moved here from</span> |
| <span class="source-line-no">093</span><span id="line-93"> * {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held (so movement</span> |
| <span class="source-line-no">094</span><span id="line-94"> * between the Maps is atomic).</span> |
| <span class="source-line-no">095</span><span id="line-95"> */</span> |
| <span class="source-line-no">096</span><span id="line-96"> private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>();</span> |
| <span class="source-line-no">097</span><span id="line-97"></span> |
| <span class="source-line-no">098</span><span id="line-98"> /**</span> |
| <span class="source-line-no">099</span><span id="line-99"> * <p></span> |
| <span class="source-line-no">100</span><span id="line-100"> * Map of region encoded names to the latest/highest region sequence id. Updated on each call to</span> |
| <span class="source-line-no">101</span><span id="line-101"> * append.</span> |
| <span class="source-line-no">102</span><span id="line-102"> * </p></span> |
| <span class="source-line-no">103</span><span id="line-103"> * <p></span> |
| <span class="source-line-no">104</span><span id="line-104"> * This map uses byte[] as the key, and uses reference equality. It works in our use case as we</span> |
| <span class="source-line-no">105</span><span id="line-105"> * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a</span> |
| <span class="source-line-no">106</span><span id="line-106"> * given region, it always returns the same array.</span> |
| <span class="source-line-no">107</span><span id="line-107"> * </p></span> |
| <span class="source-line-no">108</span><span id="line-108"> */</span> |
| <span class="source-line-no">109</span><span id="line-109"> private Map<byte[], Long> highestSequenceIds = new HashMap<>();</span> |
| <span class="source-line-no">110</span><span id="line-110"></span> |
| <span class="source-line-no">111</span><span id="line-111"> /**</span> |
| <span class="source-line-no">112</span><span id="line-112"> * Returns the lowest unflushed sequence id for the region.</span> |
| <span class="source-line-no">113</span><span id="line-113"> * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will return</span> |
| <span class="source-line-no">114</span><span id="line-114"> * {@link HConstants#NO_SEQNUM} when none.</span> |
| <span class="source-line-no">115</span><span id="line-115"> */</span> |
| <span class="source-line-no">116</span><span id="line-116"> @RestrictedApi(explanation = "Should only be called in tests", link = "",</span> |
| <span class="source-line-no">117</span><span id="line-117"> allowedOnPath = ".*/src/test/.*")</span> |
| <span class="source-line-no">118</span><span id="line-118"> public long getLowestSequenceId(final byte[] encodedRegionName) {</span> |
| <span class="source-line-no">119</span><span id="line-119"> synchronized (this.tieLock) {</span> |
| <span class="source-line-no">120</span><span id="line-120"> Map<?, Long> m = this.flushingSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">121</span><span id="line-121"> long flushingLowest = m != null ? getLowestSequenceId(m) : Long.MAX_VALUE;</span> |
| <span class="source-line-no">122</span><span id="line-122"> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">123</span><span id="line-123"> long unflushedLowest = m != null ? getLowestSequenceId(m) : HConstants.NO_SEQNUM;</span> |
| <span class="source-line-no">124</span><span id="line-124"> return Math.min(flushingLowest, unflushedLowest);</span> |
| <span class="source-line-no">125</span><span id="line-125"> }</span> |
| <span class="source-line-no">126</span><span id="line-126"> }</span> |
| <span class="source-line-no">127</span><span id="line-127"></span> |
| <span class="source-line-no">128</span><span id="line-128"> /**</span> |
| <span class="source-line-no">129</span><span id="line-129"> * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and</span> |
| <span class="source-line-no">130</span><span id="line-130"> * <code>familyName</code>. Returned sequenceid may be for an edit currently being</span> |
| <span class="source-line-no">131</span><span id="line-131"> * flushed.</span> |
| <span class="source-line-no">132</span><span id="line-132"> */</span> |
| <span class="source-line-no">133</span><span id="line-133"> long getLowestSequenceId(final byte[] encodedRegionName, final byte[] familyName) {</span> |
| <span class="source-line-no">134</span><span id="line-134"> ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);</span> |
| <span class="source-line-no">135</span><span id="line-135"> synchronized (this.tieLock) {</span> |
| <span class="source-line-no">136</span><span id="line-136"> Map<ImmutableByteArray, Long> m = this.flushingSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">137</span><span id="line-137"> if (m != null) {</span> |
| <span class="source-line-no">138</span><span id="line-138"> Long lowest = m.get(familyNameWrapper);</span> |
| <span class="source-line-no">139</span><span id="line-139"> if (lowest != null) {</span> |
| <span class="source-line-no">140</span><span id="line-140"> return lowest;</span> |
| <span class="source-line-no">141</span><span id="line-141"> }</span> |
| <span class="source-line-no">142</span><span id="line-142"> }</span> |
| <span class="source-line-no">143</span><span id="line-143"> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">144</span><span id="line-144"> if (m != null) {</span> |
| <span class="source-line-no">145</span><span id="line-145"> Long lowest = m.get(familyNameWrapper);</span> |
| <span class="source-line-no">146</span><span id="line-146"> if (lowest != null) {</span> |
| <span class="source-line-no">147</span><span id="line-147"> return lowest;</span> |
| <span class="source-line-no">148</span><span id="line-148"> }</span> |
| <span class="source-line-no">149</span><span id="line-149"> }</span> |
| <span class="source-line-no">150</span><span id="line-150"> }</span> |
| <span class="source-line-no">151</span><span id="line-151"> return HConstants.NO_SEQNUM;</span> |
| <span class="source-line-no">152</span><span id="line-152"> }</span> |
| <span class="source-line-no">153</span><span id="line-153"></span> |
| <span class="source-line-no">154</span><span id="line-154"> /**</span> |
| <span class="source-line-no">155</span><span id="line-155"> * Reset the accounting of highest sequenceid by regionname.</span> |
| <span class="source-line-no">156</span><span id="line-156"> * @return Return the previous accounting Map of regions to the last sequence id written into</span> |
| <span class="source-line-no">157</span><span id="line-157"> * each.</span> |
| <span class="source-line-no">158</span><span id="line-158"> */</span> |
| <span class="source-line-no">159</span><span id="line-159"> Map<byte[], Long> resetHighest() {</span> |
| <span class="source-line-no">160</span><span id="line-160"> Map<byte[], Long> old = this.highestSequenceIds;</span> |
| <span class="source-line-no">161</span><span id="line-161"> this.highestSequenceIds = new HashMap<>();</span> |
| <span class="source-line-no">162</span><span id="line-162"> return old;</span> |
| <span class="source-line-no">163</span><span id="line-163"> }</span> |
| <span class="source-line-no">164</span><span id="line-164"></span> |
| <span class="source-line-no">165</span><span id="line-165"> /**</span> |
| <span class="source-line-no">166</span><span id="line-166"> * We've been passed a new sequenceid for the region. Set it as highest seen for this region and</span> |
| <span class="source-line-no">167</span><span id="line-167"> * if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing currently</span> |
| <span class="source-line-no">168</span><span id="line-168"> * older.</span> |
| <span class="source-line-no">169</span><span id="line-169"> * @param lowest Whether to keep running account of oldest sequence id.</span> |
| <span class="source-line-no">170</span><span id="line-170"> */</span> |
| <span class="source-line-no">171</span><span id="line-171"> void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,</span> |
| <span class="source-line-no">172</span><span id="line-172"> final boolean lowest) {</span> |
| <span class="source-line-no">173</span><span id="line-173"> Long l = Long.valueOf(sequenceid);</span> |
| <span class="source-line-no">174</span><span id="line-174"> this.highestSequenceIds.put(encodedRegionName, l);</span> |
| <span class="source-line-no">175</span><span id="line-175"> if (lowest) {</span> |
| <span class="source-line-no">176</span><span id="line-176"> ConcurrentMap<ImmutableByteArray, Long> m = getOrCreateLowestSequenceIds(encodedRegionName);</span> |
| <span class="source-line-no">177</span><span id="line-177"> for (byte[] familyName : families) {</span> |
| <span class="source-line-no">178</span><span id="line-178"> m.putIfAbsent(ImmutableByteArray.wrap(familyName), l);</span> |
| <span class="source-line-no">179</span><span id="line-179"> }</span> |
| <span class="source-line-no">180</span><span id="line-180"> }</span> |
| <span class="source-line-no">181</span><span id="line-181"> }</span> |
| <span class="source-line-no">182</span><span id="line-182"></span> |
| <span class="source-line-no">183</span><span id="line-183"> /**</span> |
| <span class="source-line-no">184</span><span id="line-184"> * Clear all the records of the given region as it is going to be closed.</span> |
| <span class="source-line-no">185</span><span id="line-185"> * <p/></span> |
| <span class="source-line-no">186</span><span id="line-186"> * We will call this once we get the region close marker. We need this because that, if we use</span> |
| <span class="source-line-no">187</span><span id="line-187"> * Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries</span> |
| <span class="source-line-no">188</span><span id="line-188"> * that has not been processed yet, this will lead to orphan records in the</span> |
| <span class="source-line-no">189</span><span id="line-189"> * lowestUnflushedSequenceIds and then cause too many WAL files.</span> |
| <span class="source-line-no">190</span><span id="line-190"> * <p/></span> |
| <span class="source-line-no">191</span><span id="line-191"> * See HBASE-23157 for more details.</span> |
| <span class="source-line-no">192</span><span id="line-192"> */</span> |
| <span class="source-line-no">193</span><span id="line-193"> void onRegionClose(byte[] encodedRegionName) {</span> |
| <span class="source-line-no">194</span><span id="line-194"> synchronized (tieLock) {</span> |
| <span class="source-line-no">195</span><span id="line-195"> this.lowestUnflushedSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">196</span><span id="line-196"> Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">197</span><span id="line-197"> if (flushing != null) {</span> |
| <span class="source-line-no">198</span><span id="line-198"> LOG.warn("Still have flushing records when closing {}, {}",</span> |
| <span class="source-line-no">199</span><span id="line-199"> Bytes.toString(encodedRegionName),</span> |
| <span class="source-line-no">200</span><span id="line-200"> flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue())</span> |
| <span class="source-line-no">201</span><span id="line-201"> .collect(Collectors.joining(",", "{", "}")));</span> |
| <span class="source-line-no">202</span><span id="line-202"> }</span> |
| <span class="source-line-no">203</span><span id="line-203"> }</span> |
| <span class="source-line-no">204</span><span id="line-204"> this.highestSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">205</span><span id="line-205"> }</span> |
| <span class="source-line-no">206</span><span id="line-206"></span> |
| <span class="source-line-no">207</span><span id="line-207"> /**</span> |
| <span class="source-line-no">208</span><span id="line-208"> * Update the store sequence id, e.g., upon executing in-memory compaction</span> |
| <span class="source-line-no">209</span><span id="line-209"> */</span> |
| <span class="source-line-no">210</span><span id="line-210"> void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceId,</span> |
| <span class="source-line-no">211</span><span id="line-211"> boolean onlyIfGreater) {</span> |
| <span class="source-line-no">212</span><span id="line-212"> if (sequenceId == null) {</span> |
| <span class="source-line-no">213</span><span id="line-213"> return;</span> |
| <span class="source-line-no">214</span><span id="line-214"> }</span> |
| <span class="source-line-no">215</span><span id="line-215"> Long highest = this.highestSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">216</span><span id="line-216"> if (highest == null || sequenceId > highest) {</span> |
| <span class="source-line-no">217</span><span id="line-217"> this.highestSequenceIds.put(encodedRegionName, sequenceId);</span> |
| <span class="source-line-no">218</span><span id="line-218"> }</span> |
| <span class="source-line-no">219</span><span id="line-219"> ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);</span> |
| <span class="source-line-no">220</span><span id="line-220"> synchronized (this.tieLock) {</span> |
| <span class="source-line-no">221</span><span id="line-221"> ConcurrentMap<ImmutableByteArray, Long> m = getOrCreateLowestSequenceIds(encodedRegionName);</span> |
| <span class="source-line-no">222</span><span id="line-222"> boolean replaced = false;</span> |
| <span class="source-line-no">223</span><span id="line-223"> while (!replaced) {</span> |
| <span class="source-line-no">224</span><span id="line-224"> Long oldSeqId = m.get(familyNameWrapper);</span> |
| <span class="source-line-no">225</span><span id="line-225"> if (oldSeqId == null) {</span> |
| <span class="source-line-no">226</span><span id="line-226"> m.put(familyNameWrapper, sequenceId);</span> |
| <span class="source-line-no">227</span><span id="line-227"> replaced = true;</span> |
| <span class="source-line-no">228</span><span id="line-228"> } else if (onlyIfGreater) {</span> |
| <span class="source-line-no">229</span><span id="line-229"> if (sequenceId > oldSeqId) {</span> |
| <span class="source-line-no">230</span><span id="line-230"> replaced = m.replace(familyNameWrapper, oldSeqId, sequenceId);</span> |
| <span class="source-line-no">231</span><span id="line-231"> } else {</span> |
| <span class="source-line-no">232</span><span id="line-232"> return;</span> |
| <span class="source-line-no">233</span><span id="line-233"> }</span> |
| <span class="source-line-no">234</span><span id="line-234"> } else { // replace even if sequence id is not greater than oldSeqId</span> |
| <span class="source-line-no">235</span><span id="line-235"> m.put(familyNameWrapper, sequenceId);</span> |
| <span class="source-line-no">236</span><span id="line-236"> return;</span> |
| <span class="source-line-no">237</span><span id="line-237"> }</span> |
| <span class="source-line-no">238</span><span id="line-238"> }</span> |
| <span class="source-line-no">239</span><span id="line-239"> }</span> |
| <span class="source-line-no">240</span><span id="line-240"> }</span> |
| <span class="source-line-no">241</span><span id="line-241"></span> |
| <span class="source-line-no">242</span><span id="line-242"> ConcurrentMap<ImmutableByteArray, Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {</span> |
| <span class="source-line-no">243</span><span id="line-243"> // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.</span> |
| <span class="source-line-no">244</span><span id="line-244"> return computeIfAbsent(this.lowestUnflushedSequenceIds, encodedRegionName,</span> |
| <span class="source-line-no">245</span><span id="line-245"> ConcurrentHashMap::new);</span> |
| <span class="source-line-no">246</span><span id="line-246"> }</span> |
| <span class="source-line-no">247</span><span id="line-247"></span> |
| <span class="source-line-no">248</span><span id="line-248"> /**</span> |
| <span class="source-line-no">249</span><span id="line-249"> * @param sequenceids Map to search for lowest value.</span> |
| <span class="source-line-no">250</span><span id="line-250"> * @return Lowest value found in <code>sequenceids</code>.</span> |
| <span class="source-line-no">251</span><span id="line-251"> */</span> |
| <span class="source-line-no">252</span><span id="line-252"> private static long getLowestSequenceId(Map<?, Long> sequenceids) {</span> |
| <span class="source-line-no">253</span><span id="line-253"> long lowest = HConstants.NO_SEQNUM;</span> |
| <span class="source-line-no">254</span><span id="line-254"> for (Map.Entry<?, Long> entry : sequenceids.entrySet()) {</span> |
| <span class="source-line-no">255</span><span id="line-255"> if (entry.getKey().toString().equals("METAFAMILY")) {</span> |
| <span class="source-line-no">256</span><span id="line-256"> continue;</span> |
| <span class="source-line-no">257</span><span id="line-257"> }</span> |
| <span class="source-line-no">258</span><span id="line-258"> Long sid = entry.getValue();</span> |
| <span class="source-line-no">259</span><span id="line-259"> if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {</span> |
| <span class="source-line-no">260</span><span id="line-260"> lowest = sid.longValue();</span> |
| <span class="source-line-no">261</span><span id="line-261"> }</span> |
| <span class="source-line-no">262</span><span id="line-262"> }</span> |
| <span class="source-line-no">263</span><span id="line-263"> return lowest;</span> |
| <span class="source-line-no">264</span><span id="line-264"> }</span> |
| <span class="source-line-no">265</span><span id="line-265"></span> |
| <span class="source-line-no">266</span><span id="line-266"> /**</span> |
| <span class="source-line-no">267</span><span id="line-267"> * @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it</span> |
| <span class="source-line-no">268</span><span id="line-268"> * instead has found the smallest sequence id and it returns that as the value instead.</span> |
| <span class="source-line-no">269</span><span id="line-269"> */</span> |
| <span class="source-line-no">270</span><span id="line-270"> private <T extends Map<?, Long>> Map<byte[], Long> flattenToLowestSequenceId(Map<byte[], T> src) {</span> |
| <span class="source-line-no">271</span><span id="line-271"> if (src == null || src.isEmpty()) {</span> |
| <span class="source-line-no">272</span><span id="line-272"> return null;</span> |
| <span class="source-line-no">273</span><span id="line-273"> }</span> |
| <span class="source-line-no">274</span><span id="line-274"> Map<byte[], Long> tgt = new HashMap<>();</span> |
| <span class="source-line-no">275</span><span id="line-275"> for (Map.Entry<byte[], T> entry : src.entrySet()) {</span> |
| <span class="source-line-no">276</span><span id="line-276"> long lowestSeqId = getLowestSequenceId(entry.getValue());</span> |
| <span class="source-line-no">277</span><span id="line-277"> if (lowestSeqId != HConstants.NO_SEQNUM) {</span> |
| <span class="source-line-no">278</span><span id="line-278"> tgt.put(entry.getKey(), lowestSeqId);</span> |
| <span class="source-line-no">279</span><span id="line-279"> }</span> |
| <span class="source-line-no">280</span><span id="line-280"> }</span> |
| <span class="source-line-no">281</span><span id="line-281"> return tgt;</span> |
| <span class="source-line-no">282</span><span id="line-282"> }</span> |
| <span class="source-line-no">283</span><span id="line-283"></span> |
| <span class="source-line-no">284</span><span id="line-284"> /**</span> |
| <span class="source-line-no">285</span><span id="line-285"> * @param encodedRegionName Region to flush.</span> |
| <span class="source-line-no">286</span><span id="line-286"> * @param families Families to flush. May be a subset of all families in the region.</span> |
| <span class="source-line-no">287</span><span id="line-287"> * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if we are</span> |
| <span class="source-line-no">288</span><span id="line-288"> * flushing a subset of all families but there are no edits in those families not being</span> |
| <span class="source-line-no">289</span><span id="line-289"> * flushed; in other words, this is effectively same as a flush of all of the region</span> |
| <span class="source-line-no">290</span><span id="line-290"> * though we were passed a subset of regions. Otherwise, it returns the sequence id of the</span> |
| <span class="source-line-no">291</span><span id="line-291"> * oldest/lowest outstanding edit.</span> |
| <span class="source-line-no">292</span><span id="line-292"> */</span> |
| <span class="source-line-no">293</span><span id="line-293"> Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {</span> |
| <span class="source-line-no">294</span><span id="line-294"> Map<byte[], Long> familytoSeq = new HashMap<>();</span> |
| <span class="source-line-no">295</span><span id="line-295"> for (byte[] familyName : families) {</span> |
| <span class="source-line-no">296</span><span id="line-296"> familytoSeq.put(familyName, HConstants.NO_SEQNUM);</span> |
| <span class="source-line-no">297</span><span id="line-297"> }</span> |
| <span class="source-line-no">298</span><span id="line-298"> return startCacheFlush(encodedRegionName, familytoSeq);</span> |
| <span class="source-line-no">299</span><span id="line-299"> }</span> |
| <span class="source-line-no">300</span><span id="line-300"></span> |
| <span class="source-line-no">301</span><span id="line-301"> Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) {</span> |
| <span class="source-line-no">302</span><span id="line-302"> Map<ImmutableByteArray, Long> oldSequenceIds = null;</span> |
| <span class="source-line-no">303</span><span id="line-303"> Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;</span> |
| <span class="source-line-no">304</span><span id="line-304"> synchronized (tieLock) {</span> |
| <span class="source-line-no">305</span><span id="line-305"> Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">306</span><span id="line-306"> if (m != null) {</span> |
| <span class="source-line-no">307</span><span id="line-307"> // NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled</span> |
| <span class="source-line-no">308</span><span id="line-308"> // circumstance because another concurrent thread now may add sequenceids for this family</span> |
| <span class="source-line-no">309</span><span id="line-309"> // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it</span> |
| <span class="source-line-no">310</span><span id="line-310"> // is fine because updates are blocked when this method is called. Make sure!!!</span> |
| <span class="source-line-no">311</span><span id="line-311"> for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {</span> |
| <span class="source-line-no">312</span><span id="line-312"> ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());</span> |
| <span class="source-line-no">313</span><span id="line-313"> Long seqId = null;</span> |
| <span class="source-line-no">314</span><span id="line-314"> if (entry.getValue() == HConstants.NO_SEQNUM) {</span> |
| <span class="source-line-no">315</span><span id="line-315"> seqId = m.remove(familyNameWrapper);</span> |
| <span class="source-line-no">316</span><span id="line-316"> } else {</span> |
| <span class="source-line-no">317</span><span id="line-317"> seqId = m.replace(familyNameWrapper, entry.getValue());</span> |
| <span class="source-line-no">318</span><span id="line-318"> }</span> |
| <span class="source-line-no">319</span><span id="line-319"> if (seqId != null) {</span> |
| <span class="source-line-no">320</span><span id="line-320"> if (oldSequenceIds == null) {</span> |
| <span class="source-line-no">321</span><span id="line-321"> oldSequenceIds = new HashMap<>();</span> |
| <span class="source-line-no">322</span><span id="line-322"> }</span> |
| <span class="source-line-no">323</span><span id="line-323"> oldSequenceIds.put(familyNameWrapper, seqId);</span> |
| <span class="source-line-no">324</span><span id="line-324"> }</span> |
| <span class="source-line-no">325</span><span id="line-325"> }</span> |
| <span class="source-line-no">326</span><span id="line-326"> if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {</span> |
| <span class="source-line-no">327</span><span id="line-327"> if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {</span> |
| <span class="source-line-no">328</span><span id="line-328"> LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName)</span> |
| <span class="source-line-no">329</span><span id="line-329"> + ", sequenceid=" + oldSequenceIds);</span> |
| <span class="source-line-no">330</span><span id="line-330"> }</span> |
| <span class="source-line-no">331</span><span id="line-331"> }</span> |
| <span class="source-line-no">332</span><span id="line-332"> if (m.isEmpty()) {</span> |
| <span class="source-line-no">333</span><span id="line-333"> // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever</span> |
| <span class="source-line-no">334</span><span id="line-334"> // even if the region is already moved to other server.</span> |
| <span class="source-line-no">335</span><span id="line-335"> // Do not worry about data racing, we held write lock of region when calling</span> |
| <span class="source-line-no">336</span><span id="line-336"> // startCacheFlush, so no one can add value to the map we removed.</span> |
| <span class="source-line-no">337</span><span id="line-337"> this.lowestUnflushedSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">338</span><span id="line-338"> } else {</span> |
| <span class="source-line-no">339</span><span id="line-339"> // Flushing a subset of the region families. Return the sequence id of the oldest entry.</span> |
| <span class="source-line-no">340</span><span id="line-340"> lowestUnflushedInRegion = Collections.min(m.values());</span> |
| <span class="source-line-no">341</span><span id="line-341"> }</span> |
| <span class="source-line-no">342</span><span id="line-342"> }</span> |
| <span class="source-line-no">343</span><span id="line-343"> }</span> |
| <span class="source-line-no">344</span><span id="line-344"> // Do this check outside lock.</span> |
| <span class="source-line-no">345</span><span id="line-345"> if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {</span> |
| <span class="source-line-no">346</span><span id="line-346"> // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either</span> |
| <span class="source-line-no">347</span><span id="line-347"> // the region is already flushing (which would make this call invalid), or there</span> |
| <span class="source-line-no">348</span><span id="line-348"> // were no appends after last flush, so why are we starting flush? Maybe we should</span> |
| <span class="source-line-no">349</span><span id="line-349"> // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.</span> |
| <span class="source-line-no">350</span><span id="line-350"> // For now preserve old logic.</span> |
| <span class="source-line-no">351</span><span id="line-351"> LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));</span> |
| <span class="source-line-no">352</span><span id="line-352"> }</span> |
| <span class="source-line-no">353</span><span id="line-353"> return lowestUnflushedInRegion;</span> |
| <span class="source-line-no">354</span><span id="line-354"> }</span> |
| <span class="source-line-no">355</span><span id="line-355"></span> |
| <span class="source-line-no">356</span><span id="line-356"> void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {</span> |
| <span class="source-line-no">357</span><span id="line-357"> // This is a simple hack to avoid maxFlushedSeqId go backwards.</span> |
| <span class="source-line-no">358</span><span id="line-358"> // The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going</span> |
| <span class="source-line-no">359</span><span id="line-359"> // to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may</span> |
| <span class="source-line-no">360</span><span id="line-360"> // still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and</span> |
| <span class="source-line-no">361</span><span id="line-361"> // then it will be recorded as the lowestUnflushedSeqId by the above update method, which is</span> |
| <span class="source-line-no">362</span><span id="line-362"> // less than the current maxFlushedSeqId. And if next time we only flush the family with this</span> |
| <span class="source-line-no">363</span><span id="line-363"> // unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.</span> |
| <span class="source-line-no">364</span><span id="line-364"> // This is an unexpected behavior so we should fix it, otherwise it may cause unexpected</span> |
| <span class="source-line-no">365</span><span id="line-365"> // behavior in other area.</span> |
| <span class="source-line-no">366</span><span id="line-366"> // The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with</span> |
| <span class="source-line-no">367</span><span id="line-367"> // maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less</span> |
| <span class="source-line-no">368</span><span id="line-368"> // than or equal to it have been flushed, i.e, persistent to HFile, so set</span> |
| <span class="source-line-no">369</span><span id="line-369"> // lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.</span> |
| <span class="source-line-no">370</span><span id="line-370"> // And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it</span> |
| <span class="source-line-no">371</span><span id="line-371"> // means we have flushed all the stores so the seq id for actual data should be at least plus 1.</span> |
| <span class="source-line-no">372</span><span id="line-372"> // And if we do not flush all the stores, then the maxFlushedSeqId is calculated by</span> |
| <span class="source-line-no">373</span><span id="line-373"> // lowestUnflushedSeqId - 1, so here let's plus the 1 back.</span> |
| <span class="source-line-no">374</span><span id="line-374"> Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);</span> |
| <span class="source-line-no">375</span><span id="line-375"> synchronized (tieLock) {</span> |
| <span class="source-line-no">376</span><span id="line-376"> this.flushingSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">377</span><span id="line-377"> Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);</span> |
| <span class="source-line-no">378</span><span id="line-378"> if (unflushed == null) {</span> |
| <span class="source-line-no">379</span><span id="line-379"> return;</span> |
| <span class="source-line-no">380</span><span id="line-380"> }</span> |
| <span class="source-line-no">381</span><span id="line-381"> for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {</span> |
| <span class="source-line-no">382</span><span id="line-382"> if (e.getValue().longValue() <= maxFlushedSeqId) {</span> |
| <span class="source-line-no">383</span><span id="line-383"> e.setValue(wrappedSeqId);</span> |
| <span class="source-line-no">384</span><span id="line-384"> }</span> |
| <span class="source-line-no">385</span><span id="line-385"> }</span> |
| <span class="source-line-no">386</span><span id="line-386"> }</span> |
| <span class="source-line-no">387</span><span id="line-387"> }</span> |
| <span class="source-line-no">388</span><span id="line-388"></span> |
| <span class="source-line-no">389</span><span id="line-389"> void abortCacheFlush(final byte[] encodedRegionName) {</span> |
| <span class="source-line-no">390</span><span id="line-390"> // Method is called when we are crashing down because failed write flush AND it is called</span> |
| <span class="source-line-no">391</span><span id="line-391"> // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids.</span> |
| <span class="source-line-no">392</span><span id="line-392"> Map<ImmutableByteArray, Long> flushing = null;</span> |
| <span class="source-line-no">393</span><span id="line-393"> Map<ImmutableByteArray, Long> tmpMap = new HashMap<>();</span> |
| <span class="source-line-no">394</span><span id="line-394"> // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what</span> |
| <span class="source-line-no">395</span><span id="line-395"> // happened in startCacheFlush. During prepare phase, we have update lock on the region so</span> |
| <span class="source-line-no">396</span><span id="line-396"> // no edits should be coming in via append.</span> |
| <span class="source-line-no">397</span><span id="line-397"> synchronized (tieLock) {</span> |
| <span class="source-line-no">398</span><span id="line-398"> flushing = this.flushingSequenceIds.remove(encodedRegionName);</span> |
| <span class="source-line-no">399</span><span id="line-399"> if (flushing != null) {</span> |
| <span class="source-line-no">400</span><span id="line-400"> Map<ImmutableByteArray, Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName);</span> |
| <span class="source-line-no">401</span><span id="line-401"> for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {</span> |
| <span class="source-line-no">402</span><span id="line-402"> // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this</span> |
| <span class="source-line-no">403</span><span id="line-403"> // value, it will now be in tmpMap.</span> |
| <span class="source-line-no">404</span><span id="line-404"> tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue()));</span> |
| <span class="source-line-no">405</span><span id="line-405"> }</span> |
| <span class="source-line-no">406</span><span id="line-406"> }</span> |
| <span class="source-line-no">407</span><span id="line-407"> }</span> |
| <span class="source-line-no">408</span><span id="line-408"></span> |
| <span class="source-line-no">409</span><span id="line-409"> // Here we are doing some 'test' to see if edits are going in out of order. What is it for?</span> |
| <span class="source-line-no">410</span><span id="line-410"> // Carried over from old code.</span> |
| <span class="source-line-no">411</span><span id="line-411"> if (flushing != null) {</span> |
| <span class="source-line-no">412</span><span id="line-412"> for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {</span> |
| <span class="source-line-no">413</span><span id="line-413"> Long currentId = tmpMap.get(e.getKey());</span> |
| <span class="source-line-no">414</span><span id="line-414"> if (currentId != null && currentId.longValue() < e.getValue().longValue()) {</span> |
| <span class="source-line-no">415</span><span id="line-415"> String errorStr = Bytes.toString(encodedRegionName) + " family " + e.getKey().toString()</span> |
| <span class="source-line-no">416</span><span id="line-416"> + " acquired edits out of order current memstore seq=" + currentId</span> |
| <span class="source-line-no">417</span><span id="line-417"> + ", previous oldest unflushed id=" + e.getValue();</span> |
| <span class="source-line-no">418</span><span id="line-418"> LOG.error(errorStr);</span> |
| <span class="source-line-no">419</span><span id="line-419"> Runtime.getRuntime().halt(1);</span> |
| <span class="source-line-no">420</span><span id="line-420"> }</span> |
| <span class="source-line-no">421</span><span id="line-421"> }</span> |
| <span class="source-line-no">422</span><span id="line-422"> }</span> |
| <span class="source-line-no">423</span><span id="line-423"> }</span> |
| <span class="source-line-no">424</span><span id="line-424"></span> |
| <span class="source-line-no">425</span><span id="line-425"> /**</span> |
| <span class="source-line-no">426</span><span id="line-426"> * See if passed <code>sequenceids</code> are lower -- i.e. earlier -- than any outstanding</span> |
| <span class="source-line-no">427</span><span id="line-427"> * sequenceids, sequenceids we are holding on to in this accounting instance.</span> |
| <span class="source-line-no">428</span><span id="line-428"> * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to</span> |
| <span class="source-line-no">429</span><span id="line-429"> * be null).</span> |
| <span class="source-line-no">430</span><span id="line-430"> * @param keysBlocking An optional collection that is used to return the specific keys that are</span> |
| <span class="source-line-no">431</span><span id="line-431"> * causing this method to return false.</span> |
| <span class="source-line-no">432</span><span id="line-432"> * @return true if all sequenceids are lower, older than, the old sequenceids in this instance.</span> |
| <span class="source-line-no">433</span><span id="line-433"> */</span> |
| <span class="source-line-no">434</span><span id="line-434"> boolean areAllLower(Map<byte[], Long> sequenceids, Collection<byte[]> keysBlocking) {</span> |
| <span class="source-line-no">435</span><span id="line-435"> Map<byte[], Long> flushing = null;</span> |
| <span class="source-line-no">436</span><span id="line-436"> Map<byte[], Long> unflushed = null;</span> |
| <span class="source-line-no">437</span><span id="line-437"> synchronized (this.tieLock) {</span> |
| <span class="source-line-no">438</span><span id="line-438"> // Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed</span> |
| <span class="source-line-no">439</span><span id="line-439"> // data structures to use in tests below.</span> |
| <span class="source-line-no">440</span><span id="line-440"> flushing = flattenToLowestSequenceId(this.flushingSequenceIds);</span> |
| <span class="source-line-no">441</span><span id="line-441"> unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);</span> |
| <span class="source-line-no">442</span><span id="line-442"> }</span> |
| <span class="source-line-no">443</span><span id="line-443"> boolean result = true;</span> |
| <span class="source-line-no">444</span><span id="line-444"> for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {</span> |
| <span class="source-line-no">445</span><span id="line-445"> long oldestFlushing = Long.MAX_VALUE;</span> |
| <span class="source-line-no">446</span><span id="line-446"> long oldestUnflushed = Long.MAX_VALUE;</span> |
| <span class="source-line-no">447</span><span id="line-447"> if (flushing != null && flushing.containsKey(e.getKey())) {</span> |
| <span class="source-line-no">448</span><span id="line-448"> oldestFlushing = flushing.get(e.getKey());</span> |
| <span class="source-line-no">449</span><span id="line-449"> }</span> |
| <span class="source-line-no">450</span><span id="line-450"> if (unflushed != null && unflushed.containsKey(e.getKey())) {</span> |
| <span class="source-line-no">451</span><span id="line-451"> oldestUnflushed = unflushed.get(e.getKey());</span> |
| <span class="source-line-no">452</span><span id="line-452"> }</span> |
| <span class="source-line-no">453</span><span id="line-453"> long min = Math.min(oldestFlushing, oldestUnflushed);</span> |
| <span class="source-line-no">454</span><span id="line-454"> if (min <= e.getValue()) {</span> |
| <span class="source-line-no">455</span><span id="line-455"> if (keysBlocking == null) {</span> |
| <span class="source-line-no">456</span><span id="line-456"> return false;</span> |
| <span class="source-line-no">457</span><span id="line-457"> }</span> |
| <span class="source-line-no">458</span><span id="line-458"> result = false;</span> |
| <span class="source-line-no">459</span><span id="line-459"> keysBlocking.add(e.getKey());</span> |
| <span class="source-line-no">460</span><span id="line-460"> // Continue examining the map so we could log all regions blocking this WAL.</span> |
| <span class="source-line-no">461</span><span id="line-461"> }</span> |
| <span class="source-line-no">462</span><span id="line-462"> }</span> |
| <span class="source-line-no">463</span><span id="line-463"> return result;</span> |
| <span class="source-line-no">464</span><span id="line-464"> }</span> |
| <span class="source-line-no">465</span><span id="line-465"></span> |
| <span class="source-line-no">466</span><span id="line-466"> /**</span> |
| <span class="source-line-no">467</span><span id="line-467"> * Iterates over the given Map and compares sequence ids with corresponding entries in</span> |
| <span class="source-line-no">468</span><span id="line-468"> * {@link #lowestUnflushedSequenceIds}. If a region in {@link #lowestUnflushedSequenceIds} has a</span> |
| <span class="source-line-no">469</span><span id="line-469"> * sequence id less than that passed in <code>sequenceids</code> then return it.</span> |
| <span class="source-line-no">470</span><span id="line-470"> * @param sequenceids Sequenceids keyed by encoded region name.</span> |
| <span class="source-line-no">471</span><span id="line-471"> * @return stores of regions found in this instance with sequence ids less than those passed in.</span> |
| <span class="source-line-no">472</span><span id="line-472"> */</span> |
| <span class="source-line-no">473</span><span id="line-473"> Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) {</span> |
| <span class="source-line-no">474</span><span id="line-474"> Map<byte[], List<byte[]>> toFlush = null;</span> |
| <span class="source-line-no">475</span><span id="line-475"> // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.</span> |
| <span class="source-line-no">476</span><span id="line-476"> synchronized (tieLock) {</span> |
| <span class="source-line-no">477</span><span id="line-477"> for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {</span> |
| <span class="source-line-no">478</span><span id="line-478"> Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());</span> |
| <span class="source-line-no">479</span><span id="line-479"> if (m == null) {</span> |
| <span class="source-line-no">480</span><span id="line-480"> continue;</span> |
| <span class="source-line-no">481</span><span id="line-481"> }</span> |
| <span class="source-line-no">482</span><span id="line-482"> for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) {</span> |
| <span class="source-line-no">483</span><span id="line-483"> if (me.getValue() <= e.getValue()) {</span> |
| <span class="source-line-no">484</span><span id="line-484"> if (toFlush == null) {</span> |
| <span class="source-line-no">485</span><span id="line-485"> toFlush = new TreeMap(Bytes.BYTES_COMPARATOR);</span> |
| <span class="source-line-no">486</span><span id="line-486"> }</span> |
| <span class="source-line-no">487</span><span id="line-487"> toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>())</span> |
| <span class="source-line-no">488</span><span id="line-488"> .add(Bytes.toBytes(me.getKey().toString()));</span> |
| <span class="source-line-no">489</span><span id="line-489"> }</span> |
| <span class="source-line-no">490</span><span id="line-490"> }</span> |
| <span class="source-line-no">491</span><span id="line-491"> }</span> |
| <span class="source-line-no">492</span><span id="line-492"> }</span> |
| <span class="source-line-no">493</span><span id="line-493"> return toFlush;</span> |
| <span class="source-line-no">494</span><span id="line-494"> }</span> |
| <span class="source-line-no">495</span><span id="line-495">}</span> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </pre> |
| </div> |
| </main> |
| </body> |
| </html> |