| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>Source code</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta name="description" content="source: package: org.apache.hadoop.hbase.regionserver.wal, class: AbstractFSWAL"> |
| <meta name="generator" content="javadoc/SourceToHTMLConverter"> |
| <link rel="stylesheet" type="text/css" href="../../../../../../../stylesheet.css" title="Style"> |
| </head> |
| <body class="source-page"> |
| <main role="main"> |
| <div class="source-container"> |
| <pre><span class="source-line-no">001</span><span id="line-1">/*</span> |
| <span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span> |
| <span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span> |
| <span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span> |
| <span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span> |
| <span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span> |
| <span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span> |
| <span class="source-line-no">009</span><span id="line-9"> *</span> |
| <span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="source-line-no">011</span><span id="line-11"> *</span> |
| <span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span> |
| <span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span> |
| <span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span> |
| <span class="source-line-no">017</span><span id="line-17"> */</span> |
| <span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.regionserver.wal;</span> |
| <span class="source-line-no">019</span><span id="line-19"></span> |
| <span class="source-line-no">020</span><span id="line-20">import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.ERROR;</span> |
| <span class="source-line-no">021</span><span id="line-21">import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.LOW_REPLICATION;</span> |
| <span class="source-line-no">022</span><span id="line-22">import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SIZE;</span> |
| <span class="source-line-no">023</span><span id="line-23">import static org.apache.hadoop.hbase.regionserver.wal.WALActionsListener.RollRequestReason.SLOW_SYNC;</span> |
| <span class="source-line-no">024</span><span id="line-24">import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.WAL_IMPL;</span> |
| <span class="source-line-no">025</span><span id="line-25">import static org.apache.hadoop.hbase.util.FutureUtils.addListener;</span> |
| <span class="source-line-no">026</span><span id="line-26">import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;</span> |
| <span class="source-line-no">027</span><span id="line-27">import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;</span> |
| <span class="source-line-no">028</span><span id="line-28">import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;</span> |
| <span class="source-line-no">029</span><span id="line-29"></span> |
| <span class="source-line-no">030</span><span id="line-30">import com.google.errorprone.annotations.RestrictedApi;</span> |
| <span class="source-line-no">031</span><span id="line-31">import com.lmax.disruptor.RingBuffer;</span> |
| <span class="source-line-no">032</span><span id="line-32">import com.lmax.disruptor.Sequence;</span> |
| <span class="source-line-no">033</span><span id="line-33">import com.lmax.disruptor.Sequencer;</span> |
| <span class="source-line-no">034</span><span id="line-34">import io.opentelemetry.api.trace.Span;</span> |
| <span class="source-line-no">035</span><span id="line-35">import java.io.FileNotFoundException;</span> |
| <span class="source-line-no">036</span><span id="line-36">import java.io.IOException;</span> |
| <span class="source-line-no">037</span><span id="line-37">import java.io.InterruptedIOException;</span> |
| <span class="source-line-no">038</span><span id="line-38">import java.lang.management.MemoryType;</span> |
| <span class="source-line-no">039</span><span id="line-39">import java.net.URLEncoder;</span> |
| <span class="source-line-no">040</span><span id="line-40">import java.nio.charset.StandardCharsets;</span> |
| <span class="source-line-no">041</span><span id="line-41">import java.util.ArrayDeque;</span> |
| <span class="source-line-no">042</span><span id="line-42">import java.util.ArrayList;</span> |
| <span class="source-line-no">043</span><span id="line-43">import java.util.Arrays;</span> |
| <span class="source-line-no">044</span><span id="line-44">import java.util.Comparator;</span> |
| <span class="source-line-no">045</span><span id="line-45">import java.util.Deque;</span> |
| <span class="source-line-no">046</span><span id="line-46">import java.util.Iterator;</span> |
| <span class="source-line-no">047</span><span id="line-47">import java.util.List;</span> |
| <span class="source-line-no">048</span><span id="line-48">import java.util.Map;</span> |
| <span class="source-line-no">049</span><span id="line-49">import java.util.OptionalLong;</span> |
| <span class="source-line-no">050</span><span id="line-50">import java.util.Set;</span> |
| <span class="source-line-no">051</span><span id="line-51">import java.util.SortedSet;</span> |
| <span class="source-line-no">052</span><span id="line-52">import java.util.TreeSet;</span> |
| <span class="source-line-no">053</span><span id="line-53">import java.util.concurrent.Callable;</span> |
| <span class="source-line-no">054</span><span id="line-54">import java.util.concurrent.CompletableFuture;</span> |
| <span class="source-line-no">055</span><span id="line-55">import java.util.concurrent.ConcurrentHashMap;</span> |
| <span class="source-line-no">056</span><span id="line-56">import java.util.concurrent.ConcurrentNavigableMap;</span> |
| <span class="source-line-no">057</span><span id="line-57">import java.util.concurrent.ConcurrentSkipListMap;</span> |
| <span class="source-line-no">058</span><span id="line-58">import java.util.concurrent.CopyOnWriteArrayList;</span> |
| <span class="source-line-no">059</span><span id="line-59">import java.util.concurrent.ExecutionException;</span> |
| <span class="source-line-no">060</span><span id="line-60">import java.util.concurrent.ExecutorService;</span> |
| <span class="source-line-no">061</span><span id="line-61">import java.util.concurrent.Executors;</span> |
| <span class="source-line-no">062</span><span id="line-62">import java.util.concurrent.Future;</span> |
| <span class="source-line-no">063</span><span id="line-63">import java.util.concurrent.LinkedBlockingQueue;</span> |
| <span class="source-line-no">064</span><span id="line-64">import java.util.concurrent.ThreadPoolExecutor;</span> |
| <span class="source-line-no">065</span><span id="line-65">import java.util.concurrent.TimeUnit;</span> |
| <span class="source-line-no">066</span><span id="line-66">import java.util.concurrent.TimeoutException;</span> |
| <span class="source-line-no">067</span><span id="line-67">import java.util.concurrent.atomic.AtomicBoolean;</span> |
| <span class="source-line-no">068</span><span id="line-68">import java.util.concurrent.atomic.AtomicInteger;</span> |
| <span class="source-line-no">069</span><span id="line-69">import java.util.concurrent.atomic.AtomicLong;</span> |
| <span class="source-line-no">070</span><span id="line-70">import java.util.concurrent.locks.Condition;</span> |
| <span class="source-line-no">071</span><span id="line-71">import java.util.concurrent.locks.Lock;</span> |
| <span class="source-line-no">072</span><span id="line-72">import java.util.concurrent.locks.ReentrantLock;</span> |
| <span class="source-line-no">073</span><span id="line-73">import java.util.function.Supplier;</span> |
| <span class="source-line-no">074</span><span id="line-74">import org.apache.commons.lang3.mutable.MutableLong;</span> |
| <span class="source-line-no">075</span><span id="line-75">import org.apache.hadoop.conf.Configuration;</span> |
| <span class="source-line-no">076</span><span id="line-76">import org.apache.hadoop.fs.FileStatus;</span> |
| <span class="source-line-no">077</span><span id="line-77">import org.apache.hadoop.fs.FileSystem;</span> |
| <span class="source-line-no">078</span><span id="line-78">import org.apache.hadoop.fs.Path;</span> |
| <span class="source-line-no">079</span><span id="line-79">import org.apache.hadoop.fs.PathFilter;</span> |
| <span class="source-line-no">080</span><span id="line-80">import org.apache.hadoop.hbase.Abortable;</span> |
| <span class="source-line-no">081</span><span id="line-81">import org.apache.hadoop.hbase.Cell;</span> |
| <span class="source-line-no">082</span><span id="line-82">import org.apache.hadoop.hbase.HBaseConfiguration;</span> |
| <span class="source-line-no">083</span><span id="line-83">import org.apache.hadoop.hbase.HConstants;</span> |
| <span class="source-line-no">084</span><span id="line-84">import org.apache.hadoop.hbase.PrivateCellUtil;</span> |
| <span class="source-line-no">085</span><span id="line-85">import org.apache.hadoop.hbase.client.ConnectionUtils;</span> |
| <span class="source-line-no">086</span><span id="line-86">import org.apache.hadoop.hbase.client.RegionInfo;</span> |
| <span class="source-line-no">087</span><span id="line-87">import org.apache.hadoop.hbase.exceptions.TimeoutIOException;</span> |
| <span class="source-line-no">088</span><span id="line-88">import org.apache.hadoop.hbase.io.util.MemorySizeUtil;</span> |
| <span class="source-line-no">089</span><span id="line-89">import org.apache.hadoop.hbase.ipc.RpcServer;</span> |
| <span class="source-line-no">090</span><span id="line-90">import org.apache.hadoop.hbase.ipc.ServerCall;</span> |
| <span class="source-line-no">091</span><span id="line-91">import org.apache.hadoop.hbase.log.HBaseMarkers;</span> |
| <span class="source-line-no">092</span><span id="line-92">import org.apache.hadoop.hbase.regionserver.HRegion;</span> |
| <span class="source-line-no">093</span><span id="line-93">import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;</span> |
| <span class="source-line-no">094</span><span id="line-94">import org.apache.hadoop.hbase.trace.TraceUtil;</span> |
| <span class="source-line-no">095</span><span id="line-95">import org.apache.hadoop.hbase.util.Bytes;</span> |
| <span class="source-line-no">096</span><span id="line-96">import org.apache.hadoop.hbase.util.CommonFSUtils;</span> |
| <span class="source-line-no">097</span><span id="line-97">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span> |
| <span class="source-line-no">098</span><span id="line-98">import org.apache.hadoop.hbase.util.Pair;</span> |
| <span class="source-line-no">099</span><span id="line-99">import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;</span> |
| <span class="source-line-no">100</span><span id="line-100">import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;</span> |
| <span class="source-line-no">101</span><span id="line-101">import org.apache.hadoop.hbase.wal.WAL;</span> |
| <span class="source-line-no">102</span><span id="line-102">import org.apache.hadoop.hbase.wal.WALEdit;</span> |
| <span class="source-line-no">103</span><span id="line-103">import org.apache.hadoop.hbase.wal.WALFactory;</span> |
| <span class="source-line-no">104</span><span id="line-104">import org.apache.hadoop.hbase.wal.WALKeyImpl;</span> |
| <span class="source-line-no">105</span><span id="line-105">import org.apache.hadoop.hbase.wal.WALPrettyPrinter;</span> |
| <span class="source-line-no">106</span><span id="line-106">import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;</span> |
| <span class="source-line-no">107</span><span id="line-107">import org.apache.hadoop.hbase.wal.WALSplitter;</span> |
| <span class="source-line-no">108</span><span id="line-108">import org.apache.hadoop.hdfs.protocol.DatanodeInfo;</span> |
| <span class="source-line-no">109</span><span id="line-109">import org.apache.hadoop.util.StringUtils;</span> |
| <span class="source-line-no">110</span><span id="line-110">import org.apache.yetus.audience.InterfaceAudience;</span> |
| <span class="source-line-no">111</span><span id="line-111">import org.slf4j.Logger;</span> |
| <span class="source-line-no">112</span><span id="line-112">import org.slf4j.LoggerFactory;</span> |
| <span class="source-line-no">113</span><span id="line-113"></span> |
| <span class="source-line-no">114</span><span id="line-114">import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;</span> |
| <span class="source-line-no">115</span><span id="line-115">import org.apache.hbase.thirdparty.com.google.common.io.Closeables;</span> |
| <span class="source-line-no">116</span><span id="line-116">import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;</span> |
| <span class="source-line-no">117</span><span id="line-117"></span> |
| <span class="source-line-no">118</span><span id="line-118">/**</span> |
| <span class="source-line-no">119</span><span id="line-119"> * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one</span> |
| <span class="source-line-no">120</span><span id="line-120"> * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled.</span> |
| <span class="source-line-no">121</span><span id="line-121"> * This is done internal to the implementation.</span> |
| <span class="source-line-no">122</span><span id="line-122"> * <p></span> |
| <span class="source-line-no">123</span><span id="line-123"> * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a</span> |
| <span class="source-line-no">124</span><span id="line-124"> * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id.</span> |
| <span class="source-line-no">125</span><span id="line-125"> * A bunch of work in the below is done keeping account of these region sequence ids -- what is</span> |
| <span class="source-line-no">126</span><span id="line-126"> * flushed out to hfiles, and what is yet in WAL and in memory only.</span> |
| <span class="source-line-no">127</span><span id="line-127"> * <p></span> |
| <span class="source-line-no">128</span><span id="line-128"> * It is only practical to delete entire files. Thus, we delete an entire on-disk file</span> |
| <span class="source-line-no">129</span><span id="line-129"> * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older</span> |
| <span class="source-line-no">130</span><span id="line-130"> * (smaller) than the most-recent flush.</span> |
| <span class="source-line-no">131</span><span id="line-131"> * <p></span> |
| <span class="source-line-no">132</span><span id="line-132"> * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,</span> |
| <span class="source-line-no">133</span><span id="line-133"> * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for</span> |
| <span class="source-line-no">134</span><span id="line-134"> * replication where we may want to tail the active WAL file.</span> |
| <span class="source-line-no">135</span><span id="line-135"> * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL</span> |
| <span class="source-line-no">136</span><span id="line-136"> * is now a lame duck; any more appends or syncs will fail also with the same original exception. If</span> |
| <span class="source-line-no">137</span><span id="line-137"> * we have made successful appends to the WAL and we then are unable to sync them, our current</span> |
| <span class="source-line-no">138</span><span id="line-138"> * semantic is to return error to the client that the appends failed but also to abort the current</span> |
| <span class="source-line-no">139</span><span id="line-139"> * context, usually the hosting server. We need to replay the WALs. <br></span> |
| <span class="source-line-no">140</span><span id="line-140"> * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client</span> |
| <span class="source-line-no">141</span><span id="line-141"> * that the append failed. <br></span> |
| <span class="source-line-no">142</span><span id="line-142"> * TODO: replication may pick up these last edits though they have been marked as failed append</span> |
| <span class="source-line-no">143</span><span id="line-143"> * (Need to keep our own file lengths, not rely on HDFS).</span> |
| <span class="source-line-no">144</span><span id="line-144"> */</span> |
| <span class="source-line-no">145</span><span id="line-145">@InterfaceAudience.Private</span> |
| <span class="source-line-no">146</span><span id="line-146">public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {</span> |
| <span class="source-line-no">147</span><span id="line-147"> private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);</span> |
| <span class="source-line-no">148</span><span id="line-148"></span> |
| <span class="source-line-no">149</span><span id="line-149"> private static final Comparator<SyncFuture> SEQ_COMPARATOR =</span> |
| <span class="source-line-no">150</span><span id="line-150"> Comparator.comparingLong(SyncFuture::getTxid).thenComparingInt(System::identityHashCode);</span> |
| <span class="source-line-no">151</span><span id="line-151"></span> |
| <span class="source-line-no">152</span><span id="line-152"> private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";</span> |
| <span class="source-line-no">153</span><span id="line-153"> private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;</span> |
| <span class="source-line-no">154</span><span id="line-154"> /** Don't log blocking regions more frequently than this. */</span> |
| <span class="source-line-no">155</span><span id="line-155"> private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);</span> |
| <span class="source-line-no">156</span><span id="line-156"></span> |
| <span class="source-line-no">157</span><span id="line-157"> protected static final String SLOW_SYNC_TIME_MS = "hbase.regionserver.wal.slowsync.ms";</span> |
| <span class="source-line-no">158</span><span id="line-158"> protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms</span> |
| <span class="source-line-no">159</span><span id="line-159"> protected static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.wal.roll.on.sync.ms";</span> |
| <span class="source-line-no">160</span><span id="line-160"> protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms</span> |
| <span class="source-line-no">161</span><span id="line-161"> protected static final String SLOW_SYNC_ROLL_THRESHOLD =</span> |
| <span class="source-line-no">162</span><span id="line-162"> "hbase.regionserver.wal.slowsync.roll.threshold";</span> |
| <span class="source-line-no">163</span><span id="line-163"> protected static final int DEFAULT_SLOW_SYNC_ROLL_THRESHOLD = 100; // 100 slow sync warnings</span> |
| <span class="source-line-no">164</span><span id="line-164"> protected static final String SLOW_SYNC_ROLL_INTERVAL_MS =</span> |
| <span class="source-line-no">165</span><span id="line-165"> "hbase.regionserver.wal.slowsync.roll.interval.ms";</span> |
| <span class="source-line-no">166</span><span id="line-166"> protected static final int DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS = 60 * 1000; // in ms, 1 minute</span> |
| <span class="source-line-no">167</span><span id="line-167"></span> |
| <span class="source-line-no">168</span><span id="line-168"> public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";</span> |
| <span class="source-line-no">169</span><span id="line-169"> protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min</span> |
| <span class="source-line-no">170</span><span id="line-170"></span> |
| <span class="source-line-no">171</span><span id="line-171"> public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";</span> |
| <span class="source-line-no">172</span><span id="line-172"></span> |
| <span class="source-line-no">173</span><span id="line-173"> public static final String MAX_LOGS = "hbase.regionserver.maxlogs";</span> |
| <span class="source-line-no">174</span><span id="line-174"></span> |
| <span class="source-line-no">175</span><span id="line-175"> public static final String RING_BUFFER_SLOT_COUNT =</span> |
| <span class="source-line-no">176</span><span id="line-176"> "hbase.regionserver.wal.disruptor.event.count";</span> |
| <span class="source-line-no">177</span><span id="line-177"></span> |
| <span class="source-line-no">178</span><span id="line-178"> public static final String WAL_SHUTDOWN_WAIT_TIMEOUT_MS = "hbase.wal.shutdown.wait.timeout.ms";</span> |
| <span class="source-line-no">179</span><span id="line-179"> public static final int DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS = 15 * 1000;</span> |
| <span class="source-line-no">180</span><span id="line-180"></span> |
| <span class="source-line-no">181</span><span id="line-181"> public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";</span> |
| <span class="source-line-no">182</span><span id="line-182"> public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;</span> |
| <span class="source-line-no">183</span><span id="line-183"></span> |
| <span class="source-line-no">184</span><span id="line-184"> public static final String WAL_AVOID_LOCAL_WRITES_KEY =</span> |
| <span class="source-line-no">185</span><span id="line-185"> "hbase.regionserver.wal.avoid-local-writes";</span> |
| <span class="source-line-no">186</span><span id="line-186"> public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;</span> |
| <span class="source-line-no">187</span><span id="line-187"></span> |
| <span class="source-line-no">188</span><span id="line-188"> /**</span> |
| <span class="source-line-no">189</span><span id="line-189"> * file system instance</span> |
| <span class="source-line-no">190</span><span id="line-190"> */</span> |
| <span class="source-line-no">191</span><span id="line-191"> protected final FileSystem fs;</span> |
| <span class="source-line-no">192</span><span id="line-192"></span> |
| <span class="source-line-no">193</span><span id="line-193"> /**</span> |
| <span class="source-line-no">194</span><span id="line-194"> * WAL directory, where all WAL files would be placed.</span> |
| <span class="source-line-no">195</span><span id="line-195"> */</span> |
| <span class="source-line-no">196</span><span id="line-196"> protected final Path walDir;</span> |
| <span class="source-line-no">197</span><span id="line-197"></span> |
| <span class="source-line-no">198</span><span id="line-198"> private final FileSystem remoteFs;</span> |
| <span class="source-line-no">199</span><span id="line-199"></span> |
| <span class="source-line-no">200</span><span id="line-200"> private final Path remoteWALDir;</span> |
| <span class="source-line-no">201</span><span id="line-201"></span> |
| <span class="source-line-no">202</span><span id="line-202"> /**</span> |
| <span class="source-line-no">203</span><span id="line-203"> * dir path where old logs are kept.</span> |
| <span class="source-line-no">204</span><span id="line-204"> */</span> |
| <span class="source-line-no">205</span><span id="line-205"> protected final Path walArchiveDir;</span> |
| <span class="source-line-no">206</span><span id="line-206"></span> |
| <span class="source-line-no">207</span><span id="line-207"> /**</span> |
| <span class="source-line-no">208</span><span id="line-208"> * Matches just those wal files that belong to this wal instance.</span> |
| <span class="source-line-no">209</span><span id="line-209"> */</span> |
| <span class="source-line-no">210</span><span id="line-210"> protected final PathFilter ourFiles;</span> |
| <span class="source-line-no">211</span><span id="line-211"></span> |
| <span class="source-line-no">212</span><span id="line-212"> /**</span> |
| <span class="source-line-no">213</span><span id="line-213"> * Prefix of a WAL file, usually the region server name it is hosted on.</span> |
| <span class="source-line-no">214</span><span id="line-214"> */</span> |
| <span class="source-line-no">215</span><span id="line-215"> protected final String walFilePrefix;</span> |
| <span class="source-line-no">216</span><span id="line-216"></span> |
| <span class="source-line-no">217</span><span id="line-217"> /**</span> |
| <span class="source-line-no">218</span><span id="line-218"> * Suffix included on generated wal file names</span> |
| <span class="source-line-no">219</span><span id="line-219"> */</span> |
| <span class="source-line-no">220</span><span id="line-220"> protected final String walFileSuffix;</span> |
| <span class="source-line-no">221</span><span id="line-221"></span> |
| <span class="source-line-no">222</span><span id="line-222"> /**</span> |
| <span class="source-line-no">223</span><span id="line-223"> * Prefix used when checking for wal membership.</span> |
| <span class="source-line-no">224</span><span id="line-224"> */</span> |
| <span class="source-line-no">225</span><span id="line-225"> protected final String prefixPathStr;</span> |
| <span class="source-line-no">226</span><span id="line-226"></span> |
| <span class="source-line-no">227</span><span id="line-227"> protected final WALCoprocessorHost coprocessorHost;</span> |
| <span class="source-line-no">228</span><span id="line-228"></span> |
| <span class="source-line-no">229</span><span id="line-229"> /**</span> |
| <span class="source-line-no">230</span><span id="line-230"> * conf object</span> |
| <span class="source-line-no">231</span><span id="line-231"> */</span> |
| <span class="source-line-no">232</span><span id="line-232"> protected final Configuration conf;</span> |
| <span class="source-line-no">233</span><span id="line-233"></span> |
| <span class="source-line-no">234</span><span id="line-234"> protected final Abortable abortable;</span> |
| <span class="source-line-no">235</span><span id="line-235"></span> |
| <span class="source-line-no">236</span><span id="line-236"> /** Listeners that are called on WAL events. */</span> |
| <span class="source-line-no">237</span><span id="line-237"> protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>();</span> |
| <span class="source-line-no">238</span><span id="line-238"></span> |
| <span class="source-line-no">239</span><span id="line-239"> /** Tracks the logs in the process of being closed. */</span> |
| <span class="source-line-no">240</span><span id="line-240"> protected final Map<String, W> inflightWALClosures = new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">241</span><span id="line-241"></span> |
| <span class="source-line-no">242</span><span id="line-242"> /**</span> |
| <span class="source-line-no">243</span><span id="line-243"> * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence</span> |
| <span class="source-line-no">244</span><span id="line-244"> * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has</span> |
| <span class="source-line-no">245</span><span id="line-245"> * facility for answering questions such as "Is it safe to GC a WAL?".</span> |
| <span class="source-line-no">246</span><span id="line-246"> */</span> |
| <span class="source-line-no">247</span><span id="line-247"> protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();</span> |
| <span class="source-line-no">248</span><span id="line-248"></span> |
| <span class="source-line-no">249</span><span id="line-249"> /** The slow sync will be logged; the very slow sync will cause the WAL to be rolled. */</span> |
| <span class="source-line-no">250</span><span id="line-250"> protected final long slowSyncNs, rollOnSyncNs;</span> |
| <span class="source-line-no">251</span><span id="line-251"> protected final int slowSyncRollThreshold;</span> |
| <span class="source-line-no">252</span><span id="line-252"> protected final int slowSyncCheckInterval;</span> |
| <span class="source-line-no">253</span><span id="line-253"> protected final AtomicInteger slowSyncCount = new AtomicInteger();</span> |
| <span class="source-line-no">254</span><span id="line-254"></span> |
| <span class="source-line-no">255</span><span id="line-255"> private final long walSyncTimeoutNs;</span> |
| <span class="source-line-no">256</span><span id="line-256"></span> |
| <span class="source-line-no">257</span><span id="line-257"> private final long walTooOldNs;</span> |
| <span class="source-line-no">258</span><span id="line-258"></span> |
| <span class="source-line-no">259</span><span id="line-259"> // If > than this size, roll the log.</span> |
| <span class="source-line-no">260</span><span id="line-260"> protected final long logrollsize;</span> |
| <span class="source-line-no">261</span><span id="line-261"></span> |
| <span class="source-line-no">262</span><span id="line-262"> /**</span> |
| <span class="source-line-no">263</span><span id="line-263"> * Block size to use writing files.</span> |
| <span class="source-line-no">264</span><span id="line-264"> */</span> |
| <span class="source-line-no">265</span><span id="line-265"> protected final long blocksize;</span> |
| <span class="source-line-no">266</span><span id="line-266"></span> |
| <span class="source-line-no">267</span><span id="line-267"> /*</span> |
| <span class="source-line-no">268</span><span id="line-268"> * If more than this many logs, force flush of oldest region to the oldest edit goes to disk. If</span> |
| <span class="source-line-no">269</span><span id="line-269"> * too many and we crash, then will take forever replaying. Keep the number of logs tidy.</span> |
| <span class="source-line-no">270</span><span id="line-270"> */</span> |
| <span class="source-line-no">271</span><span id="line-271"> protected final int maxLogs;</span> |
| <span class="source-line-no">272</span><span id="line-272"></span> |
| <span class="source-line-no">273</span><span id="line-273"> protected final boolean useHsync;</span> |
| <span class="source-line-no">274</span><span id="line-274"></span> |
| <span class="source-line-no">275</span><span id="line-275"> /**</span> |
| <span class="source-line-no">276</span><span id="line-276"> * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock</span> |
| <span class="source-line-no">277</span><span id="line-277"> * is held. We don't just use synchronized because that results in bogus and tedious findbugs</span> |
| <span class="source-line-no">278</span><span id="line-278"> * warning when it thinks synchronized controls writer thread safety. It is held when we are</span> |
| <span class="source-line-no">279</span><span id="line-279"> * actually rolling the log. It is checked when we are looking to see if we should roll the log or</span> |
| <span class="source-line-no">280</span><span id="line-280"> * not.</span> |
| <span class="source-line-no">281</span><span id="line-281"> */</span> |
| <span class="source-line-no">282</span><span id="line-282"> protected final ReentrantLock rollWriterLock = new ReentrantLock(true);</span> |
| <span class="source-line-no">283</span><span id="line-283"></span> |
| <span class="source-line-no">284</span><span id="line-284"> // The timestamp (in ms) when the log file was created.</span> |
| <span class="source-line-no">285</span><span id="line-285"> protected final AtomicLong filenum = new AtomicLong(-1);</span> |
| <span class="source-line-no">286</span><span id="line-286"></span> |
| <span class="source-line-no">287</span><span id="line-287"> // Number of transactions in the current Wal.</span> |
| <span class="source-line-no">288</span><span id="line-288"> protected final AtomicInteger numEntries = new AtomicInteger(0);</span> |
| <span class="source-line-no">289</span><span id="line-289"></span> |
| <span class="source-line-no">290</span><span id="line-290"> /**</span> |
| <span class="source-line-no">291</span><span id="line-291"> * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass</span> |
| <span class="source-line-no">292</span><span id="line-292"> * WALEdit to background consumer thread, and the transaction id is the sequence number of the</span> |
| <span class="source-line-no">293</span><span id="line-293"> * corresponding entry in queue.</span> |
| <span class="source-line-no">294</span><span id="line-294"> */</span> |
| <span class="source-line-no">295</span><span id="line-295"> protected volatile long highestUnsyncedTxid = -1;</span> |
| <span class="source-line-no">296</span><span id="line-296"></span> |
| <span class="source-line-no">297</span><span id="line-297"> /**</span> |
| <span class="source-line-no">298</span><span id="line-298"> * Updated to the transaction id of the last successful sync call. This can be less than</span> |
| <span class="source-line-no">299</span><span id="line-299"> * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in</span> |
| <span class="source-line-no">300</span><span id="line-300"> * for it.</span> |
| <span class="source-line-no">301</span><span id="line-301"> */</span> |
| <span class="source-line-no">302</span><span id="line-302"> protected final AtomicLong highestSyncedTxid = new AtomicLong(0);</span> |
| <span class="source-line-no">303</span><span id="line-303"></span> |
| <span class="source-line-no">304</span><span id="line-304"> /**</span> |
| <span class="source-line-no">305</span><span id="line-305"> * The total size of wal</span> |
| <span class="source-line-no">306</span><span id="line-306"> */</span> |
| <span class="source-line-no">307</span><span id="line-307"> protected final AtomicLong totalLogSize = new AtomicLong(0);</span> |
| <span class="source-line-no">308</span><span id="line-308"> /**</span> |
| <span class="source-line-no">309</span><span id="line-309"> * Current log file.</span> |
| <span class="source-line-no">310</span><span id="line-310"> */</span> |
| <span class="source-line-no">311</span><span id="line-311"> volatile W writer;</span> |
| <span class="source-line-no">312</span><span id="line-312"></span> |
| <span class="source-line-no">313</span><span id="line-313"> // Last time to check low replication on hlog's pipeline</span> |
| <span class="source-line-no">314</span><span id="line-314"> private volatile long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">315</span><span id="line-315"></span> |
| <span class="source-line-no">316</span><span id="line-316"> // Last time we asked to roll the log due to a slow sync</span> |
| <span class="source-line-no">317</span><span id="line-317"> private volatile long lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">318</span><span id="line-318"></span> |
| <span class="source-line-no">319</span><span id="line-319"> protected volatile boolean closed = false;</span> |
| <span class="source-line-no">320</span><span id="line-320"></span> |
| <span class="source-line-no">321</span><span id="line-321"> protected final AtomicBoolean shutdown = new AtomicBoolean(false);</span> |
| <span class="source-line-no">322</span><span id="line-322"></span> |
| <span class="source-line-no">323</span><span id="line-323"> protected final long walShutdownTimeout;</span> |
| <span class="source-line-no">324</span><span id="line-324"></span> |
| <span class="source-line-no">325</span><span id="line-325"> private long nextLogTooOldNs = System.nanoTime();</span> |
| <span class="source-line-no">326</span><span id="line-326"></span> |
| <span class="source-line-no">327</span><span id="line-327"> /**</span> |
| <span class="source-line-no">328</span><span id="line-328"> * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws</span> |
| <span class="source-line-no">329</span><span id="line-329"> * an IllegalArgumentException if used to compare paths from different wals.</span> |
| <span class="source-line-no">330</span><span id="line-330"> */</span> |
| <span class="source-line-no">331</span><span id="line-331"> final Comparator<Path> LOG_NAME_COMPARATOR =</span> |
| <span class="source-line-no">332</span><span id="line-332"> (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));</span> |
| <span class="source-line-no">333</span><span id="line-333"></span> |
| <span class="source-line-no">334</span><span id="line-334"> private static final class WALProps {</span> |
| <span class="source-line-no">335</span><span id="line-335"></span> |
| <span class="source-line-no">336</span><span id="line-336"> /**</span> |
| <span class="source-line-no">337</span><span id="line-337"> * Map the encoded region name to the highest sequence id.</span> |
| <span class="source-line-no">338</span><span id="line-338"> * <p/></span> |
| <span class="source-line-no">339</span><span id="line-339"> * Contains all the regions it has an entry for.</span> |
| <span class="source-line-no">340</span><span id="line-340"> */</span> |
| <span class="source-line-no">341</span><span id="line-341"> private final Map<byte[], Long> encodedName2HighestSequenceId;</span> |
| <span class="source-line-no">342</span><span id="line-342"></span> |
| <span class="source-line-no">343</span><span id="line-343"> /**</span> |
| <span class="source-line-no">344</span><span id="line-344"> * The log file size. Notice that the size may not be accurate if we do asynchronous close in</span> |
| <span class="source-line-no">345</span><span id="line-345"> * subclasses.</span> |
| <span class="source-line-no">346</span><span id="line-346"> */</span> |
| <span class="source-line-no">347</span><span id="line-347"> private final long logSize;</span> |
| <span class="source-line-no">348</span><span id="line-348"></span> |
| <span class="source-line-no">349</span><span id="line-349"> /**</span> |
| <span class="source-line-no">350</span><span id="line-350"> * The nanoTime of the log rolling, used to determine the time interval that has passed since.</span> |
| <span class="source-line-no">351</span><span id="line-351"> */</span> |
| <span class="source-line-no">352</span><span id="line-352"> private final long rollTimeNs;</span> |
| <span class="source-line-no">353</span><span id="line-353"></span> |
| <span class="source-line-no">354</span><span id="line-354"> /**</span> |
| <span class="source-line-no">355</span><span id="line-355"> * If we do asynchronous close in subclasses, it is possible that when adding WALProps to the</span> |
| <span class="source-line-no">356</span><span id="line-356"> * rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,</span> |
| <span class="source-line-no">357</span><span id="line-357"> * for safety.</span> |
| <span class="source-line-no">358</span><span id="line-358"> */</span> |
| <span class="source-line-no">359</span><span id="line-359"> private volatile boolean closed = false;</span> |
| <span class="source-line-no">360</span><span id="line-360"></span> |
| <span class="source-line-no">361</span><span id="line-361"> WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {</span> |
| <span class="source-line-no">362</span><span id="line-362"> this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;</span> |
| <span class="source-line-no">363</span><span id="line-363"> this.logSize = logSize;</span> |
| <span class="source-line-no">364</span><span id="line-364"> this.rollTimeNs = System.nanoTime();</span> |
| <span class="source-line-no">365</span><span id="line-365"> }</span> |
| <span class="source-line-no">366</span><span id="line-366"> }</span> |
| <span class="source-line-no">367</span><span id="line-367"></span> |
| <span class="source-line-no">368</span><span id="line-368"> /**</span> |
| <span class="source-line-no">369</span><span id="line-369"> * Map of WAL log file to properties. The map is sorted by the log file creation timestamp</span> |
| <span class="source-line-no">370</span><span id="line-370"> * (contained in the log file name).</span> |
| <span class="source-line-no">371</span><span id="line-371"> */</span> |
| <span class="source-line-no">372</span><span id="line-372"> protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =</span> |
| <span class="source-line-no">373</span><span id="line-373"> new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);</span> |
| <span class="source-line-no">374</span><span id="line-374"></span> |
| <span class="source-line-no">375</span><span id="line-375"> /**</span> |
| <span class="source-line-no">376</span><span id="line-376"> * A cache of sync futures reused by threads.</span> |
| <span class="source-line-no">377</span><span id="line-377"> */</span> |
| <span class="source-line-no">378</span><span id="line-378"> protected final SyncFutureCache syncFutureCache;</span> |
| <span class="source-line-no">379</span><span id="line-379"></span> |
| <span class="source-line-no">380</span><span id="line-380"> /**</span> |
| <span class="source-line-no">381</span><span id="line-381"> * The class name of the runtime implementation, used as prefix for logging/tracing.</span> |
| <span class="source-line-no">382</span><span id="line-382"> * <p></span> |
| <span class="source-line-no">383</span><span id="line-383"> * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here,</span> |
| <span class="source-line-no">384</span><span id="line-384"> * refer to HBASE-17676 for more details</span> |
| <span class="source-line-no">385</span><span id="line-385"> * </p></span> |
| <span class="source-line-no">386</span><span id="line-386"> */</span> |
| <span class="source-line-no">387</span><span id="line-387"> protected final String implClassName;</span> |
| <span class="source-line-no">388</span><span id="line-388"></span> |
| <span class="source-line-no">389</span><span id="line-389"> protected final AtomicBoolean rollRequested = new AtomicBoolean(false);</span> |
| <span class="source-line-no">390</span><span id="line-390"></span> |
| <span class="source-line-no">391</span><span id="line-391"> protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(</span> |
| <span class="source-line-no">392</span><span id="line-392"> new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());</span> |
| <span class="source-line-no">393</span><span id="line-393"></span> |
| <span class="source-line-no">394</span><span id="line-394"> // Run in caller if we get reject execution exception, to avoid aborting region server when we get</span> |
| <span class="source-line-no">395</span><span id="line-395"> // reject execution exception. Usually this should not happen but let's make it more robust.</span> |
| <span class="source-line-no">396</span><span id="line-396"> private final ExecutorService logArchiveExecutor =</span> |
| <span class="source-line-no">397</span><span id="line-397"> new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),</span> |
| <span class="source-line-no">398</span><span id="line-398"> new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),</span> |
| <span class="source-line-no">399</span><span id="line-399"> new ThreadPoolExecutor.CallerRunsPolicy());</span> |
| <span class="source-line-no">400</span><span id="line-400"></span> |
| <span class="source-line-no">401</span><span id="line-401"> private final int archiveRetries;</span> |
| <span class="source-line-no">402</span><span id="line-402"></span> |
| <span class="source-line-no">403</span><span id="line-403"> protected ExecutorService consumeExecutor;</span> |
| <span class="source-line-no">404</span><span id="line-404"></span> |
| <span class="source-line-no">405</span><span id="line-405"> private final Lock consumeLock = new ReentrantLock();</span> |
| <span class="source-line-no">406</span><span id="line-406"></span> |
| <span class="source-line-no">407</span><span id="line-407"> protected final Runnable consumer = this::consume;</span> |
| <span class="source-line-no">408</span><span id="line-408"></span> |
| <span class="source-line-no">409</span><span id="line-409"> // check if there is already a consumer task in the event loop's task queue</span> |
| <span class="source-line-no">410</span><span id="line-410"> protected Supplier<Boolean> hasConsumerTask;</span> |
| <span class="source-line-no">411</span><span id="line-411"></span> |
| <span class="source-line-no">412</span><span id="line-412"> private static final int MAX_EPOCH = 0x3FFFFFFF;</span> |
| <span class="source-line-no">413</span><span id="line-413"> // the lowest bit is waitingRoll, which means new writer is created, and we are waiting for old</span> |
| <span class="source-line-no">414</span><span id="line-414"> // writer to be closed.</span> |
| <span class="source-line-no">415</span><span id="line-415"> // the second-lowest bit is writerBroken which means the current writer is broken and rollWriter</span> |
| <span class="source-line-no">416</span><span id="line-416"> // is needed.</span> |
| <span class="source-line-no">417</span><span id="line-417"> // all other bits are the epoch number of the current writer, this is used to detect whether the</span> |
| <span class="source-line-no">418</span><span id="line-418"> // writer is still the one when you issue the sync.</span> |
| <span class="source-line-no">419</span><span id="line-419"> // notice that, modification to this field is only allowed under the protection of consumeLock.</span> |
| <span class="source-line-no">420</span><span id="line-420"> private volatile int epochAndState;</span> |
| <span class="source-line-no">421</span><span id="line-421"></span> |
| <span class="source-line-no">422</span><span id="line-422"> private boolean readyForRolling;</span> |
| <span class="source-line-no">423</span><span id="line-423"></span> |
| <span class="source-line-no">424</span><span id="line-424"> private final Condition readyForRollingCond = consumeLock.newCondition();</span> |
| <span class="source-line-no">425</span><span id="line-425"></span> |
| <span class="source-line-no">426</span><span id="line-426"> private final RingBuffer<RingBufferTruck> waitingConsumePayloads;</span> |
| <span class="source-line-no">427</span><span id="line-427"></span> |
| <span class="source-line-no">428</span><span id="line-428"> private final Sequence waitingConsumePayloadsGatingSequence;</span> |
| <span class="source-line-no">429</span><span id="line-429"></span> |
| <span class="source-line-no">430</span><span id="line-430"> private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);</span> |
| <span class="source-line-no">431</span><span id="line-431"></span> |
| <span class="source-line-no">432</span><span id="line-432"> private final long batchSize;</span> |
| <span class="source-line-no">433</span><span id="line-433"></span> |
| <span class="source-line-no">434</span><span id="line-434"> protected final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();</span> |
| <span class="source-line-no">435</span><span id="line-435"></span> |
| <span class="source-line-no">436</span><span id="line-436"> protected final Deque<FSWALEntry> unackedAppends = new ArrayDeque<>();</span> |
| <span class="source-line-no">437</span><span id="line-437"></span> |
| <span class="source-line-no">438</span><span id="line-438"> protected final SortedSet<SyncFuture> syncFutures = new TreeSet<>(SEQ_COMPARATOR);</span> |
| <span class="source-line-no">439</span><span id="line-439"></span> |
| <span class="source-line-no">440</span><span id="line-440"> // the highest txid of WAL entries being processed</span> |
| <span class="source-line-no">441</span><span id="line-441"> protected long highestProcessedAppendTxid;</span> |
| <span class="source-line-no">442</span><span id="line-442"></span> |
| <span class="source-line-no">443</span><span id="line-443"> // file length when we issue last sync request on the writer</span> |
| <span class="source-line-no">444</span><span id="line-444"> private long fileLengthAtLastSync;</span> |
| <span class="source-line-no">445</span><span id="line-445"></span> |
| <span class="source-line-no">446</span><span id="line-446"> private long highestProcessedAppendTxidAtLastSync;</span> |
| <span class="source-line-no">447</span><span id="line-447"></span> |
| <span class="source-line-no">448</span><span id="line-448"> private int waitOnShutdownInSeconds;</span> |
| <span class="source-line-no">449</span><span id="line-449"></span> |
| <span class="source-line-no">450</span><span id="line-450"> private String waitOnShutdownInSecondsConfigKey;</span> |
| <span class="source-line-no">451</span><span id="line-451"></span> |
| <span class="source-line-no">452</span><span id="line-452"> protected boolean shouldShutDownConsumeExecutorWhenClose = true;</span> |
| <span class="source-line-no">453</span><span id="line-453"></span> |
| <span class="source-line-no">454</span><span id="line-454"> private volatile boolean skipRemoteWAL = false;</span> |
| <span class="source-line-no">455</span><span id="line-455"></span> |
| <span class="source-line-no">456</span><span id="line-456"> private volatile boolean markerEditOnly = false;</span> |
| <span class="source-line-no">457</span><span id="line-457"></span> |
| <span class="source-line-no">458</span><span id="line-458"> public long getFilenum() {</span> |
| <span class="source-line-no">459</span><span id="line-459"> return this.filenum.get();</span> |
| <span class="source-line-no">460</span><span id="line-460"> }</span> |
| <span class="source-line-no">461</span><span id="line-461"></span> |
| <span class="source-line-no">462</span><span id="line-462"> /**</span> |
| <span class="source-line-no">463</span><span id="line-463"> * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper</span> |
| <span class="source-line-no">464</span><span id="line-464"> * method returns the creation timestamp from a given log file. It extracts the timestamp assuming</span> |
| <span class="source-line-no">465</span><span id="line-465"> * the filename is created with the {@link #computeFilename(long filenum)} method.</span> |
| <span class="source-line-no">466</span><span id="line-466"> * @return timestamp, as in the log file name.</span> |
| <span class="source-line-no">467</span><span id="line-467"> */</span> |
| <span class="source-line-no">468</span><span id="line-468"> protected long getFileNumFromFileName(Path fileName) {</span> |
| <span class="source-line-no">469</span><span id="line-469"> checkNotNull(fileName, "file name can't be null");</span> |
| <span class="source-line-no">470</span><span id="line-470"> if (!ourFiles.accept(fileName)) {</span> |
| <span class="source-line-no">471</span><span id="line-471"> throw new IllegalArgumentException(</span> |
| <span class="source-line-no">472</span><span id="line-472"> "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")");</span> |
| <span class="source-line-no">473</span><span id="line-473"> }</span> |
| <span class="source-line-no">474</span><span id="line-474"> final String fileNameString = fileName.toString();</span> |
| <span class="source-line-no">475</span><span id="line-475"> String chompedPath = fileNameString.substring(prefixPathStr.length(),</span> |
| <span class="source-line-no">476</span><span id="line-476"> (fileNameString.length() - walFileSuffix.length()));</span> |
| <span class="source-line-no">477</span><span id="line-477"> return Long.parseLong(chompedPath);</span> |
| <span class="source-line-no">478</span><span id="line-478"> }</span> |
| <span class="source-line-no">479</span><span id="line-479"></span> |
| <span class="source-line-no">480</span><span id="line-480"> private int calculateMaxLogFiles(Configuration conf, long logRollSize) {</span> |
| <span class="source-line-no">481</span><span id="line-481"> checkArgument(logRollSize > 0,</span> |
| <span class="source-line-no">482</span><span id="line-482"> "The log roll size cannot be zero or negative when calculating max log files, "</span> |
| <span class="source-line-no">483</span><span id="line-483"> + "current value is " + logRollSize);</span> |
| <span class="source-line-no">484</span><span id="line-484"> Pair<Long, MemoryType> globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf);</span> |
| <span class="source-line-no">485</span><span id="line-485"> return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize);</span> |
| <span class="source-line-no">486</span><span id="line-486"> }</span> |
| <span class="source-line-no">487</span><span id="line-487"></span> |
| <span class="source-line-no">488</span><span id="line-488"> // must be power of 2</span> |
| <span class="source-line-no">489</span><span id="line-489"> protected final int getPreallocatedEventCount() {</span> |
| <span class="source-line-no">490</span><span id="line-490"> // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will</span> |
| <span class="source-line-no">491</span><span id="line-491"> // be stuck and make no progress if the buffer is filled with appends only and there is no</span> |
| <span class="source-line-no">492</span><span id="line-492"> // sync. If no sync, then the handlers will be outstanding just waiting on sync completion</span> |
| <span class="source-line-no">493</span><span id="line-493"> // before they return.</span> |
| <span class="source-line-no">494</span><span id="line-494"> int preallocatedEventCount = this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);</span> |
| <span class="source-line-no">495</span><span id="line-495"> checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");</span> |
| <span class="source-line-no">496</span><span id="line-496"> int floor = Integer.highestOneBit(preallocatedEventCount);</span> |
| <span class="source-line-no">497</span><span id="line-497"> if (floor == preallocatedEventCount) {</span> |
| <span class="source-line-no">498</span><span id="line-498"> return floor;</span> |
| <span class="source-line-no">499</span><span id="line-499"> }</span> |
| <span class="source-line-no">500</span><span id="line-500"> // max capacity is 1 << 30</span> |
| <span class="source-line-no">501</span><span id="line-501"> if (floor >= 1 << 29) {</span> |
| <span class="source-line-no">502</span><span id="line-502"> return 1 << 30;</span> |
| <span class="source-line-no">503</span><span id="line-503"> }</span> |
| <span class="source-line-no">504</span><span id="line-504"> return floor << 1;</span> |
| <span class="source-line-no">505</span><span id="line-505"> }</span> |
| <span class="source-line-no">506</span><span id="line-506"></span> |
| <span class="source-line-no">507</span><span id="line-507"> protected final void setWaitOnShutdownInSeconds(int waitOnShutdownInSeconds,</span> |
| <span class="source-line-no">508</span><span id="line-508"> String waitOnShutdownInSecondsConfigKey) {</span> |
| <span class="source-line-no">509</span><span id="line-509"> this.waitOnShutdownInSeconds = waitOnShutdownInSeconds;</span> |
| <span class="source-line-no">510</span><span id="line-510"> this.waitOnShutdownInSecondsConfigKey = waitOnShutdownInSecondsConfigKey;</span> |
| <span class="source-line-no">511</span><span id="line-511"> }</span> |
| <span class="source-line-no">512</span><span id="line-512"></span> |
| <span class="source-line-no">513</span><span id="line-513"> protected final void createSingleThreadPoolConsumeExecutor(String walType, final Path rootDir,</span> |
| <span class="source-line-no">514</span><span id="line-514"> final String prefix) {</span> |
| <span class="source-line-no">515</span><span id="line-515"> ThreadPoolExecutor threadPool =</span> |
| <span class="source-line-no">516</span><span id="line-516"> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),</span> |
| <span class="source-line-no">517</span><span id="line-517"> new ThreadFactoryBuilder().setNameFormat(walType + "-%d-" + rootDir.toString() + "-prefix:"</span> |
| <span class="source-line-no">518</span><span id="line-518"> + (prefix == null ? "default" : prefix).replace("%", "%%")).setDaemon(true).build());</span> |
| <span class="source-line-no">519</span><span id="line-519"> hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;</span> |
| <span class="source-line-no">520</span><span id="line-520"> consumeExecutor = threadPool;</span> |
| <span class="source-line-no">521</span><span id="line-521"> this.shouldShutDownConsumeExecutorWhenClose = true;</span> |
| <span class="source-line-no">522</span><span id="line-522"> }</span> |
| <span class="source-line-no">523</span><span id="line-523"></span> |
| <span class="source-line-no">524</span><span id="line-524"> protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir,</span> |
| <span class="source-line-no">525</span><span id="line-525"> final String logDir, final String archiveDir, final Configuration conf,</span> |
| <span class="source-line-no">526</span><span id="line-526"> final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix,</span> |
| <span class="source-line-no">527</span><span id="line-527"> final String suffix, FileSystem remoteFs, Path remoteWALDir)</span> |
| <span class="source-line-no">528</span><span id="line-528"> throws FailedLogCloseException, IOException {</span> |
| <span class="source-line-no">529</span><span id="line-529"> this.fs = fs;</span> |
| <span class="source-line-no">530</span><span id="line-530"> this.walDir = new Path(rootDir, logDir);</span> |
| <span class="source-line-no">531</span><span id="line-531"> this.walArchiveDir = new Path(rootDir, archiveDir);</span> |
| <span class="source-line-no">532</span><span id="line-532"> this.conf = conf;</span> |
| <span class="source-line-no">533</span><span id="line-533"> this.abortable = abortable;</span> |
| <span class="source-line-no">534</span><span id="line-534"> this.remoteFs = remoteFs;</span> |
| <span class="source-line-no">535</span><span id="line-535"> this.remoteWALDir = remoteWALDir;</span> |
| <span class="source-line-no">536</span><span id="line-536"></span> |
| <span class="source-line-no">537</span><span id="line-537"> if (!fs.exists(walDir) && !fs.mkdirs(walDir)) {</span> |
| <span class="source-line-no">538</span><span id="line-538"> throw new IOException("Unable to mkdir " + walDir);</span> |
| <span class="source-line-no">539</span><span id="line-539"> }</span> |
| <span class="source-line-no">540</span><span id="line-540"></span> |
| <span class="source-line-no">541</span><span id="line-541"> if (!fs.exists(this.walArchiveDir)) {</span> |
| <span class="source-line-no">542</span><span id="line-542"> if (!fs.mkdirs(this.walArchiveDir)) {</span> |
| <span class="source-line-no">543</span><span id="line-543"> throw new IOException("Unable to mkdir " + this.walArchiveDir);</span> |
| <span class="source-line-no">544</span><span id="line-544"> }</span> |
| <span class="source-line-no">545</span><span id="line-545"> }</span> |
| <span class="source-line-no">546</span><span id="line-546"></span> |
| <span class="source-line-no">547</span><span id="line-547"> // If prefix is null||empty then just name it wal</span> |
| <span class="source-line-no">548</span><span id="line-548"> this.walFilePrefix = prefix == null || prefix.isEmpty()</span> |
| <span class="source-line-no">549</span><span id="line-549"> ? "wal"</span> |
| <span class="source-line-no">550</span><span id="line-550"> : URLEncoder.encode(prefix, StandardCharsets.UTF_8.name());</span> |
| <span class="source-line-no">551</span><span id="line-551"> // we only correctly differentiate suffices when numeric ones start with '.'</span> |
| <span class="source-line-no">552</span><span id="line-552"> if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {</span> |
| <span class="source-line-no">553</span><span id="line-553"> throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER</span> |
| <span class="source-line-no">554</span><span id="line-554"> + "' but instead was '" + suffix + "'");</span> |
| <span class="source-line-no">555</span><span id="line-555"> }</span> |
| <span class="source-line-no">556</span><span id="line-556"> // Now that it exists, set the storage policy for the entire directory of wal files related to</span> |
| <span class="source-line-no">557</span><span id="line-557"> // this FSHLog instance</span> |
| <span class="source-line-no">558</span><span id="line-558"> String storagePolicy =</span> |
| <span class="source-line-no">559</span><span id="line-559"> conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY);</span> |
| <span class="source-line-no">560</span><span id="line-560"> CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy);</span> |
| <span class="source-line-no">561</span><span id="line-561"> this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");</span> |
| <span class="source-line-no">562</span><span id="line-562"> this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();</span> |
| <span class="source-line-no">563</span><span id="line-563"></span> |
| <span class="source-line-no">564</span><span id="line-564"> this.ourFiles = new PathFilter() {</span> |
| <span class="source-line-no">565</span><span id="line-565"> @Override</span> |
| <span class="source-line-no">566</span><span id="line-566"> public boolean accept(final Path fileName) {</span> |
| <span class="source-line-no">567</span><span id="line-567"> // The path should start with dir/<prefix> and end with our suffix</span> |
| <span class="source-line-no">568</span><span id="line-568"> final String fileNameString = fileName.toString();</span> |
| <span class="source-line-no">569</span><span id="line-569"> if (!fileNameString.startsWith(prefixPathStr)) {</span> |
| <span class="source-line-no">570</span><span id="line-570"> return false;</span> |
| <span class="source-line-no">571</span><span id="line-571"> }</span> |
| <span class="source-line-no">572</span><span id="line-572"> if (walFileSuffix.isEmpty()) {</span> |
| <span class="source-line-no">573</span><span id="line-573"> // in the case of the null suffix, we need to ensure the filename ends with a timestamp.</span> |
| <span class="source-line-no">574</span><span id="line-574"> return org.apache.commons.lang3.StringUtils</span> |
| <span class="source-line-no">575</span><span id="line-575"> .isNumeric(fileNameString.substring(prefixPathStr.length()));</span> |
| <span class="source-line-no">576</span><span id="line-576"> } else if (!fileNameString.endsWith(walFileSuffix)) {</span> |
| <span class="source-line-no">577</span><span id="line-577"> return false;</span> |
| <span class="source-line-no">578</span><span id="line-578"> }</span> |
| <span class="source-line-no">579</span><span id="line-579"> return true;</span> |
| <span class="source-line-no">580</span><span id="line-580"> }</span> |
| <span class="source-line-no">581</span><span id="line-581"> };</span> |
| <span class="source-line-no">582</span><span id="line-582"></span> |
| <span class="source-line-no">583</span><span id="line-583"> if (failIfWALExists) {</span> |
| <span class="source-line-no">584</span><span id="line-584"> final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);</span> |
| <span class="source-line-no">585</span><span id="line-585"> if (null != walFiles && 0 != walFiles.length) {</span> |
| <span class="source-line-no">586</span><span id="line-586"> throw new IOException("Target WAL already exists within directory " + walDir);</span> |
| <span class="source-line-no">587</span><span id="line-587"> }</span> |
| <span class="source-line-no">588</span><span id="line-588"> }</span> |
| <span class="source-line-no">589</span><span id="line-589"></span> |
| <span class="source-line-no">590</span><span id="line-590"> // Register listeners. TODO: Should this exist anymore? We have CPs?</span> |
| <span class="source-line-no">591</span><span id="line-591"> if (listeners != null) {</span> |
| <span class="source-line-no">592</span><span id="line-592"> for (WALActionsListener i : listeners) {</span> |
| <span class="source-line-no">593</span><span id="line-593"> registerWALActionsListener(i);</span> |
| <span class="source-line-no">594</span><span id="line-594"> }</span> |
| <span class="source-line-no">595</span><span id="line-595"> }</span> |
| <span class="source-line-no">596</span><span id="line-596"> this.coprocessorHost = new WALCoprocessorHost(this, conf);</span> |
| <span class="source-line-no">597</span><span id="line-597"></span> |
| <span class="source-line-no">598</span><span id="line-598"> // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block</span> |
| <span class="source-line-no">599</span><span id="line-599"> // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost</span> |
| <span class="source-line-no">600</span><span id="line-600"> // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of</span> |
| <span class="source-line-no">601</span><span id="line-601"> // the block size but experience from the field has it that this was not enough time for the</span> |
| <span class="source-line-no">602</span><span id="line-602"> // roll to happen before end-of-block. So the new accounting makes WALs of about the same</span> |
| <span class="source-line-no">603</span><span id="line-603"> // size as those made in hbase-1 (to prevent surprise), we now have default block size as</span> |
| <span class="source-line-no">604</span><span id="line-604"> // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally</span> |
| <span class="source-line-no">605</span><span id="line-605"> // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.</span> |
| <span class="source-line-no">606</span><span id="line-606"> this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);</span> |
| <span class="source-line-no">607</span><span id="line-607"> float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);</span> |
| <span class="source-line-no">608</span><span id="line-608"> this.logrollsize = (long) (this.blocksize * multiplier);</span> |
| <span class="source-line-no">609</span><span id="line-609"> this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));</span> |
| <span class="source-line-no">610</span><span id="line-610"></span> |
| <span class="source-line-no">611</span><span id="line-611"> LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize="</span> |
| <span class="source-line-no">612</span><span id="line-612"> + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix="</span> |
| <span class="source-line-no">613</span><span id="line-613"> + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir</span> |
| <span class="source-line-no">614</span><span id="line-614"> + ", maxLogs=" + this.maxLogs);</span> |
| <span class="source-line-no">615</span><span id="line-615"> this.slowSyncNs =</span> |
| <span class="source-line-no">616</span><span id="line-616"> TimeUnit.MILLISECONDS.toNanos(conf.getInt(SLOW_SYNC_TIME_MS, DEFAULT_SLOW_SYNC_TIME_MS));</span> |
| <span class="source-line-no">617</span><span id="line-617"> this.rollOnSyncNs = TimeUnit.MILLISECONDS</span> |
| <span class="source-line-no">618</span><span id="line-618"> .toNanos(conf.getInt(ROLL_ON_SYNC_TIME_MS, DEFAULT_ROLL_ON_SYNC_TIME_MS));</span> |
| <span class="source-line-no">619</span><span id="line-619"> this.slowSyncRollThreshold =</span> |
| <span class="source-line-no">620</span><span id="line-620"> conf.getInt(SLOW_SYNC_ROLL_THRESHOLD, DEFAULT_SLOW_SYNC_ROLL_THRESHOLD);</span> |
| <span class="source-line-no">621</span><span id="line-621"> this.slowSyncCheckInterval =</span> |
| <span class="source-line-no">622</span><span id="line-622"> conf.getInt(SLOW_SYNC_ROLL_INTERVAL_MS, DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);</span> |
| <span class="source-line-no">623</span><span id="line-623"> this.walSyncTimeoutNs =</span> |
| <span class="source-line-no">624</span><span id="line-624"> TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS, DEFAULT_WAL_SYNC_TIMEOUT_MS));</span> |
| <span class="source-line-no">625</span><span id="line-625"> this.syncFutureCache = new SyncFutureCache(conf);</span> |
| <span class="source-line-no">626</span><span id="line-626"> this.implClassName = getClass().getSimpleName();</span> |
| <span class="source-line-no">627</span><span id="line-627"> this.walTooOldNs = TimeUnit.SECONDS</span> |
| <span class="source-line-no">628</span><span id="line-628"> .toNanos(conf.getInt(SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));</span> |
| <span class="source-line-no">629</span><span id="line-629"> this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);</span> |
| <span class="source-line-no">630</span><span id="line-630"> archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0);</span> |
| <span class="source-line-no">631</span><span id="line-631"> this.walShutdownTimeout =</span> |
| <span class="source-line-no">632</span><span id="line-632"> conf.getLong(WAL_SHUTDOWN_WAIT_TIMEOUT_MS, DEFAULT_WAL_SHUTDOWN_WAIT_TIMEOUT_MS);</span> |
| <span class="source-line-no">633</span><span id="line-633"></span> |
| <span class="source-line-no">634</span><span id="line-634"> int preallocatedEventCount =</span> |
| <span class="source-line-no">635</span><span id="line-635"> conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);</span> |
| <span class="source-line-no">636</span><span id="line-636"> waitingConsumePayloads =</span> |
| <span class="source-line-no">637</span><span id="line-637"> RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount);</span> |
| <span class="source-line-no">638</span><span id="line-638"> waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);</span> |
| <span class="source-line-no">639</span><span id="line-639"> waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);</span> |
| <span class="source-line-no">640</span><span id="line-640"></span> |
| <span class="source-line-no">641</span><span id="line-641"> // inrease the ringbuffer sequence so our txid is start from 1</span> |
| <span class="source-line-no">642</span><span id="line-642"> waitingConsumePayloads.publish(waitingConsumePayloads.next());</span> |
| <span class="source-line-no">643</span><span id="line-643"> waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());</span> |
| <span class="source-line-no">644</span><span id="line-644"></span> |
| <span class="source-line-no">645</span><span id="line-645"> batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);</span> |
| <span class="source-line-no">646</span><span id="line-646"> }</span> |
| <span class="source-line-no">647</span><span id="line-647"></span> |
| <span class="source-line-no">648</span><span id="line-648"> /**</span> |
| <span class="source-line-no">649</span><span id="line-649"> * Used to initialize the WAL. Usually just call rollWriter to create the first log writer.</span> |
| <span class="source-line-no">650</span><span id="line-650"> */</span> |
| <span class="source-line-no">651</span><span id="line-651"> @Override</span> |
| <span class="source-line-no">652</span><span id="line-652"> public void init() throws IOException {</span> |
| <span class="source-line-no">653</span><span id="line-653"> rollWriter();</span> |
| <span class="source-line-no">654</span><span id="line-654"> }</span> |
| <span class="source-line-no">655</span><span id="line-655"></span> |
| <span class="source-line-no">656</span><span id="line-656"> @Override</span> |
| <span class="source-line-no">657</span><span id="line-657"> public void registerWALActionsListener(WALActionsListener listener) {</span> |
| <span class="source-line-no">658</span><span id="line-658"> this.listeners.add(listener);</span> |
| <span class="source-line-no">659</span><span id="line-659"> }</span> |
| <span class="source-line-no">660</span><span id="line-660"></span> |
| <span class="source-line-no">661</span><span id="line-661"> @Override</span> |
| <span class="source-line-no">662</span><span id="line-662"> public boolean unregisterWALActionsListener(WALActionsListener listener) {</span> |
| <span class="source-line-no">663</span><span id="line-663"> return this.listeners.remove(listener);</span> |
| <span class="source-line-no">664</span><span id="line-664"> }</span> |
| <span class="source-line-no">665</span><span id="line-665"></span> |
| <span class="source-line-no">666</span><span id="line-666"> @Override</span> |
| <span class="source-line-no">667</span><span id="line-667"> public WALCoprocessorHost getCoprocessorHost() {</span> |
| <span class="source-line-no">668</span><span id="line-668"> return coprocessorHost;</span> |
| <span class="source-line-no">669</span><span id="line-669"> }</span> |
| <span class="source-line-no">670</span><span id="line-670"></span> |
| <span class="source-line-no">671</span><span id="line-671"> @Override</span> |
| <span class="source-line-no">672</span><span id="line-672"> public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {</span> |
| <span class="source-line-no">673</span><span id="line-673"> return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);</span> |
| <span class="source-line-no">674</span><span id="line-674"> }</span> |
| <span class="source-line-no">675</span><span id="line-675"></span> |
| <span class="source-line-no">676</span><span id="line-676"> @Override</span> |
| <span class="source-line-no">677</span><span id="line-677"> public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {</span> |
| <span class="source-line-no">678</span><span id="line-678"> return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);</span> |
| <span class="source-line-no">679</span><span id="line-679"> }</span> |
| <span class="source-line-no">680</span><span id="line-680"></span> |
| <span class="source-line-no">681</span><span id="line-681"> @Override</span> |
| <span class="source-line-no">682</span><span id="line-682"> public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {</span> |
| <span class="source-line-no">683</span><span id="line-683"> this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);</span> |
| <span class="source-line-no">684</span><span id="line-684"> }</span> |
| <span class="source-line-no">685</span><span id="line-685"></span> |
| <span class="source-line-no">686</span><span id="line-686"> @Override</span> |
| <span class="source-line-no">687</span><span id="line-687"> public void abortCacheFlush(byte[] encodedRegionName) {</span> |
| <span class="source-line-no">688</span><span id="line-688"> this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);</span> |
| <span class="source-line-no">689</span><span id="line-689"> }</span> |
| <span class="source-line-no">690</span><span id="line-690"></span> |
| <span class="source-line-no">691</span><span id="line-691"> @Override</span> |
| <span class="source-line-no">692</span><span id="line-692"> public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {</span> |
| <span class="source-line-no">693</span><span id="line-693"> // This method is used by tests and for figuring if we should flush or not because our</span> |
| <span class="source-line-no">694</span><span id="line-694"> // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use</span> |
| <span class="source-line-no">695</span><span id="line-695"> // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId</span> |
| <span class="source-line-no">696</span><span id="line-696"> // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the</span> |
| <span class="source-line-no">697</span><span id="line-697"> // currently flushing sequence ids, and if anything found there, it is returning these. This is</span> |
| <span class="source-line-no">698</span><span id="line-698"> // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if</span> |
| <span class="source-line-no">699</span><span id="line-699"> // we crash during the flush. For figuring what to flush, we might get requeued if our sequence</span> |
| <span class="source-line-no">700</span><span id="line-700"> // id is old even though we are currently flushing. This may mean we do too much flushing.</span> |
| <span class="source-line-no">701</span><span id="line-701"> return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);</span> |
| <span class="source-line-no">702</span><span id="line-702"> }</span> |
| <span class="source-line-no">703</span><span id="line-703"></span> |
| <span class="source-line-no">704</span><span id="line-704"> @Override</span> |
| <span class="source-line-no">705</span><span id="line-705"> public Map<byte[], List<byte[]>> rollWriter() throws FailedLogCloseException, IOException {</span> |
| <span class="source-line-no">706</span><span id="line-706"> return rollWriter(false);</span> |
| <span class="source-line-no">707</span><span id="line-707"> }</span> |
| <span class="source-line-no">708</span><span id="line-708"></span> |
| <span class="source-line-no">709</span><span id="line-709"> @Override</span> |
| <span class="source-line-no">710</span><span id="line-710"> public final void sync() throws IOException {</span> |
| <span class="source-line-no">711</span><span id="line-711"> sync(useHsync);</span> |
| <span class="source-line-no">712</span><span id="line-712"> }</span> |
| <span class="source-line-no">713</span><span id="line-713"></span> |
| <span class="source-line-no">714</span><span id="line-714"> @Override</span> |
| <span class="source-line-no">715</span><span id="line-715"> public final void sync(long txid) throws IOException {</span> |
| <span class="source-line-no">716</span><span id="line-716"> sync(txid, useHsync);</span> |
| <span class="source-line-no">717</span><span id="line-717"> }</span> |
| <span class="source-line-no">718</span><span id="line-718"></span> |
| <span class="source-line-no">719</span><span id="line-719"> @Override</span> |
| <span class="source-line-no">720</span><span id="line-720"> public final void sync(boolean forceSync) throws IOException {</span> |
| <span class="source-line-no">721</span><span id="line-721"> TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));</span> |
| <span class="source-line-no">722</span><span id="line-722"> }</span> |
| <span class="source-line-no">723</span><span id="line-723"></span> |
| <span class="source-line-no">724</span><span id="line-724"> @Override</span> |
| <span class="source-line-no">725</span><span id="line-725"> public final void sync(long txid, boolean forceSync) throws IOException {</span> |
| <span class="source-line-no">726</span><span id="line-726"> TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));</span> |
| <span class="source-line-no">727</span><span id="line-727"> }</span> |
| <span class="source-line-no">728</span><span id="line-728"></span> |
| <span class="source-line-no">729</span><span id="line-729"> @RestrictedApi(explanation = "Should only be called in tests", link = "",</span> |
| <span class="source-line-no">730</span><span id="line-730"> allowedOnPath = ".*/src/test/.*")</span> |
| <span class="source-line-no">731</span><span id="line-731"> public SequenceIdAccounting getSequenceIdAccounting() {</span> |
| <span class="source-line-no">732</span><span id="line-732"> return sequenceIdAccounting;</span> |
| <span class="source-line-no">733</span><span id="line-733"> }</span> |
| <span class="source-line-no">734</span><span id="line-734"></span> |
| <span class="source-line-no">735</span><span id="line-735"> /**</span> |
| <span class="source-line-no">736</span><span id="line-736"> * This is a convenience method that computes a new filename with a given file-number.</span> |
| <span class="source-line-no">737</span><span id="line-737"> * @param filenum to use</span> |
| <span class="source-line-no">738</span><span id="line-738"> */</span> |
| <span class="source-line-no">739</span><span id="line-739"> protected Path computeFilename(final long filenum) {</span> |
| <span class="source-line-no">740</span><span id="line-740"> if (filenum < 0) {</span> |
| <span class="source-line-no">741</span><span id="line-741"> throw new RuntimeException("WAL file number can't be < 0");</span> |
| <span class="source-line-no">742</span><span id="line-742"> }</span> |
| <span class="source-line-no">743</span><span id="line-743"> String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix;</span> |
| <span class="source-line-no">744</span><span id="line-744"> return new Path(walDir, child);</span> |
| <span class="source-line-no">745</span><span id="line-745"> }</span> |
| <span class="source-line-no">746</span><span id="line-746"></span> |
| <span class="source-line-no">747</span><span id="line-747"> /**</span> |
| <span class="source-line-no">748</span><span id="line-748"> * This is a convenience method that computes a new filename with a given using the current WAL</span> |
| <span class="source-line-no">749</span><span id="line-749"> * file-number</span> |
| <span class="source-line-no">750</span><span id="line-750"> */</span> |
| <span class="source-line-no">751</span><span id="line-751"> public Path getCurrentFileName() {</span> |
| <span class="source-line-no">752</span><span id="line-752"> return computeFilename(this.filenum.get());</span> |
| <span class="source-line-no">753</span><span id="line-753"> }</span> |
| <span class="source-line-no">754</span><span id="line-754"></span> |
| <span class="source-line-no">755</span><span id="line-755"> /**</span> |
| <span class="source-line-no">756</span><span id="line-756"> * retrieve the next path to use for writing. Increments the internal filenum.</span> |
| <span class="source-line-no">757</span><span id="line-757"> */</span> |
| <span class="source-line-no">758</span><span id="line-758"> private Path getNewPath() throws IOException {</span> |
| <span class="source-line-no">759</span><span id="line-759"> this.filenum.set(Math.max(getFilenum() + 1, EnvironmentEdgeManager.currentTime()));</span> |
| <span class="source-line-no">760</span><span id="line-760"> Path newPath = getCurrentFileName();</span> |
| <span class="source-line-no">761</span><span id="line-761"> return newPath;</span> |
| <span class="source-line-no">762</span><span id="line-762"> }</span> |
| <span class="source-line-no">763</span><span id="line-763"></span> |
| <span class="source-line-no">764</span><span id="line-764"> public Path getOldPath() {</span> |
| <span class="source-line-no">765</span><span id="line-765"> long currentFilenum = this.filenum.get();</span> |
| <span class="source-line-no">766</span><span id="line-766"> Path oldPath = null;</span> |
| <span class="source-line-no">767</span><span id="line-767"> if (currentFilenum > 0) {</span> |
| <span class="source-line-no">768</span><span id="line-768"> // ComputeFilename will take care of meta wal filename</span> |
| <span class="source-line-no">769</span><span id="line-769"> oldPath = computeFilename(currentFilenum);</span> |
| <span class="source-line-no">770</span><span id="line-770"> } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine?</span> |
| <span class="source-line-no">771</span><span id="line-771"> return oldPath;</span> |
| <span class="source-line-no">772</span><span id="line-772"> }</span> |
| <span class="source-line-no">773</span><span id="line-773"></span> |
| <span class="source-line-no">774</span><span id="line-774"> /**</span> |
| <span class="source-line-no">775</span><span id="line-775"> * Tell listeners about pre log roll.</span> |
| <span class="source-line-no">776</span><span id="line-776"> */</span> |
| <span class="source-line-no">777</span><span id="line-777"> private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)</span> |
| <span class="source-line-no">778</span><span id="line-778"> throws IOException {</span> |
| <span class="source-line-no">779</span><span id="line-779"> coprocessorHost.preWALRoll(oldPath, newPath);</span> |
| <span class="source-line-no">780</span><span id="line-780"></span> |
| <span class="source-line-no">781</span><span id="line-781"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">782</span><span id="line-782"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">783</span><span id="line-783"> i.preLogRoll(oldPath, newPath);</span> |
| <span class="source-line-no">784</span><span id="line-784"> }</span> |
| <span class="source-line-no">785</span><span id="line-785"> }</span> |
| <span class="source-line-no">786</span><span id="line-786"> }</span> |
| <span class="source-line-no">787</span><span id="line-787"></span> |
| <span class="source-line-no">788</span><span id="line-788"> /**</span> |
| <span class="source-line-no">789</span><span id="line-789"> * Tell listeners about post log roll.</span> |
| <span class="source-line-no">790</span><span id="line-790"> */</span> |
| <span class="source-line-no">791</span><span id="line-791"> private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)</span> |
| <span class="source-line-no">792</span><span id="line-792"> throws IOException {</span> |
| <span class="source-line-no">793</span><span id="line-793"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">794</span><span id="line-794"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">795</span><span id="line-795"> i.postLogRoll(oldPath, newPath);</span> |
| <span class="source-line-no">796</span><span id="line-796"> }</span> |
| <span class="source-line-no">797</span><span id="line-797"> }</span> |
| <span class="source-line-no">798</span><span id="line-798"></span> |
| <span class="source-line-no">799</span><span id="line-799"> coprocessorHost.postWALRoll(oldPath, newPath);</span> |
| <span class="source-line-no">800</span><span id="line-800"> }</span> |
| <span class="source-line-no">801</span><span id="line-801"></span> |
| <span class="source-line-no">802</span><span id="line-802"> // public only until class moves to o.a.h.h.wal</span> |
| <span class="source-line-no">803</span><span id="line-803"> /** Returns the number of rolled log files */</span> |
| <span class="source-line-no">804</span><span id="line-804"> public int getNumRolledLogFiles() {</span> |
| <span class="source-line-no">805</span><span id="line-805"> return walFile2Props.size();</span> |
| <span class="source-line-no">806</span><span id="line-806"> }</span> |
| <span class="source-line-no">807</span><span id="line-807"></span> |
| <span class="source-line-no">808</span><span id="line-808"> // public only until class moves to o.a.h.h.wal</span> |
| <span class="source-line-no">809</span><span id="line-809"> /** Returns the number of log files in use */</span> |
| <span class="source-line-no">810</span><span id="line-810"> public int getNumLogFiles() {</span> |
| <span class="source-line-no">811</span><span id="line-811"> // +1 for current use log</span> |
| <span class="source-line-no">812</span><span id="line-812"> return getNumRolledLogFiles() + 1;</span> |
| <span class="source-line-no">813</span><span id="line-813"> }</span> |
| <span class="source-line-no">814</span><span id="line-814"></span> |
| <span class="source-line-no">815</span><span id="line-815"> /**</span> |
| <span class="source-line-no">816</span><span id="line-816"> * If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed, check the</span> |
| <span class="source-line-no">817</span><span id="line-817"> * first (oldest) WAL, and return those regions which should be flushed so that it can be</span> |
| <span class="source-line-no">818</span><span id="line-818"> * let-go/'archived'.</span> |
| <span class="source-line-no">819</span><span id="line-819"> * @return stores of regions (encodedRegionNames) to flush in order to archive the oldest WAL file</span> |
| <span class="source-line-no">820</span><span id="line-820"> */</span> |
| <span class="source-line-no">821</span><span id="line-821"> Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {</span> |
| <span class="source-line-no">822</span><span id="line-822"> Map<byte[], List<byte[]>> regions = null;</span> |
| <span class="source-line-no">823</span><span id="line-823"> int logCount = getNumRolledLogFiles();</span> |
| <span class="source-line-no">824</span><span id="line-824"> if (logCount > this.maxLogs && logCount > 0) {</span> |
| <span class="source-line-no">825</span><span id="line-825"> Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();</span> |
| <span class="source-line-no">826</span><span id="line-826"> regions =</span> |
| <span class="source-line-no">827</span><span id="line-827"> this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);</span> |
| <span class="source-line-no">828</span><span id="line-828"> }</span> |
| <span class="source-line-no">829</span><span id="line-829"> if (regions != null) {</span> |
| <span class="source-line-no">830</span><span id="line-830"> List<String> listForPrint = new ArrayList<>();</span> |
| <span class="source-line-no">831</span><span id="line-831"> for (Map.Entry<byte[], List<byte[]>> r : regions.entrySet()) {</span> |
| <span class="source-line-no">832</span><span id="line-832"> StringBuilder families = new StringBuilder();</span> |
| <span class="source-line-no">833</span><span id="line-833"> for (int i = 0; i < r.getValue().size(); i++) {</span> |
| <span class="source-line-no">834</span><span id="line-834"> if (i > 0) {</span> |
| <span class="source-line-no">835</span><span id="line-835"> families.append(",");</span> |
| <span class="source-line-no">836</span><span id="line-836"> }</span> |
| <span class="source-line-no">837</span><span id="line-837"> families.append(Bytes.toString(r.getValue().get(i)));</span> |
| <span class="source-line-no">838</span><span id="line-838"> }</span> |
| <span class="source-line-no">839</span><span id="line-839"> listForPrint.add(Bytes.toStringBinary(r.getKey()) + "[" + families.toString() + "]");</span> |
| <span class="source-line-no">840</span><span id="line-840"> }</span> |
| <span class="source-line-no">841</span><span id="line-841"> LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs</span> |
| <span class="source-line-no">842</span><span id="line-842"> + "; forcing (partial) flush of " + regions.size() + " region(s): "</span> |
| <span class="source-line-no">843</span><span id="line-843"> + StringUtils.join(",", listForPrint));</span> |
| <span class="source-line-no">844</span><span id="line-844"> }</span> |
| <span class="source-line-no">845</span><span id="line-845"> return regions;</span> |
| <span class="source-line-no">846</span><span id="line-846"> }</span> |
| <span class="source-line-no">847</span><span id="line-847"></span> |
| <span class="source-line-no">848</span><span id="line-848"> /**</span> |
| <span class="source-line-no">849</span><span id="line-849"> * Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.</span> |
| <span class="source-line-no">850</span><span id="line-850"> */</span> |
| <span class="source-line-no">851</span><span id="line-851"> private void markClosedAndClean(Path path) {</span> |
| <span class="source-line-no">852</span><span id="line-852"> WALProps props = walFile2Props.get(path);</span> |
| <span class="source-line-no">853</span><span id="line-853"> // typically this should not be null, but if there is no big issue if it is already null, so</span> |
| <span class="source-line-no">854</span><span id="line-854"> // let's make the code more robust</span> |
| <span class="source-line-no">855</span><span id="line-855"> if (props != null) {</span> |
| <span class="source-line-no">856</span><span id="line-856"> props.closed = true;</span> |
| <span class="source-line-no">857</span><span id="line-857"> cleanOldLogs();</span> |
| <span class="source-line-no">858</span><span id="line-858"> }</span> |
| <span class="source-line-no">859</span><span id="line-859"> }</span> |
| <span class="source-line-no">860</span><span id="line-860"></span> |
| <span class="source-line-no">861</span><span id="line-861"> /**</span> |
| <span class="source-line-no">862</span><span id="line-862"> * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.</span> |
| <span class="source-line-no">863</span><span id="line-863"> * <p/></span> |
| <span class="source-line-no">864</span><span id="line-864"> * Use synchronized because we may call this method in different threads, normally when replacing</span> |
| <span class="source-line-no">865</span><span id="line-865"> * writer, and since now close writer may be asynchronous, we will also call this method in the</span> |
| <span class="source-line-no">866</span><span id="line-866"> * closeExecutor, right after we actually close a WAL writer.</span> |
| <span class="source-line-no">867</span><span id="line-867"> */</span> |
| <span class="source-line-no">868</span><span id="line-868"> private synchronized void cleanOldLogs() {</span> |
| <span class="source-line-no">869</span><span id="line-869"> List<Pair<Path, Long>> logsToArchive = null;</span> |
| <span class="source-line-no">870</span><span id="line-870"> long now = System.nanoTime();</span> |
| <span class="source-line-no">871</span><span id="line-871"> boolean mayLogTooOld = nextLogTooOldNs <= now;</span> |
| <span class="source-line-no">872</span><span id="line-872"> ArrayList<byte[]> regionsBlockingWal = null;</span> |
| <span class="source-line-no">873</span><span id="line-873"> // For each log file, look at its Map of regions to the highest sequence id; if all sequence ids</span> |
| <span class="source-line-no">874</span><span id="line-874"> // are older than what is currently in memory, the WAL can be GC'd.</span> |
| <span class="source-line-no">875</span><span id="line-875"> for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {</span> |
| <span class="source-line-no">876</span><span id="line-876"> if (!e.getValue().closed) {</span> |
| <span class="source-line-no">877</span><span id="line-877"> LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());</span> |
| <span class="source-line-no">878</span><span id="line-878"> continue;</span> |
| <span class="source-line-no">879</span><span id="line-879"> }</span> |
| <span class="source-line-no">880</span><span id="line-880"> Path log = e.getKey();</span> |
| <span class="source-line-no">881</span><span id="line-881"> ArrayList<byte[]> regionsBlockingThisWal = null;</span> |
| <span class="source-line-no">882</span><span id="line-882"> long ageNs = now - e.getValue().rollTimeNs;</span> |
| <span class="source-line-no">883</span><span id="line-883"> if (ageNs > walTooOldNs) {</span> |
| <span class="source-line-no">884</span><span id="line-884"> if (mayLogTooOld && regionsBlockingWal == null) {</span> |
| <span class="source-line-no">885</span><span id="line-885"> regionsBlockingWal = new ArrayList<>();</span> |
| <span class="source-line-no">886</span><span id="line-886"> }</span> |
| <span class="source-line-no">887</span><span id="line-887"> regionsBlockingThisWal = regionsBlockingWal;</span> |
| <span class="source-line-no">888</span><span id="line-888"> }</span> |
| <span class="source-line-no">889</span><span id="line-889"> Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;</span> |
| <span class="source-line-no">890</span><span id="line-890"> if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {</span> |
| <span class="source-line-no">891</span><span id="line-891"> if (logsToArchive == null) {</span> |
| <span class="source-line-no">892</span><span id="line-892"> logsToArchive = new ArrayList<>();</span> |
| <span class="source-line-no">893</span><span id="line-893"> }</span> |
| <span class="source-line-no">894</span><span id="line-894"> logsToArchive.add(Pair.newPair(log, e.getValue().logSize));</span> |
| <span class="source-line-no">895</span><span id="line-895"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">896</span><span id="line-896"> LOG.trace("WAL file ready for archiving " + log);</span> |
| <span class="source-line-no">897</span><span id="line-897"> }</span> |
| <span class="source-line-no">898</span><span id="line-898"> } else if (regionsBlockingThisWal != null) {</span> |
| <span class="source-line-no">899</span><span id="line-899"> StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")</span> |
| <span class="source-line-no">900</span><span id="line-900"> .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");</span> |
| <span class="source-line-no">901</span><span id="line-901"> boolean isFirst = true;</span> |
| <span class="source-line-no">902</span><span id="line-902"> for (byte[] region : regionsBlockingThisWal) {</span> |
| <span class="source-line-no">903</span><span id="line-903"> if (!isFirst) {</span> |
| <span class="source-line-no">904</span><span id="line-904"> sb.append("; ");</span> |
| <span class="source-line-no">905</span><span id="line-905"> }</span> |
| <span class="source-line-no">906</span><span id="line-906"> isFirst = false;</span> |
| <span class="source-line-no">907</span><span id="line-907"> sb.append(Bytes.toString(region));</span> |
| <span class="source-line-no">908</span><span id="line-908"> }</span> |
| <span class="source-line-no">909</span><span id="line-909"> LOG.warn(sb.toString());</span> |
| <span class="source-line-no">910</span><span id="line-910"> nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;</span> |
| <span class="source-line-no">911</span><span id="line-911"> regionsBlockingThisWal.clear();</span> |
| <span class="source-line-no">912</span><span id="line-912"> }</span> |
| <span class="source-line-no">913</span><span id="line-913"> }</span> |
| <span class="source-line-no">914</span><span id="line-914"></span> |
| <span class="source-line-no">915</span><span id="line-915"> if (logsToArchive != null) {</span> |
| <span class="source-line-no">916</span><span id="line-916"> final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;</span> |
| <span class="source-line-no">917</span><span id="line-917"> // make it async</span> |
| <span class="source-line-no">918</span><span id="line-918"> for (Pair<Path, Long> log : localLogsToArchive) {</span> |
| <span class="source-line-no">919</span><span id="line-919"> logArchiveExecutor.execute(() -> {</span> |
| <span class="source-line-no">920</span><span id="line-920"> archive(log);</span> |
| <span class="source-line-no">921</span><span id="line-921"> });</span> |
| <span class="source-line-no">922</span><span id="line-922"> this.walFile2Props.remove(log.getFirst());</span> |
| <span class="source-line-no">923</span><span id="line-923"> }</span> |
| <span class="source-line-no">924</span><span id="line-924"> }</span> |
| <span class="source-line-no">925</span><span id="line-925"> }</span> |
| <span class="source-line-no">926</span><span id="line-926"></span> |
| <span class="source-line-no">927</span><span id="line-927"> protected void archive(final Pair<Path, Long> log) {</span> |
| <span class="source-line-no">928</span><span id="line-928"> totalLogSize.addAndGet(-log.getSecond());</span> |
| <span class="source-line-no">929</span><span id="line-929"> int retry = 1;</span> |
| <span class="source-line-no">930</span><span id="line-930"> while (true) {</span> |
| <span class="source-line-no">931</span><span id="line-931"> try {</span> |
| <span class="source-line-no">932</span><span id="line-932"> archiveLogFile(log.getFirst());</span> |
| <span class="source-line-no">933</span><span id="line-933"> // successful</span> |
| <span class="source-line-no">934</span><span id="line-934"> break;</span> |
| <span class="source-line-no">935</span><span id="line-935"> } catch (Throwable e) {</span> |
| <span class="source-line-no">936</span><span id="line-936"> if (retry > archiveRetries) {</span> |
| <span class="source-line-no">937</span><span id="line-937"> LOG.error("Failed log archiving for the log {},", log.getFirst(), e);</span> |
| <span class="source-line-no">938</span><span id="line-938"> if (this.abortable != null) {</span> |
| <span class="source-line-no">939</span><span id="line-939"> this.abortable.abort("Failed log archiving", e);</span> |
| <span class="source-line-no">940</span><span id="line-940"> break;</span> |
| <span class="source-line-no">941</span><span id="line-941"> }</span> |
| <span class="source-line-no">942</span><span id="line-942"> } else {</span> |
| <span class="source-line-no">943</span><span id="line-943"> LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, e);</span> |
| <span class="source-line-no">944</span><span id="line-944"> }</span> |
| <span class="source-line-no">945</span><span id="line-945"> retry++;</span> |
| <span class="source-line-no">946</span><span id="line-946"> }</span> |
| <span class="source-line-no">947</span><span id="line-947"> }</span> |
| <span class="source-line-no">948</span><span id="line-948"> }</span> |
| <span class="source-line-no">949</span><span id="line-949"></span> |
| <span class="source-line-no">950</span><span id="line-950"> /*</span> |
| <span class="source-line-no">951</span><span id="line-951"> * only public so WALSplitter can use.</span> |
| <span class="source-line-no">952</span><span id="line-952"> * @return archived location of a WAL file with the given path p</span> |
| <span class="source-line-no">953</span><span id="line-953"> */</span> |
| <span class="source-line-no">954</span><span id="line-954"> public static Path getWALArchivePath(Path archiveDir, Path p) {</span> |
| <span class="source-line-no">955</span><span id="line-955"> return new Path(archiveDir, p.getName());</span> |
| <span class="source-line-no">956</span><span id="line-956"> }</span> |
| <span class="source-line-no">957</span><span id="line-957"></span> |
| <span class="source-line-no">958</span><span id="line-958"> protected void archiveLogFile(final Path p) throws IOException {</span> |
| <span class="source-line-no">959</span><span id="line-959"> Path newPath = getWALArchivePath(this.walArchiveDir, p);</span> |
| <span class="source-line-no">960</span><span id="line-960"> // Tell our listeners that a log is going to be archived.</span> |
| <span class="source-line-no">961</span><span id="line-961"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">962</span><span id="line-962"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">963</span><span id="line-963"> i.preLogArchive(p, newPath);</span> |
| <span class="source-line-no">964</span><span id="line-964"> }</span> |
| <span class="source-line-no">965</span><span id="line-965"> }</span> |
| <span class="source-line-no">966</span><span id="line-966"> LOG.info("Archiving " + p + " to " + newPath);</span> |
| <span class="source-line-no">967</span><span id="line-967"> if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {</span> |
| <span class="source-line-no">968</span><span id="line-968"> throw new IOException("Unable to rename " + p + " to " + newPath);</span> |
| <span class="source-line-no">969</span><span id="line-969"> }</span> |
| <span class="source-line-no">970</span><span id="line-970"> // Tell our listeners that a log has been archived.</span> |
| <span class="source-line-no">971</span><span id="line-971"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">972</span><span id="line-972"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">973</span><span id="line-973"> i.postLogArchive(p, newPath);</span> |
| <span class="source-line-no">974</span><span id="line-974"> }</span> |
| <span class="source-line-no">975</span><span id="line-975"> }</span> |
| <span class="source-line-no">976</span><span id="line-976"> }</span> |
| <span class="source-line-no">977</span><span id="line-977"></span> |
| <span class="source-line-no">978</span><span id="line-978"> protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {</span> |
| <span class="source-line-no">979</span><span id="line-979"> int oldNumEntries = this.numEntries.getAndSet(0);</span> |
| <span class="source-line-no">980</span><span id="line-980"> String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;</span> |
| <span class="source-line-no">981</span><span id="line-981"> if (oldPath != null) {</span> |
| <span class="source-line-no">982</span><span id="line-982"> this.walFile2Props.put(oldPath,</span> |
| <span class="source-line-no">983</span><span id="line-983"> new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));</span> |
| <span class="source-line-no">984</span><span id="line-984"> this.totalLogSize.addAndGet(oldFileLen);</span> |
| <span class="source-line-no">985</span><span id="line-985"> LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",</span> |
| <span class="source-line-no">986</span><span id="line-986"> CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),</span> |
| <span class="source-line-no">987</span><span id="line-987"> newPathString);</span> |
| <span class="source-line-no">988</span><span id="line-988"> } else {</span> |
| <span class="source-line-no">989</span><span id="line-989"> LOG.info("New WAL {}", newPathString);</span> |
| <span class="source-line-no">990</span><span id="line-990"> }</span> |
| <span class="source-line-no">991</span><span id="line-991"> }</span> |
| <span class="source-line-no">992</span><span id="line-992"></span> |
| <span class="source-line-no">993</span><span id="line-993"> private Span createSpan(String name) {</span> |
| <span class="source-line-no">994</span><span id="line-994"> return TraceUtil.createSpan(name).setAttribute(WAL_IMPL, implClassName);</span> |
| <span class="source-line-no">995</span><span id="line-995"> }</span> |
| <span class="source-line-no">996</span><span id="line-996"></span> |
| <span class="source-line-no">997</span><span id="line-997"> /**</span> |
| <span class="source-line-no">998</span><span id="line-998"> * Cleans up current writer closing it and then puts in place the passed in {@code nextWriter}.</span> |
| <span class="source-line-no">999</span><span id="line-999"> * <p/></span> |
| <span class="source-line-no">1000</span><span id="line-1000"> * <ul></span> |
| <span class="source-line-no">1001</span><span id="line-1001"> * <li>In the case of creating a new WAL, oldPath will be null.</li></span> |
| <span class="source-line-no">1002</span><span id="line-1002"> * <li>In the case of rolling over from one file to the next, none of the parameters will be null.</span> |
| <span class="source-line-no">1003</span><span id="line-1003"> * </li></span> |
| <span class="source-line-no">1004</span><span id="line-1004"> * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be</span> |
| <span class="source-line-no">1005</span><span id="line-1005"> * null.</li></span> |
| <span class="source-line-no">1006</span><span id="line-1006"> * </ul></span> |
| <span class="source-line-no">1007</span><span id="line-1007"> * @param oldPath may be null</span> |
| <span class="source-line-no">1008</span><span id="line-1008"> * @param newPath may be null</span> |
| <span class="source-line-no">1009</span><span id="line-1009"> * @param nextWriter may be null</span> |
| <span class="source-line-no">1010</span><span id="line-1010"> * @return the passed in <code>newPath</code></span> |
| <span class="source-line-no">1011</span><span id="line-1011"> * @throws IOException if there is a problem flushing or closing the underlying FS</span> |
| <span class="source-line-no">1012</span><span id="line-1012"> */</span> |
| <span class="source-line-no">1013</span><span id="line-1013"> Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {</span> |
| <span class="source-line-no">1014</span><span id="line-1014"> return TraceUtil.trace(() -> {</span> |
| <span class="source-line-no">1015</span><span id="line-1015"> doReplaceWriter(oldPath, newPath, nextWriter);</span> |
| <span class="source-line-no">1016</span><span id="line-1016"> return newPath;</span> |
| <span class="source-line-no">1017</span><span id="line-1017"> }, () -> createSpan("WAL.replaceWriter"));</span> |
| <span class="source-line-no">1018</span><span id="line-1018"> }</span> |
| <span class="source-line-no">1019</span><span id="line-1019"></span> |
| <span class="source-line-no">1020</span><span id="line-1020"> protected final void blockOnSync(SyncFuture syncFuture) throws IOException {</span> |
| <span class="source-line-no">1021</span><span id="line-1021"> // Now we have published the ringbuffer, halt the current thread until we get an answer back.</span> |
| <span class="source-line-no">1022</span><span id="line-1022"> try {</span> |
| <span class="source-line-no">1023</span><span id="line-1023"> if (syncFuture != null) {</span> |
| <span class="source-line-no">1024</span><span id="line-1024"> if (closed) {</span> |
| <span class="source-line-no">1025</span><span id="line-1025"> throw new IOException("WAL has been closed");</span> |
| <span class="source-line-no">1026</span><span id="line-1026"> } else {</span> |
| <span class="source-line-no">1027</span><span id="line-1027"> syncFuture.get(walSyncTimeoutNs);</span> |
| <span class="source-line-no">1028</span><span id="line-1028"> }</span> |
| <span class="source-line-no">1029</span><span id="line-1029"> }</span> |
| <span class="source-line-no">1030</span><span id="line-1030"> } catch (TimeoutIOException tioe) {</span> |
| <span class="source-line-no">1031</span><span id="line-1031"> throw new WALSyncTimeoutIOException(tioe);</span> |
| <span class="source-line-no">1032</span><span id="line-1032"> } catch (InterruptedException ie) {</span> |
| <span class="source-line-no">1033</span><span id="line-1033"> LOG.warn("Interrupted", ie);</span> |
| <span class="source-line-no">1034</span><span id="line-1034"> throw convertInterruptedExceptionToIOException(ie);</span> |
| <span class="source-line-no">1035</span><span id="line-1035"> } catch (ExecutionException e) {</span> |
| <span class="source-line-no">1036</span><span id="line-1036"> throw ensureIOException(e.getCause());</span> |
| <span class="source-line-no">1037</span><span id="line-1037"> }</span> |
| <span class="source-line-no">1038</span><span id="line-1038"> }</span> |
| <span class="source-line-no">1039</span><span id="line-1039"></span> |
| <span class="source-line-no">1040</span><span id="line-1040"> private static IOException ensureIOException(final Throwable t) {</span> |
| <span class="source-line-no">1041</span><span id="line-1041"> return (t instanceof IOException) ? (IOException) t : new IOException(t);</span> |
| <span class="source-line-no">1042</span><span id="line-1042"> }</span> |
| <span class="source-line-no">1043</span><span id="line-1043"></span> |
| <span class="source-line-no">1044</span><span id="line-1044"> private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {</span> |
| <span class="source-line-no">1045</span><span id="line-1045"> Thread.currentThread().interrupt();</span> |
| <span class="source-line-no">1046</span><span id="line-1046"> IOException ioe = new InterruptedIOException();</span> |
| <span class="source-line-no">1047</span><span id="line-1047"> ioe.initCause(ie);</span> |
| <span class="source-line-no">1048</span><span id="line-1048"> return ioe;</span> |
| <span class="source-line-no">1049</span><span id="line-1049"> }</span> |
| <span class="source-line-no">1050</span><span id="line-1050"></span> |
| <span class="source-line-no">1051</span><span id="line-1051"> private W createCombinedWriter(W localWriter, Path localPath)</span> |
| <span class="source-line-no">1052</span><span id="line-1052"> throws IOException, CommonFSUtils.StreamLacksCapabilityException {</span> |
| <span class="source-line-no">1053</span><span id="line-1053"> // retry forever if we can not create the remote writer to prevent aborting the RS due to log</span> |
| <span class="source-line-no">1054</span><span id="line-1054"> // rolling error, unless the skipRemoteWal is set to true.</span> |
| <span class="source-line-no">1055</span><span id="line-1055"> // TODO: since for now we only have one thread doing log rolling, this may block the rolling for</span> |
| <span class="source-line-no">1056</span><span id="line-1056"> // other wals</span> |
| <span class="source-line-no">1057</span><span id="line-1057"> Path remoteWAL = new Path(remoteWALDir, localPath.getName());</span> |
| <span class="source-line-no">1058</span><span id="line-1058"> for (int retry = 0;; retry++) {</span> |
| <span class="source-line-no">1059</span><span id="line-1059"> if (skipRemoteWAL) {</span> |
| <span class="source-line-no">1060</span><span id="line-1060"> return localWriter;</span> |
| <span class="source-line-no">1061</span><span id="line-1061"> }</span> |
| <span class="source-line-no">1062</span><span id="line-1062"> W remoteWriter;</span> |
| <span class="source-line-no">1063</span><span id="line-1063"> try {</span> |
| <span class="source-line-no">1064</span><span id="line-1064"> remoteWriter = createWriterInstance(remoteFs, remoteWAL);</span> |
| <span class="source-line-no">1065</span><span id="line-1065"> } catch (IOException e) {</span> |
| <span class="source-line-no">1066</span><span id="line-1066"> LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);</span> |
| <span class="source-line-no">1067</span><span id="line-1067"> try {</span> |
| <span class="source-line-no">1068</span><span id="line-1068"> Thread.sleep(ConnectionUtils.getPauseTime(100, retry));</span> |
| <span class="source-line-no">1069</span><span id="line-1069"> } catch (InterruptedException ie) {</span> |
| <span class="source-line-no">1070</span><span id="line-1070"> // restore the interrupt state</span> |
| <span class="source-line-no">1071</span><span id="line-1071"> Thread.currentThread().interrupt();</span> |
| <span class="source-line-no">1072</span><span id="line-1072"> // must close local writer here otherwise no one will close it for us</span> |
| <span class="source-line-no">1073</span><span id="line-1073"> Closeables.close(localWriter, true);</span> |
| <span class="source-line-no">1074</span><span id="line-1074"> throw (IOException) new InterruptedIOException().initCause(ie);</span> |
| <span class="source-line-no">1075</span><span id="line-1075"> }</span> |
| <span class="source-line-no">1076</span><span id="line-1076"> continue;</span> |
| <span class="source-line-no">1077</span><span id="line-1077"> }</span> |
| <span class="source-line-no">1078</span><span id="line-1078"> return createCombinedWriter(localWriter, remoteWriter);</span> |
| <span class="source-line-no">1079</span><span id="line-1079"> }</span> |
| <span class="source-line-no">1080</span><span id="line-1080"> }</span> |
| <span class="source-line-no">1081</span><span id="line-1081"></span> |
| <span class="source-line-no">1082</span><span id="line-1082"> private Map<byte[], List<byte[]>> rollWriterInternal(boolean force) throws IOException {</span> |
| <span class="source-line-no">1083</span><span id="line-1083"> rollWriterLock.lock();</span> |
| <span class="source-line-no">1084</span><span id="line-1084"> try {</span> |
| <span class="source-line-no">1085</span><span id="line-1085"> if (this.closed) {</span> |
| <span class="source-line-no">1086</span><span id="line-1086"> throw new WALClosedException("WAL has been closed");</span> |
| <span class="source-line-no">1087</span><span id="line-1087"> }</span> |
| <span class="source-line-no">1088</span><span id="line-1088"> // Return if nothing to flush.</span> |
| <span class="source-line-no">1089</span><span id="line-1089"> if (!force && this.writer != null && this.numEntries.get() <= 0) {</span> |
| <span class="source-line-no">1090</span><span id="line-1090"> return null;</span> |
| <span class="source-line-no">1091</span><span id="line-1091"> }</span> |
| <span class="source-line-no">1092</span><span id="line-1092"> Map<byte[], List<byte[]>> regionsToFlush = null;</span> |
| <span class="source-line-no">1093</span><span id="line-1093"> try {</span> |
| <span class="source-line-no">1094</span><span id="line-1094"> Path oldPath = getOldPath();</span> |
| <span class="source-line-no">1095</span><span id="line-1095"> Path newPath = getNewPath();</span> |
| <span class="source-line-no">1096</span><span id="line-1096"> // Any exception from here on is catastrophic, non-recoverable, so we currently abort.</span> |
| <span class="source-line-no">1097</span><span id="line-1097"> W nextWriter = this.createWriterInstance(fs, newPath);</span> |
| <span class="source-line-no">1098</span><span id="line-1098"> if (remoteFs != null) {</span> |
| <span class="source-line-no">1099</span><span id="line-1099"> // create a remote wal if necessary</span> |
| <span class="source-line-no">1100</span><span id="line-1100"> nextWriter = createCombinedWriter(nextWriter, newPath);</span> |
| <span class="source-line-no">1101</span><span id="line-1101"> }</span> |
| <span class="source-line-no">1102</span><span id="line-1102"> tellListenersAboutPreLogRoll(oldPath, newPath);</span> |
| <span class="source-line-no">1103</span><span id="line-1103"> // NewPath could be equal to oldPath if replaceWriter fails.</span> |
| <span class="source-line-no">1104</span><span id="line-1104"> newPath = replaceWriter(oldPath, newPath, nextWriter);</span> |
| <span class="source-line-no">1105</span><span id="line-1105"> tellListenersAboutPostLogRoll(oldPath, newPath);</span> |
| <span class="source-line-no">1106</span><span id="line-1106"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">1107</span><span id="line-1107"> LOG.debug("Create new " + implClassName + " writer with pipeline: "</span> |
| <span class="source-line-no">1108</span><span id="line-1108"> + Arrays.toString(getPipeline()));</span> |
| <span class="source-line-no">1109</span><span id="line-1109"> }</span> |
| <span class="source-line-no">1110</span><span id="line-1110"> // We got a new writer, so reset the slow sync count</span> |
| <span class="source-line-no">1111</span><span id="line-1111"> lastTimeCheckSlowSync = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">1112</span><span id="line-1112"> slowSyncCount.set(0);</span> |
| <span class="source-line-no">1113</span><span id="line-1113"> // Can we delete any of the old log files?</span> |
| <span class="source-line-no">1114</span><span id="line-1114"> if (getNumRolledLogFiles() > 0) {</span> |
| <span class="source-line-no">1115</span><span id="line-1115"> cleanOldLogs();</span> |
| <span class="source-line-no">1116</span><span id="line-1116"> regionsToFlush = findRegionsToForceFlush();</span> |
| <span class="source-line-no">1117</span><span id="line-1117"> }</span> |
| <span class="source-line-no">1118</span><span id="line-1118"> } catch (CommonFSUtils.StreamLacksCapabilityException exception) {</span> |
| <span class="source-line-no">1119</span><span id="line-1119"> // If the underlying FileSystem can't do what we ask, treat as IO failure, so</span> |
| <span class="source-line-no">1120</span><span id="line-1120"> // we'll abort.</span> |
| <span class="source-line-no">1121</span><span id="line-1121"> throw new IOException(</span> |
| <span class="source-line-no">1122</span><span id="line-1122"> "Underlying FileSystem can't meet stream requirements. See RS log " + "for details.",</span> |
| <span class="source-line-no">1123</span><span id="line-1123"> exception);</span> |
| <span class="source-line-no">1124</span><span id="line-1124"> }</span> |
| <span class="source-line-no">1125</span><span id="line-1125"> return regionsToFlush;</span> |
| <span class="source-line-no">1126</span><span id="line-1126"> } finally {</span> |
| <span class="source-line-no">1127</span><span id="line-1127"> rollWriterLock.unlock();</span> |
| <span class="source-line-no">1128</span><span id="line-1128"> }</span> |
| <span class="source-line-no">1129</span><span id="line-1129"> }</span> |
| <span class="source-line-no">1130</span><span id="line-1130"></span> |
| <span class="source-line-no">1131</span><span id="line-1131"> @Override</span> |
| <span class="source-line-no">1132</span><span id="line-1132"> public Map<byte[], List<byte[]>> rollWriter(boolean force) throws IOException {</span> |
| <span class="source-line-no">1133</span><span id="line-1133"> return TraceUtil.trace(() -> rollWriterInternal(force), () -> createSpan("WAL.rollWriter"));</span> |
| <span class="source-line-no">1134</span><span id="line-1134"> }</span> |
| <span class="source-line-no">1135</span><span id="line-1135"></span> |
| <span class="source-line-no">1136</span><span id="line-1136"> // public only until class moves to o.a.h.h.wal</span> |
| <span class="source-line-no">1137</span><span id="line-1137"> /** Returns the size of log files in use */</span> |
| <span class="source-line-no">1138</span><span id="line-1138"> public long getLogFileSize() {</span> |
| <span class="source-line-no">1139</span><span id="line-1139"> return this.totalLogSize.get();</span> |
| <span class="source-line-no">1140</span><span id="line-1140"> }</span> |
| <span class="source-line-no">1141</span><span id="line-1141"></span> |
| <span class="source-line-no">1142</span><span id="line-1142"> // public only until class moves to o.a.h.h.wal</span> |
| <span class="source-line-no">1143</span><span id="line-1143"> public void requestLogRoll() {</span> |
| <span class="source-line-no">1144</span><span id="line-1144"> requestLogRoll(ERROR);</span> |
| <span class="source-line-no">1145</span><span id="line-1145"> }</span> |
| <span class="source-line-no">1146</span><span id="line-1146"></span> |
| <span class="source-line-no">1147</span><span id="line-1147"> /**</span> |
| <span class="source-line-no">1148</span><span id="line-1148"> * Get the backing files associated with this WAL.</span> |
| <span class="source-line-no">1149</span><span id="line-1149"> * @return may be null if there are no files.</span> |
| <span class="source-line-no">1150</span><span id="line-1150"> */</span> |
| <span class="source-line-no">1151</span><span id="line-1151"> FileStatus[] getFiles() throws IOException {</span> |
| <span class="source-line-no">1152</span><span id="line-1152"> return CommonFSUtils.listStatus(fs, walDir, ourFiles);</span> |
| <span class="source-line-no">1153</span><span id="line-1153"> }</span> |
| <span class="source-line-no">1154</span><span id="line-1154"></span> |
| <span class="source-line-no">1155</span><span id="line-1155"> @Override</span> |
| <span class="source-line-no">1156</span><span id="line-1156"> public void shutdown() throws IOException {</span> |
| <span class="source-line-no">1157</span><span id="line-1157"> if (!shutdown.compareAndSet(false, true)) {</span> |
| <span class="source-line-no">1158</span><span id="line-1158"> return;</span> |
| <span class="source-line-no">1159</span><span id="line-1159"> }</span> |
| <span class="source-line-no">1160</span><span id="line-1160"> closed = true;</span> |
| <span class="source-line-no">1161</span><span id="line-1161"> // Tell our listeners that the log is closing</span> |
| <span class="source-line-no">1162</span><span id="line-1162"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1163</span><span id="line-1163"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">1164</span><span id="line-1164"> i.logCloseRequested();</span> |
| <span class="source-line-no">1165</span><span id="line-1165"> }</span> |
| <span class="source-line-no">1166</span><span id="line-1166"> }</span> |
| <span class="source-line-no">1167</span><span id="line-1167"></span> |
| <span class="source-line-no">1168</span><span id="line-1168"> ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(</span> |
| <span class="source-line-no">1169</span><span id="line-1169"> new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());</span> |
| <span class="source-line-no">1170</span><span id="line-1170"></span> |
| <span class="source-line-no">1171</span><span id="line-1171"> Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {</span> |
| <span class="source-line-no">1172</span><span id="line-1172"> @Override</span> |
| <span class="source-line-no">1173</span><span id="line-1173"> public Void call() throws Exception {</span> |
| <span class="source-line-no">1174</span><span id="line-1174"> if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {</span> |
| <span class="source-line-no">1175</span><span id="line-1175"> try {</span> |
| <span class="source-line-no">1176</span><span id="line-1176"> doShutdown();</span> |
| <span class="source-line-no">1177</span><span id="line-1177"> if (syncFutureCache != null) {</span> |
| <span class="source-line-no">1178</span><span id="line-1178"> syncFutureCache.clear();</span> |
| <span class="source-line-no">1179</span><span id="line-1179"> }</span> |
| <span class="source-line-no">1180</span><span id="line-1180"> } finally {</span> |
| <span class="source-line-no">1181</span><span id="line-1181"> rollWriterLock.unlock();</span> |
| <span class="source-line-no">1182</span><span id="line-1182"> }</span> |
| <span class="source-line-no">1183</span><span id="line-1183"> } else {</span> |
| <span class="source-line-no">1184</span><span id="line-1184"> throw new IOException("Waiting for rollWriterLock timeout");</span> |
| <span class="source-line-no">1185</span><span id="line-1185"> }</span> |
| <span class="source-line-no">1186</span><span id="line-1186"> return null;</span> |
| <span class="source-line-no">1187</span><span id="line-1187"> }</span> |
| <span class="source-line-no">1188</span><span id="line-1188"> });</span> |
| <span class="source-line-no">1189</span><span id="line-1189"> shutdownExecutor.shutdown();</span> |
| <span class="source-line-no">1190</span><span id="line-1190"></span> |
| <span class="source-line-no">1191</span><span id="line-1191"> try {</span> |
| <span class="source-line-no">1192</span><span id="line-1192"> future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);</span> |
| <span class="source-line-no">1193</span><span id="line-1193"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">1194</span><span id="line-1194"> throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");</span> |
| <span class="source-line-no">1195</span><span id="line-1195"> } catch (TimeoutException e) {</span> |
| <span class="source-line-no">1196</span><span id="line-1196"> throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"</span> |
| <span class="source-line-no">1197</span><span id="line-1197"> + " the shutdown of WAL doesn't complete! Please check the status of underlying "</span> |
| <span class="source-line-no">1198</span><span id="line-1198"> + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS</span> |
| <span class="source-line-no">1199</span><span id="line-1199"> + "\"", e);</span> |
| <span class="source-line-no">1200</span><span id="line-1200"> } catch (ExecutionException e) {</span> |
| <span class="source-line-no">1201</span><span id="line-1201"> if (e.getCause() instanceof IOException) {</span> |
| <span class="source-line-no">1202</span><span id="line-1202"> throw (IOException) e.getCause();</span> |
| <span class="source-line-no">1203</span><span id="line-1203"> } else {</span> |
| <span class="source-line-no">1204</span><span id="line-1204"> throw new IOException(e.getCause());</span> |
| <span class="source-line-no">1205</span><span id="line-1205"> }</span> |
| <span class="source-line-no">1206</span><span id="line-1206"> } finally {</span> |
| <span class="source-line-no">1207</span><span id="line-1207"> // in shutdown, we may call cleanOldLogs so shutdown this executor in the end.</span> |
| <span class="source-line-no">1208</span><span id="line-1208"> // In sync replication implementation, we may shut down a WAL without shutting down the whole</span> |
| <span class="source-line-no">1209</span><span id="line-1209"> // region server, if we shut down this executor earlier we may get reject execution exception</span> |
| <span class="source-line-no">1210</span><span id="line-1210"> // and abort the region server</span> |
| <span class="source-line-no">1211</span><span id="line-1211"> logArchiveExecutor.shutdown();</span> |
| <span class="source-line-no">1212</span><span id="line-1212"> }</span> |
| <span class="source-line-no">1213</span><span id="line-1213"> // we also need to wait logArchive to finish if we want to a graceful shutdown as we may still</span> |
| <span class="source-line-no">1214</span><span id="line-1214"> // have some pending archiving tasks not finished yet, and in close we may archive all the</span> |
| <span class="source-line-no">1215</span><span id="line-1215"> // remaining WAL files, there could be race if we do not wait for the background archive task</span> |
| <span class="source-line-no">1216</span><span id="line-1216"> // finish</span> |
| <span class="source-line-no">1217</span><span id="line-1217"> try {</span> |
| <span class="source-line-no">1218</span><span id="line-1218"> if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {</span> |
| <span class="source-line-no">1219</span><span id="line-1219"> throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"</span> |
| <span class="source-line-no">1220</span><span id="line-1220"> + " the shutdown of WAL doesn't complete! Please check the status of underlying "</span> |
| <span class="source-line-no">1221</span><span id="line-1221"> + "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS</span> |
| <span class="source-line-no">1222</span><span id="line-1222"> + "\"");</span> |
| <span class="source-line-no">1223</span><span id="line-1223"> }</span> |
| <span class="source-line-no">1224</span><span id="line-1224"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">1225</span><span id="line-1225"> throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");</span> |
| <span class="source-line-no">1226</span><span id="line-1226"> }</span> |
| <span class="source-line-no">1227</span><span id="line-1227"> }</span> |
| <span class="source-line-no">1228</span><span id="line-1228"></span> |
| <span class="source-line-no">1229</span><span id="line-1229"> @Override</span> |
| <span class="source-line-no">1230</span><span id="line-1230"> public void close() throws IOException {</span> |
| <span class="source-line-no">1231</span><span id="line-1231"> shutdown();</span> |
| <span class="source-line-no">1232</span><span id="line-1232"> final FileStatus[] files = getFiles();</span> |
| <span class="source-line-no">1233</span><span id="line-1233"> if (null != files && 0 != files.length) {</span> |
| <span class="source-line-no">1234</span><span id="line-1234"> for (FileStatus file : files) {</span> |
| <span class="source-line-no">1235</span><span id="line-1235"> Path p = getWALArchivePath(this.walArchiveDir, file.getPath());</span> |
| <span class="source-line-no">1236</span><span id="line-1236"> // Tell our listeners that a log is going to be archived.</span> |
| <span class="source-line-no">1237</span><span id="line-1237"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1238</span><span id="line-1238"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">1239</span><span id="line-1239"> i.preLogArchive(file.getPath(), p);</span> |
| <span class="source-line-no">1240</span><span id="line-1240"> }</span> |
| <span class="source-line-no">1241</span><span id="line-1241"> }</span> |
| <span class="source-line-no">1242</span><span id="line-1242"></span> |
| <span class="source-line-no">1243</span><span id="line-1243"> if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {</span> |
| <span class="source-line-no">1244</span><span id="line-1244"> throw new IOException("Unable to rename " + file.getPath() + " to " + p);</span> |
| <span class="source-line-no">1245</span><span id="line-1245"> }</span> |
| <span class="source-line-no">1246</span><span id="line-1246"> // Tell our listeners that a log was archived.</span> |
| <span class="source-line-no">1247</span><span id="line-1247"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1248</span><span id="line-1248"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">1249</span><span id="line-1249"> i.postLogArchive(file.getPath(), p);</span> |
| <span class="source-line-no">1250</span><span id="line-1250"> }</span> |
| <span class="source-line-no">1251</span><span id="line-1251"> }</span> |
| <span class="source-line-no">1252</span><span id="line-1252"> }</span> |
| <span class="source-line-no">1253</span><span id="line-1253"> LOG.debug(</span> |
| <span class="source-line-no">1254</span><span id="line-1254"> "Moved " + files.length + " WAL file(s) to " + CommonFSUtils.getPath(this.walArchiveDir));</span> |
| <span class="source-line-no">1255</span><span id="line-1255"> }</span> |
| <span class="source-line-no">1256</span><span id="line-1256"> LOG.info("Closed WAL: " + toString());</span> |
| <span class="source-line-no">1257</span><span id="line-1257"> }</span> |
| <span class="source-line-no">1258</span><span id="line-1258"></span> |
| <span class="source-line-no">1259</span><span id="line-1259"> /** Returns number of WALs currently in the process of closing. */</span> |
| <span class="source-line-no">1260</span><span id="line-1260"> public int getInflightWALCloseCount() {</span> |
| <span class="source-line-no">1261</span><span id="line-1261"> return inflightWALClosures.size();</span> |
| <span class="source-line-no">1262</span><span id="line-1262"> }</span> |
| <span class="source-line-no">1263</span><span id="line-1263"></span> |
| <span class="source-line-no">1264</span><span id="line-1264"> /**</span> |
| <span class="source-line-no">1265</span><span id="line-1265"> * updates the sequence number of a specific store. depending on the flag: replaces current seq</span> |
| <span class="source-line-no">1266</span><span id="line-1266"> * number if the given seq id is bigger, or even if it is lower than existing one</span> |
| <span class="source-line-no">1267</span><span id="line-1267"> */</span> |
| <span class="source-line-no">1268</span><span id="line-1268"> @Override</span> |
| <span class="source-line-no">1269</span><span id="line-1269"> public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,</span> |
| <span class="source-line-no">1270</span><span id="line-1270"> boolean onlyIfGreater) {</span> |
| <span class="source-line-no">1271</span><span id="line-1271"> sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater);</span> |
| <span class="source-line-no">1272</span><span id="line-1272"> }</span> |
| <span class="source-line-no">1273</span><span id="line-1273"></span> |
| <span class="source-line-no">1274</span><span id="line-1274"> protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {</span> |
| <span class="source-line-no">1275</span><span id="line-1275"> return syncFutureCache.getIfPresentOrNew().reset(sequence, forceSync);</span> |
| <span class="source-line-no">1276</span><span id="line-1276"> }</span> |
| <span class="source-line-no">1277</span><span id="line-1277"></span> |
| <span class="source-line-no">1278</span><span id="line-1278"> protected boolean isLogRollRequested() {</span> |
| <span class="source-line-no">1279</span><span id="line-1279"> return rollRequested.get();</span> |
| <span class="source-line-no">1280</span><span id="line-1280"> }</span> |
| <span class="source-line-no">1281</span><span id="line-1281"></span> |
| <span class="source-line-no">1282</span><span id="line-1282"> protected final void requestLogRoll(final WALActionsListener.RollRequestReason reason) {</span> |
| <span class="source-line-no">1283</span><span id="line-1283"> // If we have already requested a roll, don't do it again</span> |
| <span class="source-line-no">1284</span><span id="line-1284"> // And only set rollRequested to true when there is a registered listener</span> |
| <span class="source-line-no">1285</span><span id="line-1285"> if (!this.listeners.isEmpty() && rollRequested.compareAndSet(false, true)) {</span> |
| <span class="source-line-no">1286</span><span id="line-1286"> for (WALActionsListener i : this.listeners) {</span> |
| <span class="source-line-no">1287</span><span id="line-1287"> i.logRollRequested(reason);</span> |
| <span class="source-line-no">1288</span><span id="line-1288"> }</span> |
| <span class="source-line-no">1289</span><span id="line-1289"> }</span> |
| <span class="source-line-no">1290</span><span id="line-1290"> }</span> |
| <span class="source-line-no">1291</span><span id="line-1291"></span> |
| <span class="source-line-no">1292</span><span id="line-1292"> long getUnflushedEntriesCount() {</span> |
| <span class="source-line-no">1293</span><span id="line-1293"> long highestSynced = this.highestSyncedTxid.get();</span> |
| <span class="source-line-no">1294</span><span id="line-1294"> long highestUnsynced = this.highestUnsyncedTxid;</span> |
| <span class="source-line-no">1295</span><span id="line-1295"> return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced;</span> |
| <span class="source-line-no">1296</span><span id="line-1296"> }</span> |
| <span class="source-line-no">1297</span><span id="line-1297"></span> |
| <span class="source-line-no">1298</span><span id="line-1298"> boolean isUnflushedEntries() {</span> |
| <span class="source-line-no">1299</span><span id="line-1299"> return getUnflushedEntriesCount() > 0;</span> |
| <span class="source-line-no">1300</span><span id="line-1300"> }</span> |
| <span class="source-line-no">1301</span><span id="line-1301"></span> |
| <span class="source-line-no">1302</span><span id="line-1302"> /**</span> |
| <span class="source-line-no">1303</span><span id="line-1303"> * Exposed for testing only. Use to tricks like halt the ring buffer appending.</span> |
| <span class="source-line-no">1304</span><span id="line-1304"> */</span> |
| <span class="source-line-no">1305</span><span id="line-1305"> protected void atHeadOfRingBufferEventHandlerAppend() {</span> |
| <span class="source-line-no">1306</span><span id="line-1306"> // Noop</span> |
| <span class="source-line-no">1307</span><span id="line-1307"> }</span> |
| <span class="source-line-no">1308</span><span id="line-1308"></span> |
| <span class="source-line-no">1309</span><span id="line-1309"> protected final boolean appendEntry(W writer, FSWALEntry entry) throws IOException {</span> |
| <span class="source-line-no">1310</span><span id="line-1310"> // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.</span> |
| <span class="source-line-no">1311</span><span id="line-1311"> atHeadOfRingBufferEventHandlerAppend();</span> |
| <span class="source-line-no">1312</span><span id="line-1312"> long start = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">1313</span><span id="line-1313"> byte[] encodedRegionName = entry.getKey().getEncodedRegionName();</span> |
| <span class="source-line-no">1314</span><span id="line-1314"> long regionSequenceId = entry.getKey().getSequenceId();</span> |
| <span class="source-line-no">1315</span><span id="line-1315"></span> |
| <span class="source-line-no">1316</span><span id="line-1316"> // Edits are empty, there is nothing to append. Maybe empty when we are looking for a</span> |
| <span class="source-line-no">1317</span><span id="line-1317"> // region sequence id only, a region edit/sequence id that is not associated with an actual</span> |
| <span class="source-line-no">1318</span><span id="line-1318"> // edit. It has to go through all the rigmarole to be sure we have the right ordering.</span> |
| <span class="source-line-no">1319</span><span id="line-1319"> if (entry.getEdit().isEmpty()) {</span> |
| <span class="source-line-no">1320</span><span id="line-1320"> return false;</span> |
| <span class="source-line-no">1321</span><span id="line-1321"> }</span> |
| <span class="source-line-no">1322</span><span id="line-1322"></span> |
| <span class="source-line-no">1323</span><span id="line-1323"> // Coprocessor hook.</span> |
| <span class="source-line-no">1324</span><span id="line-1324"> coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());</span> |
| <span class="source-line-no">1325</span><span id="line-1325"> if (!listeners.isEmpty()) {</span> |
| <span class="source-line-no">1326</span><span id="line-1326"> for (WALActionsListener i : listeners) {</span> |
| <span class="source-line-no">1327</span><span id="line-1327"> i.visitLogEntryBeforeWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());</span> |
| <span class="source-line-no">1328</span><span id="line-1328"> }</span> |
| <span class="source-line-no">1329</span><span id="line-1329"> }</span> |
| <span class="source-line-no">1330</span><span id="line-1330"> doAppend(writer, entry);</span> |
| <span class="source-line-no">1331</span><span id="line-1331"> assert highestUnsyncedTxid < entry.getTxid();</span> |
| <span class="source-line-no">1332</span><span id="line-1332"> highestUnsyncedTxid = entry.getTxid();</span> |
| <span class="source-line-no">1333</span><span id="line-1333"> if (entry.isCloseRegion()) {</span> |
| <span class="source-line-no">1334</span><span id="line-1334"> // let's clean all the records of this region</span> |
| <span class="source-line-no">1335</span><span id="line-1335"> sequenceIdAccounting.onRegionClose(encodedRegionName);</span> |
| <span class="source-line-no">1336</span><span id="line-1336"> } else {</span> |
| <span class="source-line-no">1337</span><span id="line-1337"> sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,</span> |
| <span class="source-line-no">1338</span><span id="line-1338"> entry.isInMemStore());</span> |
| <span class="source-line-no">1339</span><span id="line-1339"> }</span> |
| <span class="source-line-no">1340</span><span id="line-1340"> coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());</span> |
| <span class="source-line-no">1341</span><span id="line-1341"> // Update metrics.</span> |
| <span class="source-line-no">1342</span><span id="line-1342"> postAppend(entry, EnvironmentEdgeManager.currentTime() - start);</span> |
| <span class="source-line-no">1343</span><span id="line-1343"> numEntries.incrementAndGet();</span> |
| <span class="source-line-no">1344</span><span id="line-1344"> return true;</span> |
| <span class="source-line-no">1345</span><span id="line-1345"> }</span> |
| <span class="source-line-no">1346</span><span id="line-1346"></span> |
| <span class="source-line-no">1347</span><span id="line-1347"> private long postAppend(final Entry e, final long elapsedTime) throws IOException {</span> |
| <span class="source-line-no">1348</span><span id="line-1348"> long len = 0;</span> |
| <span class="source-line-no">1349</span><span id="line-1349"> if (!listeners.isEmpty()) {</span> |
| <span class="source-line-no">1350</span><span id="line-1350"> for (Cell cell : e.getEdit().getCells()) {</span> |
| <span class="source-line-no">1351</span><span id="line-1351"> len += PrivateCellUtil.estimatedSerializedSizeOf(cell);</span> |
| <span class="source-line-no">1352</span><span id="line-1352"> }</span> |
| <span class="source-line-no">1353</span><span id="line-1353"> for (WALActionsListener listener : listeners) {</span> |
| <span class="source-line-no">1354</span><span id="line-1354"> listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit());</span> |
| <span class="source-line-no">1355</span><span id="line-1355"> }</span> |
| <span class="source-line-no">1356</span><span id="line-1356"> }</span> |
| <span class="source-line-no">1357</span><span id="line-1357"> return len;</span> |
| <span class="source-line-no">1358</span><span id="line-1358"> }</span> |
| <span class="source-line-no">1359</span><span id="line-1359"></span> |
| <span class="source-line-no">1360</span><span id="line-1360"> protected final void postSync(long timeInNanos, int handlerSyncs) {</span> |
| <span class="source-line-no">1361</span><span id="line-1361"> if (timeInNanos > this.slowSyncNs) {</span> |
| <span class="source-line-no">1362</span><span id="line-1362"> String msg = new StringBuilder().append("Slow sync cost: ")</span> |
| <span class="source-line-no">1363</span><span id="line-1363"> .append(TimeUnit.NANOSECONDS.toMillis(timeInNanos)).append(" ms, current pipeline: ")</span> |
| <span class="source-line-no">1364</span><span id="line-1364"> .append(Arrays.toString(getPipeline())).toString();</span> |
| <span class="source-line-no">1365</span><span id="line-1365"> LOG.info(msg);</span> |
| <span class="source-line-no">1366</span><span id="line-1366"> if (timeInNanos > this.rollOnSyncNs) {</span> |
| <span class="source-line-no">1367</span><span id="line-1367"> // A single sync took too long.</span> |
| <span class="source-line-no">1368</span><span id="line-1368"> // Elsewhere in checkSlowSync, called from checkLogRoll, we will look at cumulative</span> |
| <span class="source-line-no">1369</span><span id="line-1369"> // effects. Here we have a single data point that indicates we should take immediate</span> |
| <span class="source-line-no">1370</span><span id="line-1370"> // action, so do so.</span> |
| <span class="source-line-no">1371</span><span id="line-1371"> LOG.warn("Requesting log roll because we exceeded slow sync threshold; time="</span> |
| <span class="source-line-no">1372</span><span id="line-1372"> + TimeUnit.NANOSECONDS.toMillis(timeInNanos) + " ms, threshold="</span> |
| <span class="source-line-no">1373</span><span id="line-1373"> + TimeUnit.NANOSECONDS.toMillis(rollOnSyncNs) + " ms, current pipeline: "</span> |
| <span class="source-line-no">1374</span><span id="line-1374"> + Arrays.toString(getPipeline()));</span> |
| <span class="source-line-no">1375</span><span id="line-1375"> requestLogRoll(SLOW_SYNC);</span> |
| <span class="source-line-no">1376</span><span id="line-1376"> }</span> |
| <span class="source-line-no">1377</span><span id="line-1377"> slowSyncCount.incrementAndGet(); // it's fine to unconditionally increment this</span> |
| <span class="source-line-no">1378</span><span id="line-1378"> }</span> |
| <span class="source-line-no">1379</span><span id="line-1379"> if (!listeners.isEmpty()) {</span> |
| <span class="source-line-no">1380</span><span id="line-1380"> for (WALActionsListener listener : listeners) {</span> |
| <span class="source-line-no">1381</span><span id="line-1381"> listener.postSync(timeInNanos, handlerSyncs);</span> |
| <span class="source-line-no">1382</span><span id="line-1382"> }</span> |
| <span class="source-line-no">1383</span><span id="line-1383"> }</span> |
| <span class="source-line-no">1384</span><span id="line-1384"> }</span> |
| <span class="source-line-no">1385</span><span id="line-1385"></span> |
| <span class="source-line-no">1386</span><span id="line-1386"> protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,</span> |
| <span class="source-line-no">1387</span><span id="line-1387"> WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException {</span> |
| <span class="source-line-no">1388</span><span id="line-1388"> if (this.closed) {</span> |
| <span class="source-line-no">1389</span><span id="line-1389"> throw new IOException(</span> |
| <span class="source-line-no">1390</span><span id="line-1390"> "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());</span> |
| <span class="source-line-no">1391</span><span id="line-1391"> }</span> |
| <span class="source-line-no">1392</span><span id="line-1392"> MutableLong txidHolder = new MutableLong();</span> |
| <span class="source-line-no">1393</span><span id="line-1393"> MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {</span> |
| <span class="source-line-no">1394</span><span id="line-1394"> txidHolder.setValue(ringBuffer.next());</span> |
| <span class="source-line-no">1395</span><span id="line-1395"> });</span> |
| <span class="source-line-no">1396</span><span id="line-1396"> long txid = txidHolder.longValue();</span> |
| <span class="source-line-no">1397</span><span id="line-1397"> ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);</span> |
| <span class="source-line-no">1398</span><span id="line-1398"> try {</span> |
| <span class="source-line-no">1399</span><span id="line-1399"> FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);</span> |
| <span class="source-line-no">1400</span><span id="line-1400"> entry.stampRegionSequenceId(we);</span> |
| <span class="source-line-no">1401</span><span id="line-1401"> ringBuffer.get(txid).load(entry);</span> |
| <span class="source-line-no">1402</span><span id="line-1402"> } finally {</span> |
| <span class="source-line-no">1403</span><span id="line-1403"> ringBuffer.publish(txid);</span> |
| <span class="source-line-no">1404</span><span id="line-1404"> }</span> |
| <span class="source-line-no">1405</span><span id="line-1405"> return txid;</span> |
| <span class="source-line-no">1406</span><span id="line-1406"> }</span> |
| <span class="source-line-no">1407</span><span id="line-1407"></span> |
| <span class="source-line-no">1408</span><span id="line-1408"> @Override</span> |
| <span class="source-line-no">1409</span><span id="line-1409"> public String toString() {</span> |
| <span class="source-line-no">1410</span><span id="line-1410"> return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")";</span> |
| <span class="source-line-no">1411</span><span id="line-1411"> }</span> |
| <span class="source-line-no">1412</span><span id="line-1412"></span> |
| <span class="source-line-no">1413</span><span id="line-1413"> /**</span> |
| <span class="source-line-no">1414</span><span id="line-1414"> * if the given {@code path} is being written currently, then return its length.</span> |
| <span class="source-line-no">1415</span><span id="line-1415"> * <p></span> |
| <span class="source-line-no">1416</span><span id="line-1416"> * This is used by replication to prevent replicating unacked log entries. See</span> |
| <span class="source-line-no">1417</span><span id="line-1417"> * https://issues.apache.org/jira/browse/HBASE-14004 for more details.</span> |
| <span class="source-line-no">1418</span><span id="line-1418"> */</span> |
| <span class="source-line-no">1419</span><span id="line-1419"> @Override</span> |
| <span class="source-line-no">1420</span><span id="line-1420"> public OptionalLong getLogFileSizeIfBeingWritten(Path path) {</span> |
| <span class="source-line-no">1421</span><span id="line-1421"> rollWriterLock.lock();</span> |
| <span class="source-line-no">1422</span><span id="line-1422"> try {</span> |
| <span class="source-line-no">1423</span><span id="line-1423"> Path currentPath = getOldPath();</span> |
| <span class="source-line-no">1424</span><span id="line-1424"> if (path.equals(currentPath)) {</span> |
| <span class="source-line-no">1425</span><span id="line-1425"> // Currently active path.</span> |
| <span class="source-line-no">1426</span><span id="line-1426"> W writer = this.writer;</span> |
| <span class="source-line-no">1427</span><span id="line-1427"> return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();</span> |
| <span class="source-line-no">1428</span><span id="line-1428"> } else {</span> |
| <span class="source-line-no">1429</span><span id="line-1429"> W temp = inflightWALClosures.get(path.getName());</span> |
| <span class="source-line-no">1430</span><span id="line-1430"> if (temp != null) {</span> |
| <span class="source-line-no">1431</span><span id="line-1431"> // In the process of being closed, trailer bytes may or may not be flushed.</span> |
| <span class="source-line-no">1432</span><span id="line-1432"> // Ensuring that we read all the bytes in a file is critical for correctness of tailing</span> |
| <span class="source-line-no">1433</span><span id="line-1433"> // use cases like replication, see HBASE-25924/HBASE-25932.</span> |
| <span class="source-line-no">1434</span><span id="line-1434"> return OptionalLong.of(temp.getSyncedLength());</span> |
| <span class="source-line-no">1435</span><span id="line-1435"> }</span> |
| <span class="source-line-no">1436</span><span id="line-1436"> // Log rolled successfully.</span> |
| <span class="source-line-no">1437</span><span id="line-1437"> return OptionalLong.empty();</span> |
| <span class="source-line-no">1438</span><span id="line-1438"> }</span> |
| <span class="source-line-no">1439</span><span id="line-1439"> } finally {</span> |
| <span class="source-line-no">1440</span><span id="line-1440"> rollWriterLock.unlock();</span> |
| <span class="source-line-no">1441</span><span id="line-1441"> }</span> |
| <span class="source-line-no">1442</span><span id="line-1442"> }</span> |
| <span class="source-line-no">1443</span><span id="line-1443"></span> |
| <span class="source-line-no">1444</span><span id="line-1444"> @Override</span> |
| <span class="source-line-no">1445</span><span id="line-1445"> public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {</span> |
| <span class="source-line-no">1446</span><span id="line-1446"> return TraceUtil.trace(() -> append(info, key, edits, true),</span> |
| <span class="source-line-no">1447</span><span id="line-1447"> () -> createSpan("WAL.appendData"));</span> |
| <span class="source-line-no">1448</span><span id="line-1448"> }</span> |
| <span class="source-line-no">1449</span><span id="line-1449"></span> |
| <span class="source-line-no">1450</span><span id="line-1450"> @Override</span> |
| <span class="source-line-no">1451</span><span id="line-1451"> public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {</span> |
| <span class="source-line-no">1452</span><span id="line-1452"> return TraceUtil.trace(() -> append(info, key, edits, false),</span> |
| <span class="source-line-no">1453</span><span id="line-1453"> () -> createSpan("WAL.appendMarker"));</span> |
| <span class="source-line-no">1454</span><span id="line-1454"> }</span> |
| <span class="source-line-no">1455</span><span id="line-1455"></span> |
| <span class="source-line-no">1456</span><span id="line-1456"> /**</span> |
| <span class="source-line-no">1457</span><span id="line-1457"> * Helper that marks the future as DONE and offers it back to the cache.</span> |
| <span class="source-line-no">1458</span><span id="line-1458"> */</span> |
| <span class="source-line-no">1459</span><span id="line-1459"> protected void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {</span> |
| <span class="source-line-no">1460</span><span id="line-1460"> future.done(txid, t);</span> |
| <span class="source-line-no">1461</span><span id="line-1461"> syncFutureCache.offer(future);</span> |
| <span class="source-line-no">1462</span><span id="line-1462"> }</span> |
| <span class="source-line-no">1463</span><span id="line-1463"></span> |
| <span class="source-line-no">1464</span><span id="line-1464"> private static boolean waitingRoll(int epochAndState) {</span> |
| <span class="source-line-no">1465</span><span id="line-1465"> return (epochAndState & 1) != 0;</span> |
| <span class="source-line-no">1466</span><span id="line-1466"> }</span> |
| <span class="source-line-no">1467</span><span id="line-1467"></span> |
| <span class="source-line-no">1468</span><span id="line-1468"> private static boolean writerBroken(int epochAndState) {</span> |
| <span class="source-line-no">1469</span><span id="line-1469"> return ((epochAndState >>> 1) & 1) != 0;</span> |
| <span class="source-line-no">1470</span><span id="line-1470"> }</span> |
| <span class="source-line-no">1471</span><span id="line-1471"></span> |
| <span class="source-line-no">1472</span><span id="line-1472"> private static int epoch(int epochAndState) {</span> |
| <span class="source-line-no">1473</span><span id="line-1473"> return epochAndState >>> 2;</span> |
| <span class="source-line-no">1474</span><span id="line-1474"> }</span> |
| <span class="source-line-no">1475</span><span id="line-1475"></span> |
| <span class="source-line-no">1476</span><span id="line-1476"> // return whether we have successfully set readyForRolling to true.</span> |
| <span class="source-line-no">1477</span><span id="line-1477"> private boolean trySetReadyForRolling() {</span> |
| <span class="source-line-no">1478</span><span id="line-1478"> // Check without holding lock first. Usually we will just return here.</span> |
| <span class="source-line-no">1479</span><span id="line-1479"> // waitingRoll is volatile and unacedEntries is only accessed inside event loop, so it is safe</span> |
| <span class="source-line-no">1480</span><span id="line-1480"> // to check them outside the consumeLock.</span> |
| <span class="source-line-no">1481</span><span id="line-1481"> if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {</span> |
| <span class="source-line-no">1482</span><span id="line-1482"> return false;</span> |
| <span class="source-line-no">1483</span><span id="line-1483"> }</span> |
| <span class="source-line-no">1484</span><span id="line-1484"> consumeLock.lock();</span> |
| <span class="source-line-no">1485</span><span id="line-1485"> try {</span> |
| <span class="source-line-no">1486</span><span id="line-1486"> // 1. a roll is requested</span> |
| <span class="source-line-no">1487</span><span id="line-1487"> // 2. all out-going entries have been acked(we have confirmed above).</span> |
| <span class="source-line-no">1488</span><span id="line-1488"> if (waitingRoll(epochAndState)) {</span> |
| <span class="source-line-no">1489</span><span id="line-1489"> readyForRolling = true;</span> |
| <span class="source-line-no">1490</span><span id="line-1490"> readyForRollingCond.signalAll();</span> |
| <span class="source-line-no">1491</span><span id="line-1491"> return true;</span> |
| <span class="source-line-no">1492</span><span id="line-1492"> } else {</span> |
| <span class="source-line-no">1493</span><span id="line-1493"> return false;</span> |
| <span class="source-line-no">1494</span><span id="line-1494"> }</span> |
| <span class="source-line-no">1495</span><span id="line-1495"> } finally {</span> |
| <span class="source-line-no">1496</span><span id="line-1496"> consumeLock.unlock();</span> |
| <span class="source-line-no">1497</span><span id="line-1497"> }</span> |
| <span class="source-line-no">1498</span><span id="line-1498"> }</span> |
| <span class="source-line-no">1499</span><span id="line-1499"></span> |
| <span class="source-line-no">1500</span><span id="line-1500"> private void syncFailed(long epochWhenSync, Throwable error) {</span> |
| <span class="source-line-no">1501</span><span id="line-1501"> LOG.warn("sync failed", error);</span> |
| <span class="source-line-no">1502</span><span id="line-1502"> this.onException(epochWhenSync, error);</span> |
| <span class="source-line-no">1503</span><span id="line-1503"> }</span> |
| <span class="source-line-no">1504</span><span id="line-1504"></span> |
| <span class="source-line-no">1505</span><span id="line-1505"> private void onException(long epochWhenSync, Throwable error) {</span> |
| <span class="source-line-no">1506</span><span id="line-1506"> boolean shouldRequestLogRoll = true;</span> |
| <span class="source-line-no">1507</span><span id="line-1507"> consumeLock.lock();</span> |
| <span class="source-line-no">1508</span><span id="line-1508"> try {</span> |
| <span class="source-line-no">1509</span><span id="line-1509"> int currentEpochAndState = epochAndState;</span> |
| <span class="source-line-no">1510</span><span id="line-1510"> if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) {</span> |
| <span class="source-line-no">1511</span><span id="line-1511"> // this is not the previous writer which means we have already rolled the writer.</span> |
| <span class="source-line-no">1512</span><span id="line-1512"> // or this is still the current writer, but we have already marked it as broken and request</span> |
| <span class="source-line-no">1513</span><span id="line-1513"> // a roll.</span> |
| <span class="source-line-no">1514</span><span id="line-1514"> return;</span> |
| <span class="source-line-no">1515</span><span id="line-1515"> }</span> |
| <span class="source-line-no">1516</span><span id="line-1516"> this.epochAndState = currentEpochAndState | 0b10;</span> |
| <span class="source-line-no">1517</span><span id="line-1517"> if (waitingRoll(currentEpochAndState)) {</span> |
| <span class="source-line-no">1518</span><span id="line-1518"> readyForRolling = true;</span> |
| <span class="source-line-no">1519</span><span id="line-1519"> readyForRollingCond.signalAll();</span> |
| <span class="source-line-no">1520</span><span id="line-1520"> // this means we have already in the middle of a rollWriter so just tell the roller thread</span> |
| <span class="source-line-no">1521</span><span id="line-1521"> // that you can continue without requesting an extra log roll.</span> |
| <span class="source-line-no">1522</span><span id="line-1522"> shouldRequestLogRoll = false;</span> |
| <span class="source-line-no">1523</span><span id="line-1523"> }</span> |
| <span class="source-line-no">1524</span><span id="line-1524"> } finally {</span> |
| <span class="source-line-no">1525</span><span id="line-1525"> consumeLock.unlock();</span> |
| <span class="source-line-no">1526</span><span id="line-1526"> }</span> |
| <span class="source-line-no">1527</span><span id="line-1527"> for (Iterator<FSWALEntry> iter = unackedAppends.descendingIterator(); iter.hasNext();) {</span> |
| <span class="source-line-no">1528</span><span id="line-1528"> toWriteAppends.addFirst(iter.next());</span> |
| <span class="source-line-no">1529</span><span id="line-1529"> }</span> |
| <span class="source-line-no">1530</span><span id="line-1530"> highestUnsyncedTxid = highestSyncedTxid.get();</span> |
| <span class="source-line-no">1531</span><span id="line-1531"> if (shouldRequestLogRoll) {</span> |
| <span class="source-line-no">1532</span><span id="line-1532"> // request a roll.</span> |
| <span class="source-line-no">1533</span><span id="line-1533"> requestLogRoll(ERROR);</span> |
| <span class="source-line-no">1534</span><span id="line-1534"> }</span> |
| <span class="source-line-no">1535</span><span id="line-1535"> }</span> |
| <span class="source-line-no">1536</span><span id="line-1536"></span> |
| <span class="source-line-no">1537</span><span id="line-1537"> private void syncCompleted(long epochWhenSync, W writer, long processedTxid, long startTimeNs) {</span> |
| <span class="source-line-no">1538</span><span id="line-1538"> // Please see the last several comments on HBASE-22761, it is possible that we get a</span> |
| <span class="source-line-no">1539</span><span id="line-1539"> // syncCompleted which acks a previous sync request after we received a syncFailed on the same</span> |
| <span class="source-line-no">1540</span><span id="line-1540"> // writer. So here we will also check on the epoch and state, if the epoch has already been</span> |
| <span class="source-line-no">1541</span><span id="line-1541"> // changed, i.e, we have already rolled the writer, or the writer is already broken, we should</span> |
| <span class="source-line-no">1542</span><span id="line-1542"> // just skip here, to avoid mess up the state or accidentally release some WAL entries and</span> |
| <span class="source-line-no">1543</span><span id="line-1543"> // cause data corruption.</span> |
| <span class="source-line-no">1544</span><span id="line-1544"> // The syncCompleted call is on the critical write path, so we should try our best to make it</span> |
| <span class="source-line-no">1545</span><span id="line-1545"> // fast. So here we do not hold consumeLock, for increasing performance. It is safe because</span> |
| <span class="source-line-no">1546</span><span id="line-1546"> // there are only 3 possible situations:</span> |
| <span class="source-line-no">1547</span><span id="line-1547"> // 1. For normal case, the only place where we change epochAndState is when rolling the writer.</span> |
| <span class="source-line-no">1548</span><span id="line-1548"> // Before rolling actually happen, we will only change the state to waitingRoll which is another</span> |
| <span class="source-line-no">1549</span><span id="line-1549"> // bit than writerBroken, and when we actually change the epoch, we can make sure that there is</span> |
| <span class="source-line-no">1550</span><span id="line-1550"> // no outgoing sync request. So we will always pass the check here and there is no problem.</span> |
| <span class="source-line-no">1551</span><span id="line-1551"> // 2. The writer is broken, but we have not called syncFailed yet. In this case, since</span> |
| <span class="source-line-no">1552</span><span id="line-1552"> // syncFailed and syncCompleted are executed in the same thread, we will just face the same</span> |
| <span class="source-line-no">1553</span><span id="line-1553"> // situation with #1.</span> |
| <span class="source-line-no">1554</span><span id="line-1554"> // 3. The writer is broken, and syncFailed has been called. Then when we arrive here, there are</span> |
| <span class="source-line-no">1555</span><span id="line-1555"> // only 2 possible situations:</span> |
| <span class="source-line-no">1556</span><span id="line-1556"> // a. we arrive before we actually roll the writer, then we will find out the writer is broken</span> |
| <span class="source-line-no">1557</span><span id="line-1557"> // and give up.</span> |
| <span class="source-line-no">1558</span><span id="line-1558"> // b. we arrive after we actually roll the writer, then we will find out the epoch is changed</span> |
| <span class="source-line-no">1559</span><span id="line-1559"> // and give up.</span> |
| <span class="source-line-no">1560</span><span id="line-1560"> // For both #a and #b, we do not need to hold the consumeLock as we will always update the</span> |
| <span class="source-line-no">1561</span><span id="line-1561"> // epochAndState as a whole.</span> |
| <span class="source-line-no">1562</span><span id="line-1562"> // So in general, for all the cases above, we do not need to hold the consumeLock.</span> |
| <span class="source-line-no">1563</span><span id="line-1563"> int epochAndState = this.epochAndState;</span> |
| <span class="source-line-no">1564</span><span id="line-1564"> if (epoch(epochAndState) != epochWhenSync || writerBroken(epochAndState)) {</span> |
| <span class="source-line-no">1565</span><span id="line-1565"> LOG.warn("Got a sync complete call after the writer is broken, skip");</span> |
| <span class="source-line-no">1566</span><span id="line-1566"> return;</span> |
| <span class="source-line-no">1567</span><span id="line-1567"> }</span> |
| <span class="source-line-no">1568</span><span id="line-1568"></span> |
| <span class="source-line-no">1569</span><span id="line-1569"> if (processedTxid < highestSyncedTxid.get()) {</span> |
| <span class="source-line-no">1570</span><span id="line-1570"> return;</span> |
| <span class="source-line-no">1571</span><span id="line-1571"> }</span> |
| <span class="source-line-no">1572</span><span id="line-1572"> highestSyncedTxid.set(processedTxid);</span> |
| <span class="source-line-no">1573</span><span id="line-1573"> for (Iterator<FSWALEntry> iter = unackedAppends.iterator(); iter.hasNext();) {</span> |
| <span class="source-line-no">1574</span><span id="line-1574"> FSWALEntry entry = iter.next();</span> |
| <span class="source-line-no">1575</span><span id="line-1575"> if (entry.getTxid() <= processedTxid) {</span> |
| <span class="source-line-no">1576</span><span id="line-1576"> entry.release();</span> |
| <span class="source-line-no">1577</span><span id="line-1577"> iter.remove();</span> |
| <span class="source-line-no">1578</span><span id="line-1578"> } else {</span> |
| <span class="source-line-no">1579</span><span id="line-1579"> break;</span> |
| <span class="source-line-no">1580</span><span id="line-1580"> }</span> |
| <span class="source-line-no">1581</span><span id="line-1581"> }</span> |
| <span class="source-line-no">1582</span><span id="line-1582"> postSync(System.nanoTime() - startTimeNs, finishSync());</span> |
| <span class="source-line-no">1583</span><span id="line-1583"> /**</span> |
| <span class="source-line-no">1584</span><span id="line-1584"> * This method is used to be compatible with the original logic of {@link FSHLog}.</span> |
| <span class="source-line-no">1585</span><span id="line-1585"> */</span> |
| <span class="source-line-no">1586</span><span id="line-1586"> checkSlowSyncCount();</span> |
| <span class="source-line-no">1587</span><span id="line-1587"> if (trySetReadyForRolling()) {</span> |
| <span class="source-line-no">1588</span><span id="line-1588"> // we have just finished a roll, then do not need to check for log rolling, the writer will be</span> |
| <span class="source-line-no">1589</span><span id="line-1589"> // closed soon.</span> |
| <span class="source-line-no">1590</span><span id="line-1590"> return;</span> |
| <span class="source-line-no">1591</span><span id="line-1591"> }</span> |
| <span class="source-line-no">1592</span><span id="line-1592"> // If we haven't already requested a roll, check if we have exceeded logrollsize</span> |
| <span class="source-line-no">1593</span><span id="line-1593"> if (!isLogRollRequested() && writer.getLength() > logrollsize) {</span> |
| <span class="source-line-no">1594</span><span id="line-1594"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">1595</span><span id="line-1595"> LOG.debug("Requesting log roll because of file size threshold; length=" + writer.getLength()</span> |
| <span class="source-line-no">1596</span><span id="line-1596"> + ", logrollsize=" + logrollsize);</span> |
| <span class="source-line-no">1597</span><span id="line-1597"> }</span> |
| <span class="source-line-no">1598</span><span id="line-1598"> requestLogRoll(SIZE);</span> |
| <span class="source-line-no">1599</span><span id="line-1599"> }</span> |
| <span class="source-line-no">1600</span><span id="line-1600"> }</span> |
| <span class="source-line-no">1601</span><span id="line-1601"></span> |
| <span class="source-line-no">1602</span><span id="line-1602"> // find all the sync futures between these two txids to see if we need to issue a hsync, if no</span> |
| <span class="source-line-no">1603</span><span id="line-1603"> // sync futures then just use the default one.</span> |
| <span class="source-line-no">1604</span><span id="line-1604"> private boolean isHsync(long beginTxid, long endTxid) {</span> |
| <span class="source-line-no">1605</span><span id="line-1605"> SortedSet<SyncFuture> futures = syncFutures.subSet(new SyncFuture().reset(beginTxid, false),</span> |
| <span class="source-line-no">1606</span><span id="line-1606"> new SyncFuture().reset(endTxid + 1, false));</span> |
| <span class="source-line-no">1607</span><span id="line-1607"> if (futures.isEmpty()) {</span> |
| <span class="source-line-no">1608</span><span id="line-1608"> return useHsync;</span> |
| <span class="source-line-no">1609</span><span id="line-1609"> }</span> |
| <span class="source-line-no">1610</span><span id="line-1610"> for (SyncFuture future : futures) {</span> |
| <span class="source-line-no">1611</span><span id="line-1611"> if (future.isForceSync()) {</span> |
| <span class="source-line-no">1612</span><span id="line-1612"> return true;</span> |
| <span class="source-line-no">1613</span><span id="line-1613"> }</span> |
| <span class="source-line-no">1614</span><span id="line-1614"> }</span> |
| <span class="source-line-no">1615</span><span id="line-1615"> return false;</span> |
| <span class="source-line-no">1616</span><span id="line-1616"> }</span> |
| <span class="source-line-no">1617</span><span id="line-1617"></span> |
| <span class="source-line-no">1618</span><span id="line-1618"> private void sync(W writer) {</span> |
| <span class="source-line-no">1619</span><span id="line-1619"> fileLengthAtLastSync = writer.getLength();</span> |
| <span class="source-line-no">1620</span><span id="line-1620"> long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;</span> |
| <span class="source-line-no">1621</span><span id="line-1621"> boolean shouldUseHsync =</span> |
| <span class="source-line-no">1622</span><span id="line-1622"> isHsync(highestProcessedAppendTxidAtLastSync, currentHighestProcessedAppendTxid);</span> |
| <span class="source-line-no">1623</span><span id="line-1623"> highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;</span> |
| <span class="source-line-no">1624</span><span id="line-1624"> final long startTimeNs = System.nanoTime();</span> |
| <span class="source-line-no">1625</span><span id="line-1625"> final long epoch = (long) epochAndState >>> 2L;</span> |
| <span class="source-line-no">1626</span><span id="line-1626"> addListener(doWriterSync(writer, shouldUseHsync, currentHighestProcessedAppendTxid),</span> |
| <span class="source-line-no">1627</span><span id="line-1627"> (result, error) -> {</span> |
| <span class="source-line-no">1628</span><span id="line-1628"> if (error != null) {</span> |
| <span class="source-line-no">1629</span><span id="line-1629"> syncFailed(epoch, error);</span> |
| <span class="source-line-no">1630</span><span id="line-1630"> } else {</span> |
| <span class="source-line-no">1631</span><span id="line-1631"> long syncedTxid = getSyncedTxid(currentHighestProcessedAppendTxid, result);</span> |
| <span class="source-line-no">1632</span><span id="line-1632"> syncCompleted(epoch, writer, syncedTxid, startTimeNs);</span> |
| <span class="source-line-no">1633</span><span id="line-1633"> }</span> |
| <span class="source-line-no">1634</span><span id="line-1634"> }, consumeExecutor);</span> |
| <span class="source-line-no">1635</span><span id="line-1635"> }</span> |
| <span class="source-line-no">1636</span><span id="line-1636"></span> |
| <span class="source-line-no">1637</span><span id="line-1637"> /**</span> |
| <span class="source-line-no">1638</span><span id="line-1638"> * This method is to adapt {@link FSHLog} and {@link AsyncFSWAL}. For {@link AsyncFSWAL}, we use</span> |
| <span class="source-line-no">1639</span><span id="line-1639"> * {@link AbstractFSWAL#highestProcessedAppendTxid} at the point we calling</span> |
| <span class="source-line-no">1640</span><span id="line-1640"> * {@link AsyncFSWAL#doWriterSync} method as successful syncedTxid. For {@link FSHLog}, because we</span> |
| <span class="source-line-no">1641</span><span id="line-1641"> * use multi-thread {@code SyncRunner}s, we used the result of {@link CompletableFuture} as</span> |
| <span class="source-line-no">1642</span><span id="line-1642"> * successful syncedTxid.</span> |
| <span class="source-line-no">1643</span><span id="line-1643"> */</span> |
| <span class="source-line-no">1644</span><span id="line-1644"> protected long getSyncedTxid(long processedTxid, long completableFutureResult) {</span> |
| <span class="source-line-no">1645</span><span id="line-1645"> return processedTxid;</span> |
| <span class="source-line-no">1646</span><span id="line-1646"> }</span> |
| <span class="source-line-no">1647</span><span id="line-1647"></span> |
| <span class="source-line-no">1648</span><span id="line-1648"> protected abstract CompletableFuture<Long> doWriterSync(W writer, boolean shouldUseHsync,</span> |
| <span class="source-line-no">1649</span><span id="line-1649"> long txidWhenSyn);</span> |
| <span class="source-line-no">1650</span><span id="line-1650"></span> |
| <span class="source-line-no">1651</span><span id="line-1651"> private int finishSyncLowerThanTxid(long txid) {</span> |
| <span class="source-line-no">1652</span><span id="line-1652"> int finished = 0;</span> |
| <span class="source-line-no">1653</span><span id="line-1653"> for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {</span> |
| <span class="source-line-no">1654</span><span id="line-1654"> SyncFuture sync = iter.next();</span> |
| <span class="source-line-no">1655</span><span id="line-1655"> if (sync.getTxid() <= txid) {</span> |
| <span class="source-line-no">1656</span><span id="line-1656"> markFutureDoneAndOffer(sync, txid, null);</span> |
| <span class="source-line-no">1657</span><span id="line-1657"> iter.remove();</span> |
| <span class="source-line-no">1658</span><span id="line-1658"> finished++;</span> |
| <span class="source-line-no">1659</span><span id="line-1659"> } else {</span> |
| <span class="source-line-no">1660</span><span id="line-1660"> break;</span> |
| <span class="source-line-no">1661</span><span id="line-1661"> }</span> |
| <span class="source-line-no">1662</span><span id="line-1662"> }</span> |
| <span class="source-line-no">1663</span><span id="line-1663"> return finished;</span> |
| <span class="source-line-no">1664</span><span id="line-1664"> }</span> |
| <span class="source-line-no">1665</span><span id="line-1665"></span> |
| <span class="source-line-no">1666</span><span id="line-1666"> // try advancing the highestSyncedTxid as much as possible</span> |
| <span class="source-line-no">1667</span><span id="line-1667"> private int finishSync() {</span> |
| <span class="source-line-no">1668</span><span id="line-1668"> if (unackedAppends.isEmpty()) {</span> |
| <span class="source-line-no">1669</span><span id="line-1669"> // All outstanding appends have been acked.</span> |
| <span class="source-line-no">1670</span><span id="line-1670"> if (toWriteAppends.isEmpty()) {</span> |
| <span class="source-line-no">1671</span><span id="line-1671"> // Also no appends that wait to be written out, then just finished all pending syncs.</span> |
| <span class="source-line-no">1672</span><span id="line-1672"> long maxSyncTxid = highestSyncedTxid.get();</span> |
| <span class="source-line-no">1673</span><span id="line-1673"> for (SyncFuture sync : syncFutures) {</span> |
| <span class="source-line-no">1674</span><span id="line-1674"> maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());</span> |
| <span class="source-line-no">1675</span><span id="line-1675"> markFutureDoneAndOffer(sync, maxSyncTxid, null);</span> |
| <span class="source-line-no">1676</span><span id="line-1676"> }</span> |
| <span class="source-line-no">1677</span><span id="line-1677"> highestSyncedTxid.set(maxSyncTxid);</span> |
| <span class="source-line-no">1678</span><span id="line-1678"> int finished = syncFutures.size();</span> |
| <span class="source-line-no">1679</span><span id="line-1679"> syncFutures.clear();</span> |
| <span class="source-line-no">1680</span><span id="line-1680"> return finished;</span> |
| <span class="source-line-no">1681</span><span id="line-1681"> } else {</span> |
| <span class="source-line-no">1682</span><span id="line-1682"> // There is no append between highestProcessedAppendTxid and lowestUnprocessedAppendTxid, so</span> |
| <span class="source-line-no">1683</span><span id="line-1683"> // if highestSyncedTxid >= highestProcessedAppendTxid, then all syncs whose txid are between</span> |
| <span class="source-line-no">1684</span><span id="line-1684"> // highestProcessedAppendTxid and lowestUnprocessedAppendTxid can be finished.</span> |
| <span class="source-line-no">1685</span><span id="line-1685"> long lowestUnprocessedAppendTxid = toWriteAppends.peek().getTxid();</span> |
| <span class="source-line-no">1686</span><span id="line-1686"> assert lowestUnprocessedAppendTxid > highestProcessedAppendTxid;</span> |
| <span class="source-line-no">1687</span><span id="line-1687"> long doneTxid = lowestUnprocessedAppendTxid - 1;</span> |
| <span class="source-line-no">1688</span><span id="line-1688"> highestSyncedTxid.set(doneTxid);</span> |
| <span class="source-line-no">1689</span><span id="line-1689"> return finishSyncLowerThanTxid(doneTxid);</span> |
| <span class="source-line-no">1690</span><span id="line-1690"> }</span> |
| <span class="source-line-no">1691</span><span id="line-1691"> } else {</span> |
| <span class="source-line-no">1692</span><span id="line-1692"> // There are still unacked appends. So let's move the highestSyncedTxid to the txid of the</span> |
| <span class="source-line-no">1693</span><span id="line-1693"> // first unacked append minus 1.</span> |
| <span class="source-line-no">1694</span><span id="line-1694"> long lowestUnackedAppendTxid = unackedAppends.peek().getTxid();</span> |
| <span class="source-line-no">1695</span><span id="line-1695"> long doneTxid = Math.max(lowestUnackedAppendTxid - 1, highestSyncedTxid.get());</span> |
| <span class="source-line-no">1696</span><span id="line-1696"> highestSyncedTxid.set(doneTxid);</span> |
| <span class="source-line-no">1697</span><span id="line-1697"> return finishSyncLowerThanTxid(doneTxid);</span> |
| <span class="source-line-no">1698</span><span id="line-1698"> }</span> |
| <span class="source-line-no">1699</span><span id="line-1699"> }</span> |
| <span class="source-line-no">1700</span><span id="line-1700"></span> |
| <span class="source-line-no">1701</span><span id="line-1701"> // confirm non-empty before calling</span> |
| <span class="source-line-no">1702</span><span id="line-1702"> private static long getLastTxid(Deque<FSWALEntry> queue) {</span> |
| <span class="source-line-no">1703</span><span id="line-1703"> return queue.peekLast().getTxid();</span> |
| <span class="source-line-no">1704</span><span id="line-1704"> }</span> |
| <span class="source-line-no">1705</span><span id="line-1705"></span> |
| <span class="source-line-no">1706</span><span id="line-1706"> private void appendAndSync() throws IOException {</span> |
| <span class="source-line-no">1707</span><span id="line-1707"> final W writer = this.writer;</span> |
| <span class="source-line-no">1708</span><span id="line-1708"> // maybe a sync request is not queued when we issue a sync, so check here to see if we could</span> |
| <span class="source-line-no">1709</span><span id="line-1709"> // finish some.</span> |
| <span class="source-line-no">1710</span><span id="line-1710"> finishSync();</span> |
| <span class="source-line-no">1711</span><span id="line-1711"> long newHighestProcessedAppendTxid = -1L;</span> |
| <span class="source-line-no">1712</span><span id="line-1712"> // this is used to avoid calling peedLast every time on unackedAppends, appendAndAsync is single</span> |
| <span class="source-line-no">1713</span><span id="line-1713"> // threaded, this could save us some cycles</span> |
| <span class="source-line-no">1714</span><span id="line-1714"> boolean addedToUnackedAppends = false;</span> |
| <span class="source-line-no">1715</span><span id="line-1715"> for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) {</span> |
| <span class="source-line-no">1716</span><span id="line-1716"> FSWALEntry entry = iter.next();</span> |
| <span class="source-line-no">1717</span><span id="line-1717"> /**</span> |
| <span class="source-line-no">1718</span><span id="line-1718"> * For {@link FSHog},here may throw IOException,but for {@link AsyncFSWAL}, here would not</span> |
| <span class="source-line-no">1719</span><span id="line-1719"> * throw any IOException.</span> |
| <span class="source-line-no">1720</span><span id="line-1720"> */</span> |
| <span class="source-line-no">1721</span><span id="line-1721"> boolean appended = appendEntry(writer, entry);</span> |
| <span class="source-line-no">1722</span><span id="line-1722"> newHighestProcessedAppendTxid = entry.getTxid();</span> |
| <span class="source-line-no">1723</span><span id="line-1723"> iter.remove();</span> |
| <span class="source-line-no">1724</span><span id="line-1724"> if (appended) {</span> |
| <span class="source-line-no">1725</span><span id="line-1725"> // This is possible, when we fail to sync, we will add the unackedAppends back to</span> |
| <span class="source-line-no">1726</span><span id="line-1726"> // toWriteAppends, so here we may get an entry which is already in the unackedAppends.</span> |
| <span class="source-line-no">1727</span><span id="line-1727"> if (</span> |
| <span class="source-line-no">1728</span><span id="line-1728"> addedToUnackedAppends || unackedAppends.isEmpty()</span> |
| <span class="source-line-no">1729</span><span id="line-1729"> || getLastTxid(unackedAppends) < entry.getTxid()</span> |
| <span class="source-line-no">1730</span><span id="line-1730"> ) {</span> |
| <span class="source-line-no">1731</span><span id="line-1731"> unackedAppends.addLast(entry);</span> |
| <span class="source-line-no">1732</span><span id="line-1732"> addedToUnackedAppends = true;</span> |
| <span class="source-line-no">1733</span><span id="line-1733"> }</span> |
| <span class="source-line-no">1734</span><span id="line-1734"> // See HBASE-25905, here we need to make sure that, we will always write all the entries in</span> |
| <span class="source-line-no">1735</span><span id="line-1735"> // unackedAppends out. As the code in the consume method will assume that, the entries in</span> |
| <span class="source-line-no">1736</span><span id="line-1736"> // unackedAppends have all been sent out so if there is roll request and unackedAppends is</span> |
| <span class="source-line-no">1737</span><span id="line-1737"> // not empty, we could just return as later there will be a syncCompleted call to clear the</span> |
| <span class="source-line-no">1738</span><span id="line-1738"> // unackedAppends, or a syncFailed to lead us to another state.</span> |
| <span class="source-line-no">1739</span><span id="line-1739"> // There could be other ways to fix, such as changing the logic in the consume method, but</span> |
| <span class="source-line-no">1740</span><span id="line-1740"> // it will break the assumption and then (may) lead to a big refactoring. So here let's use</span> |
| <span class="source-line-no">1741</span><span id="line-1741"> // this way to fix first, can optimize later.</span> |
| <span class="source-line-no">1742</span><span id="line-1742"> if (</span> |
| <span class="source-line-no">1743</span><span id="line-1743"> writer.getLength() - fileLengthAtLastSync >= batchSize</span> |
| <span class="source-line-no">1744</span><span id="line-1744"> && (addedToUnackedAppends || entry.getTxid() >= getLastTxid(unackedAppends))</span> |
| <span class="source-line-no">1745</span><span id="line-1745"> ) {</span> |
| <span class="source-line-no">1746</span><span id="line-1746"> break;</span> |
| <span class="source-line-no">1747</span><span id="line-1747"> }</span> |
| <span class="source-line-no">1748</span><span id="line-1748"> }</span> |
| <span class="source-line-no">1749</span><span id="line-1749"> }</span> |
| <span class="source-line-no">1750</span><span id="line-1750"> // if we have a newer transaction id, update it.</span> |
| <span class="source-line-no">1751</span><span id="line-1751"> // otherwise, use the previous transaction id.</span> |
| <span class="source-line-no">1752</span><span id="line-1752"> if (newHighestProcessedAppendTxid > 0) {</span> |
| <span class="source-line-no">1753</span><span id="line-1753"> highestProcessedAppendTxid = newHighestProcessedAppendTxid;</span> |
| <span class="source-line-no">1754</span><span id="line-1754"> } else {</span> |
| <span class="source-line-no">1755</span><span id="line-1755"> newHighestProcessedAppendTxid = highestProcessedAppendTxid;</span> |
| <span class="source-line-no">1756</span><span id="line-1756"> }</span> |
| <span class="source-line-no">1757</span><span id="line-1757"></span> |
| <span class="source-line-no">1758</span><span id="line-1758"> if (writer.getLength() - fileLengthAtLastSync >= batchSize) {</span> |
| <span class="source-line-no">1759</span><span id="line-1759"> // sync because buffer size limit.</span> |
| <span class="source-line-no">1760</span><span id="line-1760"> sync(writer);</span> |
| <span class="source-line-no">1761</span><span id="line-1761"> return;</span> |
| <span class="source-line-no">1762</span><span id="line-1762"> }</span> |
| <span class="source-line-no">1763</span><span id="line-1763"> if (writer.getLength() == fileLengthAtLastSync) {</span> |
| <span class="source-line-no">1764</span><span id="line-1764"> // we haven't written anything out, just advance the highestSyncedSequence since we may only</span> |
| <span class="source-line-no">1765</span><span id="line-1765"> // stamp some region sequence id.</span> |
| <span class="source-line-no">1766</span><span id="line-1766"> if (unackedAppends.isEmpty()) {</span> |
| <span class="source-line-no">1767</span><span id="line-1767"> highestSyncedTxid.set(highestProcessedAppendTxid);</span> |
| <span class="source-line-no">1768</span><span id="line-1768"> finishSync();</span> |
| <span class="source-line-no">1769</span><span id="line-1769"> trySetReadyForRolling();</span> |
| <span class="source-line-no">1770</span><span id="line-1770"> }</span> |
| <span class="source-line-no">1771</span><span id="line-1771"> return;</span> |
| <span class="source-line-no">1772</span><span id="line-1772"> }</span> |
| <span class="source-line-no">1773</span><span id="line-1773"> // reach here means that we have some unsynced data but haven't reached the batch size yet,</span> |
| <span class="source-line-no">1774</span><span id="line-1774"> // but we will not issue a sync directly here even if there are sync requests because we may</span> |
| <span class="source-line-no">1775</span><span id="line-1775"> // have some new data in the ringbuffer, so let's just return here and delay the decision of</span> |
| <span class="source-line-no">1776</span><span id="line-1776"> // whether to issue a sync in the caller method.</span> |
| <span class="source-line-no">1777</span><span id="line-1777"> }</span> |
| <span class="source-line-no">1778</span><span id="line-1778"></span> |
| <span class="source-line-no">1779</span><span id="line-1779"> private void consume() {</span> |
| <span class="source-line-no">1780</span><span id="line-1780"> consumeLock.lock();</span> |
| <span class="source-line-no">1781</span><span id="line-1781"> try {</span> |
| <span class="source-line-no">1782</span><span id="line-1782"> int currentEpochAndState = epochAndState;</span> |
| <span class="source-line-no">1783</span><span id="line-1783"> if (writerBroken(currentEpochAndState)) {</span> |
| <span class="source-line-no">1784</span><span id="line-1784"> return;</span> |
| <span class="source-line-no">1785</span><span id="line-1785"> }</span> |
| <span class="source-line-no">1786</span><span id="line-1786"> if (waitingRoll(currentEpochAndState)) {</span> |
| <span class="source-line-no">1787</span><span id="line-1787"> if (writer.getLength() > fileLengthAtLastSync) {</span> |
| <span class="source-line-no">1788</span><span id="line-1788"> // issue a sync</span> |
| <span class="source-line-no">1789</span><span id="line-1789"> sync(writer);</span> |
| <span class="source-line-no">1790</span><span id="line-1790"> } else {</span> |
| <span class="source-line-no">1791</span><span id="line-1791"> if (unackedAppends.isEmpty()) {</span> |
| <span class="source-line-no">1792</span><span id="line-1792"> readyForRolling = true;</span> |
| <span class="source-line-no">1793</span><span id="line-1793"> readyForRollingCond.signalAll();</span> |
| <span class="source-line-no">1794</span><span id="line-1794"> }</span> |
| <span class="source-line-no">1795</span><span id="line-1795"> }</span> |
| <span class="source-line-no">1796</span><span id="line-1796"> return;</span> |
| <span class="source-line-no">1797</span><span id="line-1797"> }</span> |
| <span class="source-line-no">1798</span><span id="line-1798"> } finally {</span> |
| <span class="source-line-no">1799</span><span id="line-1799"> consumeLock.unlock();</span> |
| <span class="source-line-no">1800</span><span id="line-1800"> }</span> |
| <span class="source-line-no">1801</span><span id="line-1801"> long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;</span> |
| <span class="source-line-no">1802</span><span id="line-1802"> for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor</span> |
| <span class="source-line-no">1803</span><span id="line-1803"> <= cursorBound; nextCursor++) {</span> |
| <span class="source-line-no">1804</span><span id="line-1804"> if (!waitingConsumePayloads.isPublished(nextCursor)) {</span> |
| <span class="source-line-no">1805</span><span id="line-1805"> break;</span> |
| <span class="source-line-no">1806</span><span id="line-1806"> }</span> |
| <span class="source-line-no">1807</span><span id="line-1807"> RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);</span> |
| <span class="source-line-no">1808</span><span id="line-1808"> switch (truck.type()) {</span> |
| <span class="source-line-no">1809</span><span id="line-1809"> case APPEND:</span> |
| <span class="source-line-no">1810</span><span id="line-1810"> toWriteAppends.addLast(truck.unloadAppend());</span> |
| <span class="source-line-no">1811</span><span id="line-1811"> break;</span> |
| <span class="source-line-no">1812</span><span id="line-1812"> case SYNC:</span> |
| <span class="source-line-no">1813</span><span id="line-1813"> syncFutures.add(truck.unloadSync());</span> |
| <span class="source-line-no">1814</span><span id="line-1814"> break;</span> |
| <span class="source-line-no">1815</span><span id="line-1815"> default:</span> |
| <span class="source-line-no">1816</span><span id="line-1816"> LOG.warn("RingBufferTruck with unexpected type: " + truck.type());</span> |
| <span class="source-line-no">1817</span><span id="line-1817"> break;</span> |
| <span class="source-line-no">1818</span><span id="line-1818"> }</span> |
| <span class="source-line-no">1819</span><span id="line-1819"> waitingConsumePayloadsGatingSequence.set(nextCursor);</span> |
| <span class="source-line-no">1820</span><span id="line-1820"> }</span> |
| <span class="source-line-no">1821</span><span id="line-1821"></span> |
| <span class="source-line-no">1822</span><span id="line-1822"> /**</span> |
| <span class="source-line-no">1823</span><span id="line-1823"> * This method is used to be compatible with the original logic of {@link AsyncFSWAL}.</span> |
| <span class="source-line-no">1824</span><span id="line-1824"> */</span> |
| <span class="source-line-no">1825</span><span id="line-1825"> if (markerEditOnly) {</span> |
| <span class="source-line-no">1826</span><span id="line-1826"> drainNonMarkerEditsAndFailSyncs();</span> |
| <span class="source-line-no">1827</span><span id="line-1827"> }</span> |
| <span class="source-line-no">1828</span><span id="line-1828"> try {</span> |
| <span class="source-line-no">1829</span><span id="line-1829"> appendAndSync();</span> |
| <span class="source-line-no">1830</span><span id="line-1830"> } catch (IOException exception) {</span> |
| <span class="source-line-no">1831</span><span id="line-1831"> /**</span> |
| <span class="source-line-no">1832</span><span id="line-1832"> * For {@link FSHog},here may catch IOException,but for {@link AsyncFSWAL}, the code doesn't</span> |
| <span class="source-line-no">1833</span><span id="line-1833"> * go in here.</span> |
| <span class="source-line-no">1834</span><span id="line-1834"> */</span> |
| <span class="source-line-no">1835</span><span id="line-1835"> LOG.error("appendAndSync throws IOException.", exception);</span> |
| <span class="source-line-no">1836</span><span id="line-1836"> onAppendEntryFailed(exception);</span> |
| <span class="source-line-no">1837</span><span id="line-1837"> return;</span> |
| <span class="source-line-no">1838</span><span id="line-1838"> }</span> |
| <span class="source-line-no">1839</span><span id="line-1839"> if (hasConsumerTask.get()) {</span> |
| <span class="source-line-no">1840</span><span id="line-1840"> return;</span> |
| <span class="source-line-no">1841</span><span id="line-1841"> }</span> |
| <span class="source-line-no">1842</span><span id="line-1842"> if (toWriteAppends.isEmpty()) {</span> |
| <span class="source-line-no">1843</span><span id="line-1843"> if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {</span> |
| <span class="source-line-no">1844</span><span id="line-1844"> consumerScheduled.set(false);</span> |
| <span class="source-line-no">1845</span><span id="line-1845"> // recheck here since in append and sync we do not hold the consumeLock. Thing may</span> |
| <span class="source-line-no">1846</span><span id="line-1846"> // happen like</span> |
| <span class="source-line-no">1847</span><span id="line-1847"> // 1. we check cursor, no new entry</span> |
| <span class="source-line-no">1848</span><span id="line-1848"> // 2. someone publishes a new entry to ringbuffer and the consumerScheduled is true and</span> |
| <span class="source-line-no">1849</span><span id="line-1849"> // give up scheduling the consumer task.</span> |
| <span class="source-line-no">1850</span><span id="line-1850"> // 3. we set consumerScheduled to false and also give up scheduling consumer task.</span> |
| <span class="source-line-no">1851</span><span id="line-1851"> if (waitingConsumePayloadsGatingSequence.get() == waitingConsumePayloads.getCursor()) {</span> |
| <span class="source-line-no">1852</span><span id="line-1852"> // we will give up consuming so if there are some unsynced data we need to issue a sync.</span> |
| <span class="source-line-no">1853</span><span id="line-1853"> if (</span> |
| <span class="source-line-no">1854</span><span id="line-1854"> writer.getLength() > fileLengthAtLastSync && !syncFutures.isEmpty()</span> |
| <span class="source-line-no">1855</span><span id="line-1855"> && syncFutures.last().getTxid() > highestProcessedAppendTxidAtLastSync</span> |
| <span class="source-line-no">1856</span><span id="line-1856"> ) {</span> |
| <span class="source-line-no">1857</span><span id="line-1857"> // no new data in the ringbuffer and we have at least one sync request</span> |
| <span class="source-line-no">1858</span><span id="line-1858"> sync(writer);</span> |
| <span class="source-line-no">1859</span><span id="line-1859"> }</span> |
| <span class="source-line-no">1860</span><span id="line-1860"> return;</span> |
| <span class="source-line-no">1861</span><span id="line-1861"> } else {</span> |
| <span class="source-line-no">1862</span><span id="line-1862"> // maybe someone has grabbed this before us</span> |
| <span class="source-line-no">1863</span><span id="line-1863"> if (!consumerScheduled.compareAndSet(false, true)) {</span> |
| <span class="source-line-no">1864</span><span id="line-1864"> return;</span> |
| <span class="source-line-no">1865</span><span id="line-1865"> }</span> |
| <span class="source-line-no">1866</span><span id="line-1866"> }</span> |
| <span class="source-line-no">1867</span><span id="line-1867"> }</span> |
| <span class="source-line-no">1868</span><span id="line-1868"> }</span> |
| <span class="source-line-no">1869</span><span id="line-1869"> // reschedule if we still have something to write.</span> |
| <span class="source-line-no">1870</span><span id="line-1870"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">1871</span><span id="line-1871"> }</span> |
| <span class="source-line-no">1872</span><span id="line-1872"></span> |
| <span class="source-line-no">1873</span><span id="line-1873"> private boolean shouldScheduleConsumer() {</span> |
| <span class="source-line-no">1874</span><span id="line-1874"> int currentEpochAndState = epochAndState;</span> |
| <span class="source-line-no">1875</span><span id="line-1875"> if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) {</span> |
| <span class="source-line-no">1876</span><span id="line-1876"> return false;</span> |
| <span class="source-line-no">1877</span><span id="line-1877"> }</span> |
| <span class="source-line-no">1878</span><span id="line-1878"> return consumerScheduled.compareAndSet(false, true);</span> |
| <span class="source-line-no">1879</span><span id="line-1879"> }</span> |
| <span class="source-line-no">1880</span><span id="line-1880"></span> |
| <span class="source-line-no">1881</span><span id="line-1881"> /**</span> |
| <span class="source-line-no">1882</span><span id="line-1882"> * Append a set of edits to the WAL.</span> |
| <span class="source-line-no">1883</span><span id="line-1883"> * <p/></span> |
| <span class="source-line-no">1884</span><span id="line-1884"> * The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must</span> |
| <span class="source-line-no">1885</span><span id="line-1885"> * have its region edit/sequence id assigned else it messes up our unification of mvcc and</span> |
| <span class="source-line-no">1886</span><span id="line-1886"> * sequenceid. On return <code>key</code> will have the region edit/sequence id filled in.</span> |
| <span class="source-line-no">1887</span><span id="line-1887"> * <p/></span> |
| <span class="source-line-no">1888</span><span id="line-1888"> * NOTE: This appends, at a time that is usually after this call returns, starts a mvcc</span> |
| <span class="source-line-no">1889</span><span id="line-1889"> * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment</span> |
| <span class="source-line-no">1890</span><span id="line-1890"> * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must</span> |
| <span class="source-line-no">1891</span><span id="line-1891"> * 'complete' the transaction this mvcc transaction by calling</span> |
| <span class="source-line-no">1892</span><span id="line-1892"> * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it</span> |
| <span class="source-line-no">1893</span><span id="line-1893"> * in the finally of a try/finally block within which this appends lives and any subsequent</span> |
| <span class="source-line-no">1894</span><span id="line-1894"> * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the</span> |
| <span class="source-line-no">1895</span><span id="line-1895"> * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not</span> |
| <span class="source-line-no">1896</span><span id="line-1896"> * immediately available on return from this method. It WILL be available subsequent to a sync of</span> |
| <span class="source-line-no">1897</span><span id="line-1897"> * this append; otherwise, you will just have to wait on the WriteEntry to get filled in.</span> |
| <span class="source-line-no">1898</span><span id="line-1898"> * @param hri the regioninfo associated with append</span> |
| <span class="source-line-no">1899</span><span id="line-1899"> * @param key Modified by this call; we add to it this edits region edit/sequence id.</span> |
| <span class="source-line-no">1900</span><span id="line-1900"> * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit</span> |
| <span class="source-line-no">1901</span><span id="line-1901"> * sequence id that is after all currently appended edits.</span> |
| <span class="source-line-no">1902</span><span id="line-1902"> * @param inMemstore Always true except for case where we are writing a region event meta marker</span> |
| <span class="source-line-no">1903</span><span id="line-1903"> * edit, for example, a compaction completion record into the WAL or noting a</span> |
| <span class="source-line-no">1904</span><span id="line-1904"> * Region Open event. In these cases the entry is just so we can finish an</span> |
| <span class="source-line-no">1905</span><span id="line-1905"> * unfinished compaction after a crash when the new Server reads the WAL on</span> |
| <span class="source-line-no">1906</span><span id="line-1906"> * recovery, etc. These transition event 'Markers' do not go via the memstore.</span> |
| <span class="source-line-no">1907</span><span id="line-1907"> * When memstore is false, we presume a Marker event edit.</span> |
| <span class="source-line-no">1908</span><span id="line-1908"> * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id</span> |
| <span class="source-line-no">1909</span><span id="line-1909"> * in it.</span> |
| <span class="source-line-no">1910</span><span id="line-1910"> */</span> |
| <span class="source-line-no">1911</span><span id="line-1911"> protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)</span> |
| <span class="source-line-no">1912</span><span id="line-1912"> throws IOException {</span> |
| <span class="source-line-no">1913</span><span id="line-1913"> if (markerEditOnly && !edits.isMetaEdit()) {</span> |
| <span class="source-line-no">1914</span><span id="line-1914"> throw new IOException("WAL is closing, only marker edit is allowed");</span> |
| <span class="source-line-no">1915</span><span id="line-1915"> }</span> |
| <span class="source-line-no">1916</span><span id="line-1916"> long txid =</span> |
| <span class="source-line-no">1917</span><span id="line-1917"> stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);</span> |
| <span class="source-line-no">1918</span><span id="line-1918"> if (shouldScheduleConsumer()) {</span> |
| <span class="source-line-no">1919</span><span id="line-1919"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">1920</span><span id="line-1920"> }</span> |
| <span class="source-line-no">1921</span><span id="line-1921"> return txid;</span> |
| <span class="source-line-no">1922</span><span id="line-1922"> }</span> |
| <span class="source-line-no">1923</span><span id="line-1923"></span> |
| <span class="source-line-no">1924</span><span id="line-1924"> protected void doSync(boolean forceSync) throws IOException {</span> |
| <span class="source-line-no">1925</span><span id="line-1925"> long txid = waitingConsumePayloads.next();</span> |
| <span class="source-line-no">1926</span><span id="line-1926"> SyncFuture future;</span> |
| <span class="source-line-no">1927</span><span id="line-1927"> try {</span> |
| <span class="source-line-no">1928</span><span id="line-1928"> future = getSyncFuture(txid, forceSync);</span> |
| <span class="source-line-no">1929</span><span id="line-1929"> RingBufferTruck truck = waitingConsumePayloads.get(txid);</span> |
| <span class="source-line-no">1930</span><span id="line-1930"> truck.load(future);</span> |
| <span class="source-line-no">1931</span><span id="line-1931"> } finally {</span> |
| <span class="source-line-no">1932</span><span id="line-1932"> waitingConsumePayloads.publish(txid);</span> |
| <span class="source-line-no">1933</span><span id="line-1933"> }</span> |
| <span class="source-line-no">1934</span><span id="line-1934"> if (shouldScheduleConsumer()) {</span> |
| <span class="source-line-no">1935</span><span id="line-1935"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">1936</span><span id="line-1936"> }</span> |
| <span class="source-line-no">1937</span><span id="line-1937"> blockOnSync(future);</span> |
| <span class="source-line-no">1938</span><span id="line-1938"> }</span> |
| <span class="source-line-no">1939</span><span id="line-1939"></span> |
| <span class="source-line-no">1940</span><span id="line-1940"> protected void doSync(long txid, boolean forceSync) throws IOException {</span> |
| <span class="source-line-no">1941</span><span id="line-1941"> if (highestSyncedTxid.get() >= txid) {</span> |
| <span class="source-line-no">1942</span><span id="line-1942"> return;</span> |
| <span class="source-line-no">1943</span><span id="line-1943"> }</span> |
| <span class="source-line-no">1944</span><span id="line-1944"> // here we do not use ring buffer sequence as txid</span> |
| <span class="source-line-no">1945</span><span id="line-1945"> long sequence = waitingConsumePayloads.next();</span> |
| <span class="source-line-no">1946</span><span id="line-1946"> SyncFuture future;</span> |
| <span class="source-line-no">1947</span><span id="line-1947"> try {</span> |
| <span class="source-line-no">1948</span><span id="line-1948"> future = getSyncFuture(txid, forceSync);</span> |
| <span class="source-line-no">1949</span><span id="line-1949"> RingBufferTruck truck = waitingConsumePayloads.get(sequence);</span> |
| <span class="source-line-no">1950</span><span id="line-1950"> truck.load(future);</span> |
| <span class="source-line-no">1951</span><span id="line-1951"> } finally {</span> |
| <span class="source-line-no">1952</span><span id="line-1952"> waitingConsumePayloads.publish(sequence);</span> |
| <span class="source-line-no">1953</span><span id="line-1953"> }</span> |
| <span class="source-line-no">1954</span><span id="line-1954"> if (shouldScheduleConsumer()) {</span> |
| <span class="source-line-no">1955</span><span id="line-1955"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">1956</span><span id="line-1956"> }</span> |
| <span class="source-line-no">1957</span><span id="line-1957"> blockOnSync(future);</span> |
| <span class="source-line-no">1958</span><span id="line-1958"> }</span> |
| <span class="source-line-no">1959</span><span id="line-1959"></span> |
| <span class="source-line-no">1960</span><span id="line-1960"> private void drainNonMarkerEditsAndFailSyncs() {</span> |
| <span class="source-line-no">1961</span><span id="line-1961"> if (toWriteAppends.isEmpty()) {</span> |
| <span class="source-line-no">1962</span><span id="line-1962"> return;</span> |
| <span class="source-line-no">1963</span><span id="line-1963"> }</span> |
| <span class="source-line-no">1964</span><span id="line-1964"> boolean hasNonMarkerEdits = false;</span> |
| <span class="source-line-no">1965</span><span id="line-1965"> Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();</span> |
| <span class="source-line-no">1966</span><span id="line-1966"> while (iter.hasNext()) {</span> |
| <span class="source-line-no">1967</span><span id="line-1967"> FSWALEntry entry = iter.next();</span> |
| <span class="source-line-no">1968</span><span id="line-1968"> if (!entry.getEdit().isMetaEdit()) {</span> |
| <span class="source-line-no">1969</span><span id="line-1969"> entry.release();</span> |
| <span class="source-line-no">1970</span><span id="line-1970"> hasNonMarkerEdits = true;</span> |
| <span class="source-line-no">1971</span><span id="line-1971"> break;</span> |
| <span class="source-line-no">1972</span><span id="line-1972"> }</span> |
| <span class="source-line-no">1973</span><span id="line-1973"> }</span> |
| <span class="source-line-no">1974</span><span id="line-1974"> if (hasNonMarkerEdits) {</span> |
| <span class="source-line-no">1975</span><span id="line-1975"> for (;;) {</span> |
| <span class="source-line-no">1976</span><span id="line-1976"> iter.remove();</span> |
| <span class="source-line-no">1977</span><span id="line-1977"> if (!iter.hasNext()) {</span> |
| <span class="source-line-no">1978</span><span id="line-1978"> break;</span> |
| <span class="source-line-no">1979</span><span id="line-1979"> }</span> |
| <span class="source-line-no">1980</span><span id="line-1980"> iter.next().release();</span> |
| <span class="source-line-no">1981</span><span id="line-1981"> }</span> |
| <span class="source-line-no">1982</span><span id="line-1982"> for (FSWALEntry entry : unackedAppends) {</span> |
| <span class="source-line-no">1983</span><span id="line-1983"> entry.release();</span> |
| <span class="source-line-no">1984</span><span id="line-1984"> }</span> |
| <span class="source-line-no">1985</span><span id="line-1985"> unackedAppends.clear();</span> |
| <span class="source-line-no">1986</span><span id="line-1986"> // fail the sync futures which are under the txid of the first remaining edit, if none, fail</span> |
| <span class="source-line-no">1987</span><span id="line-1987"> // all the sync futures.</span> |
| <span class="source-line-no">1988</span><span id="line-1988"> long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();</span> |
| <span class="source-line-no">1989</span><span id="line-1989"> IOException error = new IOException("WAL is closing, only marker edit is allowed");</span> |
| <span class="source-line-no">1990</span><span id="line-1990"> for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {</span> |
| <span class="source-line-no">1991</span><span id="line-1991"> SyncFuture future = syncIter.next();</span> |
| <span class="source-line-no">1992</span><span id="line-1992"> if (future.getTxid() < txid) {</span> |
| <span class="source-line-no">1993</span><span id="line-1993"> markFutureDoneAndOffer(future, future.getTxid(), error);</span> |
| <span class="source-line-no">1994</span><span id="line-1994"> syncIter.remove();</span> |
| <span class="source-line-no">1995</span><span id="line-1995"> } else {</span> |
| <span class="source-line-no">1996</span><span id="line-1996"> break;</span> |
| <span class="source-line-no">1997</span><span id="line-1997"> }</span> |
| <span class="source-line-no">1998</span><span id="line-1998"> }</span> |
| <span class="source-line-no">1999</span><span id="line-1999"> }</span> |
| <span class="source-line-no">2000</span><span id="line-2000"> }</span> |
| <span class="source-line-no">2001</span><span id="line-2001"></span> |
| <span class="source-line-no">2002</span><span id="line-2002"> protected abstract W createWriterInstance(FileSystem fs, Path path)</span> |
| <span class="source-line-no">2003</span><span id="line-2003"> throws IOException, CommonFSUtils.StreamLacksCapabilityException;</span> |
| <span class="source-line-no">2004</span><span id="line-2004"></span> |
| <span class="source-line-no">2005</span><span id="line-2005"> protected abstract W createCombinedWriter(W localWriter, W remoteWriter);</span> |
| <span class="source-line-no">2006</span><span id="line-2006"></span> |
| <span class="source-line-no">2007</span><span id="line-2007"> protected final void waitForSafePoint() {</span> |
| <span class="source-line-no">2008</span><span id="line-2008"> consumeLock.lock();</span> |
| <span class="source-line-no">2009</span><span id="line-2009"> try {</span> |
| <span class="source-line-no">2010</span><span id="line-2010"> int currentEpochAndState = epochAndState;</span> |
| <span class="source-line-no">2011</span><span id="line-2011"> if (writerBroken(currentEpochAndState) || this.writer == null) {</span> |
| <span class="source-line-no">2012</span><span id="line-2012"> return;</span> |
| <span class="source-line-no">2013</span><span id="line-2013"> }</span> |
| <span class="source-line-no">2014</span><span id="line-2014"> consumerScheduled.set(true);</span> |
| <span class="source-line-no">2015</span><span id="line-2015"> epochAndState = currentEpochAndState | 1;</span> |
| <span class="source-line-no">2016</span><span id="line-2016"> readyForRolling = false;</span> |
| <span class="source-line-no">2017</span><span id="line-2017"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">2018</span><span id="line-2018"> while (!readyForRolling) {</span> |
| <span class="source-line-no">2019</span><span id="line-2019"> readyForRollingCond.awaitUninterruptibly();</span> |
| <span class="source-line-no">2020</span><span id="line-2020"> }</span> |
| <span class="source-line-no">2021</span><span id="line-2021"> } finally {</span> |
| <span class="source-line-no">2022</span><span id="line-2022"> consumeLock.unlock();</span> |
| <span class="source-line-no">2023</span><span id="line-2023"> }</span> |
| <span class="source-line-no">2024</span><span id="line-2024"> }</span> |
| <span class="source-line-no">2025</span><span id="line-2025"></span> |
| <span class="source-line-no">2026</span><span id="line-2026"> private void recoverLease(FileSystem fs, Path p, Configuration conf) {</span> |
| <span class="source-line-no">2027</span><span id="line-2027"> try {</span> |
| <span class="source-line-no">2028</span><span id="line-2028"> RecoverLeaseFSUtils.recoverFileLease(fs, p, conf, null);</span> |
| <span class="source-line-no">2029</span><span id="line-2029"> } catch (IOException ex) {</span> |
| <span class="source-line-no">2030</span><span id="line-2030"> LOG.error("Unable to recover lease after several attempts. Give up.", ex);</span> |
| <span class="source-line-no">2031</span><span id="line-2031"> }</span> |
| <span class="source-line-no">2032</span><span id="line-2032"> }</span> |
| <span class="source-line-no">2033</span><span id="line-2033"></span> |
| <span class="source-line-no">2034</span><span id="line-2034"> protected final void closeWriter(W writer, Path path) {</span> |
| <span class="source-line-no">2035</span><span id="line-2035"> inflightWALClosures.put(path.getName(), writer);</span> |
| <span class="source-line-no">2036</span><span id="line-2036"> closeExecutor.execute(() -> {</span> |
| <span class="source-line-no">2037</span><span id="line-2037"> try {</span> |
| <span class="source-line-no">2038</span><span id="line-2038"> writer.close();</span> |
| <span class="source-line-no">2039</span><span id="line-2039"> } catch (IOException e) {</span> |
| <span class="source-line-no">2040</span><span id="line-2040"> LOG.warn("close old writer failed.", e);</span> |
| <span class="source-line-no">2041</span><span id="line-2041"> recoverLease(this.fs, path, conf);</span> |
| <span class="source-line-no">2042</span><span id="line-2042"> } finally {</span> |
| <span class="source-line-no">2043</span><span id="line-2043"> // call this even if the above close fails, as there is no other chance we can set closed to</span> |
| <span class="source-line-no">2044</span><span id="line-2044"> // true, it will not cause big problems.</span> |
| <span class="source-line-no">2045</span><span id="line-2045"> markClosedAndClean(path);</span> |
| <span class="source-line-no">2046</span><span id="line-2046"> inflightWALClosures.remove(path.getName());</span> |
| <span class="source-line-no">2047</span><span id="line-2047"> }</span> |
| <span class="source-line-no">2048</span><span id="line-2048"> });</span> |
| <span class="source-line-no">2049</span><span id="line-2049"> }</span> |
| <span class="source-line-no">2050</span><span id="line-2050"></span> |
| <span class="source-line-no">2051</span><span id="line-2051"> /**</span> |
| <span class="source-line-no">2052</span><span id="line-2052"> * Notice that you need to clear the {@link #rollRequested} flag in this method, as the new writer</span> |
| <span class="source-line-no">2053</span><span id="line-2053"> * will begin to work before returning from this method. If we clear the flag after returning from</span> |
| <span class="source-line-no">2054</span><span id="line-2054"> * this call, we may miss a roll request. The implementation class should choose a proper place to</span> |
| <span class="source-line-no">2055</span><span id="line-2055"> * clear the {@link #rollRequested} flag, so we do not miss a roll request, typically before you</span> |
| <span class="source-line-no">2056</span><span id="line-2056"> * start writing to the new writer.</span> |
| <span class="source-line-no">2057</span><span id="line-2057"> */</span> |
| <span class="source-line-no">2058</span><span id="line-2058"> protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {</span> |
| <span class="source-line-no">2059</span><span id="line-2059"> Preconditions.checkNotNull(nextWriter);</span> |
| <span class="source-line-no">2060</span><span id="line-2060"> waitForSafePoint();</span> |
| <span class="source-line-no">2061</span><span id="line-2061"> /**</span> |
| <span class="source-line-no">2062</span><span id="line-2062"> * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.</span> |
| <span class="source-line-no">2063</span><span id="line-2063"> */</span> |
| <span class="source-line-no">2064</span><span id="line-2064"> doCleanUpResources();</span> |
| <span class="source-line-no">2065</span><span id="line-2065"> // we will call rollWriter in init method, where we want to create the first writer and</span> |
| <span class="source-line-no">2066</span><span id="line-2066"> // obviously the previous writer is null, so here we need this null check. And why we must call</span> |
| <span class="source-line-no">2067</span><span id="line-2067"> // logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after</span> |
| <span class="source-line-no">2068</span><span id="line-2068"> // closing the writer asynchronously, we need to make sure the WALProps is put into</span> |
| <span class="source-line-no">2069</span><span id="line-2069"> // walFile2Props before we call markClosedAndClean</span> |
| <span class="source-line-no">2070</span><span id="line-2070"> if (writer != null) {</span> |
| <span class="source-line-no">2071</span><span id="line-2071"> long oldFileLen = writer.getLength();</span> |
| <span class="source-line-no">2072</span><span id="line-2072"> logRollAndSetupWalProps(oldPath, newPath, oldFileLen);</span> |
| <span class="source-line-no">2073</span><span id="line-2073"> closeWriter(writer, oldPath);</span> |
| <span class="source-line-no">2074</span><span id="line-2074"> } else {</span> |
| <span class="source-line-no">2075</span><span id="line-2075"> logRollAndSetupWalProps(oldPath, newPath, 0);</span> |
| <span class="source-line-no">2076</span><span id="line-2076"> }</span> |
| <span class="source-line-no">2077</span><span id="line-2077"> this.writer = nextWriter;</span> |
| <span class="source-line-no">2078</span><span id="line-2078"> /**</span> |
| <span class="source-line-no">2079</span><span id="line-2079"> * Here is used for {@link AsyncFSWAL} and {@link FSHLog} to set the under layer filesystem</span> |
| <span class="source-line-no">2080</span><span id="line-2080"> * output after writer is replaced.</span> |
| <span class="source-line-no">2081</span><span id="line-2081"> */</span> |
| <span class="source-line-no">2082</span><span id="line-2082"> onWriterReplaced(nextWriter);</span> |
| <span class="source-line-no">2083</span><span id="line-2083"> this.fileLengthAtLastSync = nextWriter.getLength();</span> |
| <span class="source-line-no">2084</span><span id="line-2084"> this.highestProcessedAppendTxidAtLastSync = 0L;</span> |
| <span class="source-line-no">2085</span><span id="line-2085"> consumeLock.lock();</span> |
| <span class="source-line-no">2086</span><span id="line-2086"> try {</span> |
| <span class="source-line-no">2087</span><span id="line-2087"> consumerScheduled.set(true);</span> |
| <span class="source-line-no">2088</span><span id="line-2088"> int currentEpoch = epochAndState >>> 2;</span> |
| <span class="source-line-no">2089</span><span id="line-2089"> int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;</span> |
| <span class="source-line-no">2090</span><span id="line-2090"> // set a new epoch and also clear waitingRoll and writerBroken</span> |
| <span class="source-line-no">2091</span><span id="line-2091"> this.epochAndState = nextEpoch << 2;</span> |
| <span class="source-line-no">2092</span><span id="line-2092"> // Reset rollRequested status</span> |
| <span class="source-line-no">2093</span><span id="line-2093"> rollRequested.set(false);</span> |
| <span class="source-line-no">2094</span><span id="line-2094"> consumeExecutor.execute(consumer);</span> |
| <span class="source-line-no">2095</span><span id="line-2095"> } finally {</span> |
| <span class="source-line-no">2096</span><span id="line-2096"> consumeLock.unlock();</span> |
| <span class="source-line-no">2097</span><span id="line-2097"> }</span> |
| <span class="source-line-no">2098</span><span id="line-2098"> }</span> |
| <span class="source-line-no">2099</span><span id="line-2099"></span> |
| <span class="source-line-no">2100</span><span id="line-2100"> protected abstract void onWriterReplaced(W nextWriter);</span> |
| <span class="source-line-no">2101</span><span id="line-2101"></span> |
| <span class="source-line-no">2102</span><span id="line-2102"> protected void doShutdown() throws IOException {</span> |
| <span class="source-line-no">2103</span><span id="line-2103"> waitForSafePoint();</span> |
| <span class="source-line-no">2104</span><span id="line-2104"> /**</span> |
| <span class="source-line-no">2105</span><span id="line-2105"> * For {@link FSHLog},here would shut down {@link FSHLog.SyncRunner}.</span> |
| <span class="source-line-no">2106</span><span id="line-2106"> */</span> |
| <span class="source-line-no">2107</span><span id="line-2107"> doCleanUpResources();</span> |
| <span class="source-line-no">2108</span><span id="line-2108"> if (this.writer != null) {</span> |
| <span class="source-line-no">2109</span><span id="line-2109"> closeWriter(this.writer, getOldPath());</span> |
| <span class="source-line-no">2110</span><span id="line-2110"> this.writer = null;</span> |
| <span class="source-line-no">2111</span><span id="line-2111"> }</span> |
| <span class="source-line-no">2112</span><span id="line-2112"> closeExecutor.shutdown();</span> |
| <span class="source-line-no">2113</span><span id="line-2113"> try {</span> |
| <span class="source-line-no">2114</span><span id="line-2114"> if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {</span> |
| <span class="source-line-no">2115</span><span id="line-2115"> LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but"</span> |
| <span class="source-line-no">2116</span><span id="line-2116"> + " the close of async writer doesn't complete."</span> |
| <span class="source-line-no">2117</span><span id="line-2117"> + "Please check the status of underlying filesystem"</span> |
| <span class="source-line-no">2118</span><span id="line-2118"> + " or increase the wait time by the config \"" + this.waitOnShutdownInSecondsConfigKey</span> |
| <span class="source-line-no">2119</span><span id="line-2119"> + "\"");</span> |
| <span class="source-line-no">2120</span><span id="line-2120"> }</span> |
| <span class="source-line-no">2121</span><span id="line-2121"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">2122</span><span id="line-2122"> LOG.error("The wait for close of async writer is interrupted");</span> |
| <span class="source-line-no">2123</span><span id="line-2123"> Thread.currentThread().interrupt();</span> |
| <span class="source-line-no">2124</span><span id="line-2124"> }</span> |
| <span class="source-line-no">2125</span><span id="line-2125"> IOException error = new IOException("WAL has been closed");</span> |
| <span class="source-line-no">2126</span><span id="line-2126"> long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;</span> |
| <span class="source-line-no">2127</span><span id="line-2127"> // drain all the pending sync requests</span> |
| <span class="source-line-no">2128</span><span id="line-2128"> for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor</span> |
| <span class="source-line-no">2129</span><span id="line-2129"> <= cursorBound; nextCursor++) {</span> |
| <span class="source-line-no">2130</span><span id="line-2130"> if (!waitingConsumePayloads.isPublished(nextCursor)) {</span> |
| <span class="source-line-no">2131</span><span id="line-2131"> break;</span> |
| <span class="source-line-no">2132</span><span id="line-2132"> }</span> |
| <span class="source-line-no">2133</span><span id="line-2133"> RingBufferTruck truck = waitingConsumePayloads.get(nextCursor);</span> |
| <span class="source-line-no">2134</span><span id="line-2134"> switch (truck.type()) {</span> |
| <span class="source-line-no">2135</span><span id="line-2135"> case SYNC:</span> |
| <span class="source-line-no">2136</span><span id="line-2136"> syncFutures.add(truck.unloadSync());</span> |
| <span class="source-line-no">2137</span><span id="line-2137"> break;</span> |
| <span class="source-line-no">2138</span><span id="line-2138"> default:</span> |
| <span class="source-line-no">2139</span><span id="line-2139"> break;</span> |
| <span class="source-line-no">2140</span><span id="line-2140"> }</span> |
| <span class="source-line-no">2141</span><span id="line-2141"> }</span> |
| <span class="source-line-no">2142</span><span id="line-2142"> // and fail them</span> |
| <span class="source-line-no">2143</span><span id="line-2143"> syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));</span> |
| <span class="source-line-no">2144</span><span id="line-2144"> if (this.shouldShutDownConsumeExecutorWhenClose) {</span> |
| <span class="source-line-no">2145</span><span id="line-2145"> consumeExecutor.shutdown();</span> |
| <span class="source-line-no">2146</span><span id="line-2146"> }</span> |
| <span class="source-line-no">2147</span><span id="line-2147"> }</span> |
| <span class="source-line-no">2148</span><span id="line-2148"></span> |
| <span class="source-line-no">2149</span><span id="line-2149"> protected void doCleanUpResources() {</span> |
| <span class="source-line-no">2150</span><span id="line-2150"> };</span> |
| <span class="source-line-no">2151</span><span id="line-2151"></span> |
| <span class="source-line-no">2152</span><span id="line-2152"> protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;</span> |
| <span class="source-line-no">2153</span><span id="line-2153"></span> |
| <span class="source-line-no">2154</span><span id="line-2154"> /**</span> |
| <span class="source-line-no">2155</span><span id="line-2155"> * This method gets the pipeline for the current WAL.</span> |
| <span class="source-line-no">2156</span><span id="line-2156"> */</span> |
| <span class="source-line-no">2157</span><span id="line-2157"> abstract DatanodeInfo[] getPipeline();</span> |
| <span class="source-line-no">2158</span><span id="line-2158"></span> |
| <span class="source-line-no">2159</span><span id="line-2159"> /**</span> |
| <span class="source-line-no">2160</span><span id="line-2160"> * This method gets the datanode replication count for the current WAL.</span> |
| <span class="source-line-no">2161</span><span id="line-2161"> */</span> |
| <span class="source-line-no">2162</span><span id="line-2162"> abstract int getLogReplication();</span> |
| <span class="source-line-no">2163</span><span id="line-2163"></span> |
| <span class="source-line-no">2164</span><span id="line-2164"> protected abstract boolean doCheckLogLowReplication();</span> |
| <span class="source-line-no">2165</span><span id="line-2165"></span> |
| <span class="source-line-no">2166</span><span id="line-2166"> protected boolean isWriterBroken() {</span> |
| <span class="source-line-no">2167</span><span id="line-2167"> return writerBroken(epochAndState);</span> |
| <span class="source-line-no">2168</span><span id="line-2168"> }</span> |
| <span class="source-line-no">2169</span><span id="line-2169"></span> |
| <span class="source-line-no">2170</span><span id="line-2170"> private void onAppendEntryFailed(IOException exception) {</span> |
| <span class="source-line-no">2171</span><span id="line-2171"> LOG.warn("append entry failed", exception);</span> |
| <span class="source-line-no">2172</span><span id="line-2172"> final long currentEpoch = (long) epochAndState >>> 2L;</span> |
| <span class="source-line-no">2173</span><span id="line-2173"> this.onException(currentEpoch, exception);</span> |
| <span class="source-line-no">2174</span><span id="line-2174"> }</span> |
| <span class="source-line-no">2175</span><span id="line-2175"></span> |
| <span class="source-line-no">2176</span><span id="line-2176"> protected void checkSlowSyncCount() {</span> |
| <span class="source-line-no">2177</span><span id="line-2177"> }</span> |
| <span class="source-line-no">2178</span><span id="line-2178"></span> |
| <span class="source-line-no">2179</span><span id="line-2179"> /** Returns true if we exceeded the slow sync roll threshold over the last check interval */</span> |
| <span class="source-line-no">2180</span><span id="line-2180"> protected boolean doCheckSlowSync() {</span> |
| <span class="source-line-no">2181</span><span id="line-2181"> boolean result = false;</span> |
| <span class="source-line-no">2182</span><span id="line-2182"> long now = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">2183</span><span id="line-2183"> long elapsedTime = now - lastTimeCheckSlowSync;</span> |
| <span class="source-line-no">2184</span><span id="line-2184"> if (elapsedTime >= slowSyncCheckInterval) {</span> |
| <span class="source-line-no">2185</span><span id="line-2185"> if (slowSyncCount.get() >= slowSyncRollThreshold) {</span> |
| <span class="source-line-no">2186</span><span id="line-2186"> if (elapsedTime >= (2 * slowSyncCheckInterval)) {</span> |
| <span class="source-line-no">2187</span><span id="line-2187"> // If two or more slowSyncCheckInterval have elapsed this is a corner case</span> |
| <span class="source-line-no">2188</span><span id="line-2188"> // where a train of slow syncs almost triggered us but then there was a long</span> |
| <span class="source-line-no">2189</span><span id="line-2189"> // interval from then until the one more that pushed us over. If so, we</span> |
| <span class="source-line-no">2190</span><span id="line-2190"> // should do nothing and let the count reset.</span> |
| <span class="source-line-no">2191</span><span id="line-2191"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">2192</span><span id="line-2192"> LOG.debug("checkSlowSync triggered but we decided to ignore it; " + "count="</span> |
| <span class="source-line-no">2193</span><span id="line-2193"> + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", elapsedTime="</span> |
| <span class="source-line-no">2194</span><span id="line-2194"> + elapsedTime + " ms, slowSyncCheckInterval=" + slowSyncCheckInterval + " ms");</span> |
| <span class="source-line-no">2195</span><span id="line-2195"> }</span> |
| <span class="source-line-no">2196</span><span id="line-2196"> // Fall through to count reset below</span> |
| <span class="source-line-no">2197</span><span id="line-2197"> } else {</span> |
| <span class="source-line-no">2198</span><span id="line-2198"> LOG.warn("Requesting log roll because we exceeded slow sync threshold; count="</span> |
| <span class="source-line-no">2199</span><span id="line-2199"> + slowSyncCount.get() + ", threshold=" + slowSyncRollThreshold + ", current pipeline: "</span> |
| <span class="source-line-no">2200</span><span id="line-2200"> + Arrays.toString(getPipeline()));</span> |
| <span class="source-line-no">2201</span><span id="line-2201"> result = true;</span> |
| <span class="source-line-no">2202</span><span id="line-2202"> }</span> |
| <span class="source-line-no">2203</span><span id="line-2203"> }</span> |
| <span class="source-line-no">2204</span><span id="line-2204"> lastTimeCheckSlowSync = now;</span> |
| <span class="source-line-no">2205</span><span id="line-2205"> slowSyncCount.set(0);</span> |
| <span class="source-line-no">2206</span><span id="line-2206"> }</span> |
| <span class="source-line-no">2207</span><span id="line-2207"> return result;</span> |
| <span class="source-line-no">2208</span><span id="line-2208"> }</span> |
| <span class="source-line-no">2209</span><span id="line-2209"></span> |
| <span class="source-line-no">2210</span><span id="line-2210"> public void checkLogLowReplication(long checkInterval) {</span> |
| <span class="source-line-no">2211</span><span id="line-2211"> long now = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">2212</span><span id="line-2212"> if (now - lastTimeCheckLowReplication < checkInterval) {</span> |
| <span class="source-line-no">2213</span><span id="line-2213"> return;</span> |
| <span class="source-line-no">2214</span><span id="line-2214"> }</span> |
| <span class="source-line-no">2215</span><span id="line-2215"> // Will return immediately if we are in the middle of a WAL log roll currently.</span> |
| <span class="source-line-no">2216</span><span id="line-2216"> if (!rollWriterLock.tryLock()) {</span> |
| <span class="source-line-no">2217</span><span id="line-2217"> return;</span> |
| <span class="source-line-no">2218</span><span id="line-2218"> }</span> |
| <span class="source-line-no">2219</span><span id="line-2219"> try {</span> |
| <span class="source-line-no">2220</span><span id="line-2220"> lastTimeCheckLowReplication = now;</span> |
| <span class="source-line-no">2221</span><span id="line-2221"> if (doCheckLogLowReplication()) {</span> |
| <span class="source-line-no">2222</span><span id="line-2222"> requestLogRoll(LOW_REPLICATION);</span> |
| <span class="source-line-no">2223</span><span id="line-2223"> }</span> |
| <span class="source-line-no">2224</span><span id="line-2224"> } finally {</span> |
| <span class="source-line-no">2225</span><span id="line-2225"> rollWriterLock.unlock();</span> |
| <span class="source-line-no">2226</span><span id="line-2226"> }</span> |
| <span class="source-line-no">2227</span><span id="line-2227"> }</span> |
| <span class="source-line-no">2228</span><span id="line-2228"></span> |
| <span class="source-line-no">2229</span><span id="line-2229"> // Allow temporarily skipping the creation of remote writer. When failing to write to the remote</span> |
| <span class="source-line-no">2230</span><span id="line-2230"> // dfs cluster, we need to reopen the regions and switch to use the original wal writer. But we</span> |
| <span class="source-line-no">2231</span><span id="line-2231"> // need to write a close marker when closing a region, and if it fails, the whole rs will abort.</span> |
| <span class="source-line-no">2232</span><span id="line-2232"> // So here we need to skip the creation of remote writer and make it possible to write the region</span> |
| <span class="source-line-no">2233</span><span id="line-2233"> // close marker.</span> |
| <span class="source-line-no">2234</span><span id="line-2234"> // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing</span> |
| <span class="source-line-no">2235</span><span id="line-2235"> // any pending wal entries as they will be discarded. The remote cluster will replicate the</span> |
| <span class="source-line-no">2236</span><span id="line-2236"> // correct data back later. We still need to allow writing marker edits such as close region event</span> |
| <span class="source-line-no">2237</span><span id="line-2237"> // to allow closing a region.</span> |
| <span class="source-line-no">2238</span><span id="line-2238"> @Override</span> |
| <span class="source-line-no">2239</span><span id="line-2239"> public void skipRemoteWAL(boolean markerEditOnly) {</span> |
| <span class="source-line-no">2240</span><span id="line-2240"> if (markerEditOnly) {</span> |
| <span class="source-line-no">2241</span><span id="line-2241"> this.markerEditOnly = true;</span> |
| <span class="source-line-no">2242</span><span id="line-2242"> }</span> |
| <span class="source-line-no">2243</span><span id="line-2243"> this.skipRemoteWAL = true;</span> |
| <span class="source-line-no">2244</span><span id="line-2244"> }</span> |
| <span class="source-line-no">2245</span><span id="line-2245"></span> |
| <span class="source-line-no">2246</span><span id="line-2246"> private static void split(final Configuration conf, final Path p) throws IOException {</span> |
| <span class="source-line-no">2247</span><span id="line-2247"> FileSystem fs = CommonFSUtils.getWALFileSystem(conf);</span> |
| <span class="source-line-no">2248</span><span id="line-2248"> if (!fs.exists(p)) {</span> |
| <span class="source-line-no">2249</span><span id="line-2249"> throw new FileNotFoundException(p.toString());</span> |
| <span class="source-line-no">2250</span><span id="line-2250"> }</span> |
| <span class="source-line-no">2251</span><span id="line-2251"> if (!fs.getFileStatus(p).isDirectory()) {</span> |
| <span class="source-line-no">2252</span><span id="line-2252"> throw new IOException(p + " is not a directory");</span> |
| <span class="source-line-no">2253</span><span id="line-2253"> }</span> |
| <span class="source-line-no">2254</span><span id="line-2254"></span> |
| <span class="source-line-no">2255</span><span id="line-2255"> final Path baseDir = CommonFSUtils.getWALRootDir(conf);</span> |
| <span class="source-line-no">2256</span><span id="line-2256"> Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);</span> |
| <span class="source-line-no">2257</span><span id="line-2257"> if (</span> |
| <span class="source-line-no">2258</span><span id="line-2258"> conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,</span> |
| <span class="source-line-no">2259</span><span id="line-2259"> AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)</span> |
| <span class="source-line-no">2260</span><span id="line-2260"> ) {</span> |
| <span class="source-line-no">2261</span><span id="line-2261"> archiveDir = new Path(archiveDir, p.getName());</span> |
| <span class="source-line-no">2262</span><span id="line-2262"> }</span> |
| <span class="source-line-no">2263</span><span id="line-2263"> WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));</span> |
| <span class="source-line-no">2264</span><span id="line-2264"> }</span> |
| <span class="source-line-no">2265</span><span id="line-2265"></span> |
| <span class="source-line-no">2266</span><span id="line-2266"> W getWriter() {</span> |
| <span class="source-line-no">2267</span><span id="line-2267"> return this.writer;</span> |
| <span class="source-line-no">2268</span><span id="line-2268"> }</span> |
| <span class="source-line-no">2269</span><span id="line-2269"></span> |
| <span class="source-line-no">2270</span><span id="line-2270"> private static void usage() {</span> |
| <span class="source-line-no">2271</span><span id="line-2271"> System.err.println("Usage: AbstractFSWAL <ARGS>");</span> |
| <span class="source-line-no">2272</span><span id="line-2272"> System.err.println("Arguments:");</span> |
| <span class="source-line-no">2273</span><span id="line-2273"> System.err.println(" --dump Dump textual representation of passed one or more files");</span> |
| <span class="source-line-no">2274</span><span id="line-2274"> System.err.println(" For example: "</span> |
| <span class="source-line-no">2275</span><span id="line-2275"> + "AbstractFSWAL --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");</span> |
| <span class="source-line-no">2276</span><span id="line-2276"> System.err.println(" --split Split the passed directory of WAL logs");</span> |
| <span class="source-line-no">2277</span><span id="line-2277"> System.err.println(</span> |
| <span class="source-line-no">2278</span><span id="line-2278"> " For example: AbstractFSWAL --split hdfs://example.com:9000/hbase/WALs/DIR");</span> |
| <span class="source-line-no">2279</span><span id="line-2279"> }</span> |
| <span class="source-line-no">2280</span><span id="line-2280"></span> |
| <span class="source-line-no">2281</span><span id="line-2281"> /**</span> |
| <span class="source-line-no">2282</span><span id="line-2282"> * Pass one or more log file names, and it will either dump out a text version on</span> |
| <span class="source-line-no">2283</span><span id="line-2283"> * <code>stdout</code> or split the specified log files.</span> |
| <span class="source-line-no">2284</span><span id="line-2284"> */</span> |
| <span class="source-line-no">2285</span><span id="line-2285"> public static void main(String[] args) throws IOException {</span> |
| <span class="source-line-no">2286</span><span id="line-2286"> if (args.length < 2) {</span> |
| <span class="source-line-no">2287</span><span id="line-2287"> usage();</span> |
| <span class="source-line-no">2288</span><span id="line-2288"> System.exit(-1);</span> |
| <span class="source-line-no">2289</span><span id="line-2289"> }</span> |
| <span class="source-line-no">2290</span><span id="line-2290"> // either dump using the WALPrettyPrinter or split, depending on args</span> |
| <span class="source-line-no">2291</span><span id="line-2291"> if (args[0].compareTo("--dump") == 0) {</span> |
| <span class="source-line-no">2292</span><span id="line-2292"> WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));</span> |
| <span class="source-line-no">2293</span><span id="line-2293"> } else if (args[0].compareTo("--perf") == 0) {</span> |
| <span class="source-line-no">2294</span><span id="line-2294"> LOG.error(HBaseMarkers.FATAL, "Please use the WALPerformanceEvaluation tool instead. i.e.:");</span> |
| <span class="source-line-no">2295</span><span id="line-2295"> LOG.error(HBaseMarkers.FATAL,</span> |
| <span class="source-line-no">2296</span><span id="line-2296"> "\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " + args[1]);</span> |
| <span class="source-line-no">2297</span><span id="line-2297"> System.exit(-1);</span> |
| <span class="source-line-no">2298</span><span id="line-2298"> } else if (args[0].compareTo("--split") == 0) {</span> |
| <span class="source-line-no">2299</span><span id="line-2299"> Configuration conf = HBaseConfiguration.create();</span> |
| <span class="source-line-no">2300</span><span id="line-2300"> for (int i = 1; i < args.length; i++) {</span> |
| <span class="source-line-no">2301</span><span id="line-2301"> try {</span> |
| <span class="source-line-no">2302</span><span id="line-2302"> Path logPath = new Path(args[i]);</span> |
| <span class="source-line-no">2303</span><span id="line-2303"> CommonFSUtils.setFsDefault(conf, logPath);</span> |
| <span class="source-line-no">2304</span><span id="line-2304"> split(conf, logPath);</span> |
| <span class="source-line-no">2305</span><span id="line-2305"> } catch (IOException t) {</span> |
| <span class="source-line-no">2306</span><span id="line-2306"> t.printStackTrace(System.err);</span> |
| <span class="source-line-no">2307</span><span id="line-2307"> System.exit(-1);</span> |
| <span class="source-line-no">2308</span><span id="line-2308"> }</span> |
| <span class="source-line-no">2309</span><span id="line-2309"> }</span> |
| <span class="source-line-no">2310</span><span id="line-2310"> } else {</span> |
| <span class="source-line-no">2311</span><span id="line-2311"> usage();</span> |
| <span class="source-line-no">2312</span><span id="line-2312"> System.exit(-1);</span> |
| <span class="source-line-no">2313</span><span id="line-2313"> }</span> |
| <span class="source-line-no">2314</span><span id="line-2314"> }</span> |
| <span class="source-line-no">2315</span><span id="line-2315">}</span> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </pre> |
| </div> |
| </main> |
| </body> |
| </html> |