| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>Source code</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta name="description" content="source: package: org.apache.hadoop.hbase.procedure2, class: ProcedureExecutor"> |
| <meta name="generator" content="javadoc/SourceToHTMLConverter"> |
| <link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style"> |
| </head> |
| <body class="source-page"> |
| <main role="main"> |
| <div class="source-container"> |
| <pre><span class="source-line-no">001</span><span id="line-1">/*</span> |
| <span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span> |
| <span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span> |
| <span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span> |
| <span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span> |
| <span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span> |
| <span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span> |
| <span class="source-line-no">009</span><span id="line-9"> *</span> |
| <span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="source-line-no">011</span><span id="line-11"> *</span> |
| <span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span> |
| <span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span> |
| <span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span> |
| <span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span> |
| <span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span> |
| <span class="source-line-no">017</span><span id="line-17"> */</span> |
| <span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.procedure2;</span> |
| <span class="source-line-no">019</span><span id="line-19"></span> |
| <span class="source-line-no">020</span><span id="line-20">import edu.umd.cs.findbugs.annotations.Nullable;</span> |
| <span class="source-line-no">021</span><span id="line-21">import java.io.IOException;</span> |
| <span class="source-line-no">022</span><span id="line-22">import java.io.UncheckedIOException;</span> |
| <span class="source-line-no">023</span><span id="line-23">import java.util.ArrayDeque;</span> |
| <span class="source-line-no">024</span><span id="line-24">import java.util.ArrayList;</span> |
| <span class="source-line-no">025</span><span id="line-25">import java.util.Arrays;</span> |
| <span class="source-line-no">026</span><span id="line-26">import java.util.Collection;</span> |
| <span class="source-line-no">027</span><span id="line-27">import java.util.Comparator;</span> |
| <span class="source-line-no">028</span><span id="line-28">import java.util.Deque;</span> |
| <span class="source-line-no">029</span><span id="line-29">import java.util.HashSet;</span> |
| <span class="source-line-no">030</span><span id="line-30">import java.util.List;</span> |
| <span class="source-line-no">031</span><span id="line-31">import java.util.PriorityQueue;</span> |
| <span class="source-line-no">032</span><span id="line-32">import java.util.Set;</span> |
| <span class="source-line-no">033</span><span id="line-33">import java.util.concurrent.ConcurrentHashMap;</span> |
| <span class="source-line-no">034</span><span id="line-34">import java.util.concurrent.CopyOnWriteArrayList;</span> |
| <span class="source-line-no">035</span><span id="line-35">import java.util.concurrent.ExecutorService;</span> |
| <span class="source-line-no">036</span><span id="line-36">import java.util.concurrent.Executors;</span> |
| <span class="source-line-no">037</span><span id="line-37">import java.util.concurrent.LinkedBlockingQueue;</span> |
| <span class="source-line-no">038</span><span id="line-38">import java.util.concurrent.ThreadFactory;</span> |
| <span class="source-line-no">039</span><span id="line-39">import java.util.concurrent.ThreadPoolExecutor;</span> |
| <span class="source-line-no">040</span><span id="line-40">import java.util.concurrent.TimeUnit;</span> |
| <span class="source-line-no">041</span><span id="line-41">import java.util.concurrent.atomic.AtomicBoolean;</span> |
| <span class="source-line-no">042</span><span id="line-42">import java.util.concurrent.atomic.AtomicInteger;</span> |
| <span class="source-line-no">043</span><span id="line-43">import java.util.concurrent.atomic.AtomicLong;</span> |
| <span class="source-line-no">044</span><span id="line-44">import java.util.stream.Collectors;</span> |
| <span class="source-line-no">045</span><span id="line-45">import java.util.stream.Stream;</span> |
| <span class="source-line-no">046</span><span id="line-46">import org.apache.hadoop.conf.Configuration;</span> |
| <span class="source-line-no">047</span><span id="line-47">import org.apache.hadoop.hbase.HConstants;</span> |
| <span class="source-line-no">048</span><span id="line-48">import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;</span> |
| <span class="source-line-no">049</span><span id="line-49">import org.apache.hadoop.hbase.log.HBaseMarkers;</span> |
| <span class="source-line-no">050</span><span id="line-50">import org.apache.hadoop.hbase.procedure2.Procedure.LockState;</span> |
| <span class="source-line-no">051</span><span id="line-51">import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;</span> |
| <span class="source-line-no">052</span><span id="line-52">import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;</span> |
| <span class="source-line-no">053</span><span id="line-53">import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;</span> |
| <span class="source-line-no">054</span><span id="line-54">import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder;</span> |
| <span class="source-line-no">055</span><span id="line-55">import org.apache.hadoop.hbase.procedure2.util.StringUtils;</span> |
| <span class="source-line-no">056</span><span id="line-56">import org.apache.hadoop.hbase.security.User;</span> |
| <span class="source-line-no">057</span><span id="line-57">import org.apache.hadoop.hbase.trace.TraceUtil;</span> |
| <span class="source-line-no">058</span><span id="line-58">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span> |
| <span class="source-line-no">059</span><span id="line-59">import org.apache.hadoop.hbase.util.IdLock;</span> |
| <span class="source-line-no">060</span><span id="line-60">import org.apache.hadoop.hbase.util.NonceKey;</span> |
| <span class="source-line-no">061</span><span id="line-61">import org.apache.hadoop.hbase.util.Threads;</span> |
| <span class="source-line-no">062</span><span id="line-62">import org.apache.yetus.audience.InterfaceAudience;</span> |
| <span class="source-line-no">063</span><span id="line-63">import org.slf4j.Logger;</span> |
| <span class="source-line-no">064</span><span id="line-64">import org.slf4j.LoggerFactory;</span> |
| <span class="source-line-no">065</span><span id="line-65"></span> |
| <span class="source-line-no">066</span><span id="line-66">import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;</span> |
| <span class="source-line-no">067</span><span id="line-67">import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;</span> |
| <span class="source-line-no">068</span><span id="line-68"></span> |
| <span class="source-line-no">069</span><span id="line-69">import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;</span> |
| <span class="source-line-no">070</span><span id="line-70"></span> |
| <span class="source-line-no">071</span><span id="line-71">/**</span> |
| <span class="source-line-no">072</span><span id="line-72"> * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated.</span> |
| <span class="source-line-no">073</span><span id="line-73"> * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure</span> |
| <span class="source-line-no">074</span><span id="line-74"> * code throws an error (e.g. invalid user input) the procedure will complete (at some point in</span> |
| <span class="source-line-no">075</span><span id="line-75"> * time), On restart the pending procedures are resumed and the once failed will be rolledback. The</span> |
| <span class="source-line-no">076</span><span id="line-76"> * user can add procedures to the executor via submitProcedure(proc) check for the finished state</span> |
| <span class="source-line-no">077</span><span id="line-77"> * via isFinished(procId) and get the result via getResult(procId)</span> |
| <span class="source-line-no">078</span><span id="line-78"> */</span> |
| <span class="source-line-no">079</span><span id="line-79">@InterfaceAudience.Private</span> |
| <span class="source-line-no">080</span><span id="line-80">public class ProcedureExecutor<TEnvironment> {</span> |
| <span class="source-line-no">081</span><span id="line-81"> private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);</span> |
| <span class="source-line-no">082</span><span id="line-82"></span> |
| <span class="source-line-no">083</span><span id="line-83"> public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";</span> |
| <span class="source-line-no">084</span><span id="line-84"> private static final boolean DEFAULT_CHECK_OWNER_SET = false;</span> |
| <span class="source-line-no">085</span><span id="line-85"></span> |
| <span class="source-line-no">086</span><span id="line-86"> public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =</span> |
| <span class="source-line-no">087</span><span id="line-87"> "hbase.procedure.worker.keep.alive.time.msec";</span> |
| <span class="source-line-no">088</span><span id="line-88"> private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);</span> |
| <span class="source-line-no">089</span><span id="line-89"></span> |
| <span class="source-line-no">090</span><span id="line-90"> public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";</span> |
| <span class="source-line-no">091</span><span id="line-91"> static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min</span> |
| <span class="source-line-no">092</span><span id="line-92"></span> |
| <span class="source-line-no">093</span><span id="line-93"> public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl";</span> |
| <span class="source-line-no">094</span><span id="line-94"> static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min</span> |
| <span class="source-line-no">095</span><span id="line-95"></span> |
| <span class="source-line-no">096</span><span id="line-96"> /**</span> |
| <span class="source-line-no">097</span><span id="line-97"> * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE</span> |
| <span class="source-line-no">098</span><span id="line-98"> * having it fail at various junctures. When non-null, testing is set to an instance of the below</span> |
| <span class="source-line-no">099</span><span id="line-99"> * internal {@link Testing} class with flags set for the particular test.</span> |
| <span class="source-line-no">100</span><span id="line-100"> */</span> |
| <span class="source-line-no">101</span><span id="line-101"> volatile Testing testing = null;</span> |
| <span class="source-line-no">102</span><span id="line-102"></span> |
| <span class="source-line-no">103</span><span id="line-103"> /**</span> |
| <span class="source-line-no">104</span><span id="line-104"> * Class with parameters describing how to fail/die when in testing-context.</span> |
| <span class="source-line-no">105</span><span id="line-105"> */</span> |
| <span class="source-line-no">106</span><span id="line-106"> public static class Testing {</span> |
| <span class="source-line-no">107</span><span id="line-107"> protected volatile boolean killIfHasParent = true;</span> |
| <span class="source-line-no">108</span><span id="line-108"> protected volatile boolean killIfSuspended = false;</span> |
| <span class="source-line-no">109</span><span id="line-109"></span> |
| <span class="source-line-no">110</span><span id="line-110"> /**</span> |
| <span class="source-line-no">111</span><span id="line-111"> * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is</span> |
| <span class="source-line-no">112</span><span id="line-112"> * persisting all the state it needs to recover after a crash.</span> |
| <span class="source-line-no">113</span><span id="line-113"> */</span> |
| <span class="source-line-no">114</span><span id="line-114"> protected volatile boolean killBeforeStoreUpdate = false;</span> |
| <span class="source-line-no">115</span><span id="line-115"> protected volatile boolean toggleKillBeforeStoreUpdate = false;</span> |
| <span class="source-line-no">116</span><span id="line-116"></span> |
| <span class="source-line-no">117</span><span id="line-117"> /**</span> |
| <span class="source-line-no">118</span><span id="line-118"> * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978</span> |
| <span class="source-line-no">119</span><span id="line-119"> * is about a case where memory-state was being set after store to WAL where a crash could cause</span> |
| <span class="source-line-no">120</span><span id="line-120"> * us to get stuck. This flag allows killing at what was a vulnerable time.</span> |
| <span class="source-line-no">121</span><span id="line-121"> */</span> |
| <span class="source-line-no">122</span><span id="line-122"> protected volatile boolean killAfterStoreUpdate = false;</span> |
| <span class="source-line-no">123</span><span id="line-123"> protected volatile boolean toggleKillAfterStoreUpdate = false;</span> |
| <span class="source-line-no">124</span><span id="line-124"></span> |
| <span class="source-line-no">125</span><span id="line-125"> protected volatile boolean killBeforeStoreUpdateInRollback = false;</span> |
| <span class="source-line-no">126</span><span id="line-126"> protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false;</span> |
| <span class="source-line-no">127</span><span id="line-127"></span> |
| <span class="source-line-no">128</span><span id="line-128"> protected boolean shouldKillBeforeStoreUpdate() {</span> |
| <span class="source-line-no">129</span><span id="line-129"> final boolean kill = this.killBeforeStoreUpdate;</span> |
| <span class="source-line-no">130</span><span id="line-130"> if (this.toggleKillBeforeStoreUpdate) {</span> |
| <span class="source-line-no">131</span><span id="line-131"> this.killBeforeStoreUpdate = !kill;</span> |
| <span class="source-line-no">132</span><span id="line-132"> LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);</span> |
| <span class="source-line-no">133</span><span id="line-133"> }</span> |
| <span class="source-line-no">134</span><span id="line-134"> return kill;</span> |
| <span class="source-line-no">135</span><span id="line-135"> }</span> |
| <span class="source-line-no">136</span><span id="line-136"></span> |
| <span class="source-line-no">137</span><span id="line-137"> protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {</span> |
| <span class="source-line-no">138</span><span id="line-138"> if (isSuspended && !killIfSuspended) {</span> |
| <span class="source-line-no">139</span><span id="line-139"> return false;</span> |
| <span class="source-line-no">140</span><span id="line-140"> }</span> |
| <span class="source-line-no">141</span><span id="line-141"> if (hasParent && !killIfHasParent) {</span> |
| <span class="source-line-no">142</span><span id="line-142"> return false;</span> |
| <span class="source-line-no">143</span><span id="line-143"> }</span> |
| <span class="source-line-no">144</span><span id="line-144"> return shouldKillBeforeStoreUpdate();</span> |
| <span class="source-line-no">145</span><span id="line-145"> }</span> |
| <span class="source-line-no">146</span><span id="line-146"></span> |
| <span class="source-line-no">147</span><span id="line-147"> protected boolean shouldKillAfterStoreUpdate() {</span> |
| <span class="source-line-no">148</span><span id="line-148"> final boolean kill = this.killAfterStoreUpdate;</span> |
| <span class="source-line-no">149</span><span id="line-149"> if (this.toggleKillAfterStoreUpdate) {</span> |
| <span class="source-line-no">150</span><span id="line-150"> this.killAfterStoreUpdate = !kill;</span> |
| <span class="source-line-no">151</span><span id="line-151"> LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);</span> |
| <span class="source-line-no">152</span><span id="line-152"> }</span> |
| <span class="source-line-no">153</span><span id="line-153"> return kill;</span> |
| <span class="source-line-no">154</span><span id="line-154"> }</span> |
| <span class="source-line-no">155</span><span id="line-155"></span> |
| <span class="source-line-no">156</span><span id="line-156"> protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {</span> |
| <span class="source-line-no">157</span><span id="line-157"> return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();</span> |
| <span class="source-line-no">158</span><span id="line-158"> }</span> |
| <span class="source-line-no">159</span><span id="line-159"></span> |
| <span class="source-line-no">160</span><span id="line-160"> protected boolean shouldKillBeforeStoreUpdateInRollback() {</span> |
| <span class="source-line-no">161</span><span id="line-161"> final boolean kill = this.killBeforeStoreUpdateInRollback;</span> |
| <span class="source-line-no">162</span><span id="line-162"> if (this.toggleKillBeforeStoreUpdateInRollback) {</span> |
| <span class="source-line-no">163</span><span id="line-163"> this.killBeforeStoreUpdateInRollback = !kill;</span> |
| <span class="source-line-no">164</span><span id="line-164"> LOG.warn("Toggle KILL before store update in rollback to: "</span> |
| <span class="source-line-no">165</span><span id="line-165"> + this.killBeforeStoreUpdateInRollback);</span> |
| <span class="source-line-no">166</span><span id="line-166"> }</span> |
| <span class="source-line-no">167</span><span id="line-167"> return kill;</span> |
| <span class="source-line-no">168</span><span id="line-168"> }</span> |
| <span class="source-line-no">169</span><span id="line-169"> }</span> |
| <span class="source-line-no">170</span><span id="line-170"></span> |
| <span class="source-line-no">171</span><span id="line-171"> public interface ProcedureExecutorListener {</span> |
| <span class="source-line-no">172</span><span id="line-172"> void procedureLoaded(long procId);</span> |
| <span class="source-line-no">173</span><span id="line-173"></span> |
| <span class="source-line-no">174</span><span id="line-174"> void procedureAdded(long procId);</span> |
| <span class="source-line-no">175</span><span id="line-175"></span> |
| <span class="source-line-no">176</span><span id="line-176"> void procedureFinished(long procId);</span> |
| <span class="source-line-no">177</span><span id="line-177"> }</span> |
| <span class="source-line-no">178</span><span id="line-178"></span> |
| <span class="source-line-no">179</span><span id="line-179"> /**</span> |
| <span class="source-line-no">180</span><span id="line-180"> * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a</span> |
| <span class="source-line-no">181</span><span id="line-181"> * Root-Procedure completes (success or failure), the result will be added to this map. The user</span> |
| <span class="source-line-no">182</span><span id="line-182"> * of ProcedureExecutor should call getResult(procId) to get the result.</span> |
| <span class="source-line-no">183</span><span id="line-183"> */</span> |
| <span class="source-line-no">184</span><span id="line-184"> private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =</span> |
| <span class="source-line-no">185</span><span id="line-185"> new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">186</span><span id="line-186"></span> |
| <span class="source-line-no">187</span><span id="line-187"> /**</span> |
| <span class="source-line-no">188</span><span id="line-188"> * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.</span> |
| <span class="source-line-no">189</span><span id="line-189"> * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the</span> |
| <span class="source-line-no">190</span><span id="line-190"> * map by submitProcedure() and removed on procedure completion.</span> |
| <span class="source-line-no">191</span><span id="line-191"> */</span> |
| <span class="source-line-no">192</span><span id="line-192"> private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =</span> |
| <span class="source-line-no">193</span><span id="line-193"> new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">194</span><span id="line-194"></span> |
| <span class="source-line-no">195</span><span id="line-195"> /**</span> |
| <span class="source-line-no">196</span><span id="line-196"> * Helper map to lookup the live procedures by ID. This map contains every procedure.</span> |
| <span class="source-line-no">197</span><span id="line-197"> * root-procedures and subprocedures.</span> |
| <span class="source-line-no">198</span><span id="line-198"> */</span> |
| <span class="source-line-no">199</span><span id="line-199"> private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =</span> |
| <span class="source-line-no">200</span><span id="line-200"> new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">201</span><span id="line-201"></span> |
| <span class="source-line-no">202</span><span id="line-202"> /**</span> |
| <span class="source-line-no">203</span><span id="line-203"> * Helper map to lookup whether the procedure already issued from the same client. This map</span> |
| <span class="source-line-no">204</span><span id="line-204"> * contains every root procedure.</span> |
| <span class="source-line-no">205</span><span id="line-205"> */</span> |
| <span class="source-line-no">206</span><span id="line-206"> private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();</span> |
| <span class="source-line-no">207</span><span id="line-207"></span> |
| <span class="source-line-no">208</span><span id="line-208"> private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =</span> |
| <span class="source-line-no">209</span><span id="line-209"> new CopyOnWriteArrayList<>();</span> |
| <span class="source-line-no">210</span><span id="line-210"></span> |
| <span class="source-line-no">211</span><span id="line-211"> private Configuration conf;</span> |
| <span class="source-line-no">212</span><span id="line-212"></span> |
| <span class="source-line-no">213</span><span id="line-213"> /**</span> |
| <span class="source-line-no">214</span><span id="line-214"> * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing</span> |
| <span class="source-line-no">215</span><span id="line-215"> * resource handling rather than observing in a #join is unexpected). Overridden when we do the</span> |
| <span class="source-line-no">216</span><span id="line-216"> * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).</span> |
| <span class="source-line-no">217</span><span id="line-217"> */</span> |
| <span class="source-line-no">218</span><span id="line-218"> private ThreadGroup threadGroup;</span> |
| <span class="source-line-no">219</span><span id="line-219"></span> |
| <span class="source-line-no">220</span><span id="line-220"> /**</span> |
| <span class="source-line-no">221</span><span id="line-221"> * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing</span> |
| <span class="source-line-no">222</span><span id="line-222"> * resource handling rather than observing in a #join is unexpected). Overridden when we do the</span> |
| <span class="source-line-no">223</span><span id="line-223"> * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).</span> |
| <span class="source-line-no">224</span><span id="line-224"> */</span> |
| <span class="source-line-no">225</span><span id="line-225"> private CopyOnWriteArrayList<WorkerThread> workerThreads;</span> |
| <span class="source-line-no">226</span><span id="line-226"></span> |
| <span class="source-line-no">227</span><span id="line-227"> /**</span> |
| <span class="source-line-no">228</span><span id="line-228"> * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing</span> |
| <span class="source-line-no">229</span><span id="line-229"> * resource handling rather than observing in a #join is unexpected). Overridden when we do the</span> |
| <span class="source-line-no">230</span><span id="line-230"> * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok).</span> |
| <span class="source-line-no">231</span><span id="line-231"> */</span> |
| <span class="source-line-no">232</span><span id="line-232"> private TimeoutExecutorThread<TEnvironment> timeoutExecutor;</span> |
| <span class="source-line-no">233</span><span id="line-233"></span> |
| <span class="source-line-no">234</span><span id="line-234"> /**</span> |
| <span class="source-line-no">235</span><span id="line-235"> * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if</span> |
| <span class="source-line-no">236</span><span id="line-236"> * there is no worker to assign meta, it will new worker thread for it, so it is very important.</span> |
| <span class="source-line-no">237</span><span id="line-237"> * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and</span> |
| <span class="source-line-no">238</span><span id="line-238"> * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so</span> |
| <span class="source-line-no">239</span><span id="line-239"> * use a dedicated thread for executing WorkerMonitor.</span> |
| <span class="source-line-no">240</span><span id="line-240"> */</span> |
| <span class="source-line-no">241</span><span id="line-241"> private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;</span> |
| <span class="source-line-no">242</span><span id="line-242"></span> |
| <span class="source-line-no">243</span><span id="line-243"> private ExecutorService forceUpdateExecutor;</span> |
| <span class="source-line-no">244</span><span id="line-244"></span> |
| <span class="source-line-no">245</span><span id="line-245"> // A thread pool for executing some asynchronous tasks for procedures, you can find references to</span> |
| <span class="source-line-no">246</span><span id="line-246"> // getAsyncTaskExecutor to see the usage</span> |
| <span class="source-line-no">247</span><span id="line-247"> private ExecutorService asyncTaskExecutor;</span> |
| <span class="source-line-no">248</span><span id="line-248"></span> |
| <span class="source-line-no">249</span><span id="line-249"> private int corePoolSize;</span> |
| <span class="source-line-no">250</span><span id="line-250"> private int maxPoolSize;</span> |
| <span class="source-line-no">251</span><span id="line-251"></span> |
| <span class="source-line-no">252</span><span id="line-252"> private volatile long keepAliveTime;</span> |
| <span class="source-line-no">253</span><span id="line-253"></span> |
| <span class="source-line-no">254</span><span id="line-254"> /**</span> |
| <span class="source-line-no">255</span><span id="line-255"> * Scheduler/Queue that contains runnable procedures.</span> |
| <span class="source-line-no">256</span><span id="line-256"> */</span> |
| <span class="source-line-no">257</span><span id="line-257"> private final ProcedureScheduler scheduler;</span> |
| <span class="source-line-no">258</span><span id="line-258"></span> |
| <span class="source-line-no">259</span><span id="line-259"> private final AtomicLong lastProcId = new AtomicLong(-1);</span> |
| <span class="source-line-no">260</span><span id="line-260"> private final AtomicLong workerId = new AtomicLong(0);</span> |
| <span class="source-line-no">261</span><span id="line-261"> private final AtomicInteger activeExecutorCount = new AtomicInteger(0);</span> |
| <span class="source-line-no">262</span><span id="line-262"> private final AtomicBoolean running = new AtomicBoolean(false);</span> |
| <span class="source-line-no">263</span><span id="line-263"> private final TEnvironment environment;</span> |
| <span class="source-line-no">264</span><span id="line-264"> private final ProcedureStore store;</span> |
| <span class="source-line-no">265</span><span id="line-265"></span> |
| <span class="source-line-no">266</span><span id="line-266"> private final boolean checkOwnerSet;</span> |
| <span class="source-line-no">267</span><span id="line-267"></span> |
| <span class="source-line-no">268</span><span id="line-268"> // To prevent concurrent execution of the same procedure.</span> |
| <span class="source-line-no">269</span><span id="line-269"> // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the</span> |
| <span class="source-line-no">270</span><span id="line-270"> // procedure is woken up before we finish the suspend which causes the same procedures to be</span> |
| <span class="source-line-no">271</span><span id="line-271"> // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also</span> |
| <span class="source-line-no">272</span><span id="line-272"> // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent</span> |
| <span class="source-line-no">273</span><span id="line-273"> // execution of the same procedure.</span> |
| <span class="source-line-no">274</span><span id="line-274"> private final IdLock procExecutionLock = new IdLock();</span> |
| <span class="source-line-no">275</span><span id="line-275"></span> |
| <span class="source-line-no">276</span><span id="line-276"> public ProcedureExecutor(final Configuration conf, final TEnvironment environment,</span> |
| <span class="source-line-no">277</span><span id="line-277"> final ProcedureStore store) {</span> |
| <span class="source-line-no">278</span><span id="line-278"> this(conf, environment, store, new SimpleProcedureScheduler());</span> |
| <span class="source-line-no">279</span><span id="line-279"> }</span> |
| <span class="source-line-no">280</span><span id="line-280"></span> |
| <span class="source-line-no">281</span><span id="line-281"> private boolean isRootFinished(Procedure<?> proc) {</span> |
| <span class="source-line-no">282</span><span id="line-282"> Procedure<?> rootProc = procedures.get(proc.getRootProcId());</span> |
| <span class="source-line-no">283</span><span id="line-283"> return rootProc == null || rootProc.isFinished();</span> |
| <span class="source-line-no">284</span><span id="line-284"> }</span> |
| <span class="source-line-no">285</span><span id="line-285"></span> |
| <span class="source-line-no">286</span><span id="line-286"> private void forceUpdateProcedure(long procId) throws IOException {</span> |
| <span class="source-line-no">287</span><span id="line-287"> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);</span> |
| <span class="source-line-no">288</span><span id="line-288"> try {</span> |
| <span class="source-line-no">289</span><span id="line-289"> Procedure<TEnvironment> proc = procedures.get(procId);</span> |
| <span class="source-line-no">290</span><span id="line-290"> if (proc != null) {</span> |
| <span class="source-line-no">291</span><span id="line-291"> if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {</span> |
| <span class="source-line-no">292</span><span id="line-292"> LOG.debug("Procedure {} has already been finished and parent is succeeded,"</span> |
| <span class="source-line-no">293</span><span id="line-293"> + " skip force updating", proc);</span> |
| <span class="source-line-no">294</span><span id="line-294"> return;</span> |
| <span class="source-line-no">295</span><span id="line-295"> }</span> |
| <span class="source-line-no">296</span><span id="line-296"> } else {</span> |
| <span class="source-line-no">297</span><span id="line-297"> CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);</span> |
| <span class="source-line-no">298</span><span id="line-298"> if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {</span> |
| <span class="source-line-no">299</span><span id="line-299"> LOG.debug("No pending procedure with id = {}, skip force updating.", procId);</span> |
| <span class="source-line-no">300</span><span id="line-300"> return;</span> |
| <span class="source-line-no">301</span><span id="line-301"> }</span> |
| <span class="source-line-no">302</span><span id="line-302"> long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);</span> |
| <span class="source-line-no">303</span><span id="line-303"> long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);</span> |
| <span class="source-line-no">304</span><span id="line-304"> if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) {</span> |
| <span class="source-line-no">305</span><span id="line-305"> LOG.debug("Procedure {} has already been finished and expired, skip force updating",</span> |
| <span class="source-line-no">306</span><span id="line-306"> procId);</span> |
| <span class="source-line-no">307</span><span id="line-307"> return;</span> |
| <span class="source-line-no">308</span><span id="line-308"> }</span> |
| <span class="source-line-no">309</span><span id="line-309"> proc = retainer.getProcedure();</span> |
| <span class="source-line-no">310</span><span id="line-310"> }</span> |
| <span class="source-line-no">311</span><span id="line-311"> LOG.debug("Force update procedure {}", proc);</span> |
| <span class="source-line-no">312</span><span id="line-312"> store.update(proc);</span> |
| <span class="source-line-no">313</span><span id="line-313"> } finally {</span> |
| <span class="source-line-no">314</span><span id="line-314"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">315</span><span id="line-315"> }</span> |
| <span class="source-line-no">316</span><span id="line-316"> }</span> |
| <span class="source-line-no">317</span><span id="line-317"></span> |
| <span class="source-line-no">318</span><span id="line-318"> public ProcedureExecutor(final Configuration conf, final TEnvironment environment,</span> |
| <span class="source-line-no">319</span><span id="line-319"> final ProcedureStore store, final ProcedureScheduler scheduler) {</span> |
| <span class="source-line-no">320</span><span id="line-320"> this.environment = environment;</span> |
| <span class="source-line-no">321</span><span id="line-321"> this.scheduler = scheduler;</span> |
| <span class="source-line-no">322</span><span id="line-322"> this.store = store;</span> |
| <span class="source-line-no">323</span><span id="line-323"> this.conf = conf;</span> |
| <span class="source-line-no">324</span><span id="line-324"> this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);</span> |
| <span class="source-line-no">325</span><span id="line-325"> refreshConfiguration(conf);</span> |
| <span class="source-line-no">326</span><span id="line-326"> }</span> |
| <span class="source-line-no">327</span><span id="line-327"></span> |
| <span class="source-line-no">328</span><span id="line-328"> private void load(final boolean abortOnCorruption) throws IOException {</span> |
| <span class="source-line-no">329</span><span id="line-329"> Preconditions.checkArgument(completed.isEmpty(), "completed not empty: %s", completed);</span> |
| <span class="source-line-no">330</span><span id="line-330"> Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty: %s",</span> |
| <span class="source-line-no">331</span><span id="line-331"> rollbackStack);</span> |
| <span class="source-line-no">332</span><span id="line-332"> Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty: %s", procedures);</span> |
| <span class="source-line-no">333</span><span id="line-333"> Preconditions.checkArgument(scheduler.size() == 0, "scheduler queue not empty: %s", scheduler);</span> |
| <span class="source-line-no">334</span><span id="line-334"></span> |
| <span class="source-line-no">335</span><span id="line-335"> store.load(new ProcedureStore.ProcedureLoader() {</span> |
| <span class="source-line-no">336</span><span id="line-336"> @Override</span> |
| <span class="source-line-no">337</span><span id="line-337"> public void setMaxProcId(long maxProcId) {</span> |
| <span class="source-line-no">338</span><span id="line-338"> assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";</span> |
| <span class="source-line-no">339</span><span id="line-339"> lastProcId.set(maxProcId);</span> |
| <span class="source-line-no">340</span><span id="line-340"> }</span> |
| <span class="source-line-no">341</span><span id="line-341"></span> |
| <span class="source-line-no">342</span><span id="line-342"> @Override</span> |
| <span class="source-line-no">343</span><span id="line-343"> public void load(ProcedureIterator procIter) throws IOException {</span> |
| <span class="source-line-no">344</span><span id="line-344"> loadProcedures(procIter);</span> |
| <span class="source-line-no">345</span><span id="line-345"> }</span> |
| <span class="source-line-no">346</span><span id="line-346"></span> |
| <span class="source-line-no">347</span><span id="line-347"> @Override</span> |
| <span class="source-line-no">348</span><span id="line-348"> public void handleCorrupted(ProcedureIterator procIter) throws IOException {</span> |
| <span class="source-line-no">349</span><span id="line-349"> int corruptedCount = 0;</span> |
| <span class="source-line-no">350</span><span id="line-350"> while (procIter.hasNext()) {</span> |
| <span class="source-line-no">351</span><span id="line-351"> Procedure<?> proc = procIter.next();</span> |
| <span class="source-line-no">352</span><span id="line-352"> LOG.error("Corrupt " + proc);</span> |
| <span class="source-line-no">353</span><span id="line-353"> corruptedCount++;</span> |
| <span class="source-line-no">354</span><span id="line-354"> }</span> |
| <span class="source-line-no">355</span><span id="line-355"> if (abortOnCorruption && corruptedCount > 0) {</span> |
| <span class="source-line-no">356</span><span id="line-356"> throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");</span> |
| <span class="source-line-no">357</span><span id="line-357"> }</span> |
| <span class="source-line-no">358</span><span id="line-358"> }</span> |
| <span class="source-line-no">359</span><span id="line-359"> });</span> |
| <span class="source-line-no">360</span><span id="line-360"> }</span> |
| <span class="source-line-no">361</span><span id="line-361"></span> |
| <span class="source-line-no">362</span><span id="line-362"> private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {</span> |
| <span class="source-line-no">363</span><span id="line-363"> proc.restoreLock(getEnvironment());</span> |
| <span class="source-line-no">364</span><span id="line-364"> restored.add(proc.getProcId());</span> |
| <span class="source-line-no">365</span><span id="line-365"> }</span> |
| <span class="source-line-no">366</span><span id="line-366"></span> |
| <span class="source-line-no">367</span><span id="line-367"> private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {</span> |
| <span class="source-line-no">368</span><span id="line-368"> while (!stack.isEmpty()) {</span> |
| <span class="source-line-no">369</span><span id="line-369"> restoreLock(stack.pop(), restored);</span> |
| <span class="source-line-no">370</span><span id="line-370"> }</span> |
| <span class="source-line-no">371</span><span id="line-371"> }</span> |
| <span class="source-line-no">372</span><span id="line-372"></span> |
| <span class="source-line-no">373</span><span id="line-373"> // Restore the locks for all the procedures.</span> |
| <span class="source-line-no">374</span><span id="line-374"> // Notice that we need to restore the locks starting from the root proc, otherwise there will be</span> |
| <span class="source-line-no">375</span><span id="line-375"> // problem that a sub procedure may hold the exclusive lock first and then we are stuck when</span> |
| <span class="source-line-no">376</span><span id="line-376"> // calling the acquireLock method for the parent procedure.</span> |
| <span class="source-line-no">377</span><span id="line-377"> // The algorithm is straight-forward:</span> |
| <span class="source-line-no">378</span><span id="line-378"> // 1. Use a set to record the procedures which locks have already been restored.</span> |
| <span class="source-line-no">379</span><span id="line-379"> // 2. Use a stack to store the hierarchy of the procedures</span> |
| <span class="source-line-no">380</span><span id="line-380"> // 3. For all the procedure, we will first try to find its parent and push it into the stack,</span> |
| <span class="source-line-no">381</span><span id="line-381"> // unless</span> |
| <span class="source-line-no">382</span><span id="line-382"> // a. We have no parent, i.e, we are the root procedure</span> |
| <span class="source-line-no">383</span><span id="line-383"> // b. The lock has already been restored(by checking the set introduced in #1)</span> |
| <span class="source-line-no">384</span><span id="line-384"> // then we start to pop the stack and call acquireLock for each procedure.</span> |
| <span class="source-line-no">385</span><span id="line-385"> // Notice that this should be done for all procedures, not only the ones in runnableList.</span> |
| <span class="source-line-no">386</span><span id="line-386"> private void restoreLocks() {</span> |
| <span class="source-line-no">387</span><span id="line-387"> Set<Long> restored = new HashSet<>();</span> |
| <span class="source-line-no">388</span><span id="line-388"> Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();</span> |
| <span class="source-line-no">389</span><span id="line-389"> procedures.values().forEach(proc -> {</span> |
| <span class="source-line-no">390</span><span id="line-390"> for (;;) {</span> |
| <span class="source-line-no">391</span><span id="line-391"> if (restored.contains(proc.getProcId())) {</span> |
| <span class="source-line-no">392</span><span id="line-392"> restoreLocks(stack, restored);</span> |
| <span class="source-line-no">393</span><span id="line-393"> return;</span> |
| <span class="source-line-no">394</span><span id="line-394"> }</span> |
| <span class="source-line-no">395</span><span id="line-395"> if (!proc.hasParent()) {</span> |
| <span class="source-line-no">396</span><span id="line-396"> restoreLock(proc, restored);</span> |
| <span class="source-line-no">397</span><span id="line-397"> restoreLocks(stack, restored);</span> |
| <span class="source-line-no">398</span><span id="line-398"> return;</span> |
| <span class="source-line-no">399</span><span id="line-399"> }</span> |
| <span class="source-line-no">400</span><span id="line-400"> stack.push(proc);</span> |
| <span class="source-line-no">401</span><span id="line-401"> proc = procedures.get(proc.getParentProcId());</span> |
| <span class="source-line-no">402</span><span id="line-402"> }</span> |
| <span class="source-line-no">403</span><span id="line-403"> });</span> |
| <span class="source-line-no">404</span><span id="line-404"> }</span> |
| <span class="source-line-no">405</span><span id="line-405"></span> |
| <span class="source-line-no">406</span><span id="line-406"> private void initializeStacks(ProcedureIterator procIter,</span> |
| <span class="source-line-no">407</span><span id="line-407"> List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList,</span> |
| <span class="source-line-no">408</span><span id="line-408"> List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList)</span> |
| <span class="source-line-no">409</span><span id="line-409"> throws IOException {</span> |
| <span class="source-line-no">410</span><span id="line-410"> procIter.reset();</span> |
| <span class="source-line-no">411</span><span id="line-411"> while (procIter.hasNext()) {</span> |
| <span class="source-line-no">412</span><span id="line-412"> if (procIter.isNextFinished()) {</span> |
| <span class="source-line-no">413</span><span id="line-413"> procIter.skipNext();</span> |
| <span class="source-line-no">414</span><span id="line-414"> continue;</span> |
| <span class="source-line-no">415</span><span id="line-415"> }</span> |
| <span class="source-line-no">416</span><span id="line-416"></span> |
| <span class="source-line-no">417</span><span id="line-417"> @SuppressWarnings("unchecked")</span> |
| <span class="source-line-no">418</span><span id="line-418"> Procedure<TEnvironment> proc = procIter.next();</span> |
| <span class="source-line-no">419</span><span id="line-419"> assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;</span> |
| <span class="source-line-no">420</span><span id="line-420"> LOG.debug("Loading {}", proc);</span> |
| <span class="source-line-no">421</span><span id="line-421"> Long rootProcId = getRootProcedureId(proc);</span> |
| <span class="source-line-no">422</span><span id="line-422"> // The orphan procedures will be passed to handleCorrupted, so add an assert here</span> |
| <span class="source-line-no">423</span><span id="line-423"> assert rootProcId != null;</span> |
| <span class="source-line-no">424</span><span id="line-424"></span> |
| <span class="source-line-no">425</span><span id="line-425"> if (proc.hasParent()) {</span> |
| <span class="source-line-no">426</span><span id="line-426"> Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());</span> |
| <span class="source-line-no">427</span><span id="line-427"> if (parent != null && !proc.isFinished()) {</span> |
| <span class="source-line-no">428</span><span id="line-428"> parent.incChildrenLatch();</span> |
| <span class="source-line-no">429</span><span id="line-429"> }</span> |
| <span class="source-line-no">430</span><span id="line-430"> }</span> |
| <span class="source-line-no">431</span><span id="line-431"></span> |
| <span class="source-line-no">432</span><span id="line-432"> RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);</span> |
| <span class="source-line-no">433</span><span id="line-433"> procStack.loadStack(proc);</span> |
| <span class="source-line-no">434</span><span id="line-434"></span> |
| <span class="source-line-no">435</span><span id="line-435"> proc.setRootProcId(rootProcId);</span> |
| <span class="source-line-no">436</span><span id="line-436"> switch (proc.getState()) {</span> |
| <span class="source-line-no">437</span><span id="line-437"> case RUNNABLE:</span> |
| <span class="source-line-no">438</span><span id="line-438"> runnableList.add(proc);</span> |
| <span class="source-line-no">439</span><span id="line-439"> break;</span> |
| <span class="source-line-no">440</span><span id="line-440"> case WAITING:</span> |
| <span class="source-line-no">441</span><span id="line-441"> waitingList.add(proc);</span> |
| <span class="source-line-no">442</span><span id="line-442"> break;</span> |
| <span class="source-line-no">443</span><span id="line-443"> case WAITING_TIMEOUT:</span> |
| <span class="source-line-no">444</span><span id="line-444"> waitingTimeoutList.add(proc);</span> |
| <span class="source-line-no">445</span><span id="line-445"> break;</span> |
| <span class="source-line-no">446</span><span id="line-446"> case FAILED:</span> |
| <span class="source-line-no">447</span><span id="line-447"> failedList.add(proc);</span> |
| <span class="source-line-no">448</span><span id="line-448"> break;</span> |
| <span class="source-line-no">449</span><span id="line-449"> case ROLLEDBACK:</span> |
| <span class="source-line-no">450</span><span id="line-450"> case INITIALIZING:</span> |
| <span class="source-line-no">451</span><span id="line-451"> String msg = "Unexpected " + proc.getState() + " state for " + proc;</span> |
| <span class="source-line-no">452</span><span id="line-452"> LOG.error(msg);</span> |
| <span class="source-line-no">453</span><span id="line-453"> throw new UnsupportedOperationException(msg);</span> |
| <span class="source-line-no">454</span><span id="line-454"> default:</span> |
| <span class="source-line-no">455</span><span id="line-455"> break;</span> |
| <span class="source-line-no">456</span><span id="line-456"> }</span> |
| <span class="source-line-no">457</span><span id="line-457"> }</span> |
| <span class="source-line-no">458</span><span id="line-458"> rollbackStack.forEach((rootProcId, procStack) -> {</span> |
| <span class="source-line-no">459</span><span id="line-459"> if (procStack.getSubproceduresStack() != null) {</span> |
| <span class="source-line-no">460</span><span id="line-460"> // if we have already record some stack ids, it means we support rollback</span> |
| <span class="source-line-no">461</span><span id="line-461"> procStack.setRollbackSupported(true);</span> |
| <span class="source-line-no">462</span><span id="line-462"> } else {</span> |
| <span class="source-line-no">463</span><span id="line-463"> // otherwise, test the root procedure to see if we support rollback</span> |
| <span class="source-line-no">464</span><span id="line-464"> procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported());</span> |
| <span class="source-line-no">465</span><span id="line-465"> }</span> |
| <span class="source-line-no">466</span><span id="line-466"> });</span> |
| <span class="source-line-no">467</span><span id="line-467"> }</span> |
| <span class="source-line-no">468</span><span id="line-468"></span> |
| <span class="source-line-no">469</span><span id="line-469"> private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList,</span> |
| <span class="source-line-no">470</span><span id="line-470"> List<Procedure<TEnvironment>> runnableList) {</span> |
| <span class="source-line-no">471</span><span id="line-471"> waitingList.forEach(proc -> {</span> |
| <span class="source-line-no">472</span><span id="line-472"> if (!proc.hasChildren()) {</span> |
| <span class="source-line-no">473</span><span id="line-473"> // Normally, WAITING procedures should be waken by its children. But, there is a case that,</span> |
| <span class="source-line-no">474</span><span id="line-474"> // all the children are successful and before they can wake up their parent procedure, the</span> |
| <span class="source-line-no">475</span><span id="line-475"> // master was killed. So, during recovering the procedures from ProcedureWal, its children</span> |
| <span class="source-line-no">476</span><span id="line-476"> // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING</span> |
| <span class="source-line-no">477</span><span id="line-477"> // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a</span> |
| <span class="source-line-no">478</span><span id="line-478"> // exception will throw:</span> |
| <span class="source-line-no">479</span><span id="line-479"> // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,</span> |
| <span class="source-line-no">480</span><span id="line-480"> // "NOT RUNNABLE! " + procedure.toString());</span> |
| <span class="source-line-no">481</span><span id="line-481"> proc.setState(ProcedureState.RUNNABLE);</span> |
| <span class="source-line-no">482</span><span id="line-482"> runnableList.add(proc);</span> |
| <span class="source-line-no">483</span><span id="line-483"> } else {</span> |
| <span class="source-line-no">484</span><span id="line-484"> proc.afterReplay(getEnvironment());</span> |
| <span class="source-line-no">485</span><span id="line-485"> }</span> |
| <span class="source-line-no">486</span><span id="line-486"> });</span> |
| <span class="source-line-no">487</span><span id="line-487"> }</span> |
| <span class="source-line-no">488</span><span id="line-488"></span> |
| <span class="source-line-no">489</span><span id="line-489"> private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) {</span> |
| <span class="source-line-no">490</span><span id="line-490"> waitingTimeoutList.forEach(proc -> {</span> |
| <span class="source-line-no">491</span><span id="line-491"> proc.afterReplay(getEnvironment());</span> |
| <span class="source-line-no">492</span><span id="line-492"> timeoutExecutor.add(proc);</span> |
| <span class="source-line-no">493</span><span id="line-493"> });</span> |
| <span class="source-line-no">494</span><span id="line-494"> }</span> |
| <span class="source-line-no">495</span><span id="line-495"></span> |
| <span class="source-line-no">496</span><span id="line-496"> private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList,</span> |
| <span class="source-line-no">497</span><span id="line-497"> List<Procedure<TEnvironment>> failedList) {</span> |
| <span class="source-line-no">498</span><span id="line-498"> failedList.forEach(scheduler::addBack);</span> |
| <span class="source-line-no">499</span><span id="line-499"> runnableList.forEach(p -> {</span> |
| <span class="source-line-no">500</span><span id="line-500"> p.afterReplay(getEnvironment());</span> |
| <span class="source-line-no">501</span><span id="line-501"> if (!p.hasParent()) {</span> |
| <span class="source-line-no">502</span><span id="line-502"> sendProcedureLoadedNotification(p.getProcId());</span> |
| <span class="source-line-no">503</span><span id="line-503"> }</span> |
| <span class="source-line-no">504</span><span id="line-504"> scheduler.addBack(p);</span> |
| <span class="source-line-no">505</span><span id="line-505"> });</span> |
| <span class="source-line-no">506</span><span id="line-506"> }</span> |
| <span class="source-line-no">507</span><span id="line-507"></span> |
| <span class="source-line-no">508</span><span id="line-508"> private void loadProcedures(ProcedureIterator procIter) throws IOException {</span> |
| <span class="source-line-no">509</span><span id="line-509"> // 1. Build the rollback stack</span> |
| <span class="source-line-no">510</span><span id="line-510"> int runnableCount = 0;</span> |
| <span class="source-line-no">511</span><span id="line-511"> int failedCount = 0;</span> |
| <span class="source-line-no">512</span><span id="line-512"> int waitingCount = 0;</span> |
| <span class="source-line-no">513</span><span id="line-513"> int waitingTimeoutCount = 0;</span> |
| <span class="source-line-no">514</span><span id="line-514"> while (procIter.hasNext()) {</span> |
| <span class="source-line-no">515</span><span id="line-515"> boolean finished = procIter.isNextFinished();</span> |
| <span class="source-line-no">516</span><span id="line-516"> @SuppressWarnings("unchecked")</span> |
| <span class="source-line-no">517</span><span id="line-517"> Procedure<TEnvironment> proc = procIter.next();</span> |
| <span class="source-line-no">518</span><span id="line-518"> NonceKey nonceKey = proc.getNonceKey();</span> |
| <span class="source-line-no">519</span><span id="line-519"> long procId = proc.getProcId();</span> |
| <span class="source-line-no">520</span><span id="line-520"></span> |
| <span class="source-line-no">521</span><span id="line-521"> if (finished) {</span> |
| <span class="source-line-no">522</span><span id="line-522"> completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));</span> |
| <span class="source-line-no">523</span><span id="line-523"> LOG.debug("Completed {}", proc);</span> |
| <span class="source-line-no">524</span><span id="line-524"> } else {</span> |
| <span class="source-line-no">525</span><span id="line-525"> if (!proc.hasParent()) {</span> |
| <span class="source-line-no">526</span><span id="line-526"> assert !proc.isFinished() : "unexpected finished procedure";</span> |
| <span class="source-line-no">527</span><span id="line-527"> rollbackStack.put(proc.getProcId(), new RootProcedureState<>());</span> |
| <span class="source-line-no">528</span><span id="line-528"> }</span> |
| <span class="source-line-no">529</span><span id="line-529"></span> |
| <span class="source-line-no">530</span><span id="line-530"> // add the procedure to the map</span> |
| <span class="source-line-no">531</span><span id="line-531"> proc.beforeReplay(getEnvironment());</span> |
| <span class="source-line-no">532</span><span id="line-532"> procedures.put(proc.getProcId(), proc);</span> |
| <span class="source-line-no">533</span><span id="line-533"> switch (proc.getState()) {</span> |
| <span class="source-line-no">534</span><span id="line-534"> case RUNNABLE:</span> |
| <span class="source-line-no">535</span><span id="line-535"> runnableCount++;</span> |
| <span class="source-line-no">536</span><span id="line-536"> break;</span> |
| <span class="source-line-no">537</span><span id="line-537"> case FAILED:</span> |
| <span class="source-line-no">538</span><span id="line-538"> failedCount++;</span> |
| <span class="source-line-no">539</span><span id="line-539"> break;</span> |
| <span class="source-line-no">540</span><span id="line-540"> case WAITING:</span> |
| <span class="source-line-no">541</span><span id="line-541"> waitingCount++;</span> |
| <span class="source-line-no">542</span><span id="line-542"> break;</span> |
| <span class="source-line-no">543</span><span id="line-543"> case WAITING_TIMEOUT:</span> |
| <span class="source-line-no">544</span><span id="line-544"> waitingTimeoutCount++;</span> |
| <span class="source-line-no">545</span><span id="line-545"> break;</span> |
| <span class="source-line-no">546</span><span id="line-546"> default:</span> |
| <span class="source-line-no">547</span><span id="line-547"> break;</span> |
| <span class="source-line-no">548</span><span id="line-548"> }</span> |
| <span class="source-line-no">549</span><span id="line-549"> }</span> |
| <span class="source-line-no">550</span><span id="line-550"></span> |
| <span class="source-line-no">551</span><span id="line-551"> if (nonceKey != null) {</span> |
| <span class="source-line-no">552</span><span id="line-552"> nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map</span> |
| <span class="source-line-no">553</span><span id="line-553"> }</span> |
| <span class="source-line-no">554</span><span id="line-554"> }</span> |
| <span class="source-line-no">555</span><span id="line-555"></span> |
| <span class="source-line-no">556</span><span id="line-556"> // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will</span> |
| <span class="source-line-no">557</span><span id="line-557"> // push it into the ProcedureScheduler directly to execute the rollback. But this does not work</span> |
| <span class="source-line-no">558</span><span id="line-558"> // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove</span> |
| <span class="source-line-no">559</span><span id="line-559"> // the queue from runQueue in scheduler, and then when a procedure which has lock access, for</span> |
| <span class="source-line-no">560</span><span id="line-560"> // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,</span> |
| <span class="source-line-no">561</span><span id="line-561"> // we will add the queue back to let the workers poll from it. The assumption here is that, the</span> |
| <span class="source-line-no">562</span><span id="line-562"> // procedure which has the xlock should have been polled out already, so when loading we can not</span> |
| <span class="source-line-no">563</span><span id="line-563"> // add the procedure to scheduler first and then call acquireLock, since the procedure is still</span> |
| <span class="source-line-no">564</span><span id="line-564"> // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,</span> |
| <span class="source-line-no">565</span><span id="line-565"> // then there is a dead lock</span> |
| <span class="source-line-no">566</span><span id="line-566"> List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);</span> |
| <span class="source-line-no">567</span><span id="line-567"> List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);</span> |
| <span class="source-line-no">568</span><span id="line-568"> List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);</span> |
| <span class="source-line-no">569</span><span id="line-569"> List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);</span> |
| <span class="source-line-no">570</span><span id="line-570"></span> |
| <span class="source-line-no">571</span><span id="line-571"> initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList);</span> |
| <span class="source-line-no">572</span><span id="line-572"></span> |
| <span class="source-line-no">573</span><span id="line-573"> // 3. Check the waiting procedures to see if some of them can be added to runnable.</span> |
| <span class="source-line-no">574</span><span id="line-574"> processWaitingProcedures(waitingList, runnableList);</span> |
| <span class="source-line-no">575</span><span id="line-575"></span> |
| <span class="source-line-no">576</span><span id="line-576"> // 4. restore locks</span> |
| <span class="source-line-no">577</span><span id="line-577"> restoreLocks();</span> |
| <span class="source-line-no">578</span><span id="line-578"></span> |
| <span class="source-line-no">579</span><span id="line-579"> // 5. Push the procedures to the timeout executor</span> |
| <span class="source-line-no">580</span><span id="line-580"> processWaitingTimeoutProcedures(waitingTimeoutList);</span> |
| <span class="source-line-no">581</span><span id="line-581"></span> |
| <span class="source-line-no">582</span><span id="line-582"> // 6. Push the procedure to the scheduler</span> |
| <span class="source-line-no">583</span><span id="line-583"> pushProceduresAfterLoad(runnableList, failedList);</span> |
| <span class="source-line-no">584</span><span id="line-584"> // After all procedures put into the queue, signal the worker threads.</span> |
| <span class="source-line-no">585</span><span id="line-585"> // Otherwise, there is a race condition. See HBASE-21364.</span> |
| <span class="source-line-no">586</span><span id="line-586"> scheduler.signalAll();</span> |
| <span class="source-line-no">587</span><span id="line-587"> }</span> |
| <span class="source-line-no">588</span><span id="line-588"></span> |
| <span class="source-line-no">589</span><span id="line-589"> /**</span> |
| <span class="source-line-no">590</span><span id="line-590"> * Initialize the procedure executor, but do not start workers. We will start them later.</span> |
| <span class="source-line-no">591</span><span id="line-591"> * <p/></span> |
| <span class="source-line-no">592</span><span id="line-592"> * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and</span> |
| <span class="source-line-no">593</span><span id="line-593"> * ensure a single executor, and start the procedure replay to resume and recover the previous</span> |
| <span class="source-line-no">594</span><span id="line-594"> * pending and in-progress procedures.</span> |
| <span class="source-line-no">595</span><span id="line-595"> * @param numThreads number of threads available for procedure execution.</span> |
| <span class="source-line-no">596</span><span id="line-596"> * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure</span> |
| <span class="source-line-no">597</span><span id="line-597"> * is found on replay. otherwise false.</span> |
| <span class="source-line-no">598</span><span id="line-598"> */</span> |
| <span class="source-line-no">599</span><span id="line-599"> public void init(int numThreads, boolean abortOnCorruption) throws IOException {</span> |
| <span class="source-line-no">600</span><span id="line-600"> // We have numThreads executor + one timer thread used for timing out</span> |
| <span class="source-line-no">601</span><span id="line-601"> // procedures and triggering periodic procedures.</span> |
| <span class="source-line-no">602</span><span id="line-602"> this.corePoolSize = numThreads;</span> |
| <span class="source-line-no">603</span><span id="line-603"> this.maxPoolSize = 10 * numThreads;</span> |
| <span class="source-line-no">604</span><span id="line-604"> LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",</span> |
| <span class="source-line-no">605</span><span id="line-605"> corePoolSize, maxPoolSize);</span> |
| <span class="source-line-no">606</span><span id="line-606"></span> |
| <span class="source-line-no">607</span><span id="line-607"> this.threadGroup = new ThreadGroup("PEWorkerGroup");</span> |
| <span class="source-line-no">608</span><span id="line-608"> this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");</span> |
| <span class="source-line-no">609</span><span id="line-609"> this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");</span> |
| <span class="source-line-no">610</span><span id="line-610"> ThreadFactory backingThreadFactory = new ThreadFactory() {</span> |
| <span class="source-line-no">611</span><span id="line-611"></span> |
| <span class="source-line-no">612</span><span id="line-612"> @Override</span> |
| <span class="source-line-no">613</span><span id="line-613"> public Thread newThread(Runnable r) {</span> |
| <span class="source-line-no">614</span><span id="line-614"> return new Thread(threadGroup, r);</span> |
| <span class="source-line-no">615</span><span id="line-615"> }</span> |
| <span class="source-line-no">616</span><span id="line-616"> };</span> |
| <span class="source-line-no">617</span><span id="line-617"> int size = Math.max(2, Runtime.getRuntime().availableProcessors());</span> |
| <span class="source-line-no">618</span><span id="line-618"> ThreadPoolExecutor executor =</span> |
| <span class="source-line-no">619</span><span id="line-619"> new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(),</span> |
| <span class="source-line-no">620</span><span id="line-620"> new ThreadFactoryBuilder().setDaemon(true)</span> |
| <span class="source-line-no">621</span><span id="line-621"> .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d")</span> |
| <span class="source-line-no">622</span><span id="line-622"> .setThreadFactory(backingThreadFactory).build());</span> |
| <span class="source-line-no">623</span><span id="line-623"> executor.allowCoreThreadTimeOut(true);</span> |
| <span class="source-line-no">624</span><span id="line-624"> this.asyncTaskExecutor = executor;</span> |
| <span class="source-line-no">625</span><span id="line-625"> forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)</span> |
| <span class="source-line-no">626</span><span id="line-626"> .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());</span> |
| <span class="source-line-no">627</span><span id="line-627"> store.registerListener(new ProcedureStoreListener() {</span> |
| <span class="source-line-no">628</span><span id="line-628"></span> |
| <span class="source-line-no">629</span><span id="line-629"> @Override</span> |
| <span class="source-line-no">630</span><span id="line-630"> public void forceUpdate(long[] procIds) {</span> |
| <span class="source-line-no">631</span><span id="line-631"> Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {</span> |
| <span class="source-line-no">632</span><span id="line-632"> try {</span> |
| <span class="source-line-no">633</span><span id="line-633"> forceUpdateProcedure(procId);</span> |
| <span class="source-line-no">634</span><span id="line-634"> } catch (IOException e) {</span> |
| <span class="source-line-no">635</span><span id="line-635"> LOG.warn("Failed to force update procedure with pid={}", procId);</span> |
| <span class="source-line-no">636</span><span id="line-636"> }</span> |
| <span class="source-line-no">637</span><span id="line-637"> }));</span> |
| <span class="source-line-no">638</span><span id="line-638"> }</span> |
| <span class="source-line-no">639</span><span id="line-639"> });</span> |
| <span class="source-line-no">640</span><span id="line-640"></span> |
| <span class="source-line-no">641</span><span id="line-641"> // Create the workers</span> |
| <span class="source-line-no">642</span><span id="line-642"> workerId.set(0);</span> |
| <span class="source-line-no">643</span><span id="line-643"> workerThreads = new CopyOnWriteArrayList<>();</span> |
| <span class="source-line-no">644</span><span id="line-644"> for (int i = 0; i < corePoolSize; ++i) {</span> |
| <span class="source-line-no">645</span><span id="line-645"> workerThreads.add(new WorkerThread(threadGroup));</span> |
| <span class="source-line-no">646</span><span id="line-646"> }</span> |
| <span class="source-line-no">647</span><span id="line-647"></span> |
| <span class="source-line-no">648</span><span id="line-648"> long st, et;</span> |
| <span class="source-line-no">649</span><span id="line-649"></span> |
| <span class="source-line-no">650</span><span id="line-650"> // Acquire the store lease.</span> |
| <span class="source-line-no">651</span><span id="line-651"> st = System.nanoTime();</span> |
| <span class="source-line-no">652</span><span id="line-652"> store.recoverLease();</span> |
| <span class="source-line-no">653</span><span id="line-653"> et = System.nanoTime();</span> |
| <span class="source-line-no">654</span><span id="line-654"> LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),</span> |
| <span class="source-line-no">655</span><span id="line-655"> StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));</span> |
| <span class="source-line-no">656</span><span id="line-656"></span> |
| <span class="source-line-no">657</span><span id="line-657"> // start the procedure scheduler</span> |
| <span class="source-line-no">658</span><span id="line-658"> scheduler.start();</span> |
| <span class="source-line-no">659</span><span id="line-659"></span> |
| <span class="source-line-no">660</span><span id="line-660"> // TODO: Split in two steps.</span> |
| <span class="source-line-no">661</span><span id="line-661"> // TODO: Handle corrupted procedures (currently just a warn)</span> |
| <span class="source-line-no">662</span><span id="line-662"> // The first one will make sure that we have the latest id,</span> |
| <span class="source-line-no">663</span><span id="line-663"> // so we can start the threads and accept new procedures.</span> |
| <span class="source-line-no">664</span><span id="line-664"> // The second step will do the actual load of old procedures.</span> |
| <span class="source-line-no">665</span><span id="line-665"> st = System.nanoTime();</span> |
| <span class="source-line-no">666</span><span id="line-666"> load(abortOnCorruption);</span> |
| <span class="source-line-no">667</span><span id="line-667"> et = System.nanoTime();</span> |
| <span class="source-line-no">668</span><span id="line-668"> LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),</span> |
| <span class="source-line-no">669</span><span id="line-669"> StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));</span> |
| <span class="source-line-no">670</span><span id="line-670"> }</span> |
| <span class="source-line-no">671</span><span id="line-671"></span> |
| <span class="source-line-no">672</span><span id="line-672"> /**</span> |
| <span class="source-line-no">673</span><span id="line-673"> * Start the workers.</span> |
| <span class="source-line-no">674</span><span id="line-674"> */</span> |
| <span class="source-line-no">675</span><span id="line-675"> public void startWorkers() throws IOException {</span> |
| <span class="source-line-no">676</span><span id="line-676"> if (!running.compareAndSet(false, true)) {</span> |
| <span class="source-line-no">677</span><span id="line-677"> LOG.warn("Already running");</span> |
| <span class="source-line-no">678</span><span id="line-678"> return;</span> |
| <span class="source-line-no">679</span><span id="line-679"> }</span> |
| <span class="source-line-no">680</span><span id="line-680"> // Start the executors. Here we must have the lastProcId set.</span> |
| <span class="source-line-no">681</span><span id="line-681"> LOG.trace("Start workers {}", workerThreads.size());</span> |
| <span class="source-line-no">682</span><span id="line-682"> timeoutExecutor.start();</span> |
| <span class="source-line-no">683</span><span id="line-683"> workerMonitorExecutor.start();</span> |
| <span class="source-line-no">684</span><span id="line-684"> for (WorkerThread worker : workerThreads) {</span> |
| <span class="source-line-no">685</span><span id="line-685"> worker.start();</span> |
| <span class="source-line-no">686</span><span id="line-686"> }</span> |
| <span class="source-line-no">687</span><span id="line-687"></span> |
| <span class="source-line-no">688</span><span id="line-688"> // Internal chores</span> |
| <span class="source-line-no">689</span><span id="line-689"> workerMonitorExecutor.add(new WorkerMonitor());</span> |
| <span class="source-line-no">690</span><span id="line-690"></span> |
| <span class="source-line-no">691</span><span id="line-691"> // Add completed cleaner chore</span> |
| <span class="source-line-no">692</span><span id="line-692"> addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,</span> |
| <span class="source-line-no">693</span><span id="line-693"> nonceKeysToProcIdsMap));</span> |
| <span class="source-line-no">694</span><span id="line-694"> }</span> |
| <span class="source-line-no">695</span><span id="line-695"></span> |
| <span class="source-line-no">696</span><span id="line-696"> public void stop() {</span> |
| <span class="source-line-no">697</span><span id="line-697"> // it is possible that we fail in init, while loading procedures, so we will not set running to</span> |
| <span class="source-line-no">698</span><span id="line-698"> // true but we should have already started the ProcedureScheduler, and also the two</span> |
| <span class="source-line-no">699</span><span id="line-699"> // ExecutorServices, so here we do not check running state, just stop them</span> |
| <span class="source-line-no">700</span><span id="line-700"> running.set(false);</span> |
| <span class="source-line-no">701</span><span id="line-701"> LOG.info("Stopping");</span> |
| <span class="source-line-no">702</span><span id="line-702"> scheduler.stop();</span> |
| <span class="source-line-no">703</span><span id="line-703"> timeoutExecutor.sendStopSignal();</span> |
| <span class="source-line-no">704</span><span id="line-704"> workerMonitorExecutor.sendStopSignal();</span> |
| <span class="source-line-no">705</span><span id="line-705"> forceUpdateExecutor.shutdown();</span> |
| <span class="source-line-no">706</span><span id="line-706"> asyncTaskExecutor.shutdown();</span> |
| <span class="source-line-no">707</span><span id="line-707"> }</span> |
| <span class="source-line-no">708</span><span id="line-708"></span> |
| <span class="source-line-no">709</span><span id="line-709"> public void join() {</span> |
| <span class="source-line-no">710</span><span id="line-710"> assert !isRunning() : "expected not running";</span> |
| <span class="source-line-no">711</span><span id="line-711"></span> |
| <span class="source-line-no">712</span><span id="line-712"> // stop the timeout executor</span> |
| <span class="source-line-no">713</span><span id="line-713"> timeoutExecutor.awaitTermination();</span> |
| <span class="source-line-no">714</span><span id="line-714"> // stop the work monitor executor</span> |
| <span class="source-line-no">715</span><span id="line-715"> workerMonitorExecutor.awaitTermination();</span> |
| <span class="source-line-no">716</span><span id="line-716"></span> |
| <span class="source-line-no">717</span><span id="line-717"> // stop the worker threads</span> |
| <span class="source-line-no">718</span><span id="line-718"> for (WorkerThread worker : workerThreads) {</span> |
| <span class="source-line-no">719</span><span id="line-719"> worker.awaitTermination();</span> |
| <span class="source-line-no">720</span><span id="line-720"> }</span> |
| <span class="source-line-no">721</span><span id="line-721"> try {</span> |
| <span class="source-line-no">722</span><span id="line-722"> if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {</span> |
| <span class="source-line-no">723</span><span id="line-723"> LOG.warn("There are still pending tasks in forceUpdateExecutor");</span> |
| <span class="source-line-no">724</span><span id="line-724"> }</span> |
| <span class="source-line-no">725</span><span id="line-725"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">726</span><span id="line-726"> LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e);</span> |
| <span class="source-line-no">727</span><span id="line-727"> Thread.currentThread().interrupt();</span> |
| <span class="source-line-no">728</span><span id="line-728"> }</span> |
| <span class="source-line-no">729</span><span id="line-729"> try {</span> |
| <span class="source-line-no">730</span><span id="line-730"> if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) {</span> |
| <span class="source-line-no">731</span><span id="line-731"> LOG.warn("There are still pending tasks in asyncTaskExecutor");</span> |
| <span class="source-line-no">732</span><span id="line-732"> }</span> |
| <span class="source-line-no">733</span><span id="line-733"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">734</span><span id="line-734"> LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e);</span> |
| <span class="source-line-no">735</span><span id="line-735"> Thread.currentThread().interrupt();</span> |
| <span class="source-line-no">736</span><span id="line-736"> }</span> |
| <span class="source-line-no">737</span><span id="line-737"></span> |
| <span class="source-line-no">738</span><span id="line-738"> // log the still active threads, ThreadGroup.destroy is deprecated in JDK17 and it is not</span> |
| <span class="source-line-no">739</span><span id="line-739"> // necessary for us to must destroy it here, so we just do a check and log</span> |
| <span class="source-line-no">740</span><span id="line-740"> if (threadGroup.activeCount() > 0) {</span> |
| <span class="source-line-no">741</span><span id="line-741"> LOG.error("There are still active thread in group {}, see STDOUT", threadGroup);</span> |
| <span class="source-line-no">742</span><span id="line-742"> threadGroup.list();</span> |
| <span class="source-line-no">743</span><span id="line-743"> }</span> |
| <span class="source-line-no">744</span><span id="line-744"></span> |
| <span class="source-line-no">745</span><span id="line-745"> // reset the in-memory state for testing</span> |
| <span class="source-line-no">746</span><span id="line-746"> completed.clear();</span> |
| <span class="source-line-no">747</span><span id="line-747"> rollbackStack.clear();</span> |
| <span class="source-line-no">748</span><span id="line-748"> procedures.clear();</span> |
| <span class="source-line-no">749</span><span id="line-749"> nonceKeysToProcIdsMap.clear();</span> |
| <span class="source-line-no">750</span><span id="line-750"> scheduler.clear();</span> |
| <span class="source-line-no">751</span><span id="line-751"> lastProcId.set(-1);</span> |
| <span class="source-line-no">752</span><span id="line-752"> }</span> |
| <span class="source-line-no">753</span><span id="line-753"></span> |
| <span class="source-line-no">754</span><span id="line-754"> public void refreshConfiguration(final Configuration conf) {</span> |
| <span class="source-line-no">755</span><span id="line-755"> this.conf = conf;</span> |
| <span class="source-line-no">756</span><span id="line-756"> setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME),</span> |
| <span class="source-line-no">757</span><span id="line-757"> TimeUnit.MILLISECONDS);</span> |
| <span class="source-line-no">758</span><span id="line-758"> }</span> |
| <span class="source-line-no">759</span><span id="line-759"></span> |
| <span class="source-line-no">760</span><span id="line-760"> // ==========================================================================</span> |
| <span class="source-line-no">761</span><span id="line-761"> // Accessors</span> |
| <span class="source-line-no">762</span><span id="line-762"> // ==========================================================================</span> |
| <span class="source-line-no">763</span><span id="line-763"> public boolean isRunning() {</span> |
| <span class="source-line-no">764</span><span id="line-764"> return running.get();</span> |
| <span class="source-line-no">765</span><span id="line-765"> }</span> |
| <span class="source-line-no">766</span><span id="line-766"></span> |
| <span class="source-line-no">767</span><span id="line-767"> /** Returns the current number of worker threads. */</span> |
| <span class="source-line-no">768</span><span id="line-768"> public int getWorkerThreadCount() {</span> |
| <span class="source-line-no">769</span><span id="line-769"> return workerThreads.size();</span> |
| <span class="source-line-no">770</span><span id="line-770"> }</span> |
| <span class="source-line-no">771</span><span id="line-771"></span> |
| <span class="source-line-no">772</span><span id="line-772"> /** Returns the core pool size settings. */</span> |
| <span class="source-line-no">773</span><span id="line-773"> public int getCorePoolSize() {</span> |
| <span class="source-line-no">774</span><span id="line-774"> return corePoolSize;</span> |
| <span class="source-line-no">775</span><span id="line-775"> }</span> |
| <span class="source-line-no">776</span><span id="line-776"></span> |
| <span class="source-line-no">777</span><span id="line-777"> public int getActiveExecutorCount() {</span> |
| <span class="source-line-no">778</span><span id="line-778"> return activeExecutorCount.get();</span> |
| <span class="source-line-no">779</span><span id="line-779"> }</span> |
| <span class="source-line-no">780</span><span id="line-780"></span> |
| <span class="source-line-no">781</span><span id="line-781"> public TEnvironment getEnvironment() {</span> |
| <span class="source-line-no">782</span><span id="line-782"> return this.environment;</span> |
| <span class="source-line-no">783</span><span id="line-783"> }</span> |
| <span class="source-line-no">784</span><span id="line-784"></span> |
| <span class="source-line-no">785</span><span id="line-785"> public ProcedureStore getStore() {</span> |
| <span class="source-line-no">786</span><span id="line-786"> return this.store;</span> |
| <span class="source-line-no">787</span><span id="line-787"> }</span> |
| <span class="source-line-no">788</span><span id="line-788"></span> |
| <span class="source-line-no">789</span><span id="line-789"> ProcedureScheduler getScheduler() {</span> |
| <span class="source-line-no">790</span><span id="line-790"> return scheduler;</span> |
| <span class="source-line-no">791</span><span id="line-791"> }</span> |
| <span class="source-line-no">792</span><span id="line-792"></span> |
| <span class="source-line-no">793</span><span id="line-793"> public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {</span> |
| <span class="source-line-no">794</span><span id="line-794"> this.keepAliveTime = timeUnit.toMillis(keepAliveTime);</span> |
| <span class="source-line-no">795</span><span id="line-795"> this.scheduler.signalAll();</span> |
| <span class="source-line-no">796</span><span id="line-796"> }</span> |
| <span class="source-line-no">797</span><span id="line-797"></span> |
| <span class="source-line-no">798</span><span id="line-798"> public long getKeepAliveTime(final TimeUnit timeUnit) {</span> |
| <span class="source-line-no">799</span><span id="line-799"> return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);</span> |
| <span class="source-line-no">800</span><span id="line-800"> }</span> |
| <span class="source-line-no">801</span><span id="line-801"></span> |
| <span class="source-line-no">802</span><span id="line-802"> // ==========================================================================</span> |
| <span class="source-line-no">803</span><span id="line-803"> // Submit/Remove Chores</span> |
| <span class="source-line-no">804</span><span id="line-804"> // ==========================================================================</span> |
| <span class="source-line-no">805</span><span id="line-805"></span> |
| <span class="source-line-no">806</span><span id="line-806"> /**</span> |
| <span class="source-line-no">807</span><span id="line-807"> * Add a chore procedure to the executor</span> |
| <span class="source-line-no">808</span><span id="line-808"> * @param chore the chore to add</span> |
| <span class="source-line-no">809</span><span id="line-809"> */</span> |
| <span class="source-line-no">810</span><span id="line-810"> public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {</span> |
| <span class="source-line-no">811</span><span id="line-811"> if (chore == null) {</span> |
| <span class="source-line-no">812</span><span id="line-812"> return;</span> |
| <span class="source-line-no">813</span><span id="line-813"> }</span> |
| <span class="source-line-no">814</span><span id="line-814"> chore.setState(ProcedureState.WAITING_TIMEOUT);</span> |
| <span class="source-line-no">815</span><span id="line-815"> timeoutExecutor.add(chore);</span> |
| <span class="source-line-no">816</span><span id="line-816"> }</span> |
| <span class="source-line-no">817</span><span id="line-817"></span> |
| <span class="source-line-no">818</span><span id="line-818"> /**</span> |
| <span class="source-line-no">819</span><span id="line-819"> * Remove a chore procedure from the executor</span> |
| <span class="source-line-no">820</span><span id="line-820"> * @param chore the chore to remove</span> |
| <span class="source-line-no">821</span><span id="line-821"> * @return whether the chore is removed, or it will be removed later</span> |
| <span class="source-line-no">822</span><span id="line-822"> */</span> |
| <span class="source-line-no">823</span><span id="line-823"> public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {</span> |
| <span class="source-line-no">824</span><span id="line-824"> if (chore == null) {</span> |
| <span class="source-line-no">825</span><span id="line-825"> return true;</span> |
| <span class="source-line-no">826</span><span id="line-826"> }</span> |
| <span class="source-line-no">827</span><span id="line-827"> chore.setState(ProcedureState.SUCCESS);</span> |
| <span class="source-line-no">828</span><span id="line-828"> return timeoutExecutor.remove(chore);</span> |
| <span class="source-line-no">829</span><span id="line-829"> }</span> |
| <span class="source-line-no">830</span><span id="line-830"></span> |
| <span class="source-line-no">831</span><span id="line-831"> // ==========================================================================</span> |
| <span class="source-line-no">832</span><span id="line-832"> // Nonce Procedure helpers</span> |
| <span class="source-line-no">833</span><span id="line-833"> // ==========================================================================</span> |
| <span class="source-line-no">834</span><span id="line-834"> /**</span> |
| <span class="source-line-no">835</span><span id="line-835"> * Create a NonceKey from the specified nonceGroup and nonce.</span> |
| <span class="source-line-no">836</span><span id="line-836"> * @param nonceGroup the group to use for the {@link NonceKey}</span> |
| <span class="source-line-no">837</span><span id="line-837"> * @param nonce the nonce to use in the {@link NonceKey}</span> |
| <span class="source-line-no">838</span><span id="line-838"> * @return the generated NonceKey</span> |
| <span class="source-line-no">839</span><span id="line-839"> */</span> |
| <span class="source-line-no">840</span><span id="line-840"> public NonceKey createNonceKey(final long nonceGroup, final long nonce) {</span> |
| <span class="source-line-no">841</span><span id="line-841"> return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);</span> |
| <span class="source-line-no">842</span><span id="line-842"> }</span> |
| <span class="source-line-no">843</span><span id="line-843"></span> |
| <span class="source-line-no">844</span><span id="line-844"> /**</span> |
| <span class="source-line-no">845</span><span id="line-845"> * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and</span> |
| <span class="source-line-no">846</span><span id="line-846"> * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If</span> |
| <span class="source-line-no">847</span><span id="line-847"> * someone already reserved the nonce, this method will return the procId reserved, otherwise an</span> |
| <span class="source-line-no">848</span><span id="line-848"> * invalid procId will be returned. and the caller should procede and submit the procedure.</span> |
| <span class="source-line-no">849</span><span id="line-849"> * @param nonceKey A unique identifier for this operation from the client or process.</span> |
| <span class="source-line-no">850</span><span id="line-850"> * @return the procId associated with the nonce, if any otherwise an invalid procId.</span> |
| <span class="source-line-no">851</span><span id="line-851"> */</span> |
| <span class="source-line-no">852</span><span id="line-852"> public long registerNonce(final NonceKey nonceKey) {</span> |
| <span class="source-line-no">853</span><span id="line-853"> if (nonceKey == null) {</span> |
| <span class="source-line-no">854</span><span id="line-854"> return -1;</span> |
| <span class="source-line-no">855</span><span id="line-855"> }</span> |
| <span class="source-line-no">856</span><span id="line-856"></span> |
| <span class="source-line-no">857</span><span id="line-857"> // check if we have already a Reserved ID for the nonce</span> |
| <span class="source-line-no">858</span><span id="line-858"> Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);</span> |
| <span class="source-line-no">859</span><span id="line-859"> if (oldProcId == null) {</span> |
| <span class="source-line-no">860</span><span id="line-860"> // reserve a new Procedure ID, this will be associated with the nonce</span> |
| <span class="source-line-no">861</span><span id="line-861"> // and the procedure submitted with the specified nonce will use this ID.</span> |
| <span class="source-line-no">862</span><span id="line-862"> final long newProcId = nextProcId();</span> |
| <span class="source-line-no">863</span><span id="line-863"> oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);</span> |
| <span class="source-line-no">864</span><span id="line-864"> if (oldProcId == null) {</span> |
| <span class="source-line-no">865</span><span id="line-865"> return -1;</span> |
| <span class="source-line-no">866</span><span id="line-866"> }</span> |
| <span class="source-line-no">867</span><span id="line-867"> }</span> |
| <span class="source-line-no">868</span><span id="line-868"></span> |
| <span class="source-line-no">869</span><span id="line-869"> // we found a registered nonce, but the procedure may not have been submitted yet.</span> |
| <span class="source-line-no">870</span><span id="line-870"> // since the client expect the procedure to be submitted, spin here until it is.</span> |
| <span class="source-line-no">871</span><span id="line-871"> final boolean traceEnabled = LOG.isTraceEnabled();</span> |
| <span class="source-line-no">872</span><span id="line-872"> while (</span> |
| <span class="source-line-no">873</span><span id="line-873"> isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId))</span> |
| <span class="source-line-no">874</span><span id="line-874"> && nonceKeysToProcIdsMap.containsKey(nonceKey)</span> |
| <span class="source-line-no">875</span><span id="line-875"> ) {</span> |
| <span class="source-line-no">876</span><span id="line-876"> if (traceEnabled) {</span> |
| <span class="source-line-no">877</span><span id="line-877"> LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");</span> |
| <span class="source-line-no">878</span><span id="line-878"> }</span> |
| <span class="source-line-no">879</span><span id="line-879"> Threads.sleep(100);</span> |
| <span class="source-line-no">880</span><span id="line-880"> }</span> |
| <span class="source-line-no">881</span><span id="line-881"> return oldProcId.longValue();</span> |
| <span class="source-line-no">882</span><span id="line-882"> }</span> |
| <span class="source-line-no">883</span><span id="line-883"></span> |
| <span class="source-line-no">884</span><span id="line-884"> /**</span> |
| <span class="source-line-no">885</span><span id="line-885"> * Remove the NonceKey if the procedure was not submitted to the executor.</span> |
| <span class="source-line-no">886</span><span id="line-886"> * @param nonceKey A unique identifier for this operation from the client or process.</span> |
| <span class="source-line-no">887</span><span id="line-887"> */</span> |
| <span class="source-line-no">888</span><span id="line-888"> public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {</span> |
| <span class="source-line-no">889</span><span id="line-889"> if (nonceKey == null) {</span> |
| <span class="source-line-no">890</span><span id="line-890"> return;</span> |
| <span class="source-line-no">891</span><span id="line-891"> }</span> |
| <span class="source-line-no">892</span><span id="line-892"></span> |
| <span class="source-line-no">893</span><span id="line-893"> final Long procId = nonceKeysToProcIdsMap.get(nonceKey);</span> |
| <span class="source-line-no">894</span><span id="line-894"> if (procId == null) {</span> |
| <span class="source-line-no">895</span><span id="line-895"> return;</span> |
| <span class="source-line-no">896</span><span id="line-896"> }</span> |
| <span class="source-line-no">897</span><span id="line-897"></span> |
| <span class="source-line-no">898</span><span id="line-898"> // if the procedure was not submitted, remove the nonce</span> |
| <span class="source-line-no">899</span><span id="line-899"> if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {</span> |
| <span class="source-line-no">900</span><span id="line-900"> nonceKeysToProcIdsMap.remove(nonceKey);</span> |
| <span class="source-line-no">901</span><span id="line-901"> }</span> |
| <span class="source-line-no">902</span><span id="line-902"> }</span> |
| <span class="source-line-no">903</span><span id="line-903"></span> |
| <span class="source-line-no">904</span><span id="line-904"> /**</span> |
| <span class="source-line-no">905</span><span id="line-905"> * If the failure failed before submitting it, we may want to give back the same error to the</span> |
| <span class="source-line-no">906</span><span id="line-906"> * requests with the same nonceKey.</span> |
| <span class="source-line-no">907</span><span id="line-907"> * @param nonceKey A unique identifier for this operation from the client or process</span> |
| <span class="source-line-no">908</span><span id="line-908"> * @param procName name of the procedure, used to inform the user</span> |
| <span class="source-line-no">909</span><span id="line-909"> * @param procOwner name of the owner of the procedure, used to inform the user</span> |
| <span class="source-line-no">910</span><span id="line-910"> * @param exception the failure to report to the user</span> |
| <span class="source-line-no">911</span><span id="line-911"> */</span> |
| <span class="source-line-no">912</span><span id="line-912"> public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,</span> |
| <span class="source-line-no">913</span><span id="line-913"> IOException exception) {</span> |
| <span class="source-line-no">914</span><span id="line-914"> if (nonceKey == null) {</span> |
| <span class="source-line-no">915</span><span id="line-915"> return;</span> |
| <span class="source-line-no">916</span><span id="line-916"> }</span> |
| <span class="source-line-no">917</span><span id="line-917"></span> |
| <span class="source-line-no">918</span><span id="line-918"> Long procId = nonceKeysToProcIdsMap.get(nonceKey);</span> |
| <span class="source-line-no">919</span><span id="line-919"> if (procId == null || completed.containsKey(procId)) {</span> |
| <span class="source-line-no">920</span><span id="line-920"> return;</span> |
| <span class="source-line-no">921</span><span id="line-921"> }</span> |
| <span class="source-line-no">922</span><span id="line-922"></span> |
| <span class="source-line-no">923</span><span id="line-923"> completed.computeIfAbsent(procId, (key) -> {</span> |
| <span class="source-line-no">924</span><span id="line-924"> Procedure<TEnvironment> proc =</span> |
| <span class="source-line-no">925</span><span id="line-925"> new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);</span> |
| <span class="source-line-no">926</span><span id="line-926"></span> |
| <span class="source-line-no">927</span><span id="line-927"> return new CompletedProcedureRetainer<>(proc);</span> |
| <span class="source-line-no">928</span><span id="line-928"> });</span> |
| <span class="source-line-no">929</span><span id="line-929"> }</span> |
| <span class="source-line-no">930</span><span id="line-930"></span> |
| <span class="source-line-no">931</span><span id="line-931"> // ==========================================================================</span> |
| <span class="source-line-no">932</span><span id="line-932"> // Submit/Abort Procedure</span> |
| <span class="source-line-no">933</span><span id="line-933"> // ==========================================================================</span> |
| <span class="source-line-no">934</span><span id="line-934"> /**</span> |
| <span class="source-line-no">935</span><span id="line-935"> * Add a new root-procedure to the executor.</span> |
| <span class="source-line-no">936</span><span id="line-936"> * @param proc the new procedure to execute.</span> |
| <span class="source-line-no">937</span><span id="line-937"> * @return the procedure id, that can be used to monitor the operation</span> |
| <span class="source-line-no">938</span><span id="line-938"> */</span> |
| <span class="source-line-no">939</span><span id="line-939"> public long submitProcedure(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">940</span><span id="line-940"> return submitProcedure(proc, null);</span> |
| <span class="source-line-no">941</span><span id="line-941"> }</span> |
| <span class="source-line-no">942</span><span id="line-942"></span> |
| <span class="source-line-no">943</span><span id="line-943"> /**</span> |
| <span class="source-line-no">944</span><span id="line-944"> * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will</span> |
| <span class="source-line-no">945</span><span id="line-945"> * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures,</span> |
| <span class="source-line-no">946</span><span id="line-946"> * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its</span> |
| <span class="source-line-no">947</span><span id="line-947"> * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region</span> |
| <span class="source-line-no">948</span><span id="line-948"> * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the</span> |
| <span class="source-line-no">949</span><span id="line-949"> * operators may have to do some clean up on hdfs or schedule some assign procedures to let region</span> |
| <span class="source-line-no">950</span><span id="line-950"> * online. DO AT YOUR OWN RISK.</span> |
| <span class="source-line-no">951</span><span id="line-951"> * <p></span> |
| <span class="source-line-no">952</span><span id="line-952"> * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING,</span> |
| <span class="source-line-no">953</span><span id="line-953"> * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is</span> |
| <span class="source-line-no">954</span><span id="line-954"> * executing it 3. No child procedure has been submitted</span> |
| <span class="source-line-no">955</span><span id="line-955"> * <p></span> |
| <span class="source-line-no">956</span><span id="line-956"> * If all the requirements are meet, the procedure and its ancestors will be bypassed and</span> |
| <span class="source-line-no">957</span><span id="line-957"> * persisted to WAL.</span> |
| <span class="source-line-no">958</span><span id="line-958"> * <p></span> |
| <span class="source-line-no">959</span><span id="line-959"> * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What</span> |
| <span class="source-line-no">960</span><span id="line-960"> * about WAITING_TIMEOUT?</span> |
| <span class="source-line-no">961</span><span id="line-961"> * @param pids the procedure id</span> |
| <span class="source-line-no">962</span><span id="line-962"> * @param lockWait time to wait lock</span> |
| <span class="source-line-no">963</span><span id="line-963"> * @param force if force set to true, we will bypass the procedure even if it is executing.</span> |
| <span class="source-line-no">964</span><span id="line-964"> * This is for procedures which can't break out during executing(due to bug,</span> |
| <span class="source-line-no">965</span><span id="line-965"> * mostly) In this case, bypassing the procedure is not enough, since it is</span> |
| <span class="source-line-no">966</span><span id="line-966"> * already stuck there. We need to restart the master after bypassing, and</span> |
| <span class="source-line-no">967</span><span id="line-967"> * letting the problematic procedure to execute wth bypass=true, so in that</span> |
| <span class="source-line-no">968</span><span id="line-968"> * condition, the procedure can be successfully bypassed.</span> |
| <span class="source-line-no">969</span><span id="line-969"> * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!</span> |
| <span class="source-line-no">970</span><span id="line-970"> * @return true if bypass success</span> |
| <span class="source-line-no">971</span><span id="line-971"> * @throws IOException IOException</span> |
| <span class="source-line-no">972</span><span id="line-972"> */</span> |
| <span class="source-line-no">973</span><span id="line-973"> public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,</span> |
| <span class="source-line-no">974</span><span id="line-974"> boolean recursive) throws IOException {</span> |
| <span class="source-line-no">975</span><span id="line-975"> List<Boolean> result = new ArrayList<Boolean>(pids.size());</span> |
| <span class="source-line-no">976</span><span id="line-976"> for (long pid : pids) {</span> |
| <span class="source-line-no">977</span><span id="line-977"> result.add(bypassProcedure(pid, lockWait, force, recursive));</span> |
| <span class="source-line-no">978</span><span id="line-978"> }</span> |
| <span class="source-line-no">979</span><span id="line-979"> return result;</span> |
| <span class="source-line-no">980</span><span id="line-980"> }</span> |
| <span class="source-line-no">981</span><span id="line-981"></span> |
| <span class="source-line-no">982</span><span id="line-982"> boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)</span> |
| <span class="source-line-no">983</span><span id="line-983"> throws IOException {</span> |
| <span class="source-line-no">984</span><span id="line-984"> Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");</span> |
| <span class="source-line-no">985</span><span id="line-985"> final Procedure<TEnvironment> procedure = getProcedure(pid);</span> |
| <span class="source-line-no">986</span><span id="line-986"> if (procedure == null) {</span> |
| <span class="source-line-no">987</span><span id="line-987"> LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);</span> |
| <span class="source-line-no">988</span><span id="line-988"> return false;</span> |
| <span class="source-line-no">989</span><span id="line-989"> }</span> |
| <span class="source-line-no">990</span><span id="line-990"></span> |
| <span class="source-line-no">991</span><span id="line-991"> LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait,</span> |
| <span class="source-line-no">992</span><span id="line-992"> override, recursive);</span> |
| <span class="source-line-no">993</span><span id="line-993"> IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);</span> |
| <span class="source-line-no">994</span><span id="line-994"> if (lockEntry == null && !override) {</span> |
| <span class="source-line-no">995</span><span id="line-995"> LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait,</span> |
| <span class="source-line-no">996</span><span id="line-996"> procedure, override);</span> |
| <span class="source-line-no">997</span><span id="line-997"> return false;</span> |
| <span class="source-line-no">998</span><span id="line-998"> } else if (lockEntry == null) {</span> |
| <span class="source-line-no">999</span><span id="line-999"> LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait,</span> |
| <span class="source-line-no">1000</span><span id="line-1000"> procedure, override);</span> |
| <span class="source-line-no">1001</span><span id="line-1001"> }</span> |
| <span class="source-line-no">1002</span><span id="line-1002"> try {</span> |
| <span class="source-line-no">1003</span><span id="line-1003"> // check whether the procedure is already finished</span> |
| <span class="source-line-no">1004</span><span id="line-1004"> if (procedure.isFinished()) {</span> |
| <span class="source-line-no">1005</span><span id="line-1005"> LOG.debug("{} is already finished, skipping bypass", procedure);</span> |
| <span class="source-line-no">1006</span><span id="line-1006"> return false;</span> |
| <span class="source-line-no">1007</span><span id="line-1007"> }</span> |
| <span class="source-line-no">1008</span><span id="line-1008"></span> |
| <span class="source-line-no">1009</span><span id="line-1009"> if (procedure.hasChildren()) {</span> |
| <span class="source-line-no">1010</span><span id="line-1010"> if (recursive) {</span> |
| <span class="source-line-no">1011</span><span id="line-1011"> // EXPENSIVE. Checks each live procedure of which there could be many!!!</span> |
| <span class="source-line-no">1012</span><span id="line-1012"> // Is there another way to get children of a procedure?</span> |
| <span class="source-line-no">1013</span><span id="line-1013"> LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());</span> |
| <span class="source-line-no">1014</span><span id="line-1014"> this.procedures.forEachValue(1 /* Single-threaded */,</span> |
| <span class="source-line-no">1015</span><span id="line-1015"> // Transformer</span> |
| <span class="source-line-no">1016</span><span id="line-1016"> v -> v.getParentProcId() == procedure.getProcId() ? v : null,</span> |
| <span class="source-line-no">1017</span><span id="line-1017"> // Consumer</span> |
| <span class="source-line-no">1018</span><span id="line-1018"> v -> {</span> |
| <span class="source-line-no">1019</span><span id="line-1019"> try {</span> |
| <span class="source-line-no">1020</span><span id="line-1020"> bypassProcedure(v.getProcId(), lockWait, override, recursive);</span> |
| <span class="source-line-no">1021</span><span id="line-1021"> } catch (IOException e) {</span> |
| <span class="source-line-no">1022</span><span id="line-1022"> LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);</span> |
| <span class="source-line-no">1023</span><span id="line-1023"> }</span> |
| <span class="source-line-no">1024</span><span id="line-1024"> });</span> |
| <span class="source-line-no">1025</span><span id="line-1025"> } else {</span> |
| <span class="source-line-no">1026</span><span id="line-1026"> LOG.debug("{} has children, skipping bypass", procedure);</span> |
| <span class="source-line-no">1027</span><span id="line-1027"> return false;</span> |
| <span class="source-line-no">1028</span><span id="line-1028"> }</span> |
| <span class="source-line-no">1029</span><span id="line-1029"> }</span> |
| <span class="source-line-no">1030</span><span id="line-1030"></span> |
| <span class="source-line-no">1031</span><span id="line-1031"> // If the procedure has no parent or no child, we are safe to bypass it in whatever state</span> |
| <span class="source-line-no">1032</span><span id="line-1032"> if (</span> |
| <span class="source-line-no">1033</span><span id="line-1033"> procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE</span> |
| <span class="source-line-no">1034</span><span id="line-1034"> && procedure.getState() != ProcedureState.WAITING</span> |
| <span class="source-line-no">1035</span><span id="line-1035"> && procedure.getState() != ProcedureState.WAITING_TIMEOUT</span> |
| <span class="source-line-no">1036</span><span id="line-1036"> ) {</span> |
| <span class="source-line-no">1037</span><span id="line-1037"> LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "</span> |
| <span class="source-line-no">1038</span><span id="line-1038"> + "(with no parent), {}", procedure);</span> |
| <span class="source-line-no">1039</span><span id="line-1039"> // Question: how is the bypass done here?</span> |
| <span class="source-line-no">1040</span><span id="line-1040"> return false;</span> |
| <span class="source-line-no">1041</span><span id="line-1041"> }</span> |
| <span class="source-line-no">1042</span><span id="line-1042"></span> |
| <span class="source-line-no">1043</span><span id="line-1043"> // Now, the procedure is not finished, and no one can execute it since we take the lock now</span> |
| <span class="source-line-no">1044</span><span id="line-1044"> // And we can be sure that its ancestor is not running too, since their child has not</span> |
| <span class="source-line-no">1045</span><span id="line-1045"> // finished yet</span> |
| <span class="source-line-no">1046</span><span id="line-1046"> Procedure<TEnvironment> current = procedure;</span> |
| <span class="source-line-no">1047</span><span id="line-1047"> while (current != null) {</span> |
| <span class="source-line-no">1048</span><span id="line-1048"> LOG.debug("Bypassing {}", current);</span> |
| <span class="source-line-no">1049</span><span id="line-1049"> current.bypass(getEnvironment());</span> |
| <span class="source-line-no">1050</span><span id="line-1050"> store.update(current);</span> |
| <span class="source-line-no">1051</span><span id="line-1051"> long parentID = current.getParentProcId();</span> |
| <span class="source-line-no">1052</span><span id="line-1052"> current = getProcedure(parentID);</span> |
| <span class="source-line-no">1053</span><span id="line-1053"> }</span> |
| <span class="source-line-no">1054</span><span id="line-1054"></span> |
| <span class="source-line-no">1055</span><span id="line-1055"> // wake up waiting procedure, already checked there is no child</span> |
| <span class="source-line-no">1056</span><span id="line-1056"> if (procedure.getState() == ProcedureState.WAITING) {</span> |
| <span class="source-line-no">1057</span><span id="line-1057"> procedure.setState(ProcedureState.RUNNABLE);</span> |
| <span class="source-line-no">1058</span><span id="line-1058"> store.update(procedure);</span> |
| <span class="source-line-no">1059</span><span id="line-1059"> }</span> |
| <span class="source-line-no">1060</span><span id="line-1060"></span> |
| <span class="source-line-no">1061</span><span id="line-1061"> // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.</span> |
| <span class="source-line-no">1062</span><span id="line-1062"> // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE</span> |
| <span class="source-line-no">1063</span><span id="line-1063"> if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {</span> |
| <span class="source-line-no">1064</span><span id="line-1064"> LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);</span> |
| <span class="source-line-no">1065</span><span id="line-1065"> if (timeoutExecutor.remove(procedure)) {</span> |
| <span class="source-line-no">1066</span><span id="line-1066"> LOG.debug("removed procedure {} from timeoutExecutor", procedure);</span> |
| <span class="source-line-no">1067</span><span id="line-1067"> timeoutExecutor.executeTimedoutProcedure(procedure);</span> |
| <span class="source-line-no">1068</span><span id="line-1068"> }</span> |
| <span class="source-line-no">1069</span><span id="line-1069"> } else if (lockEntry != null) {</span> |
| <span class="source-line-no">1070</span><span id="line-1070"> scheduler.addFront(procedure);</span> |
| <span class="source-line-no">1071</span><span id="line-1071"> LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);</span> |
| <span class="source-line-no">1072</span><span id="line-1072"> } else {</span> |
| <span class="source-line-no">1073</span><span id="line-1073"> // If we don't have the lock, we can't re-submit the queue,</span> |
| <span class="source-line-no">1074</span><span id="line-1074"> // since it is already executing. To get rid of the stuck situation, we</span> |
| <span class="source-line-no">1075</span><span id="line-1075"> // need to restart the master. With the procedure set to bypass, the procedureExecutor</span> |
| <span class="source-line-no">1076</span><span id="line-1076"> // will bypass it and won't get stuck again.</span> |
| <span class="source-line-no">1077</span><span id="line-1077"> LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "</span> |
| <span class="source-line-no">1078</span><span id="line-1078"> + "skipping add to queue", procedure);</span> |
| <span class="source-line-no">1079</span><span id="line-1079"> }</span> |
| <span class="source-line-no">1080</span><span id="line-1080"> return true;</span> |
| <span class="source-line-no">1081</span><span id="line-1081"></span> |
| <span class="source-line-no">1082</span><span id="line-1082"> } finally {</span> |
| <span class="source-line-no">1083</span><span id="line-1083"> if (lockEntry != null) {</span> |
| <span class="source-line-no">1084</span><span id="line-1084"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">1085</span><span id="line-1085"> }</span> |
| <span class="source-line-no">1086</span><span id="line-1086"> }</span> |
| <span class="source-line-no">1087</span><span id="line-1087"> }</span> |
| <span class="source-line-no">1088</span><span id="line-1088"></span> |
| <span class="source-line-no">1089</span><span id="line-1089"> /**</span> |
| <span class="source-line-no">1090</span><span id="line-1090"> * Add a new root-procedure to the executor.</span> |
| <span class="source-line-no">1091</span><span id="line-1091"> * @param proc the new procedure to execute.</span> |
| <span class="source-line-no">1092</span><span id="line-1092"> * @param nonceKey the registered unique identifier for this operation from the client or process.</span> |
| <span class="source-line-no">1093</span><span id="line-1093"> * @return the procedure id, that can be used to monitor the operation</span> |
| <span class="source-line-no">1094</span><span id="line-1094"> */</span> |
| <span class="source-line-no">1095</span><span id="line-1095"> @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",</span> |
| <span class="source-line-no">1096</span><span id="line-1096"> justification = "FindBugs is blind to the check-for-null")</span> |
| <span class="source-line-no">1097</span><span id="line-1097"> public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {</span> |
| <span class="source-line-no">1098</span><span id="line-1098"> Preconditions.checkArgument(lastProcId.get() >= 0);</span> |
| <span class="source-line-no">1099</span><span id="line-1099"></span> |
| <span class="source-line-no">1100</span><span id="line-1100"> prepareProcedure(proc);</span> |
| <span class="source-line-no">1101</span><span id="line-1101"></span> |
| <span class="source-line-no">1102</span><span id="line-1102"> final Long currentProcId;</span> |
| <span class="source-line-no">1103</span><span id="line-1103"> if (nonceKey != null) {</span> |
| <span class="source-line-no">1104</span><span id="line-1104"> currentProcId = nonceKeysToProcIdsMap.get(nonceKey);</span> |
| <span class="source-line-no">1105</span><span id="line-1105"> Preconditions.checkArgument(currentProcId != null,</span> |
| <span class="source-line-no">1106</span><span id="line-1106"> "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);</span> |
| <span class="source-line-no">1107</span><span id="line-1107"> } else {</span> |
| <span class="source-line-no">1108</span><span id="line-1108"> currentProcId = nextProcId();</span> |
| <span class="source-line-no">1109</span><span id="line-1109"> }</span> |
| <span class="source-line-no">1110</span><span id="line-1110"></span> |
| <span class="source-line-no">1111</span><span id="line-1111"> // Initialize the procedure</span> |
| <span class="source-line-no">1112</span><span id="line-1112"> proc.setNonceKey(nonceKey);</span> |
| <span class="source-line-no">1113</span><span id="line-1113"> proc.setProcId(currentProcId.longValue());</span> |
| <span class="source-line-no">1114</span><span id="line-1114"></span> |
| <span class="source-line-no">1115</span><span id="line-1115"> // Commit the transaction</span> |
| <span class="source-line-no">1116</span><span id="line-1116"> store.insert(proc, null);</span> |
| <span class="source-line-no">1117</span><span id="line-1117"> LOG.debug("Stored {}", proc);</span> |
| <span class="source-line-no">1118</span><span id="line-1118"></span> |
| <span class="source-line-no">1119</span><span id="line-1119"> // Add the procedure to the executor</span> |
| <span class="source-line-no">1120</span><span id="line-1120"> return pushProcedure(proc);</span> |
| <span class="source-line-no">1121</span><span id="line-1121"> }</span> |
| <span class="source-line-no">1122</span><span id="line-1122"></span> |
| <span class="source-line-no">1123</span><span id="line-1123"> /**</span> |
| <span class="source-line-no">1124</span><span id="line-1124"> * Add a set of new root-procedure to the executor.</span> |
| <span class="source-line-no">1125</span><span id="line-1125"> * @param procs the new procedures to execute.</span> |
| <span class="source-line-no">1126</span><span id="line-1126"> */</span> |
| <span class="source-line-no">1127</span><span id="line-1127"> // TODO: Do we need to take nonces here?</span> |
| <span class="source-line-no">1128</span><span id="line-1128"> public void submitProcedures(Procedure<TEnvironment>[] procs) {</span> |
| <span class="source-line-no">1129</span><span id="line-1129"> Preconditions.checkArgument(lastProcId.get() >= 0);</span> |
| <span class="source-line-no">1130</span><span id="line-1130"> if (procs == null || procs.length <= 0) {</span> |
| <span class="source-line-no">1131</span><span id="line-1131"> return;</span> |
| <span class="source-line-no">1132</span><span id="line-1132"> }</span> |
| <span class="source-line-no">1133</span><span id="line-1133"></span> |
| <span class="source-line-no">1134</span><span id="line-1134"> // Prepare procedure</span> |
| <span class="source-line-no">1135</span><span id="line-1135"> for (int i = 0; i < procs.length; ++i) {</span> |
| <span class="source-line-no">1136</span><span id="line-1136"> prepareProcedure(procs[i]).setProcId(nextProcId());</span> |
| <span class="source-line-no">1137</span><span id="line-1137"> }</span> |
| <span class="source-line-no">1138</span><span id="line-1138"></span> |
| <span class="source-line-no">1139</span><span id="line-1139"> // Commit the transaction</span> |
| <span class="source-line-no">1140</span><span id="line-1140"> store.insert(procs);</span> |
| <span class="source-line-no">1141</span><span id="line-1141"> if (LOG.isDebugEnabled()) {</span> |
| <span class="source-line-no">1142</span><span id="line-1142"> LOG.debug("Stored " + Arrays.toString(procs));</span> |
| <span class="source-line-no">1143</span><span id="line-1143"> }</span> |
| <span class="source-line-no">1144</span><span id="line-1144"></span> |
| <span class="source-line-no">1145</span><span id="line-1145"> // Add the procedure to the executor</span> |
| <span class="source-line-no">1146</span><span id="line-1146"> for (int i = 0; i < procs.length; ++i) {</span> |
| <span class="source-line-no">1147</span><span id="line-1147"> pushProcedure(procs[i]);</span> |
| <span class="source-line-no">1148</span><span id="line-1148"> }</span> |
| <span class="source-line-no">1149</span><span id="line-1149"> }</span> |
| <span class="source-line-no">1150</span><span id="line-1150"></span> |
| <span class="source-line-no">1151</span><span id="line-1151"> private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1152</span><span id="line-1152"> Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);</span> |
| <span class="source-line-no">1153</span><span id="line-1153"> Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);</span> |
| <span class="source-line-no">1154</span><span id="line-1154"> if (this.checkOwnerSet) {</span> |
| <span class="source-line-no">1155</span><span id="line-1155"> Preconditions.checkArgument(proc.hasOwner(), "missing owner");</span> |
| <span class="source-line-no">1156</span><span id="line-1156"> }</span> |
| <span class="source-line-no">1157</span><span id="line-1157"> return proc;</span> |
| <span class="source-line-no">1158</span><span id="line-1158"> }</span> |
| <span class="source-line-no">1159</span><span id="line-1159"></span> |
| <span class="source-line-no">1160</span><span id="line-1160"> private long pushProcedure(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1161</span><span id="line-1161"> final long currentProcId = proc.getProcId();</span> |
| <span class="source-line-no">1162</span><span id="line-1162"></span> |
| <span class="source-line-no">1163</span><span id="line-1163"> // Update metrics on start of a procedure</span> |
| <span class="source-line-no">1164</span><span id="line-1164"> proc.updateMetricsOnSubmit(getEnvironment());</span> |
| <span class="source-line-no">1165</span><span id="line-1165"></span> |
| <span class="source-line-no">1166</span><span id="line-1166"> // Create the rollback stack for the procedure</span> |
| <span class="source-line-no">1167</span><span id="line-1167"> RootProcedureState<TEnvironment> stack = new RootProcedureState<>();</span> |
| <span class="source-line-no">1168</span><span id="line-1168"> stack.setRollbackSupported(proc.isRollbackSupported());</span> |
| <span class="source-line-no">1169</span><span id="line-1169"> rollbackStack.put(currentProcId, stack);</span> |
| <span class="source-line-no">1170</span><span id="line-1170"></span> |
| <span class="source-line-no">1171</span><span id="line-1171"> // Submit the new subprocedures</span> |
| <span class="source-line-no">1172</span><span id="line-1172"> assert !procedures.containsKey(currentProcId);</span> |
| <span class="source-line-no">1173</span><span id="line-1173"> procedures.put(currentProcId, proc);</span> |
| <span class="source-line-no">1174</span><span id="line-1174"> sendProcedureAddedNotification(currentProcId);</span> |
| <span class="source-line-no">1175</span><span id="line-1175"> scheduler.addBack(proc);</span> |
| <span class="source-line-no">1176</span><span id="line-1176"> return proc.getProcId();</span> |
| <span class="source-line-no">1177</span><span id="line-1177"> }</span> |
| <span class="source-line-no">1178</span><span id="line-1178"></span> |
| <span class="source-line-no">1179</span><span id="line-1179"> /**</span> |
| <span class="source-line-no">1180</span><span id="line-1180"> * Send an abort notification the specified procedure. Depending on the procedure implementation</span> |
| <span class="source-line-no">1181</span><span id="line-1181"> * the abort can be considered or ignored.</span> |
| <span class="source-line-no">1182</span><span id="line-1182"> * @param procId the procedure to abort</span> |
| <span class="source-line-no">1183</span><span id="line-1183"> * @return true if the procedure exists and has received the abort, otherwise false.</span> |
| <span class="source-line-no">1184</span><span id="line-1184"> */</span> |
| <span class="source-line-no">1185</span><span id="line-1185"> public boolean abort(long procId) {</span> |
| <span class="source-line-no">1186</span><span id="line-1186"> return abort(procId, true);</span> |
| <span class="source-line-no">1187</span><span id="line-1187"> }</span> |
| <span class="source-line-no">1188</span><span id="line-1188"></span> |
| <span class="source-line-no">1189</span><span id="line-1189"> /**</span> |
| <span class="source-line-no">1190</span><span id="line-1190"> * Send an abort notification to the specified procedure. Depending on the procedure</span> |
| <span class="source-line-no">1191</span><span id="line-1191"> * implementation, the abort can be considered or ignored.</span> |
| <span class="source-line-no">1192</span><span id="line-1192"> * @param procId the procedure to abort</span> |
| <span class="source-line-no">1193</span><span id="line-1193"> * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?</span> |
| <span class="source-line-no">1194</span><span id="line-1194"> * @return true if the procedure exists and has received the abort, otherwise false.</span> |
| <span class="source-line-no">1195</span><span id="line-1195"> */</span> |
| <span class="source-line-no">1196</span><span id="line-1196"> public boolean abort(long procId, boolean mayInterruptIfRunning) {</span> |
| <span class="source-line-no">1197</span><span id="line-1197"> Procedure<TEnvironment> proc = procedures.get(procId);</span> |
| <span class="source-line-no">1198</span><span id="line-1198"> if (proc != null) {</span> |
| <span class="source-line-no">1199</span><span id="line-1199"> if (!mayInterruptIfRunning && proc.wasExecuted()) {</span> |
| <span class="source-line-no">1200</span><span id="line-1200"> return false;</span> |
| <span class="source-line-no">1201</span><span id="line-1201"> }</span> |
| <span class="source-line-no">1202</span><span id="line-1202"> return proc.abort(getEnvironment());</span> |
| <span class="source-line-no">1203</span><span id="line-1203"> }</span> |
| <span class="source-line-no">1204</span><span id="line-1204"> return false;</span> |
| <span class="source-line-no">1205</span><span id="line-1205"> }</span> |
| <span class="source-line-no">1206</span><span id="line-1206"></span> |
| <span class="source-line-no">1207</span><span id="line-1207"> // ==========================================================================</span> |
| <span class="source-line-no">1208</span><span id="line-1208"> // Executor query helpers</span> |
| <span class="source-line-no">1209</span><span id="line-1209"> // ==========================================================================</span> |
| <span class="source-line-no">1210</span><span id="line-1210"> public Procedure<TEnvironment> getProcedure(final long procId) {</span> |
| <span class="source-line-no">1211</span><span id="line-1211"> return procedures.get(procId);</span> |
| <span class="source-line-no">1212</span><span id="line-1212"> }</span> |
| <span class="source-line-no">1213</span><span id="line-1213"></span> |
| <span class="source-line-no">1214</span><span id="line-1214"> public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {</span> |
| <span class="source-line-no">1215</span><span id="line-1215"> Procedure<TEnvironment> proc = getProcedure(procId);</span> |
| <span class="source-line-no">1216</span><span id="line-1216"> if (clazz.isInstance(proc)) {</span> |
| <span class="source-line-no">1217</span><span id="line-1217"> return clazz.cast(proc);</span> |
| <span class="source-line-no">1218</span><span id="line-1218"> }</span> |
| <span class="source-line-no">1219</span><span id="line-1219"> return null;</span> |
| <span class="source-line-no">1220</span><span id="line-1220"> }</span> |
| <span class="source-line-no">1221</span><span id="line-1221"></span> |
| <span class="source-line-no">1222</span><span id="line-1222"> public Procedure<TEnvironment> getResult(long procId) {</span> |
| <span class="source-line-no">1223</span><span id="line-1223"> CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);</span> |
| <span class="source-line-no">1224</span><span id="line-1224"> if (retainer == null) {</span> |
| <span class="source-line-no">1225</span><span id="line-1225"> return null;</span> |
| <span class="source-line-no">1226</span><span id="line-1226"> } else {</span> |
| <span class="source-line-no">1227</span><span id="line-1227"> return retainer.getProcedure();</span> |
| <span class="source-line-no">1228</span><span id="line-1228"> }</span> |
| <span class="source-line-no">1229</span><span id="line-1229"> }</span> |
| <span class="source-line-no">1230</span><span id="line-1230"></span> |
| <span class="source-line-no">1231</span><span id="line-1231"> /**</span> |
| <span class="source-line-no">1232</span><span id="line-1232"> * Return true if the procedure is finished. The state may be "completed successfully" or "failed</span> |
| <span class="source-line-no">1233</span><span id="line-1233"> * and rolledback". Use getResult() to check the state or get the result data.</span> |
| <span class="source-line-no">1234</span><span id="line-1234"> * @param procId the ID of the procedure to check</span> |
| <span class="source-line-no">1235</span><span id="line-1235"> * @return true if the procedure execution is finished, otherwise false.</span> |
| <span class="source-line-no">1236</span><span id="line-1236"> */</span> |
| <span class="source-line-no">1237</span><span id="line-1237"> public boolean isFinished(final long procId) {</span> |
| <span class="source-line-no">1238</span><span id="line-1238"> return !procedures.containsKey(procId);</span> |
| <span class="source-line-no">1239</span><span id="line-1239"> }</span> |
| <span class="source-line-no">1240</span><span id="line-1240"></span> |
| <span class="source-line-no">1241</span><span id="line-1241"> /**</span> |
| <span class="source-line-no">1242</span><span id="line-1242"> * Return true if the procedure is started.</span> |
| <span class="source-line-no">1243</span><span id="line-1243"> * @param procId the ID of the procedure to check</span> |
| <span class="source-line-no">1244</span><span id="line-1244"> * @return true if the procedure execution is started, otherwise false.</span> |
| <span class="source-line-no">1245</span><span id="line-1245"> */</span> |
| <span class="source-line-no">1246</span><span id="line-1246"> public boolean isStarted(long procId) {</span> |
| <span class="source-line-no">1247</span><span id="line-1247"> Procedure<?> proc = procedures.get(procId);</span> |
| <span class="source-line-no">1248</span><span id="line-1248"> if (proc == null) {</span> |
| <span class="source-line-no">1249</span><span id="line-1249"> return completed.get(procId) != null;</span> |
| <span class="source-line-no">1250</span><span id="line-1250"> }</span> |
| <span class="source-line-no">1251</span><span id="line-1251"> return proc.wasExecuted();</span> |
| <span class="source-line-no">1252</span><span id="line-1252"> }</span> |
| <span class="source-line-no">1253</span><span id="line-1253"></span> |
| <span class="source-line-no">1254</span><span id="line-1254"> /**</span> |
| <span class="source-line-no">1255</span><span id="line-1255"> * Mark the specified completed procedure, as ready to remove.</span> |
| <span class="source-line-no">1256</span><span id="line-1256"> * @param procId the ID of the procedure to remove</span> |
| <span class="source-line-no">1257</span><span id="line-1257"> */</span> |
| <span class="source-line-no">1258</span><span id="line-1258"> public void removeResult(long procId) {</span> |
| <span class="source-line-no">1259</span><span id="line-1259"> CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);</span> |
| <span class="source-line-no">1260</span><span id="line-1260"> if (retainer == null) {</span> |
| <span class="source-line-no">1261</span><span id="line-1261"> assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";</span> |
| <span class="source-line-no">1262</span><span id="line-1262"> LOG.debug("pid={} already removed by the cleaner.", procId);</span> |
| <span class="source-line-no">1263</span><span id="line-1263"> return;</span> |
| <span class="source-line-no">1264</span><span id="line-1264"> }</span> |
| <span class="source-line-no">1265</span><span id="line-1265"></span> |
| <span class="source-line-no">1266</span><span id="line-1266"> // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.</span> |
| <span class="source-line-no">1267</span><span id="line-1267"> retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());</span> |
| <span class="source-line-no">1268</span><span id="line-1268"> }</span> |
| <span class="source-line-no">1269</span><span id="line-1269"></span> |
| <span class="source-line-no">1270</span><span id="line-1270"> public Procedure<TEnvironment> getResultOrProcedure(long procId) {</span> |
| <span class="source-line-no">1271</span><span id="line-1271"> CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);</span> |
| <span class="source-line-no">1272</span><span id="line-1272"> if (retainer == null) {</span> |
| <span class="source-line-no">1273</span><span id="line-1273"> return procedures.get(procId);</span> |
| <span class="source-line-no">1274</span><span id="line-1274"> } else {</span> |
| <span class="source-line-no">1275</span><span id="line-1275"> return retainer.getProcedure();</span> |
| <span class="source-line-no">1276</span><span id="line-1276"> }</span> |
| <span class="source-line-no">1277</span><span id="line-1277"> }</span> |
| <span class="source-line-no">1278</span><span id="line-1278"></span> |
| <span class="source-line-no">1279</span><span id="line-1279"> /**</span> |
| <span class="source-line-no">1280</span><span id="line-1280"> * Check if the user is this procedure's owner</span> |
| <span class="source-line-no">1281</span><span id="line-1281"> * @param procId the target procedure</span> |
| <span class="source-line-no">1282</span><span id="line-1282"> * @param user the user</span> |
| <span class="source-line-no">1283</span><span id="line-1283"> * @return true if the user is the owner of the procedure, false otherwise or the owner is</span> |
| <span class="source-line-no">1284</span><span id="line-1284"> * unknown.</span> |
| <span class="source-line-no">1285</span><span id="line-1285"> */</span> |
| <span class="source-line-no">1286</span><span id="line-1286"> public boolean isProcedureOwner(long procId, User user) {</span> |
| <span class="source-line-no">1287</span><span id="line-1287"> if (user == null) {</span> |
| <span class="source-line-no">1288</span><span id="line-1288"> return false;</span> |
| <span class="source-line-no">1289</span><span id="line-1289"> }</span> |
| <span class="source-line-no">1290</span><span id="line-1290"> final Procedure<TEnvironment> runningProc = procedures.get(procId);</span> |
| <span class="source-line-no">1291</span><span id="line-1291"> if (runningProc != null) {</span> |
| <span class="source-line-no">1292</span><span id="line-1292"> return runningProc.getOwner().equals(user.getShortName());</span> |
| <span class="source-line-no">1293</span><span id="line-1293"> }</span> |
| <span class="source-line-no">1294</span><span id="line-1294"></span> |
| <span class="source-line-no">1295</span><span id="line-1295"> final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);</span> |
| <span class="source-line-no">1296</span><span id="line-1296"> if (retainer != null) {</span> |
| <span class="source-line-no">1297</span><span id="line-1297"> return retainer.getProcedure().getOwner().equals(user.getShortName());</span> |
| <span class="source-line-no">1298</span><span id="line-1298"> }</span> |
| <span class="source-line-no">1299</span><span id="line-1299"></span> |
| <span class="source-line-no">1300</span><span id="line-1300"> // Procedure either does not exist or has already completed and got cleaned up.</span> |
| <span class="source-line-no">1301</span><span id="line-1301"> // At this time, we cannot check the owner of the procedure</span> |
| <span class="source-line-no">1302</span><span id="line-1302"> return false;</span> |
| <span class="source-line-no">1303</span><span id="line-1303"> }</span> |
| <span class="source-line-no">1304</span><span id="line-1304"></span> |
| <span class="source-line-no">1305</span><span id="line-1305"> /**</span> |
| <span class="source-line-no">1306</span><span id="line-1306"> * Should only be used when starting up, where the procedure workers have not been started.</span> |
| <span class="source-line-no">1307</span><span id="line-1307"> * <p/></span> |
| <span class="source-line-no">1308</span><span id="line-1308"> * If the procedure works has been started, the return values maybe changed when you are</span> |
| <span class="source-line-no">1309</span><span id="line-1309"> * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as</span> |
| <span class="source-line-no">1310</span><span id="line-1310"> * it will do a copy, and also include the finished procedures.</span> |
| <span class="source-line-no">1311</span><span id="line-1311"> */</span> |
| <span class="source-line-no">1312</span><span id="line-1312"> public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {</span> |
| <span class="source-line-no">1313</span><span id="line-1313"> return procedures.values();</span> |
| <span class="source-line-no">1314</span><span id="line-1314"> }</span> |
| <span class="source-line-no">1315</span><span id="line-1315"></span> |
| <span class="source-line-no">1316</span><span id="line-1316"> /**</span> |
| <span class="source-line-no">1317</span><span id="line-1317"> * Get procedures.</span> |
| <span class="source-line-no">1318</span><span id="line-1318"> * @return the procedures in a list</span> |
| <span class="source-line-no">1319</span><span id="line-1319"> */</span> |
| <span class="source-line-no">1320</span><span id="line-1320"> public List<Procedure<TEnvironment>> getProcedures() {</span> |
| <span class="source-line-no">1321</span><span id="line-1321"> List<Procedure<TEnvironment>> procedureList =</span> |
| <span class="source-line-no">1322</span><span id="line-1322"> new ArrayList<>(procedures.size() + completed.size());</span> |
| <span class="source-line-no">1323</span><span id="line-1323"> procedureList.addAll(procedures.values());</span> |
| <span class="source-line-no">1324</span><span id="line-1324"> // Note: The procedure could show up twice in the list with different state, as</span> |
| <span class="source-line-no">1325</span><span id="line-1325"> // it could complete after we walk through procedures list and insert into</span> |
| <span class="source-line-no">1326</span><span id="line-1326"> // procedureList - it is ok, as we will use the information in the Procedure</span> |
| <span class="source-line-no">1327</span><span id="line-1327"> // to figure it out; to prevent this would increase the complexity of the logic.</span> |
| <span class="source-line-no">1328</span><span id="line-1328"> completed.values().stream().map(CompletedProcedureRetainer::getProcedure)</span> |
| <span class="source-line-no">1329</span><span id="line-1329"> .forEach(procedureList::add);</span> |
| <span class="source-line-no">1330</span><span id="line-1330"> return procedureList;</span> |
| <span class="source-line-no">1331</span><span id="line-1331"> }</span> |
| <span class="source-line-no">1332</span><span id="line-1332"></span> |
| <span class="source-line-no">1333</span><span id="line-1333"> // ==========================================================================</span> |
| <span class="source-line-no">1334</span><span id="line-1334"> // Listeners helpers</span> |
| <span class="source-line-no">1335</span><span id="line-1335"> // ==========================================================================</span> |
| <span class="source-line-no">1336</span><span id="line-1336"> public void registerListener(ProcedureExecutorListener listener) {</span> |
| <span class="source-line-no">1337</span><span id="line-1337"> this.listeners.add(listener);</span> |
| <span class="source-line-no">1338</span><span id="line-1338"> }</span> |
| <span class="source-line-no">1339</span><span id="line-1339"></span> |
| <span class="source-line-no">1340</span><span id="line-1340"> public boolean unregisterListener(ProcedureExecutorListener listener) {</span> |
| <span class="source-line-no">1341</span><span id="line-1341"> return this.listeners.remove(listener);</span> |
| <span class="source-line-no">1342</span><span id="line-1342"> }</span> |
| <span class="source-line-no">1343</span><span id="line-1343"></span> |
| <span class="source-line-no">1344</span><span id="line-1344"> private void sendProcedureLoadedNotification(final long procId) {</span> |
| <span class="source-line-no">1345</span><span id="line-1345"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1346</span><span id="line-1346"> for (ProcedureExecutorListener listener : this.listeners) {</span> |
| <span class="source-line-no">1347</span><span id="line-1347"> try {</span> |
| <span class="source-line-no">1348</span><span id="line-1348"> listener.procedureLoaded(procId);</span> |
| <span class="source-line-no">1349</span><span id="line-1349"> } catch (Throwable e) {</span> |
| <span class="source-line-no">1350</span><span id="line-1350"> LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);</span> |
| <span class="source-line-no">1351</span><span id="line-1351"> }</span> |
| <span class="source-line-no">1352</span><span id="line-1352"> }</span> |
| <span class="source-line-no">1353</span><span id="line-1353"> }</span> |
| <span class="source-line-no">1354</span><span id="line-1354"> }</span> |
| <span class="source-line-no">1355</span><span id="line-1355"></span> |
| <span class="source-line-no">1356</span><span id="line-1356"> private void sendProcedureAddedNotification(final long procId) {</span> |
| <span class="source-line-no">1357</span><span id="line-1357"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1358</span><span id="line-1358"> for (ProcedureExecutorListener listener : this.listeners) {</span> |
| <span class="source-line-no">1359</span><span id="line-1359"> try {</span> |
| <span class="source-line-no">1360</span><span id="line-1360"> listener.procedureAdded(procId);</span> |
| <span class="source-line-no">1361</span><span id="line-1361"> } catch (Throwable e) {</span> |
| <span class="source-line-no">1362</span><span id="line-1362"> LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);</span> |
| <span class="source-line-no">1363</span><span id="line-1363"> }</span> |
| <span class="source-line-no">1364</span><span id="line-1364"> }</span> |
| <span class="source-line-no">1365</span><span id="line-1365"> }</span> |
| <span class="source-line-no">1366</span><span id="line-1366"> }</span> |
| <span class="source-line-no">1367</span><span id="line-1367"></span> |
| <span class="source-line-no">1368</span><span id="line-1368"> private void sendProcedureFinishedNotification(final long procId) {</span> |
| <span class="source-line-no">1369</span><span id="line-1369"> if (!this.listeners.isEmpty()) {</span> |
| <span class="source-line-no">1370</span><span id="line-1370"> for (ProcedureExecutorListener listener : this.listeners) {</span> |
| <span class="source-line-no">1371</span><span id="line-1371"> try {</span> |
| <span class="source-line-no">1372</span><span id="line-1372"> listener.procedureFinished(procId);</span> |
| <span class="source-line-no">1373</span><span id="line-1373"> } catch (Throwable e) {</span> |
| <span class="source-line-no">1374</span><span id="line-1374"> LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);</span> |
| <span class="source-line-no">1375</span><span id="line-1375"> }</span> |
| <span class="source-line-no">1376</span><span id="line-1376"> }</span> |
| <span class="source-line-no">1377</span><span id="line-1377"> }</span> |
| <span class="source-line-no">1378</span><span id="line-1378"> }</span> |
| <span class="source-line-no">1379</span><span id="line-1379"></span> |
| <span class="source-line-no">1380</span><span id="line-1380"> // ==========================================================================</span> |
| <span class="source-line-no">1381</span><span id="line-1381"> // Procedure IDs helpers</span> |
| <span class="source-line-no">1382</span><span id="line-1382"> // ==========================================================================</span> |
| <span class="source-line-no">1383</span><span id="line-1383"> private long nextProcId() {</span> |
| <span class="source-line-no">1384</span><span id="line-1384"> long procId = lastProcId.incrementAndGet();</span> |
| <span class="source-line-no">1385</span><span id="line-1385"> if (procId < 0) {</span> |
| <span class="source-line-no">1386</span><span id="line-1386"> while (!lastProcId.compareAndSet(procId, 0)) {</span> |
| <span class="source-line-no">1387</span><span id="line-1387"> procId = lastProcId.get();</span> |
| <span class="source-line-no">1388</span><span id="line-1388"> if (procId >= 0) {</span> |
| <span class="source-line-no">1389</span><span id="line-1389"> break;</span> |
| <span class="source-line-no">1390</span><span id="line-1390"> }</span> |
| <span class="source-line-no">1391</span><span id="line-1391"> }</span> |
| <span class="source-line-no">1392</span><span id="line-1392"> while (procedures.containsKey(procId)) {</span> |
| <span class="source-line-no">1393</span><span id="line-1393"> procId = lastProcId.incrementAndGet();</span> |
| <span class="source-line-no">1394</span><span id="line-1394"> }</span> |
| <span class="source-line-no">1395</span><span id="line-1395"> }</span> |
| <span class="source-line-no">1396</span><span id="line-1396"> assert procId >= 0 : "Invalid procId " + procId;</span> |
| <span class="source-line-no">1397</span><span id="line-1397"> return procId;</span> |
| <span class="source-line-no">1398</span><span id="line-1398"> }</span> |
| <span class="source-line-no">1399</span><span id="line-1399"></span> |
| <span class="source-line-no">1400</span><span id="line-1400"> protected long getLastProcId() {</span> |
| <span class="source-line-no">1401</span><span id="line-1401"> return lastProcId.get();</span> |
| <span class="source-line-no">1402</span><span id="line-1402"> }</span> |
| <span class="source-line-no">1403</span><span id="line-1403"></span> |
| <span class="source-line-no">1404</span><span id="line-1404"> public Set<Long> getActiveProcIds() {</span> |
| <span class="source-line-no">1405</span><span id="line-1405"> return procedures.keySet();</span> |
| <span class="source-line-no">1406</span><span id="line-1406"> }</span> |
| <span class="source-line-no">1407</span><span id="line-1407"></span> |
| <span class="source-line-no">1408</span><span id="line-1408"> Long getRootProcedureId(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1409</span><span id="line-1409"> return Procedure.getRootProcedureId(procedures, proc);</span> |
| <span class="source-line-no">1410</span><span id="line-1410"> }</span> |
| <span class="source-line-no">1411</span><span id="line-1411"></span> |
| <span class="source-line-no">1412</span><span id="line-1412"> // ==========================================================================</span> |
| <span class="source-line-no">1413</span><span id="line-1413"> // Executions</span> |
| <span class="source-line-no">1414</span><span id="line-1414"> // ==========================================================================</span> |
| <span class="source-line-no">1415</span><span id="line-1415"> private void executeProcedure(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1416</span><span id="line-1416"> if (proc.isFinished()) {</span> |
| <span class="source-line-no">1417</span><span id="line-1417"> LOG.debug("{} is already finished, skipping execution", proc);</span> |
| <span class="source-line-no">1418</span><span id="line-1418"> return;</span> |
| <span class="source-line-no">1419</span><span id="line-1419"> }</span> |
| <span class="source-line-no">1420</span><span id="line-1420"> final Long rootProcId = getRootProcedureId(proc);</span> |
| <span class="source-line-no">1421</span><span id="line-1421"> if (rootProcId == null) {</span> |
| <span class="source-line-no">1422</span><span id="line-1422"> // The 'proc' was ready to run but the root procedure was rolledback</span> |
| <span class="source-line-no">1423</span><span id="line-1423"> LOG.warn("Rollback because parent is done/rolledback proc=" + proc);</span> |
| <span class="source-line-no">1424</span><span id="line-1424"> executeRollback(proc);</span> |
| <span class="source-line-no">1425</span><span id="line-1425"> return;</span> |
| <span class="source-line-no">1426</span><span id="line-1426"> }</span> |
| <span class="source-line-no">1427</span><span id="line-1427"></span> |
| <span class="source-line-no">1428</span><span id="line-1428"> RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);</span> |
| <span class="source-line-no">1429</span><span id="line-1429"> if (procStack == null) {</span> |
| <span class="source-line-no">1430</span><span id="line-1430"> LOG.warn("RootProcedureState is null for " + proc.getProcId());</span> |
| <span class="source-line-no">1431</span><span id="line-1431"> return;</span> |
| <span class="source-line-no">1432</span><span id="line-1432"> }</span> |
| <span class="source-line-no">1433</span><span id="line-1433"> do {</span> |
| <span class="source-line-no">1434</span><span id="line-1434"> // Try to acquire the execution</span> |
| <span class="source-line-no">1435</span><span id="line-1435"> if (!procStack.acquire(proc)) {</span> |
| <span class="source-line-no">1436</span><span id="line-1436"> if (procStack.setRollback()) {</span> |
| <span class="source-line-no">1437</span><span id="line-1437"> // we have the 'rollback-lock' we can start rollingback</span> |
| <span class="source-line-no">1438</span><span id="line-1438"> switch (executeRollback(rootProcId, procStack)) {</span> |
| <span class="source-line-no">1439</span><span id="line-1439"> case LOCK_ACQUIRED:</span> |
| <span class="source-line-no">1440</span><span id="line-1440"> break;</span> |
| <span class="source-line-no">1441</span><span id="line-1441"> case LOCK_YIELD_WAIT:</span> |
| <span class="source-line-no">1442</span><span id="line-1442"> procStack.unsetRollback();</span> |
| <span class="source-line-no">1443</span><span id="line-1443"> scheduler.yield(proc);</span> |
| <span class="source-line-no">1444</span><span id="line-1444"> break;</span> |
| <span class="source-line-no">1445</span><span id="line-1445"> case LOCK_EVENT_WAIT:</span> |
| <span class="source-line-no">1446</span><span id="line-1446"> LOG.info("LOCK_EVENT_WAIT rollback..." + proc);</span> |
| <span class="source-line-no">1447</span><span id="line-1447"> procStack.unsetRollback();</span> |
| <span class="source-line-no">1448</span><span id="line-1448"> break;</span> |
| <span class="source-line-no">1449</span><span id="line-1449"> default:</span> |
| <span class="source-line-no">1450</span><span id="line-1450"> throw new UnsupportedOperationException();</span> |
| <span class="source-line-no">1451</span><span id="line-1451"> }</span> |
| <span class="source-line-no">1452</span><span id="line-1452"> } else {</span> |
| <span class="source-line-no">1453</span><span id="line-1453"> // if we can't rollback means that some child is still running.</span> |
| <span class="source-line-no">1454</span><span id="line-1454"> // the rollback will be executed after all the children are done.</span> |
| <span class="source-line-no">1455</span><span id="line-1455"> // If the procedure was never executed, remove and mark it as rolledback.</span> |
| <span class="source-line-no">1456</span><span id="line-1456"> if (!proc.wasExecuted()) {</span> |
| <span class="source-line-no">1457</span><span id="line-1457"> switch (executeRollback(proc)) {</span> |
| <span class="source-line-no">1458</span><span id="line-1458"> case LOCK_ACQUIRED:</span> |
| <span class="source-line-no">1459</span><span id="line-1459"> break;</span> |
| <span class="source-line-no">1460</span><span id="line-1460"> case LOCK_YIELD_WAIT:</span> |
| <span class="source-line-no">1461</span><span id="line-1461"> scheduler.yield(proc);</span> |
| <span class="source-line-no">1462</span><span id="line-1462"> break;</span> |
| <span class="source-line-no">1463</span><span id="line-1463"> case LOCK_EVENT_WAIT:</span> |
| <span class="source-line-no">1464</span><span id="line-1464"> LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);</span> |
| <span class="source-line-no">1465</span><span id="line-1465"> break;</span> |
| <span class="source-line-no">1466</span><span id="line-1466"> default:</span> |
| <span class="source-line-no">1467</span><span id="line-1467"> throw new UnsupportedOperationException();</span> |
| <span class="source-line-no">1468</span><span id="line-1468"> }</span> |
| <span class="source-line-no">1469</span><span id="line-1469"> }</span> |
| <span class="source-line-no">1470</span><span id="line-1470"> }</span> |
| <span class="source-line-no">1471</span><span id="line-1471"> break;</span> |
| <span class="source-line-no">1472</span><span id="line-1472"> }</span> |
| <span class="source-line-no">1473</span><span id="line-1473"></span> |
| <span class="source-line-no">1474</span><span id="line-1474"> // Execute the procedure</span> |
| <span class="source-line-no">1475</span><span id="line-1475"> assert proc.getState() == ProcedureState.RUNNABLE : proc;</span> |
| <span class="source-line-no">1476</span><span id="line-1476"> // Note that lock is NOT about concurrency but rather about ensuring</span> |
| <span class="source-line-no">1477</span><span id="line-1477"> // ownership of a procedure of an entity such as a region or table</span> |
| <span class="source-line-no">1478</span><span id="line-1478"> LockState lockState = acquireLock(proc);</span> |
| <span class="source-line-no">1479</span><span id="line-1479"> switch (lockState) {</span> |
| <span class="source-line-no">1480</span><span id="line-1480"> case LOCK_ACQUIRED:</span> |
| <span class="source-line-no">1481</span><span id="line-1481"> execProcedure(procStack, proc);</span> |
| <span class="source-line-no">1482</span><span id="line-1482"> break;</span> |
| <span class="source-line-no">1483</span><span id="line-1483"> case LOCK_YIELD_WAIT:</span> |
| <span class="source-line-no">1484</span><span id="line-1484"> LOG.info(lockState + " " + proc);</span> |
| <span class="source-line-no">1485</span><span id="line-1485"> scheduler.yield(proc);</span> |
| <span class="source-line-no">1486</span><span id="line-1486"> break;</span> |
| <span class="source-line-no">1487</span><span id="line-1487"> case LOCK_EVENT_WAIT:</span> |
| <span class="source-line-no">1488</span><span id="line-1488"> // Someone will wake us up when the lock is available</span> |
| <span class="source-line-no">1489</span><span id="line-1489"> LOG.debug(lockState + " " + proc);</span> |
| <span class="source-line-no">1490</span><span id="line-1490"> break;</span> |
| <span class="source-line-no">1491</span><span id="line-1491"> default:</span> |
| <span class="source-line-no">1492</span><span id="line-1492"> throw new UnsupportedOperationException();</span> |
| <span class="source-line-no">1493</span><span id="line-1493"> }</span> |
| <span class="source-line-no">1494</span><span id="line-1494"> procStack.release(proc);</span> |
| <span class="source-line-no">1495</span><span id="line-1495"></span> |
| <span class="source-line-no">1496</span><span id="line-1496"> if (proc.isSuccess()) {</span> |
| <span class="source-line-no">1497</span><span id="line-1497"> // update metrics on finishing the procedure</span> |
| <span class="source-line-no">1498</span><span id="line-1498"> proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);</span> |
| <span class="source-line-no">1499</span><span id="line-1499"> LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));</span> |
| <span class="source-line-no">1500</span><span id="line-1500"> // Finalize the procedure state</span> |
| <span class="source-line-no">1501</span><span id="line-1501"> if (proc.getProcId() == rootProcId) {</span> |
| <span class="source-line-no">1502</span><span id="line-1502"> procedureFinished(proc);</span> |
| <span class="source-line-no">1503</span><span id="line-1503"> } else {</span> |
| <span class="source-line-no">1504</span><span id="line-1504"> execCompletionCleanup(proc);</span> |
| <span class="source-line-no">1505</span><span id="line-1505"> }</span> |
| <span class="source-line-no">1506</span><span id="line-1506"> break;</span> |
| <span class="source-line-no">1507</span><span id="line-1507"> }</span> |
| <span class="source-line-no">1508</span><span id="line-1508"> } while (procStack.isFailed());</span> |
| <span class="source-line-no">1509</span><span id="line-1509"> }</span> |
| <span class="source-line-no">1510</span><span id="line-1510"></span> |
| <span class="source-line-no">1511</span><span id="line-1511"> private LockState acquireLock(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1512</span><span id="line-1512"> TEnvironment env = getEnvironment();</span> |
| <span class="source-line-no">1513</span><span id="line-1513"> // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if</span> |
| <span class="source-line-no">1514</span><span id="line-1514"> // hasLock is true.</span> |
| <span class="source-line-no">1515</span><span id="line-1515"> if (proc.hasLock()) {</span> |
| <span class="source-line-no">1516</span><span id="line-1516"> return LockState.LOCK_ACQUIRED;</span> |
| <span class="source-line-no">1517</span><span id="line-1517"> }</span> |
| <span class="source-line-no">1518</span><span id="line-1518"> return proc.doAcquireLock(env, store);</span> |
| <span class="source-line-no">1519</span><span id="line-1519"> }</span> |
| <span class="source-line-no">1520</span><span id="line-1520"></span> |
| <span class="source-line-no">1521</span><span id="line-1521"> private void releaseLock(Procedure<TEnvironment> proc, boolean force) {</span> |
| <span class="source-line-no">1522</span><span id="line-1522"> TEnvironment env = getEnvironment();</span> |
| <span class="source-line-no">1523</span><span id="line-1523"> // For how the framework works, we know that we will always have the lock</span> |
| <span class="source-line-no">1524</span><span id="line-1524"> // when we call releaseLock(), so we can avoid calling proc.hasLock()</span> |
| <span class="source-line-no">1525</span><span id="line-1525"> if (force || !proc.holdLock(env) || proc.isFinished()) {</span> |
| <span class="source-line-no">1526</span><span id="line-1526"> proc.doReleaseLock(env, store);</span> |
| <span class="source-line-no">1527</span><span id="line-1527"> }</span> |
| <span class="source-line-no">1528</span><span id="line-1528"> }</span> |
| <span class="source-line-no">1529</span><span id="line-1529"></span> |
| <span class="source-line-no">1530</span><span id="line-1530"> // Returning null means we have already held the execution lock, so you do not need to get the</span> |
| <span class="source-line-no">1531</span><span id="line-1531"> // lock entry for releasing</span> |
| <span class="source-line-no">1532</span><span id="line-1532"> private IdLock.Entry getLockEntryForRollback(long procId) {</span> |
| <span class="source-line-no">1533</span><span id="line-1533"> // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need</span> |
| <span class="source-line-no">1534</span><span id="line-1534"> // this check, as the worker will hold the lock before executing a procedure. This is the only</span> |
| <span class="source-line-no">1535</span><span id="line-1535"> // place where we may hold two procedure execution locks, and there is a fence in the</span> |
| <span class="source-line-no">1536</span><span id="line-1536"> // RootProcedureState where we can make sure that only one worker can execute the rollback of</span> |
| <span class="source-line-no">1537</span><span id="line-1537"> // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to</span> |
| <span class="source-line-no">1538</span><span id="line-1538"> // prevent race between us and the force update thread.</span> |
| <span class="source-line-no">1539</span><span id="line-1539"> if (!procExecutionLock.isHeldByCurrentThread(procId)) {</span> |
| <span class="source-line-no">1540</span><span id="line-1540"> try {</span> |
| <span class="source-line-no">1541</span><span id="line-1541"> return procExecutionLock.getLockEntry(procId);</span> |
| <span class="source-line-no">1542</span><span id="line-1542"> } catch (IOException e) {</span> |
| <span class="source-line-no">1543</span><span id="line-1543"> // can only happen if interrupted, so not a big deal to propagate it</span> |
| <span class="source-line-no">1544</span><span id="line-1544"> throw new UncheckedIOException(e);</span> |
| <span class="source-line-no">1545</span><span id="line-1545"> }</span> |
| <span class="source-line-no">1546</span><span id="line-1546"> }</span> |
| <span class="source-line-no">1547</span><span id="line-1547"> return null;</span> |
| <span class="source-line-no">1548</span><span id="line-1548"> }</span> |
| <span class="source-line-no">1549</span><span id="line-1549"></span> |
| <span class="source-line-no">1550</span><span id="line-1550"> private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc,</span> |
| <span class="source-line-no">1551</span><span id="line-1551"> RootProcedureState<TEnvironment> procStack) {</span> |
| <span class="source-line-no">1552</span><span id="line-1552"> if (procStack.getSubprocs() != null) {</span> |
| <span class="source-line-no">1553</span><span id="line-1553"> // comparing proc id in reverse order, so we will delete later procedures first, otherwise we</span> |
| <span class="source-line-no">1554</span><span id="line-1554"> // may delete parent procedure first and if we fail in the middle of this operation, when</span> |
| <span class="source-line-no">1555</span><span id="line-1555"> // loading we will find some orphan procedures</span> |
| <span class="source-line-no">1556</span><span id="line-1556"> PriorityQueue<Procedure<TEnvironment>> pq =</span> |
| <span class="source-line-no">1557</span><span id="line-1557"> new PriorityQueue<>(procStack.getSubprocs().size(),</span> |
| <span class="source-line-no">1558</span><span id="line-1558"> Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed());</span> |
| <span class="source-line-no">1559</span><span id="line-1559"> pq.addAll(procStack.getSubprocs());</span> |
| <span class="source-line-no">1560</span><span id="line-1560"> for (;;) {</span> |
| <span class="source-line-no">1561</span><span id="line-1561"> Procedure<TEnvironment> subproc = pq.poll();</span> |
| <span class="source-line-no">1562</span><span id="line-1562"> if (subproc == null) {</span> |
| <span class="source-line-no">1563</span><span id="line-1563"> break;</span> |
| <span class="source-line-no">1564</span><span id="line-1564"> }</span> |
| <span class="source-line-no">1565</span><span id="line-1565"> if (!procedures.containsKey(subproc.getProcId())) {</span> |
| <span class="source-line-no">1566</span><span id="line-1566"> // this means it has already been rolledback</span> |
| <span class="source-line-no">1567</span><span id="line-1567"> continue;</span> |
| <span class="source-line-no">1568</span><span id="line-1568"> }</span> |
| <span class="source-line-no">1569</span><span id="line-1569"> IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId());</span> |
| <span class="source-line-no">1570</span><span id="line-1570"> try {</span> |
| <span class="source-line-no">1571</span><span id="line-1571"> cleanupAfterRollbackOneStep(subproc);</span> |
| <span class="source-line-no">1572</span><span id="line-1572"> execCompletionCleanup(subproc);</span> |
| <span class="source-line-no">1573</span><span id="line-1573"> } finally {</span> |
| <span class="source-line-no">1574</span><span id="line-1574"> if (lockEntry != null) {</span> |
| <span class="source-line-no">1575</span><span id="line-1575"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">1576</span><span id="line-1576"> }</span> |
| <span class="source-line-no">1577</span><span id="line-1577"> }</span> |
| <span class="source-line-no">1578</span><span id="line-1578"> }</span> |
| <span class="source-line-no">1579</span><span id="line-1579"> }</span> |
| <span class="source-line-no">1580</span><span id="line-1580"> IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());</span> |
| <span class="source-line-no">1581</span><span id="line-1581"> try {</span> |
| <span class="source-line-no">1582</span><span id="line-1582"> cleanupAfterRollbackOneStep(rootProc);</span> |
| <span class="source-line-no">1583</span><span id="line-1583"> } finally {</span> |
| <span class="source-line-no">1584</span><span id="line-1584"> if (lockEntry != null) {</span> |
| <span class="source-line-no">1585</span><span id="line-1585"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">1586</span><span id="line-1586"> }</span> |
| <span class="source-line-no">1587</span><span id="line-1587"> }</span> |
| <span class="source-line-no">1588</span><span id="line-1588"> }</span> |
| <span class="source-line-no">1589</span><span id="line-1589"></span> |
| <span class="source-line-no">1590</span><span id="line-1590"> private LockState executeNormalRollback(Procedure<TEnvironment> rootProc,</span> |
| <span class="source-line-no">1591</span><span id="line-1591"> RootProcedureState<TEnvironment> procStack) {</span> |
| <span class="source-line-no">1592</span><span id="line-1592"> List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();</span> |
| <span class="source-line-no">1593</span><span id="line-1593"> assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;</span> |
| <span class="source-line-no">1594</span><span id="line-1594"></span> |
| <span class="source-line-no">1595</span><span id="line-1595"> int stackTail = subprocStack.size();</span> |
| <span class="source-line-no">1596</span><span id="line-1596"> while (stackTail-- > 0) {</span> |
| <span class="source-line-no">1597</span><span id="line-1597"> Procedure<TEnvironment> proc = subprocStack.get(stackTail);</span> |
| <span class="source-line-no">1598</span><span id="line-1598"> IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId());</span> |
| <span class="source-line-no">1599</span><span id="line-1599"> try {</span> |
| <span class="source-line-no">1600</span><span id="line-1600"> // For the sub procedures which are successfully finished, we do not rollback them.</span> |
| <span class="source-line-no">1601</span><span id="line-1601"> // Typically, if we want to rollback a procedure, we first need to rollback it, and then</span> |
| <span class="source-line-no">1602</span><span id="line-1602"> // recursively rollback its ancestors. The state changes which are done by sub procedures</span> |
| <span class="source-line-no">1603</span><span id="line-1603"> // should be handled by parent procedures when rolling back. For example, when rolling back</span> |
| <span class="source-line-no">1604</span><span id="line-1604"> // a MergeTableProcedure, we will schedule new procedures to bring the offline regions</span> |
| <span class="source-line-no">1605</span><span id="line-1605"> // online, instead of rolling back the original procedures which offlined the regions(in</span> |
| <span class="source-line-no">1606</span><span id="line-1606"> // fact these procedures can not be rolled back...).</span> |
| <span class="source-line-no">1607</span><span id="line-1607"> if (proc.isSuccess()) {</span> |
| <span class="source-line-no">1608</span><span id="line-1608"> // Just do the cleanup work, without actually executing the rollback</span> |
| <span class="source-line-no">1609</span><span id="line-1609"> subprocStack.remove(stackTail);</span> |
| <span class="source-line-no">1610</span><span id="line-1610"> cleanupAfterRollbackOneStep(proc);</span> |
| <span class="source-line-no">1611</span><span id="line-1611"> continue;</span> |
| <span class="source-line-no">1612</span><span id="line-1612"> }</span> |
| <span class="source-line-no">1613</span><span id="line-1613"> LockState lockState = acquireLock(proc);</span> |
| <span class="source-line-no">1614</span><span id="line-1614"> if (lockState != LockState.LOCK_ACQUIRED) {</span> |
| <span class="source-line-no">1615</span><span id="line-1615"> // can't take a lock on the procedure, add the root-proc back on the</span> |
| <span class="source-line-no">1616</span><span id="line-1616"> // queue waiting for the lock availability</span> |
| <span class="source-line-no">1617</span><span id="line-1617"> return lockState;</span> |
| <span class="source-line-no">1618</span><span id="line-1618"> }</span> |
| <span class="source-line-no">1619</span><span id="line-1619"></span> |
| <span class="source-line-no">1620</span><span id="line-1620"> lockState = executeRollback(proc);</span> |
| <span class="source-line-no">1621</span><span id="line-1621"> releaseLock(proc, false);</span> |
| <span class="source-line-no">1622</span><span id="line-1622"> boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;</span> |
| <span class="source-line-no">1623</span><span id="line-1623"> abortRollback |= !isRunning() || !store.isRunning();</span> |
| <span class="source-line-no">1624</span><span id="line-1624"></span> |
| <span class="source-line-no">1625</span><span id="line-1625"> // allows to kill the executor before something is stored to the wal.</span> |
| <span class="source-line-no">1626</span><span id="line-1626"> // useful to test the procedure recovery.</span> |
| <span class="source-line-no">1627</span><span id="line-1627"> if (abortRollback) {</span> |
| <span class="source-line-no">1628</span><span id="line-1628"> return lockState;</span> |
| <span class="source-line-no">1629</span><span id="line-1629"> }</span> |
| <span class="source-line-no">1630</span><span id="line-1630"></span> |
| <span class="source-line-no">1631</span><span id="line-1631"> subprocStack.remove(stackTail);</span> |
| <span class="source-line-no">1632</span><span id="line-1632"></span> |
| <span class="source-line-no">1633</span><span id="line-1633"> // if the procedure is kind enough to pass the slot to someone else, yield</span> |
| <span class="source-line-no">1634</span><span id="line-1634"> // if the proc is already finished, do not yield</span> |
| <span class="source-line-no">1635</span><span id="line-1635"> if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {</span> |
| <span class="source-line-no">1636</span><span id="line-1636"> return LockState.LOCK_YIELD_WAIT;</span> |
| <span class="source-line-no">1637</span><span id="line-1637"> }</span> |
| <span class="source-line-no">1638</span><span id="line-1638"></span> |
| <span class="source-line-no">1639</span><span id="line-1639"> if (proc != rootProc) {</span> |
| <span class="source-line-no">1640</span><span id="line-1640"> execCompletionCleanup(proc);</span> |
| <span class="source-line-no">1641</span><span id="line-1641"> }</span> |
| <span class="source-line-no">1642</span><span id="line-1642"> } finally {</span> |
| <span class="source-line-no">1643</span><span id="line-1643"> if (lockEntry != null) {</span> |
| <span class="source-line-no">1644</span><span id="line-1644"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">1645</span><span id="line-1645"> }</span> |
| <span class="source-line-no">1646</span><span id="line-1646"> }</span> |
| <span class="source-line-no">1647</span><span id="line-1647"> }</span> |
| <span class="source-line-no">1648</span><span id="line-1648"> return LockState.LOCK_ACQUIRED;</span> |
| <span class="source-line-no">1649</span><span id="line-1649"> }</span> |
| <span class="source-line-no">1650</span><span id="line-1650"></span> |
| <span class="source-line-no">1651</span><span id="line-1651"> /**</span> |
| <span class="source-line-no">1652</span><span id="line-1652"> * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the</span> |
| <span class="source-line-no">1653</span><span id="line-1653"> * root-procedure will be visible as finished to user, and the result will be the fatal exception.</span> |
| <span class="source-line-no">1654</span><span id="line-1654"> */</span> |
| <span class="source-line-no">1655</span><span id="line-1655"> private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {</span> |
| <span class="source-line-no">1656</span><span id="line-1656"> Procedure<TEnvironment> rootProc = procedures.get(rootProcId);</span> |
| <span class="source-line-no">1657</span><span id="line-1657"> RemoteProcedureException exception = rootProc.getException();</span> |
| <span class="source-line-no">1658</span><span id="line-1658"> // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are</span> |
| <span class="source-line-no">1659</span><span id="line-1659"> // rolling back because the subprocedure does. Clarify.</span> |
| <span class="source-line-no">1660</span><span id="line-1660"> if (exception == null) {</span> |
| <span class="source-line-no">1661</span><span id="line-1661"> exception = procStack.getException();</span> |
| <span class="source-line-no">1662</span><span id="line-1662"> rootProc.setFailure(exception);</span> |
| <span class="source-line-no">1663</span><span id="line-1663"> store.update(rootProc);</span> |
| <span class="source-line-no">1664</span><span id="line-1664"> }</span> |
| <span class="source-line-no">1665</span><span id="line-1665"></span> |
| <span class="source-line-no">1666</span><span id="line-1666"> if (procStack.isRollbackSupported()) {</span> |
| <span class="source-line-no">1667</span><span id="line-1667"> LockState lockState = executeNormalRollback(rootProc, procStack);</span> |
| <span class="source-line-no">1668</span><span id="line-1668"> if (lockState != LockState.LOCK_ACQUIRED) {</span> |
| <span class="source-line-no">1669</span><span id="line-1669"> return lockState;</span> |
| <span class="source-line-no">1670</span><span id="line-1670"> }</span> |
| <span class="source-line-no">1671</span><span id="line-1671"> } else {</span> |
| <span class="source-line-no">1672</span><span id="line-1672"> // the procedure does not support rollback, so typically we should not reach here, this</span> |
| <span class="source-line-no">1673</span><span id="line-1673"> // usually means there are code bugs, let's just wait all the subprocedures to finish and then</span> |
| <span class="source-line-no">1674</span><span id="line-1674"> // mark the root procedure as failure.</span> |
| <span class="source-line-no">1675</span><span id="line-1675"> LOG.error(HBaseMarkers.FATAL,</span> |
| <span class="source-line-no">1676</span><span id="line-1676"> "Root Procedure {} does not support rollback but the execution failed"</span> |
| <span class="source-line-no">1677</span><span id="line-1677"> + " and try to rollback, code bug?",</span> |
| <span class="source-line-no">1678</span><span id="line-1678"> rootProc, exception);</span> |
| <span class="source-line-no">1679</span><span id="line-1679"> executeUnexpectedRollback(rootProc, procStack);</span> |
| <span class="source-line-no">1680</span><span id="line-1680"> }</span> |
| <span class="source-line-no">1681</span><span id="line-1681"></span> |
| <span class="source-line-no">1682</span><span id="line-1682"> IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId());</span> |
| <span class="source-line-no">1683</span><span id="line-1683"> try {</span> |
| <span class="source-line-no">1684</span><span id="line-1684"> // Finalize the procedure state</span> |
| <span class="source-line-no">1685</span><span id="line-1685"> LOG.info("Rolled back {} exec-time={}", rootProc,</span> |
| <span class="source-line-no">1686</span><span id="line-1686"> StringUtils.humanTimeDiff(rootProc.elapsedTime()));</span> |
| <span class="source-line-no">1687</span><span id="line-1687"> procedureFinished(rootProc);</span> |
| <span class="source-line-no">1688</span><span id="line-1688"> } finally {</span> |
| <span class="source-line-no">1689</span><span id="line-1689"> if (lockEntry != null) {</span> |
| <span class="source-line-no">1690</span><span id="line-1690"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">1691</span><span id="line-1691"> }</span> |
| <span class="source-line-no">1692</span><span id="line-1692"> }</span> |
| <span class="source-line-no">1693</span><span id="line-1693"></span> |
| <span class="source-line-no">1694</span><span id="line-1694"> return LockState.LOCK_ACQUIRED;</span> |
| <span class="source-line-no">1695</span><span id="line-1695"> }</span> |
| <span class="source-line-no">1696</span><span id="line-1696"></span> |
| <span class="source-line-no">1697</span><span id="line-1697"> private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1698</span><span id="line-1698"> if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) {</span> |
| <span class="source-line-no">1699</span><span id="line-1699"> kill("TESTING: Kill BEFORE store update in rollback: " + proc);</span> |
| <span class="source-line-no">1700</span><span id="line-1700"> }</span> |
| <span class="source-line-no">1701</span><span id="line-1701"> if (proc.removeStackIndex()) {</span> |
| <span class="source-line-no">1702</span><span id="line-1702"> if (!proc.isSuccess()) {</span> |
| <span class="source-line-no">1703</span><span id="line-1703"> proc.setState(ProcedureState.ROLLEDBACK);</span> |
| <span class="source-line-no">1704</span><span id="line-1704"> }</span> |
| <span class="source-line-no">1705</span><span id="line-1705"></span> |
| <span class="source-line-no">1706</span><span id="line-1706"> // update metrics on finishing the procedure (fail)</span> |
| <span class="source-line-no">1707</span><span id="line-1707"> proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);</span> |
| <span class="source-line-no">1708</span><span id="line-1708"></span> |
| <span class="source-line-no">1709</span><span id="line-1709"> if (proc.hasParent()) {</span> |
| <span class="source-line-no">1710</span><span id="line-1710"> store.delete(proc.getProcId());</span> |
| <span class="source-line-no">1711</span><span id="line-1711"> procedures.remove(proc.getProcId());</span> |
| <span class="source-line-no">1712</span><span id="line-1712"> } else {</span> |
| <span class="source-line-no">1713</span><span id="line-1713"> final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();</span> |
| <span class="source-line-no">1714</span><span id="line-1714"> if (childProcIds != null) {</span> |
| <span class="source-line-no">1715</span><span id="line-1715"> store.delete(proc, childProcIds);</span> |
| <span class="source-line-no">1716</span><span id="line-1716"> } else {</span> |
| <span class="source-line-no">1717</span><span id="line-1717"> store.update(proc);</span> |
| <span class="source-line-no">1718</span><span id="line-1718"> }</span> |
| <span class="source-line-no">1719</span><span id="line-1719"> }</span> |
| <span class="source-line-no">1720</span><span id="line-1720"> } else {</span> |
| <span class="source-line-no">1721</span><span id="line-1721"> store.update(proc);</span> |
| <span class="source-line-no">1722</span><span id="line-1722"> }</span> |
| <span class="source-line-no">1723</span><span id="line-1723"> }</span> |
| <span class="source-line-no">1724</span><span id="line-1724"></span> |
| <span class="source-line-no">1725</span><span id="line-1725"> /**</span> |
| <span class="source-line-no">1726</span><span id="line-1726"> * Execute the rollback of the procedure step. It updates the store with the new state (stack</span> |
| <span class="source-line-no">1727</span><span id="line-1727"> * index) or will remove completly the procedure in case it is a child.</span> |
| <span class="source-line-no">1728</span><span id="line-1728"> */</span> |
| <span class="source-line-no">1729</span><span id="line-1729"> private LockState executeRollback(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1730</span><span id="line-1730"> try {</span> |
| <span class="source-line-no">1731</span><span id="line-1731"> proc.doRollback(getEnvironment());</span> |
| <span class="source-line-no">1732</span><span id="line-1732"> } catch (IOException e) {</span> |
| <span class="source-line-no">1733</span><span id="line-1733"> LOG.debug("Roll back attempt failed for {}", proc, e);</span> |
| <span class="source-line-no">1734</span><span id="line-1734"> return LockState.LOCK_YIELD_WAIT;</span> |
| <span class="source-line-no">1735</span><span id="line-1735"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">1736</span><span id="line-1736"> handleInterruptedException(proc, e);</span> |
| <span class="source-line-no">1737</span><span id="line-1737"> return LockState.LOCK_YIELD_WAIT;</span> |
| <span class="source-line-no">1738</span><span id="line-1738"> } catch (Throwable e) {</span> |
| <span class="source-line-no">1739</span><span id="line-1739"> // Catch NullPointerExceptions or similar errors...</span> |
| <span class="source-line-no">1740</span><span id="line-1740"> LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);</span> |
| <span class="source-line-no">1741</span><span id="line-1741"> }</span> |
| <span class="source-line-no">1742</span><span id="line-1742"></span> |
| <span class="source-line-no">1743</span><span id="line-1743"> cleanupAfterRollbackOneStep(proc);</span> |
| <span class="source-line-no">1744</span><span id="line-1744"></span> |
| <span class="source-line-no">1745</span><span id="line-1745"> return LockState.LOCK_ACQUIRED;</span> |
| <span class="source-line-no">1746</span><span id="line-1746"> }</span> |
| <span class="source-line-no">1747</span><span id="line-1747"></span> |
| <span class="source-line-no">1748</span><span id="line-1748"> private void yieldProcedure(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">1749</span><span id="line-1749"> releaseLock(proc, false);</span> |
| <span class="source-line-no">1750</span><span id="line-1750"> scheduler.yield(proc);</span> |
| <span class="source-line-no">1751</span><span id="line-1751"> }</span> |
| <span class="source-line-no">1752</span><span id="line-1752"></span> |
| <span class="source-line-no">1753</span><span id="line-1753"> /**</span> |
| <span class="source-line-no">1754</span><span id="line-1754"> * Executes <code>procedure</code></span> |
| <span class="source-line-no">1755</span><span id="line-1755"> * <ul></span> |
| <span class="source-line-no">1756</span><span id="line-1756"> * <li>Calls the doExecute() of the procedure</span> |
| <span class="source-line-no">1757</span><span id="line-1757"> * <li>If the procedure execution didn't fail (i.e. valid user input)</span> |
| <span class="source-line-no">1758</span><span id="line-1758"> * <ul></span> |
| <span class="source-line-no">1759</span><span id="line-1759"> * <li>...and returned subprocedures</span> |
| <span class="source-line-no">1760</span><span id="line-1760"> * <ul></span> |
| <span class="source-line-no">1761</span><span id="line-1761"> * <li>The subprocedures are initialized.</span> |
| <span class="source-line-no">1762</span><span id="line-1762"> * <li>The subprocedures are added to the store</span> |
| <span class="source-line-no">1763</span><span id="line-1763"> * <li>The subprocedures are added to the runnable queue</span> |
| <span class="source-line-no">1764</span><span id="line-1764"> * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete</span> |
| <span class="source-line-no">1765</span><span id="line-1765"> * </ul></span> |
| <span class="source-line-no">1766</span><span id="line-1766"> * </li></span> |
| <span class="source-line-no">1767</span><span id="line-1767"> * <li>...if there are no subprocedure</span> |
| <span class="source-line-no">1768</span><span id="line-1768"> * <ul></span> |
| <span class="source-line-no">1769</span><span id="line-1769"> * <li>the procedure completed successfully</span> |
| <span class="source-line-no">1770</span><span id="line-1770"> * <li>if there is a parent (WAITING)</span> |
| <span class="source-line-no">1771</span><span id="line-1771"> * <li>the parent state will be set to RUNNABLE</span> |
| <span class="source-line-no">1772</span><span id="line-1772"> * </ul></span> |
| <span class="source-line-no">1773</span><span id="line-1773"> * </li></span> |
| <span class="source-line-no">1774</span><span id="line-1774"> * </ul></span> |
| <span class="source-line-no">1775</span><span id="line-1775"> * </li></span> |
| <span class="source-line-no">1776</span><span id="line-1776"> * <li>In case of failure</span> |
| <span class="source-line-no">1777</span><span id="line-1777"> * <ul></span> |
| <span class="source-line-no">1778</span><span id="line-1778"> * <li>The store is updated with the new state</li></span> |
| <span class="source-line-no">1779</span><span id="line-1779"> * <li>The executor (caller of this method) will start the rollback of the procedure</li></span> |
| <span class="source-line-no">1780</span><span id="line-1780"> * </ul></span> |
| <span class="source-line-no">1781</span><span id="line-1781"> * </li></span> |
| <span class="source-line-no">1782</span><span id="line-1782"> * </ul></span> |
| <span class="source-line-no">1783</span><span id="line-1783"> */</span> |
| <span class="source-line-no">1784</span><span id="line-1784"> private void execProcedure(RootProcedureState<TEnvironment> procStack,</span> |
| <span class="source-line-no">1785</span><span id="line-1785"> Procedure<TEnvironment> procedure) {</span> |
| <span class="source-line-no">1786</span><span id="line-1786"> Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,</span> |
| <span class="source-line-no">1787</span><span id="line-1787"> "NOT RUNNABLE! " + procedure.toString());</span> |
| <span class="source-line-no">1788</span><span id="line-1788"></span> |
| <span class="source-line-no">1789</span><span id="line-1789"> // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.</span> |
| <span class="source-line-no">1790</span><span id="line-1790"> // The exception is caught below and then we hurry to the exit without disturbing state. The</span> |
| <span class="source-line-no">1791</span><span id="line-1791"> // idea is that the processing of this procedure will be unsuspended later by an external event</span> |
| <span class="source-line-no">1792</span><span id="line-1792"> // such the report of a region open.</span> |
| <span class="source-line-no">1793</span><span id="line-1793"> boolean suspended = false;</span> |
| <span class="source-line-no">1794</span><span id="line-1794"></span> |
| <span class="source-line-no">1795</span><span id="line-1795"> // Whether to 're-' -execute; run through the loop again.</span> |
| <span class="source-line-no">1796</span><span id="line-1796"> boolean reExecute = false;</span> |
| <span class="source-line-no">1797</span><span id="line-1797"></span> |
| <span class="source-line-no">1798</span><span id="line-1798"> Procedure<TEnvironment>[] subprocs = null;</span> |
| <span class="source-line-no">1799</span><span id="line-1799"> do {</span> |
| <span class="source-line-no">1800</span><span id="line-1800"> reExecute = false;</span> |
| <span class="source-line-no">1801</span><span id="line-1801"> procedure.resetPersistence();</span> |
| <span class="source-line-no">1802</span><span id="line-1802"> try {</span> |
| <span class="source-line-no">1803</span><span id="line-1803"> subprocs = procedure.doExecute(getEnvironment());</span> |
| <span class="source-line-no">1804</span><span id="line-1804"> if (subprocs != null && subprocs.length == 0) {</span> |
| <span class="source-line-no">1805</span><span id="line-1805"> subprocs = null;</span> |
| <span class="source-line-no">1806</span><span id="line-1806"> }</span> |
| <span class="source-line-no">1807</span><span id="line-1807"> } catch (ProcedureSuspendedException e) {</span> |
| <span class="source-line-no">1808</span><span id="line-1808"> LOG.trace("Suspend {}", procedure);</span> |
| <span class="source-line-no">1809</span><span id="line-1809"> suspended = true;</span> |
| <span class="source-line-no">1810</span><span id="line-1810"> } catch (ProcedureYieldException e) {</span> |
| <span class="source-line-no">1811</span><span id="line-1811"> LOG.trace("Yield {}", procedure, e);</span> |
| <span class="source-line-no">1812</span><span id="line-1812"> yieldProcedure(procedure);</span> |
| <span class="source-line-no">1813</span><span id="line-1813"> return;</span> |
| <span class="source-line-no">1814</span><span id="line-1814"> } catch (InterruptedException e) {</span> |
| <span class="source-line-no">1815</span><span id="line-1815"> LOG.trace("Yield interrupt {}", procedure, e);</span> |
| <span class="source-line-no">1816</span><span id="line-1816"> handleInterruptedException(procedure, e);</span> |
| <span class="source-line-no">1817</span><span id="line-1817"> yieldProcedure(procedure);</span> |
| <span class="source-line-no">1818</span><span id="line-1818"> return;</span> |
| <span class="source-line-no">1819</span><span id="line-1819"> } catch (Throwable e) {</span> |
| <span class="source-line-no">1820</span><span id="line-1820"> // Catch NullPointerExceptions or similar errors...</span> |
| <span class="source-line-no">1821</span><span id="line-1821"> String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;</span> |
| <span class="source-line-no">1822</span><span id="line-1822"> LOG.error(msg, e);</span> |
| <span class="source-line-no">1823</span><span id="line-1823"> procedure.setFailure(new RemoteProcedureException(msg, e));</span> |
| <span class="source-line-no">1824</span><span id="line-1824"> }</span> |
| <span class="source-line-no">1825</span><span id="line-1825"></span> |
| <span class="source-line-no">1826</span><span id="line-1826"> if (!procedure.isFailed()) {</span> |
| <span class="source-line-no">1827</span><span id="line-1827"> if (subprocs != null) {</span> |
| <span class="source-line-no">1828</span><span id="line-1828"> if (subprocs.length == 1 && subprocs[0] == procedure) {</span> |
| <span class="source-line-no">1829</span><span id="line-1829"> // Procedure returned itself. Quick-shortcut for a state machine-like procedure;</span> |
| <span class="source-line-no">1830</span><span id="line-1830"> // i.e. we go around this loop again rather than go back out on the scheduler queue.</span> |
| <span class="source-line-no">1831</span><span id="line-1831"> subprocs = null;</span> |
| <span class="source-line-no">1832</span><span id="line-1832"> reExecute = true;</span> |
| <span class="source-line-no">1833</span><span id="line-1833"> LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());</span> |
| <span class="source-line-no">1834</span><span id="line-1834"> } else {</span> |
| <span class="source-line-no">1835</span><span id="line-1835"> // Yield the current procedure, and make the subprocedure runnable</span> |
| <span class="source-line-no">1836</span><span id="line-1836"> // subprocs may come back 'null'.</span> |
| <span class="source-line-no">1837</span><span id="line-1837"> subprocs = initializeChildren(procStack, procedure, subprocs);</span> |
| <span class="source-line-no">1838</span><span id="line-1838"> LOG.info("Initialized subprocedures=" + (subprocs == null</span> |
| <span class="source-line-no">1839</span><span id="line-1839"> ? null</span> |
| <span class="source-line-no">1840</span><span id="line-1840"> : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList())</span> |
| <span class="source-line-no">1841</span><span id="line-1841"> .toString()));</span> |
| <span class="source-line-no">1842</span><span id="line-1842"> }</span> |
| <span class="source-line-no">1843</span><span id="line-1843"> } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {</span> |
| <span class="source-line-no">1844</span><span id="line-1844"> LOG.trace("Added to timeoutExecutor {}", procedure);</span> |
| <span class="source-line-no">1845</span><span id="line-1845"> timeoutExecutor.add(procedure);</span> |
| <span class="source-line-no">1846</span><span id="line-1846"> } else if (!suspended) {</span> |
| <span class="source-line-no">1847</span><span id="line-1847"> // No subtask, so we are done</span> |
| <span class="source-line-no">1848</span><span id="line-1848"> procedure.setState(ProcedureState.SUCCESS);</span> |
| <span class="source-line-no">1849</span><span id="line-1849"> }</span> |
| <span class="source-line-no">1850</span><span id="line-1850"> }</span> |
| <span class="source-line-no">1851</span><span id="line-1851"></span> |
| <span class="source-line-no">1852</span><span id="line-1852"> // allows to kill the executor before something is stored to the wal.</span> |
| <span class="source-line-no">1853</span><span id="line-1853"> // useful to test the procedure recovery.</span> |
| <span class="source-line-no">1854</span><span id="line-1854"> if (</span> |
| <span class="source-line-no">1855</span><span id="line-1855"> testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())</span> |
| <span class="source-line-no">1856</span><span id="line-1856"> ) {</span> |
| <span class="source-line-no">1857</span><span id="line-1857"> kill("TESTING: Kill BEFORE store update: " + procedure);</span> |
| <span class="source-line-no">1858</span><span id="line-1858"> }</span> |
| <span class="source-line-no">1859</span><span id="line-1859"></span> |
| <span class="source-line-no">1860</span><span id="line-1860"> // TODO: The code here doesn't check if store is running before persisting to the store as</span> |
| <span class="source-line-no">1861</span><span id="line-1861"> // it relies on the method call below to throw RuntimeException to wind up the stack and</span> |
| <span class="source-line-no">1862</span><span id="line-1862"> // executor thread to stop. The statement following the method call below seems to check if</span> |
| <span class="source-line-no">1863</span><span id="line-1863"> // store is not running, to prevent scheduling children procedures, re-execution or yield</span> |
| <span class="source-line-no">1864</span><span id="line-1864"> // of this procedure. This may need more scrutiny and subsequent cleanup in future</span> |
| <span class="source-line-no">1865</span><span id="line-1865"> //</span> |
| <span class="source-line-no">1866</span><span id="line-1866"> // Commit the transaction even if a suspend (state may have changed). Note this append</span> |
| <span class="source-line-no">1867</span><span id="line-1867"> // can take a bunch of time to complete.</span> |
| <span class="source-line-no">1868</span><span id="line-1868"> if (procedure.needPersistence()) {</span> |
| <span class="source-line-no">1869</span><span id="line-1869"> // Add the procedure to the stack</span> |
| <span class="source-line-no">1870</span><span id="line-1870"> // See HBASE-28210 on why we need synchronized here</span> |
| <span class="source-line-no">1871</span><span id="line-1871"> boolean needUpdateStoreOutsideLock = false;</span> |
| <span class="source-line-no">1872</span><span id="line-1872"> synchronized (procStack) {</span> |
| <span class="source-line-no">1873</span><span id="line-1873"> if (procStack.addRollbackStep(procedure)) {</span> |
| <span class="source-line-no">1874</span><span id="line-1874"> updateStoreOnExec(procStack, procedure, subprocs);</span> |
| <span class="source-line-no">1875</span><span id="line-1875"> } else {</span> |
| <span class="source-line-no">1876</span><span id="line-1876"> needUpdateStoreOutsideLock = true;</span> |
| <span class="source-line-no">1877</span><span id="line-1877"> }</span> |
| <span class="source-line-no">1878</span><span id="line-1878"> }</span> |
| <span class="source-line-no">1879</span><span id="line-1879"> // this is an optimization if we do not need to maintain rollback step, as all subprocedures</span> |
| <span class="source-line-no">1880</span><span id="line-1880"> // of the same root procedure share the same root procedure state, if we can only update</span> |
| <span class="source-line-no">1881</span><span id="line-1881"> // store under the above lock, the sub procedures of the same root procedure can only be</span> |
| <span class="source-line-no">1882</span><span id="line-1882"> // persistent sequentially, which will have a bad performance. See HBASE-28212 for more</span> |
| <span class="source-line-no">1883</span><span id="line-1883"> // details.</span> |
| <span class="source-line-no">1884</span><span id="line-1884"> if (needUpdateStoreOutsideLock) {</span> |
| <span class="source-line-no">1885</span><span id="line-1885"> updateStoreOnExec(procStack, procedure, subprocs);</span> |
| <span class="source-line-no">1886</span><span id="line-1886"> }</span> |
| <span class="source-line-no">1887</span><span id="line-1887"> }</span> |
| <span class="source-line-no">1888</span><span id="line-1888"></span> |
| <span class="source-line-no">1889</span><span id="line-1889"> // if the store is not running we are aborting</span> |
| <span class="source-line-no">1890</span><span id="line-1890"> if (!store.isRunning()) {</span> |
| <span class="source-line-no">1891</span><span id="line-1891"> return;</span> |
| <span class="source-line-no">1892</span><span id="line-1892"> }</span> |
| <span class="source-line-no">1893</span><span id="line-1893"> // if the procedure is kind enough to pass the slot to someone else, yield</span> |
| <span class="source-line-no">1894</span><span id="line-1894"> if (</span> |
| <span class="source-line-no">1895</span><span id="line-1895"> procedure.isRunnable() && !suspended</span> |
| <span class="source-line-no">1896</span><span id="line-1896"> && procedure.isYieldAfterExecutionStep(getEnvironment())</span> |
| <span class="source-line-no">1897</span><span id="line-1897"> ) {</span> |
| <span class="source-line-no">1898</span><span id="line-1898"> yieldProcedure(procedure);</span> |
| <span class="source-line-no">1899</span><span id="line-1899"> return;</span> |
| <span class="source-line-no">1900</span><span id="line-1900"> }</span> |
| <span class="source-line-no">1901</span><span id="line-1901"></span> |
| <span class="source-line-no">1902</span><span id="line-1902"> assert (reExecute && subprocs == null) || !reExecute;</span> |
| <span class="source-line-no">1903</span><span id="line-1903"> } while (reExecute);</span> |
| <span class="source-line-no">1904</span><span id="line-1904"></span> |
| <span class="source-line-no">1905</span><span id="line-1905"> // Allows to kill the executor after something is stored to the WAL but before the below</span> |
| <span class="source-line-no">1906</span><span id="line-1906"> // state settings are done -- in particular the one on the end where we make parent</span> |
| <span class="source-line-no">1907</span><span id="line-1907"> // RUNNABLE again when its children are done; see countDownChildren.</span> |
| <span class="source-line-no">1908</span><span id="line-1908"> if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {</span> |
| <span class="source-line-no">1909</span><span id="line-1909"> kill("TESTING: Kill AFTER store update: " + procedure);</span> |
| <span class="source-line-no">1910</span><span id="line-1910"> }</span> |
| <span class="source-line-no">1911</span><span id="line-1911"></span> |
| <span class="source-line-no">1912</span><span id="line-1912"> // Submit the new subprocedures</span> |
| <span class="source-line-no">1913</span><span id="line-1913"> if (subprocs != null && !procedure.isFailed()) {</span> |
| <span class="source-line-no">1914</span><span id="line-1914"> submitChildrenProcedures(subprocs);</span> |
| <span class="source-line-no">1915</span><span id="line-1915"> }</span> |
| <span class="source-line-no">1916</span><span id="line-1916"></span> |
| <span class="source-line-no">1917</span><span id="line-1917"> // we need to log the release lock operation before waking up the parent procedure, as there</span> |
| <span class="source-line-no">1918</span><span id="line-1918"> // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all</span> |
| <span class="source-line-no">1919</span><span id="line-1919"> // the sub procedures from store and cause problems...</span> |
| <span class="source-line-no">1920</span><span id="line-1920"> releaseLock(procedure, false);</span> |
| <span class="source-line-no">1921</span><span id="line-1921"></span> |
| <span class="source-line-no">1922</span><span id="line-1922"> // if the procedure is complete and has a parent, count down the children latch.</span> |
| <span class="source-line-no">1923</span><span id="line-1923"> // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.</span> |
| <span class="source-line-no">1924</span><span id="line-1924"> if (!suspended && procedure.isFinished() && procedure.hasParent()) {</span> |
| <span class="source-line-no">1925</span><span id="line-1925"> countDownChildren(procStack, procedure);</span> |
| <span class="source-line-no">1926</span><span id="line-1926"> }</span> |
| <span class="source-line-no">1927</span><span id="line-1927"> }</span> |
| <span class="source-line-no">1928</span><span id="line-1928"></span> |
| <span class="source-line-no">1929</span><span id="line-1929"> private void kill(String msg) {</span> |
| <span class="source-line-no">1930</span><span id="line-1930"> LOG.debug(msg);</span> |
| <span class="source-line-no">1931</span><span id="line-1931"> stop();</span> |
| <span class="source-line-no">1932</span><span id="line-1932"> throw new RuntimeException(msg);</span> |
| <span class="source-line-no">1933</span><span id="line-1933"> }</span> |
| <span class="source-line-no">1934</span><span id="line-1934"></span> |
| <span class="source-line-no">1935</span><span id="line-1935"> private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,</span> |
| <span class="source-line-no">1936</span><span id="line-1936"> Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {</span> |
| <span class="source-line-no">1937</span><span id="line-1937"> assert subprocs != null : "expected subprocedures";</span> |
| <span class="source-line-no">1938</span><span id="line-1938"> final long rootProcId = getRootProcedureId(procedure);</span> |
| <span class="source-line-no">1939</span><span id="line-1939"> for (int i = 0; i < subprocs.length; ++i) {</span> |
| <span class="source-line-no">1940</span><span id="line-1940"> Procedure<TEnvironment> subproc = subprocs[i];</span> |
| <span class="source-line-no">1941</span><span id="line-1941"> if (subproc == null) {</span> |
| <span class="source-line-no">1942</span><span id="line-1942"> String msg = "subproc[" + i + "] is null, aborting the procedure";</span> |
| <span class="source-line-no">1943</span><span id="line-1943"> procedure</span> |
| <span class="source-line-no">1944</span><span id="line-1944"> .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg)));</span> |
| <span class="source-line-no">1945</span><span id="line-1945"> return null;</span> |
| <span class="source-line-no">1946</span><span id="line-1946"> }</span> |
| <span class="source-line-no">1947</span><span id="line-1947"></span> |
| <span class="source-line-no">1948</span><span id="line-1948"> assert subproc.getState() == ProcedureState.INITIALIZING : subproc;</span> |
| <span class="source-line-no">1949</span><span id="line-1949"> subproc.setParentProcId(procedure.getProcId());</span> |
| <span class="source-line-no">1950</span><span id="line-1950"> subproc.setRootProcId(rootProcId);</span> |
| <span class="source-line-no">1951</span><span id="line-1951"> subproc.setProcId(nextProcId());</span> |
| <span class="source-line-no">1952</span><span id="line-1952"> procStack.addSubProcedure(subproc);</span> |
| <span class="source-line-no">1953</span><span id="line-1953"> }</span> |
| <span class="source-line-no">1954</span><span id="line-1954"></span> |
| <span class="source-line-no">1955</span><span id="line-1955"> if (!procedure.isFailed()) {</span> |
| <span class="source-line-no">1956</span><span id="line-1956"> procedure.setChildrenLatch(subprocs.length);</span> |
| <span class="source-line-no">1957</span><span id="line-1957"> switch (procedure.getState()) {</span> |
| <span class="source-line-no">1958</span><span id="line-1958"> case RUNNABLE:</span> |
| <span class="source-line-no">1959</span><span id="line-1959"> procedure.setState(ProcedureState.WAITING);</span> |
| <span class="source-line-no">1960</span><span id="line-1960"> break;</span> |
| <span class="source-line-no">1961</span><span id="line-1961"> case WAITING_TIMEOUT:</span> |
| <span class="source-line-no">1962</span><span id="line-1962"> timeoutExecutor.add(procedure);</span> |
| <span class="source-line-no">1963</span><span id="line-1963"> break;</span> |
| <span class="source-line-no">1964</span><span id="line-1964"> default:</span> |
| <span class="source-line-no">1965</span><span id="line-1965"> break;</span> |
| <span class="source-line-no">1966</span><span id="line-1966"> }</span> |
| <span class="source-line-no">1967</span><span id="line-1967"> }</span> |
| <span class="source-line-no">1968</span><span id="line-1968"> return subprocs;</span> |
| <span class="source-line-no">1969</span><span id="line-1969"> }</span> |
| <span class="source-line-no">1970</span><span id="line-1970"></span> |
| <span class="source-line-no">1971</span><span id="line-1971"> private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {</span> |
| <span class="source-line-no">1972</span><span id="line-1972"> for (int i = 0; i < subprocs.length; ++i) {</span> |
| <span class="source-line-no">1973</span><span id="line-1973"> Procedure<TEnvironment> subproc = subprocs[i];</span> |
| <span class="source-line-no">1974</span><span id="line-1974"> subproc.updateMetricsOnSubmit(getEnvironment());</span> |
| <span class="source-line-no">1975</span><span id="line-1975"> assert !procedures.containsKey(subproc.getProcId());</span> |
| <span class="source-line-no">1976</span><span id="line-1976"> procedures.put(subproc.getProcId(), subproc);</span> |
| <span class="source-line-no">1977</span><span id="line-1977"> scheduler.addFront(subproc);</span> |
| <span class="source-line-no">1978</span><span id="line-1978"> }</span> |
| <span class="source-line-no">1979</span><span id="line-1979"> }</span> |
| <span class="source-line-no">1980</span><span id="line-1980"></span> |
| <span class="source-line-no">1981</span><span id="line-1981"> private void countDownChildren(RootProcedureState<TEnvironment> procStack,</span> |
| <span class="source-line-no">1982</span><span id="line-1982"> Procedure<TEnvironment> procedure) {</span> |
| <span class="source-line-no">1983</span><span id="line-1983"> Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());</span> |
| <span class="source-line-no">1984</span><span id="line-1984"> if (parent == null) {</span> |
| <span class="source-line-no">1985</span><span id="line-1985"> assert procStack.isRollingback();</span> |
| <span class="source-line-no">1986</span><span id="line-1986"> return;</span> |
| <span class="source-line-no">1987</span><span id="line-1987"> }</span> |
| <span class="source-line-no">1988</span><span id="line-1988"></span> |
| <span class="source-line-no">1989</span><span id="line-1989"> // If this procedure is the last child awake the parent procedure</span> |
| <span class="source-line-no">1990</span><span id="line-1990"> if (parent.tryRunnable()) {</span> |
| <span class="source-line-no">1991</span><span id="line-1991"> // If we succeeded in making the parent runnable -- i.e. all of its</span> |
| <span class="source-line-no">1992</span><span id="line-1992"> // children have completed, move parent to front of the queue.</span> |
| <span class="source-line-no">1993</span><span id="line-1993"> store.update(parent);</span> |
| <span class="source-line-no">1994</span><span id="line-1994"> scheduler.addFront(parent);</span> |
| <span class="source-line-no">1995</span><span id="line-1995"> LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(),</span> |
| <span class="source-line-no">1996</span><span id="line-1996"> parent.getProcId());</span> |
| <span class="source-line-no">1997</span><span id="line-1997"> return;</span> |
| <span class="source-line-no">1998</span><span id="line-1998"> }</span> |
| <span class="source-line-no">1999</span><span id="line-1999"> }</span> |
| <span class="source-line-no">2000</span><span id="line-2000"></span> |
| <span class="source-line-no">2001</span><span id="line-2001"> private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,</span> |
| <span class="source-line-no">2002</span><span id="line-2002"> Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {</span> |
| <span class="source-line-no">2003</span><span id="line-2003"> if (subprocs != null && !procedure.isFailed()) {</span> |
| <span class="source-line-no">2004</span><span id="line-2004"> if (LOG.isTraceEnabled()) {</span> |
| <span class="source-line-no">2005</span><span id="line-2005"> LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));</span> |
| <span class="source-line-no">2006</span><span id="line-2006"> }</span> |
| <span class="source-line-no">2007</span><span id="line-2007"> store.insert(procedure, subprocs);</span> |
| <span class="source-line-no">2008</span><span id="line-2008"> } else {</span> |
| <span class="source-line-no">2009</span><span id="line-2009"> LOG.trace("Store update {}", procedure);</span> |
| <span class="source-line-no">2010</span><span id="line-2010"> if (procedure.isFinished() && !procedure.hasParent()) {</span> |
| <span class="source-line-no">2011</span><span id="line-2011"> // remove child procedures</span> |
| <span class="source-line-no">2012</span><span id="line-2012"> final long[] childProcIds = procStack.getSubprocedureIds();</span> |
| <span class="source-line-no">2013</span><span id="line-2013"> if (childProcIds != null) {</span> |
| <span class="source-line-no">2014</span><span id="line-2014"> store.delete(procedure, childProcIds);</span> |
| <span class="source-line-no">2015</span><span id="line-2015"> for (int i = 0; i < childProcIds.length; ++i) {</span> |
| <span class="source-line-no">2016</span><span id="line-2016"> procedures.remove(childProcIds[i]);</span> |
| <span class="source-line-no">2017</span><span id="line-2017"> }</span> |
| <span class="source-line-no">2018</span><span id="line-2018"> } else {</span> |
| <span class="source-line-no">2019</span><span id="line-2019"> store.update(procedure);</span> |
| <span class="source-line-no">2020</span><span id="line-2020"> }</span> |
| <span class="source-line-no">2021</span><span id="line-2021"> } else {</span> |
| <span class="source-line-no">2022</span><span id="line-2022"> store.update(procedure);</span> |
| <span class="source-line-no">2023</span><span id="line-2023"> }</span> |
| <span class="source-line-no">2024</span><span id="line-2024"> }</span> |
| <span class="source-line-no">2025</span><span id="line-2025"> }</span> |
| <span class="source-line-no">2026</span><span id="line-2026"></span> |
| <span class="source-line-no">2027</span><span id="line-2027"> private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {</span> |
| <span class="source-line-no">2028</span><span id="line-2028"> LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);</span> |
| <span class="source-line-no">2029</span><span id="line-2029"> // NOTE: We don't call Thread.currentThread().interrupt()</span> |
| <span class="source-line-no">2030</span><span id="line-2030"> // because otherwise all the subsequent calls e.g. Thread.sleep() will throw</span> |
| <span class="source-line-no">2031</span><span id="line-2031"> // the InterruptedException. If the master is going down, we will be notified</span> |
| <span class="source-line-no">2032</span><span id="line-2032"> // and the executor/store will be stopped.</span> |
| <span class="source-line-no">2033</span><span id="line-2033"> // (The interrupted procedure will be retried on the next run)</span> |
| <span class="source-line-no">2034</span><span id="line-2034"> }</span> |
| <span class="source-line-no">2035</span><span id="line-2035"></span> |
| <span class="source-line-no">2036</span><span id="line-2036"> private void execCompletionCleanup(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">2037</span><span id="line-2037"> final TEnvironment env = getEnvironment();</span> |
| <span class="source-line-no">2038</span><span id="line-2038"> if (proc.hasLock()) {</span> |
| <span class="source-line-no">2039</span><span id="line-2039"> LOG.warn("Usually this should not happen, we will release the lock before if the procedure"</span> |
| <span class="source-line-no">2040</span><span id="line-2040"> + " is finished, even if the holdLock is true, arrive here means we have some holes where"</span> |
| <span class="source-line-no">2041</span><span id="line-2041"> + " we do not release the lock. And the releaseLock below may fail since the procedure may"</span> |
| <span class="source-line-no">2042</span><span id="line-2042"> + " have already been deleted from the procedure store.");</span> |
| <span class="source-line-no">2043</span><span id="line-2043"> releaseLock(proc, true);</span> |
| <span class="source-line-no">2044</span><span id="line-2044"> }</span> |
| <span class="source-line-no">2045</span><span id="line-2045"> try {</span> |
| <span class="source-line-no">2046</span><span id="line-2046"> proc.completionCleanup(env);</span> |
| <span class="source-line-no">2047</span><span id="line-2047"> } catch (Throwable e) {</span> |
| <span class="source-line-no">2048</span><span id="line-2048"> // Catch NullPointerExceptions or similar errors...</span> |
| <span class="source-line-no">2049</span><span id="line-2049"> LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);</span> |
| <span class="source-line-no">2050</span><span id="line-2050"> }</span> |
| <span class="source-line-no">2051</span><span id="line-2051"> }</span> |
| <span class="source-line-no">2052</span><span id="line-2052"></span> |
| <span class="source-line-no">2053</span><span id="line-2053"> private void procedureFinished(Procedure<TEnvironment> proc) {</span> |
| <span class="source-line-no">2054</span><span id="line-2054"> // call the procedure completion cleanup handler</span> |
| <span class="source-line-no">2055</span><span id="line-2055"> execCompletionCleanup(proc);</span> |
| <span class="source-line-no">2056</span><span id="line-2056"></span> |
| <span class="source-line-no">2057</span><span id="line-2057"> CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);</span> |
| <span class="source-line-no">2058</span><span id="line-2058"></span> |
| <span class="source-line-no">2059</span><span id="line-2059"> // update the executor internal state maps</span> |
| <span class="source-line-no">2060</span><span id="line-2060"> if (!proc.shouldWaitClientAck(getEnvironment())) {</span> |
| <span class="source-line-no">2061</span><span id="line-2061"> retainer.setClientAckTime(0);</span> |
| <span class="source-line-no">2062</span><span id="line-2062"> }</span> |
| <span class="source-line-no">2063</span><span id="line-2063"></span> |
| <span class="source-line-no">2064</span><span id="line-2064"> completed.put(proc.getProcId(), retainer);</span> |
| <span class="source-line-no">2065</span><span id="line-2065"> rollbackStack.remove(proc.getProcId());</span> |
| <span class="source-line-no">2066</span><span id="line-2066"> procedures.remove(proc.getProcId());</span> |
| <span class="source-line-no">2067</span><span id="line-2067"></span> |
| <span class="source-line-no">2068</span><span id="line-2068"> // call the runnableSet completion cleanup handler</span> |
| <span class="source-line-no">2069</span><span id="line-2069"> try {</span> |
| <span class="source-line-no">2070</span><span id="line-2070"> scheduler.completionCleanup(proc);</span> |
| <span class="source-line-no">2071</span><span id="line-2071"> } catch (Throwable e) {</span> |
| <span class="source-line-no">2072</span><span id="line-2072"> // Catch NullPointerExceptions or similar errors...</span> |
| <span class="source-line-no">2073</span><span id="line-2073"> LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);</span> |
| <span class="source-line-no">2074</span><span id="line-2074"> }</span> |
| <span class="source-line-no">2075</span><span id="line-2075"></span> |
| <span class="source-line-no">2076</span><span id="line-2076"> // Notify the listeners</span> |
| <span class="source-line-no">2077</span><span id="line-2077"> sendProcedureFinishedNotification(proc.getProcId());</span> |
| <span class="source-line-no">2078</span><span id="line-2078"> }</span> |
| <span class="source-line-no">2079</span><span id="line-2079"></span> |
| <span class="source-line-no">2080</span><span id="line-2080"> RootProcedureState<TEnvironment> getProcStack(long rootProcId) {</span> |
| <span class="source-line-no">2081</span><span id="line-2081"> return rollbackStack.get(rootProcId);</span> |
| <span class="source-line-no">2082</span><span id="line-2082"> }</span> |
| <span class="source-line-no">2083</span><span id="line-2083"></span> |
| <span class="source-line-no">2084</span><span id="line-2084"> ProcedureScheduler getProcedureScheduler() {</span> |
| <span class="source-line-no">2085</span><span id="line-2085"> return scheduler;</span> |
| <span class="source-line-no">2086</span><span id="line-2086"> }</span> |
| <span class="source-line-no">2087</span><span id="line-2087"></span> |
| <span class="source-line-no">2088</span><span id="line-2088"> int getCompletedSize() {</span> |
| <span class="source-line-no">2089</span><span id="line-2089"> return completed.size();</span> |
| <span class="source-line-no">2090</span><span id="line-2090"> }</span> |
| <span class="source-line-no">2091</span><span id="line-2091"></span> |
| <span class="source-line-no">2092</span><span id="line-2092"> public IdLock getProcExecutionLock() {</span> |
| <span class="source-line-no">2093</span><span id="line-2093"> return procExecutionLock;</span> |
| <span class="source-line-no">2094</span><span id="line-2094"> }</span> |
| <span class="source-line-no">2095</span><span id="line-2095"></span> |
| <span class="source-line-no">2096</span><span id="line-2096"> /**</span> |
| <span class="source-line-no">2097</span><span id="line-2097"> * Get a thread pool for executing some asynchronous tasks</span> |
| <span class="source-line-no">2098</span><span id="line-2098"> */</span> |
| <span class="source-line-no">2099</span><span id="line-2099"> public ExecutorService getAsyncTaskExecutor() {</span> |
| <span class="source-line-no">2100</span><span id="line-2100"> return asyncTaskExecutor;</span> |
| <span class="source-line-no">2101</span><span id="line-2101"> }</span> |
| <span class="source-line-no">2102</span><span id="line-2102"></span> |
| <span class="source-line-no">2103</span><span id="line-2103"> // ==========================================================================</span> |
| <span class="source-line-no">2104</span><span id="line-2104"> // Worker Thread</span> |
| <span class="source-line-no">2105</span><span id="line-2105"> // ==========================================================================</span> |
| <span class="source-line-no">2106</span><span id="line-2106"> private class WorkerThread extends StoppableThread {</span> |
| <span class="source-line-no">2107</span><span id="line-2107"> private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);</span> |
| <span class="source-line-no">2108</span><span id="line-2108"> private volatile Procedure<TEnvironment> activeProcedure;</span> |
| <span class="source-line-no">2109</span><span id="line-2109"></span> |
| <span class="source-line-no">2110</span><span id="line-2110"> public WorkerThread(ThreadGroup group) {</span> |
| <span class="source-line-no">2111</span><span id="line-2111"> this(group, "PEWorker-");</span> |
| <span class="source-line-no">2112</span><span id="line-2112"> }</span> |
| <span class="source-line-no">2113</span><span id="line-2113"></span> |
| <span class="source-line-no">2114</span><span id="line-2114"> protected WorkerThread(ThreadGroup group, String prefix) {</span> |
| <span class="source-line-no">2115</span><span id="line-2115"> super(group, prefix + workerId.incrementAndGet());</span> |
| <span class="source-line-no">2116</span><span id="line-2116"> setDaemon(true);</span> |
| <span class="source-line-no">2117</span><span id="line-2117"> }</span> |
| <span class="source-line-no">2118</span><span id="line-2118"></span> |
| <span class="source-line-no">2119</span><span id="line-2119"> @Override</span> |
| <span class="source-line-no">2120</span><span id="line-2120"> public void sendStopSignal() {</span> |
| <span class="source-line-no">2121</span><span id="line-2121"> scheduler.signalAll();</span> |
| <span class="source-line-no">2122</span><span id="line-2122"> }</span> |
| <span class="source-line-no">2123</span><span id="line-2123"></span> |
| <span class="source-line-no">2124</span><span id="line-2124"> /**</span> |
| <span class="source-line-no">2125</span><span id="line-2125"> * Encapsulates execution of the current {@link #activeProcedure} for easy tracing.</span> |
| <span class="source-line-no">2126</span><span id="line-2126"> */</span> |
| <span class="source-line-no">2127</span><span id="line-2127"> private long runProcedure() throws IOException {</span> |
| <span class="source-line-no">2128</span><span id="line-2128"> final Procedure<TEnvironment> proc = this.activeProcedure;</span> |
| <span class="source-line-no">2129</span><span id="line-2129"> int activeCount = activeExecutorCount.incrementAndGet();</span> |
| <span class="source-line-no">2130</span><span id="line-2130"> int runningCount = store.setRunningProcedureCount(activeCount);</span> |
| <span class="source-line-no">2131</span><span id="line-2131"> LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,</span> |
| <span class="source-line-no">2132</span><span id="line-2132"> activeCount);</span> |
| <span class="source-line-no">2133</span><span id="line-2133"> executionStartTime.set(EnvironmentEdgeManager.currentTime());</span> |
| <span class="source-line-no">2134</span><span id="line-2134"> IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());</span> |
| <span class="source-line-no">2135</span><span id="line-2135"> try {</span> |
| <span class="source-line-no">2136</span><span id="line-2136"> executeProcedure(proc);</span> |
| <span class="source-line-no">2137</span><span id="line-2137"> } catch (AssertionError e) {</span> |
| <span class="source-line-no">2138</span><span id="line-2138"> LOG.info("ASSERT pid=" + proc.getProcId(), e);</span> |
| <span class="source-line-no">2139</span><span id="line-2139"> throw e;</span> |
| <span class="source-line-no">2140</span><span id="line-2140"> } finally {</span> |
| <span class="source-line-no">2141</span><span id="line-2141"> procExecutionLock.releaseLockEntry(lockEntry);</span> |
| <span class="source-line-no">2142</span><span id="line-2142"> activeCount = activeExecutorCount.decrementAndGet();</span> |
| <span class="source-line-no">2143</span><span id="line-2143"> runningCount = store.setRunningProcedureCount(activeCount);</span> |
| <span class="source-line-no">2144</span><span id="line-2144"> LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount,</span> |
| <span class="source-line-no">2145</span><span id="line-2145"> activeCount);</span> |
| <span class="source-line-no">2146</span><span id="line-2146"> this.activeProcedure = null;</span> |
| <span class="source-line-no">2147</span><span id="line-2147"> executionStartTime.set(Long.MAX_VALUE);</span> |
| <span class="source-line-no">2148</span><span id="line-2148"> }</span> |
| <span class="source-line-no">2149</span><span id="line-2149"> return EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">2150</span><span id="line-2150"> }</span> |
| <span class="source-line-no">2151</span><span id="line-2151"></span> |
| <span class="source-line-no">2152</span><span id="line-2152"> @Override</span> |
| <span class="source-line-no">2153</span><span id="line-2153"> public void run() {</span> |
| <span class="source-line-no">2154</span><span id="line-2154"> long lastUpdate = EnvironmentEdgeManager.currentTime();</span> |
| <span class="source-line-no">2155</span><span id="line-2155"> try {</span> |
| <span class="source-line-no">2156</span><span id="line-2156"> while (isRunning() && keepAlive(lastUpdate)) {</span> |
| <span class="source-line-no">2157</span><span id="line-2157"> @SuppressWarnings("unchecked")</span> |
| <span class="source-line-no">2158</span><span id="line-2158"> Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);</span> |
| <span class="source-line-no">2159</span><span id="line-2159"> if (proc == null) {</span> |
| <span class="source-line-no">2160</span><span id="line-2160"> continue;</span> |
| <span class="source-line-no">2161</span><span id="line-2161"> }</span> |
| <span class="source-line-no">2162</span><span id="line-2162"> this.activeProcedure = proc;</span> |
| <span class="source-line-no">2163</span><span id="line-2163"> lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc));</span> |
| <span class="source-line-no">2164</span><span id="line-2164"> }</span> |
| <span class="source-line-no">2165</span><span id="line-2165"> } catch (Throwable t) {</span> |
| <span class="source-line-no">2166</span><span id="line-2166"> LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);</span> |
| <span class="source-line-no">2167</span><span id="line-2167"> } finally {</span> |
| <span class="source-line-no">2168</span><span id="line-2168"> LOG.trace("Worker terminated.");</span> |
| <span class="source-line-no">2169</span><span id="line-2169"> }</span> |
| <span class="source-line-no">2170</span><span id="line-2170"> workerThreads.remove(this);</span> |
| <span class="source-line-no">2171</span><span id="line-2171"> }</span> |
| <span class="source-line-no">2172</span><span id="line-2172"></span> |
| <span class="source-line-no">2173</span><span id="line-2173"> @Override</span> |
| <span class="source-line-no">2174</span><span id="line-2174"> public String toString() {</span> |
| <span class="source-line-no">2175</span><span id="line-2175"> Procedure<?> p = this.activeProcedure;</span> |
| <span class="source-line-no">2176</span><span id="line-2176"> return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");</span> |
| <span class="source-line-no">2177</span><span id="line-2177"> }</span> |
| <span class="source-line-no">2178</span><span id="line-2178"></span> |
| <span class="source-line-no">2179</span><span id="line-2179"> /** Returns the time since the current procedure is running */</span> |
| <span class="source-line-no">2180</span><span id="line-2180"> public long getCurrentRunTime() {</span> |
| <span class="source-line-no">2181</span><span id="line-2181"> return EnvironmentEdgeManager.currentTime() - executionStartTime.get();</span> |
| <span class="source-line-no">2182</span><span id="line-2182"> }</span> |
| <span class="source-line-no">2183</span><span id="line-2183"></span> |
| <span class="source-line-no">2184</span><span id="line-2184"> // core worker never timeout</span> |
| <span class="source-line-no">2185</span><span id="line-2185"> protected boolean keepAlive(long lastUpdate) {</span> |
| <span class="source-line-no">2186</span><span id="line-2186"> return true;</span> |
| <span class="source-line-no">2187</span><span id="line-2187"> }</span> |
| <span class="source-line-no">2188</span><span id="line-2188"> }</span> |
| <span class="source-line-no">2189</span><span id="line-2189"></span> |
| <span class="source-line-no">2190</span><span id="line-2190"> // A worker thread which can be added when core workers are stuck. Will timeout after</span> |
| <span class="source-line-no">2191</span><span id="line-2191"> // keepAliveTime if there is no procedure to run.</span> |
| <span class="source-line-no">2192</span><span id="line-2192"> private final class KeepAliveWorkerThread extends WorkerThread {</span> |
| <span class="source-line-no">2193</span><span id="line-2193"> public KeepAliveWorkerThread(ThreadGroup group) {</span> |
| <span class="source-line-no">2194</span><span id="line-2194"> super(group, "KeepAlivePEWorker-");</span> |
| <span class="source-line-no">2195</span><span id="line-2195"> }</span> |
| <span class="source-line-no">2196</span><span id="line-2196"></span> |
| <span class="source-line-no">2197</span><span id="line-2197"> @Override</span> |
| <span class="source-line-no">2198</span><span id="line-2198"> protected boolean keepAlive(long lastUpdate) {</span> |
| <span class="source-line-no">2199</span><span id="line-2199"> return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;</span> |
| <span class="source-line-no">2200</span><span id="line-2200"> }</span> |
| <span class="source-line-no">2201</span><span id="line-2201"> }</span> |
| <span class="source-line-no">2202</span><span id="line-2202"></span> |
| <span class="source-line-no">2203</span><span id="line-2203"> // ----------------------------------------------------------------------------</span> |
| <span class="source-line-no">2204</span><span id="line-2204"> // TODO-MAYBE: Should we provide a InlineChore to notify the store with the</span> |
| <span class="source-line-no">2205</span><span id="line-2205"> // full set of procedures pending and completed to write a compacted</span> |
| <span class="source-line-no">2206</span><span id="line-2206"> // version of the log (in case is a log)?</span> |
| <span class="source-line-no">2207</span><span id="line-2207"> // In theory no, procedures are have a short life, so at some point the store</span> |
| <span class="source-line-no">2208</span><span id="line-2208"> // will have the tracker saying everything is in the last log.</span> |
| <span class="source-line-no">2209</span><span id="line-2209"> // ----------------------------------------------------------------------------</span> |
| <span class="source-line-no">2210</span><span id="line-2210"></span> |
| <span class="source-line-no">2211</span><span id="line-2211"> private final class WorkerMonitor extends InlineChore {</span> |
| <span class="source-line-no">2212</span><span id="line-2212"> public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =</span> |
| <span class="source-line-no">2213</span><span id="line-2213"> "hbase.procedure.worker.monitor.interval.msec";</span> |
| <span class="source-line-no">2214</span><span id="line-2214"> private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec</span> |
| <span class="source-line-no">2215</span><span id="line-2215"></span> |
| <span class="source-line-no">2216</span><span id="line-2216"> public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =</span> |
| <span class="source-line-no">2217</span><span id="line-2217"> "hbase.procedure.worker.stuck.threshold.msec";</span> |
| <span class="source-line-no">2218</span><span id="line-2218"> private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec</span> |
| <span class="source-line-no">2219</span><span id="line-2219"></span> |
| <span class="source-line-no">2220</span><span id="line-2220"> public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =</span> |
| <span class="source-line-no">2221</span><span id="line-2221"> "hbase.procedure.worker.add.stuck.percentage";</span> |
| <span class="source-line-no">2222</span><span id="line-2222"> private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck</span> |
| <span class="source-line-no">2223</span><span id="line-2223"></span> |
| <span class="source-line-no">2224</span><span id="line-2224"> private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;</span> |
| <span class="source-line-no">2225</span><span id="line-2225"> private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;</span> |
| <span class="source-line-no">2226</span><span id="line-2226"> private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;</span> |
| <span class="source-line-no">2227</span><span id="line-2227"></span> |
| <span class="source-line-no">2228</span><span id="line-2228"> public WorkerMonitor() {</span> |
| <span class="source-line-no">2229</span><span id="line-2229"> refreshConfig();</span> |
| <span class="source-line-no">2230</span><span id="line-2230"> }</span> |
| <span class="source-line-no">2231</span><span id="line-2231"></span> |
| <span class="source-line-no">2232</span><span id="line-2232"> @Override</span> |
| <span class="source-line-no">2233</span><span id="line-2233"> public void run() {</span> |
| <span class="source-line-no">2234</span><span id="line-2234"> final int stuckCount = checkForStuckWorkers();</span> |
| <span class="source-line-no">2235</span><span id="line-2235"> checkThreadCount(stuckCount);</span> |
| <span class="source-line-no">2236</span><span id="line-2236"></span> |
| <span class="source-line-no">2237</span><span id="line-2237"> // refresh interval (poor man dynamic conf update)</span> |
| <span class="source-line-no">2238</span><span id="line-2238"> refreshConfig();</span> |
| <span class="source-line-no">2239</span><span id="line-2239"> }</span> |
| <span class="source-line-no">2240</span><span id="line-2240"></span> |
| <span class="source-line-no">2241</span><span id="line-2241"> private int checkForStuckWorkers() {</span> |
| <span class="source-line-no">2242</span><span id="line-2242"> // check if any of the worker is stuck</span> |
| <span class="source-line-no">2243</span><span id="line-2243"> int stuckCount = 0;</span> |
| <span class="source-line-no">2244</span><span id="line-2244"> for (WorkerThread worker : workerThreads) {</span> |
| <span class="source-line-no">2245</span><span id="line-2245"> if (worker.getCurrentRunTime() < stuckThreshold) {</span> |
| <span class="source-line-no">2246</span><span id="line-2246"> continue;</span> |
| <span class="source-line-no">2247</span><span id="line-2247"> }</span> |
| <span class="source-line-no">2248</span><span id="line-2248"></span> |
| <span class="source-line-no">2249</span><span id="line-2249"> // WARN the worker is stuck</span> |
| <span class="source-line-no">2250</span><span id="line-2250"> stuckCount++;</span> |
| <span class="source-line-no">2251</span><span id="line-2251"> LOG.warn("Worker stuck {}, run time {}", worker,</span> |
| <span class="source-line-no">2252</span><span id="line-2252"> StringUtils.humanTimeDiff(worker.getCurrentRunTime()));</span> |
| <span class="source-line-no">2253</span><span id="line-2253"> }</span> |
| <span class="source-line-no">2254</span><span id="line-2254"> return stuckCount;</span> |
| <span class="source-line-no">2255</span><span id="line-2255"> }</span> |
| <span class="source-line-no">2256</span><span id="line-2256"></span> |
| <span class="source-line-no">2257</span><span id="line-2257"> private void checkThreadCount(final int stuckCount) {</span> |
| <span class="source-line-no">2258</span><span id="line-2258"> // nothing to do if there are no runnable tasks</span> |
| <span class="source-line-no">2259</span><span id="line-2259"> if (stuckCount < 1 || !scheduler.hasRunnables()) {</span> |
| <span class="source-line-no">2260</span><span id="line-2260"> return;</span> |
| <span class="source-line-no">2261</span><span id="line-2261"> }</span> |
| <span class="source-line-no">2262</span><span id="line-2262"></span> |
| <span class="source-line-no">2263</span><span id="line-2263"> // add a new thread if the worker stuck percentage exceed the threshold limit</span> |
| <span class="source-line-no">2264</span><span id="line-2264"> // and every handler is active.</span> |
| <span class="source-line-no">2265</span><span id="line-2265"> final float stuckPerc = ((float) stuckCount) / workerThreads.size();</span> |
| <span class="source-line-no">2266</span><span id="line-2266"> // let's add new worker thread more aggressively, as they will timeout finally if there is no</span> |
| <span class="source-line-no">2267</span><span id="line-2267"> // work to do.</span> |
| <span class="source-line-no">2268</span><span id="line-2268"> if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {</span> |
| <span class="source-line-no">2269</span><span id="line-2269"> final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);</span> |
| <span class="source-line-no">2270</span><span id="line-2270"> workerThreads.add(worker);</span> |
| <span class="source-line-no">2271</span><span id="line-2271"> worker.start();</span> |
| <span class="source-line-no">2272</span><span id="line-2272"> LOG.debug("Added new worker thread {}", worker);</span> |
| <span class="source-line-no">2273</span><span id="line-2273"> }</span> |
| <span class="source-line-no">2274</span><span id="line-2274"> }</span> |
| <span class="source-line-no">2275</span><span id="line-2275"></span> |
| <span class="source-line-no">2276</span><span id="line-2276"> private void refreshConfig() {</span> |
| <span class="source-line-no">2277</span><span id="line-2277"> addWorkerStuckPercentage =</span> |
| <span class="source-line-no">2278</span><span id="line-2278"> conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);</span> |
| <span class="source-line-no">2279</span><span id="line-2279"> timeoutInterval =</span> |
| <span class="source-line-no">2280</span><span id="line-2280"> conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL);</span> |
| <span class="source-line-no">2281</span><span id="line-2281"> stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD);</span> |
| <span class="source-line-no">2282</span><span id="line-2282"> }</span> |
| <span class="source-line-no">2283</span><span id="line-2283"></span> |
| <span class="source-line-no">2284</span><span id="line-2284"> @Override</span> |
| <span class="source-line-no">2285</span><span id="line-2285"> public int getTimeoutInterval() {</span> |
| <span class="source-line-no">2286</span><span id="line-2286"> return timeoutInterval;</span> |
| <span class="source-line-no">2287</span><span id="line-2287"> }</span> |
| <span class="source-line-no">2288</span><span id="line-2288"> }</span> |
| <span class="source-line-no">2289</span><span id="line-2289">}</span> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </pre> |
| </div> |
| </main> |
| </body> |
| </html> |