blob: e4bba22e93932a9585ac719c6073af0b85fd5d79 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.procedure2;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
* Thread Pool that executes the submitted procedures.
* The executor has a ProcedureStore associated.
* Each operation is logged and on restart the pending procedures are resumed.
* Unless the Procedure code throws an error (e.g. invalid user input)
* the procedure will complete (at some point in time), On restart the pending
* procedures are resumed and the once failed will be rolledback.
* The user can add procedures to the executor via submitProcedure(proc)
* check for the finished state via isFinished(procId)
* and get the result via getResult(procId)
public class ProcedureExecutor<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);
public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
private static final boolean DEFAULT_CHECK_OWNER_SET = false;
public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);
public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min
public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl";
static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min
* {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
* break PE having it fail at various junctures. When non-null, testing is set to an instance of
* the below internal {@link Testing} class with flags set for the particular test.
volatile Testing testing = null;
* Class with parameters describing how to fail/die when in testing-context.
public static class Testing {
protected volatile boolean killIfHasParent = true;
protected volatile boolean killIfSuspended = false;
* Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
* persisting all the state it needs to recover after a crash.
protected volatile boolean killBeforeStoreUpdate = false;
protected volatile boolean toggleKillBeforeStoreUpdate = false;
* Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
* is about a case where memory-state was being set after store to WAL where a crash could
* cause us to get stuck. This flag allows killing at what was a vulnerable time.
protected volatile boolean killAfterStoreUpdate = false;
protected volatile boolean toggleKillAfterStoreUpdate = false;
protected boolean shouldKillBeforeStoreUpdate() {
final boolean kill = this.killBeforeStoreUpdate;
if (this.toggleKillBeforeStoreUpdate) {
this.killBeforeStoreUpdate = !kill;
LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
return kill;
protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
if (isSuspended && !killIfSuspended) {
return false;
if (hasParent && !killIfHasParent) {
return false;
return shouldKillBeforeStoreUpdate();
protected boolean shouldKillAfterStoreUpdate() {
final boolean kill = this.killAfterStoreUpdate;
if (this.toggleKillAfterStoreUpdate) {
this.killAfterStoreUpdate = !kill;
LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
return kill;
protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
public interface ProcedureExecutorListener {
void procedureLoaded(long procId);
void procedureAdded(long procId);
void procedureFinished(long procId);
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
* Once a Root-Procedure completes (success or failure), the result will be added to this map.
* The user of ProcedureExecutor should call getResult(procId) to get the result.
private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed =
new ConcurrentHashMap<>();
* Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
* The RootProcedureState contains the execution stack of the Root-Procedure,
* It is added to the map by submitProcedure() and removed on procedure completion.
private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack =
new ConcurrentHashMap<>();
* Helper map to lookup the live procedures by ID.
* This map contains every procedure. root-procedures and subprocedures.
private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures =
new ConcurrentHashMap<>();
* Helper map to lookup whether the procedure already issued from the same client. This map
* contains every root procedure.
private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
new CopyOnWriteArrayList<>();
private Configuration conf;
* Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
private ThreadGroup threadGroup;
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
private CopyOnWriteArrayList<WorkerThread> workerThreads;
* Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
* resource handling rather than observing in a #join is unexpected).
* Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
* (Should be ok).
private TimeoutExecutorThread<TEnvironment> timeoutExecutor;
* WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
* there is no worker to assign meta, it will new worker thread for it, so it is very important.
* TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
* and so on, some tasks may execute for a long time so will block other tasks like
* WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;
private int corePoolSize;
private int maxPoolSize;
private volatile long keepAliveTime;
* Scheduler/Queue that contains runnable procedures.
private final ProcedureScheduler scheduler;
private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
private final AtomicLong lastProcId = new AtomicLong(-1);
private final AtomicLong workerId = new AtomicLong(0);
private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);
private final TEnvironment environment;
private final ProcedureStore store;
private final boolean checkOwnerSet;
// To prevent concurrent execution of the same procedure.
// For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
// procedure is woken up before we finish the suspend which causes the same procedures to be
// executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
// a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
// execution of the same procedure.
private final IdLock procExecutionLock = new IdLock();
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store) {
this(conf, environment, store, new SimpleProcedureScheduler());
private boolean isRootFinished(Procedure<?> proc) {
Procedure<?> rootProc = procedures.get(proc.getRootProcId());
return rootProc == null || rootProc.isFinished();
private void forceUpdateProcedure(long procId) throws IOException {
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
try {
Procedure<TEnvironment> proc = procedures.get(procId);
if (proc != null) {
if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
LOG.debug("Procedure {} has already been finished and parent is succeeded," +
" skip force updating", proc);
} else {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
LOG.debug("Procedure {} has already been finished and expired, skip force updating",
proc = retainer.getProcedure();
LOG.debug("Force update procedure {}", proc);
} finally {
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store, final ProcedureScheduler scheduler) {
this.environment = environment;
this.scheduler = scheduler; = store;
this.conf = conf;
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
store.registerListener(new ProcedureStoreListener() {
public void forceUpdate(long[] procIds) { -> forceUpdateExecutor.execute(() -> {
try {
} catch (IOException e) {
LOG.warn("Failed to force update procedure with pid={}", procId);
private void load(final boolean abortOnCorruption) throws IOException {
Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
store.load(new ProcedureStore.ProcedureLoader() {
public void setMaxProcId(long maxProcId) {
assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
public void load(ProcedureIterator procIter) throws IOException {
loadProcedures(procIter, abortOnCorruption);
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
int corruptedCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc =;
LOG.error("Corrupt " + proc);
if (abortOnCorruption && corruptedCount > 0) {
throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
while (!stack.isEmpty()) {
restoreLock(stack.pop(), restored);
// Restore the locks for all the procedures.
// Notice that we need to restore the locks starting from the root proc, otherwise there will be
// problem that a sub procedure may hold the exclusive lock first and then we are stuck when
// calling the acquireLock method for the parent procedure.
// The algorithm is straight-forward:
// 1. Use a set to record the procedures which locks have already been restored.
// 2. Use a stack to store the hierarchy of the procedures
// 3. For all the procedure, we will first try to find its parent and push it into the stack,
// unless
// a. We have no parent, i.e, we are the root procedure
// b. The lock has already been restored(by checking the set introduced in #1)
// then we start to pop the stack and call acquireLock for each procedure.
// Notice that this should be done for all procedures, not only the ones in runnableList.
private void restoreLocks() {
Set<Long> restored = new HashSet<>();
Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
procedures.values().forEach(proc -> {
for (;;) {
if (restored.contains(proc.getProcId())) {
restoreLocks(stack, restored);
if (!proc.hasParent()) {
restoreLock(proc, restored);
restoreLocks(stack, restored);
proc = procedures.get(proc.getParentProcId());
private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption)
throws IOException {
// 1. Build the rollback stack
int runnableCount = 0;
int failedCount = 0;
int waitingCount = 0;
int waitingTimeoutCount = 0;
while (procIter.hasNext()) {
boolean finished = procIter.isNextFinished();
Procedure<TEnvironment> proc =;
NonceKey nonceKey = proc.getNonceKey();
long procId = proc.getProcId();
if (finished) {
completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
LOG.debug("Completed {}", proc);
} else {
if (!proc.hasParent()) {
assert !proc.isFinished() : "unexpected finished procedure";
rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
// add the procedure to the map
procedures.put(proc.getProcId(), proc);
switch (proc.getState()) {
case FAILED:
if (nonceKey != null) {
nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
// 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
// push it into the ProcedureScheduler directly to execute the rollback. But this does not work
// after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
// the queue from runQueue in scheduler, and then when a procedure which has lock access, for
// example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
// we will add the queue back to let the workers poll from it. The assumption here is that, the
// procedure which has the xlock should have been polled out already, so when loading we can not
// add the procedure to scheduler first and then call acquireLock, since the procedure is still
// in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
// then there is a dead lock
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
while (procIter.hasNext()) {
if (procIter.isNextFinished()) {
Procedure<TEnvironment> proc =;
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
LOG.debug("Loading {}", proc);
Long rootProcId = getRootProcedureId(proc);
// The orphan procedures will be passed to handleCorrupted, so add an assert here
assert rootProcId != null;
if (proc.hasParent()) {
Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
if (parent != null && !proc.isFinished()) {
RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
switch (proc.getState()) {
case FAILED:
String msg = "Unexpected " + proc.getState() + " state for " + proc;
throw new UnsupportedOperationException(msg);
// 3. Check the waiting procedures to see if some of them can be added to runnable.
waitingList.forEach(proc -> {
if (!proc.hasChildren()) {
// Normally, WAITING procedures should be waken by its children. But, there is a case that,
// all the children are successful and before they can wake up their parent procedure, the
// master was killed. So, during recovering the procedures from ProcedureWal, its children
// are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
// procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
// exception will throw:
// Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
// "NOT RUNNABLE! " + procedure.toString());
} else {
// 4. restore locks
// 5. Push the procedures to the timeout executor
waitingTimeoutList.forEach(proc -> {
// 6. Push the procedure to the scheduler
runnableList.forEach(p -> {
if (!p.hasParent()) {
// After all procedures put into the queue, signal the worker threads.
// Otherwise, there is a race condition. See HBASE-21364.
* Initialize the procedure executor, but do not start workers. We will start them later.
* <p/>
* It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
* ensure a single executor, and start the procedure replay to resume and recover the previous
* pending and in-progress procedures.
* @param numThreads number of threads available for procedure execution.
* @param abortOnCorruption true if you want to abort your service in case a corrupted procedure
* is found on replay. otherwise false.
public void init(int numThreads, boolean abortOnCorruption) throws IOException {
// We have numThreads executor + one timer thread used for timing out
// procedures and triggering periodic procedures.
this.corePoolSize = numThreads;
this.maxPoolSize = 10 * numThreads;"Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}",
corePoolSize, maxPoolSize);
this.threadGroup = new ThreadGroup("PEWorkerGroup");
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");
// Create the workers
workerThreads = new CopyOnWriteArrayList<>();
for (int i = 0; i < corePoolSize; ++i) {
workerThreads.add(new WorkerThread(threadGroup));
long st, et;
// Acquire the store lease.
st = System.nanoTime();
et = System.nanoTime();"Recovered {} lease in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
// start the procedure scheduler
// TODO: Split in two steps.
// TODO: Handle corrupted procedures (currently just a warn)
// The first one will make sure that we have the latest id,
// so we can start the threads and accept new procedures.
// The second step will do the actual load of old procedures.
st = System.nanoTime();
et = System.nanoTime();"Loaded {} in {}", store.getClass().getSimpleName(),
StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
* Start the workers.
public void startWorkers() throws IOException {
if (!running.compareAndSet(false, true)) {
LOG.warn("Already running");
// Start the executors. Here we must have the lastProcId set.
LOG.trace("Start workers {}", workerThreads.size());
for (WorkerThread worker: workerThreads) {
// Internal chores
workerMonitorExecutor.add(new WorkerMonitor());
// Add completed cleaner chore
addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed,
public void stop() {
if (!running.getAndSet(false)) {
public void join() {
assert !isRunning() : "expected not running";
// stop the timeout executor
// stop the work monitor executor
// stop the worker threads
for (WorkerThread worker: workerThreads) {
// Destroy the Thread Group for the executors
// TODO: Fix. #join is not place to destroy resources.
try {
} catch (IllegalThreadStateException e) {
LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT",
this.threadGroup, e.getMessage());
// This dumps list of threads on STDOUT.
// reset the in-memory state for testing
public void refreshConfiguration(final Configuration conf) {
this.conf = conf;
// ==========================================================================
// Accessors
// ==========================================================================
public boolean isRunning() {
return running.get();
* @return the current number of worker threads.
public int getWorkerThreadCount() {
return workerThreads.size();
* @return the core pool size settings.
public int getCorePoolSize() {
return corePoolSize;
public int getActiveExecutorCount() {
return activeExecutorCount.get();
public TEnvironment getEnvironment() {
return this.environment;
public ProcedureStore getStore() {
ProcedureScheduler getScheduler() {
return scheduler;
public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
public long getKeepAliveTime(final TimeUnit timeUnit) {
return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
// ==========================================================================
// Submit/Remove Chores
// ==========================================================================
* Add a chore procedure to the executor
* @param chore the chore to add
public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
if (chore == null) {
* Remove a chore procedure from the executor
* @param chore the chore to remove
* @return whether the chore is removed, or it will be removed later
public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) {
if (chore == null) {
return true;
return timeoutExecutor.remove(chore);
// ==========================================================================
// Nonce Procedure helpers
// ==========================================================================
* Create a NonceKey from the specified nonceGroup and nonce.
* @param nonceGroup the group to use for the {@link NonceKey}
* @param nonce the nonce to use in the {@link NonceKey}
* @return the generated NonceKey
public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
* Register a nonce for a procedure that is going to be submitted.
* A procId will be reserved and on submitProcedure(),
* the procedure with the specified nonce will take the reserved ProcId.
* If someone already reserved the nonce, this method will return the procId reserved,
* otherwise an invalid procId will be returned. and the caller should procede
* and submit the procedure.
* @param nonceKey A unique identifier for this operation from the client or process.
* @return the procId associated with the nonce, if any otherwise an invalid procId.
public long registerNonce(final NonceKey nonceKey) {
if (nonceKey == null) {
return -1;
// check if we have already a Reserved ID for the nonce
Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
if (oldProcId == null) {
// reserve a new Procedure ID, this will be associated with the nonce
// and the procedure submitted with the specified nonce will use this ID.
final long newProcId = nextProcId();
oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
if (oldProcId == null) {
return -1;
// we found a registered nonce, but the procedure may not have been submitted yet.
// since the client expect the procedure to be submitted, spin here until it is.
final boolean traceEnabled = LOG.isTraceEnabled();
while (isRunning() &&
!(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) &&
nonceKeysToProcIdsMap.containsKey(nonceKey)) {
if (traceEnabled) {
LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
return oldProcId.longValue();
* Remove the NonceKey if the procedure was not submitted to the executor.
* @param nonceKey A unique identifier for this operation from the client or process.
public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
if (nonceKey == null) {
final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null) {
// if the procedure was not submitted, remove the nonce
if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
* If the failure failed before submitting it, we may want to give back the
* same error to the requests with the same nonceKey.
* @param nonceKey A unique identifier for this operation from the client or process
* @param procName name of the procedure, used to inform the user
* @param procOwner name of the owner of the procedure, used to inform the user
* @param exception the failure to report to the user
public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner,
IOException exception) {
if (nonceKey == null) {
Long procId = nonceKeysToProcIdsMap.get(nonceKey);
if (procId == null || completed.containsKey(procId)) {
completed.computeIfAbsent(procId, (key) -> {
Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(),
procName, procOwner, nonceKey, exception);
return new CompletedProcedureRetainer<>(proc);
// ==========================================================================
// Submit/Abort Procedure
// ==========================================================================
* Add a new root-procedure to the executor.
* @param proc the new procedure to execute.
* @return the procedure id, that can be used to monitor the operation
public long submitProcedure(Procedure<TEnvironment> proc) {
return submitProcedure(proc, null);
* Bypass a procedure. If the procedure is set to bypass, all the logic in
* execute/rollback will be ignored and it will return success, whatever.
* It is used to recover buggy stuck procedures, releasing the lock resources
* and letting other procedures run. Bypassing one procedure (and its ancestors will
* be bypassed automatically) may leave the cluster in a middle state, e.g. region
* not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
* the operators may have to do some clean up on hdfs or schedule some assign procedures
* to let region online. DO AT YOUR OWN RISK.
* <p>
* A procedure can be bypassed only if
* 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
* or it is a root procedure without any child.
* 2. No other worker thread is executing it
* 3. No child procedure has been submitted
* <p>
* If all the requirements are meet, the procedure and its ancestors will be
* bypassed and persisted to WAL.
* <p>
* If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
* @param pids the procedure id
* @param lockWait time to wait lock
* @param force if force set to true, we will bypass the procedure even if it is executing.
* This is for procedures which can't break out during executing(due to bug, mostly)
* In this case, bypassing the procedure is not enough, since it is already stuck
* there. We need to restart the master after bypassing, and letting the problematic
* procedure to execute wth bypass=true, so in that condition, the procedure can be
* successfully bypassed.
* @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
* @return true if bypass success
* @throws IOException IOException
public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force,
boolean recursive)
throws IOException {
List<Boolean> result = new ArrayList<Boolean>(pids.size());
for(long pid: pids) {
result.add(bypassProcedure(pid, lockWait, force, recursive));
return result;
boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive)
throws IOException {
Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
final Procedure<TEnvironment> procedure = getProcedure(pid);
if (procedure == null) {
LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
return false;
LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}",
procedure, lockWait, override, recursive);
IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
if (lockEntry == null && !override) {
LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}",
lockWait, procedure, override);
return false;
} else if (lockEntry == null) {
LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}",
lockWait, procedure, override);
try {
// check whether the procedure is already finished
if (procedure.isFinished()) {
LOG.debug("{} is already finished, skipping bypass", procedure);
return false;
if (procedure.hasChildren()) {
if (recursive) {
// EXPENSIVE. Checks each live procedure of which there could be many!!!
// Is there another way to get children of a procedure?"Recursive bypass on children of pid={}", procedure.getProcId());
this.procedures.forEachValue(1 /*Single-threaded*/,
// Transformer
v -> v.getParentProcId() == procedure.getProcId()? v: null,
// Consumer
v -> {
try {
bypassProcedure(v.getProcId(), lockWait, override, recursive);
} catch (IOException e) {
LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
} else {
LOG.debug("{} has children, skipping bypass", procedure);
return false;
// If the procedure has no parent or no child, we are safe to bypass it in whatever state
if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE
&& procedure.getState() != ProcedureState.WAITING
&& procedure.getState() != ProcedureState.WAITING_TIMEOUT) {
LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states "
+ "(with no parent), {}",
// Question: how is the bypass done here?
return false;
// Now, the procedure is not finished, and no one can execute it since we take the lock now
// And we can be sure that its ancestor is not running too, since their child has not
// finished yet
Procedure<TEnvironment> current = procedure;
while (current != null) {
LOG.debug("Bypassing {}", current);
long parentID = current.getParentProcId();
current = getProcedure(parentID);
//wake up waiting procedure, already checked there is no child
if (procedure.getState() == ProcedureState.WAITING) {
// If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
// Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
if (timeoutExecutor.remove(procedure)) {
LOG.debug("removed procedure {} from timeoutExecutor", procedure);
} else if (lockEntry != null) {
LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
} else {
// If we don't have the lock, we can't re-submit the queue,
// since it is already executing. To get rid of the stuck situation, we
// need to restart the master. With the procedure set to bypass, the procedureExecutor
// will bypass it and won't get stuck again.
LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, "
+ "skipping add to queue",
return true;
} finally {
if (lockEntry != null) {
* Add a new root-procedure to the executor.
* @param proc the new procedure to execute.
* @param nonceKey the registered unique identifier for this operation from the client or process.
* @return the procedure id, that can be used to monitor the operation
justification = "FindBugs is blind to the check-for-null")
public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
Preconditions.checkArgument(lastProcId.get() >= 0);
final Long currentProcId;
if (nonceKey != null) {
currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
Preconditions.checkArgument(currentProcId != null,
"Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc);
} else {
currentProcId = nextProcId();
// Initialize the procedure
// Commit the transaction
store.insert(proc, null);
LOG.debug("Stored {}", proc);
// Add the procedure to the executor
return pushProcedure(proc);
* Add a set of new root-procedure to the executor.
* @param procs the new procedures to execute.
// TODO: Do we need to take nonces here?
public void submitProcedures(Procedure<TEnvironment>[] procs) {
Preconditions.checkArgument(lastProcId.get() >= 0);
if (procs == null || procs.length <= 0) {
// Prepare procedure
for (int i = 0; i < procs.length; ++i) {
// Commit the transaction
if (LOG.isDebugEnabled()) {
LOG.debug("Stored " + Arrays.toString(procs));
// Add the procedure to the executor
for (int i = 0; i < procs.length; ++i) {
private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
if (this.checkOwnerSet) {
Preconditions.checkArgument(proc.hasOwner(), "missing owner");
return proc;
private long pushProcedure(Procedure<TEnvironment> proc) {
final long currentProcId = proc.getProcId();
// Update metrics on start of a procedure
// Create the rollback stack for the procedure
RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
rollbackStack.put(currentProcId, stack);
// Submit the new subprocedures
assert !procedures.containsKey(currentProcId);
procedures.put(currentProcId, proc);
return proc.getProcId();
* Send an abort notification the specified procedure.
* Depending on the procedure implementation the abort can be considered or ignored.
* @param procId the procedure to abort
* @return true if the procedure exists and has received the abort, otherwise false.
public boolean abort(long procId) {
return abort(procId, true);
* Send an abort notification to the specified procedure.
* Depending on the procedure implementation, the abort can be considered or ignored.
* @param procId the procedure to abort
* @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
* @return true if the procedure exists and has received the abort, otherwise false.
public boolean abort(long procId, boolean mayInterruptIfRunning) {
Procedure<TEnvironment> proc = procedures.get(procId);
if (proc != null) {
if (!mayInterruptIfRunning && proc.wasExecuted()) {
return false;
return proc.abort(getEnvironment());
return false;
// ==========================================================================
// Executor query helpers
// ==========================================================================
public Procedure<TEnvironment> getProcedure(final long procId) {
return procedures.get(procId);
public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
Procedure<TEnvironment> proc = getProcedure(procId);
if (clazz.isInstance(proc)) {
return clazz.cast(proc);
return null;
public Procedure<TEnvironment> getResult(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return null;
} else {
return retainer.getProcedure();
* Return true if the procedure is finished.
* The state may be "completed successfully" or "failed and rolledback".
* Use getResult() to check the state or get the result data.
* @param procId the ID of the procedure to check
* @return true if the procedure execution is finished, otherwise false.
public boolean isFinished(final long procId) {
return !procedures.containsKey(procId);
* Return true if the procedure is started.
* @param procId the ID of the procedure to check
* @return true if the procedure execution is started, otherwise false.
public boolean isStarted(long procId) {
Procedure<?> proc = procedures.get(procId);
if (proc == null) {
return completed.get(procId) != null;
return proc.wasExecuted();
* Mark the specified completed procedure, as ready to remove.
* @param procId the ID of the procedure to remove
public void removeResult(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
LOG.debug("pid={} already removed by the cleaner.", procId);
// The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
public Procedure<TEnvironment> getResultOrProcedure(long procId) {
CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer == null) {
return procedures.get(procId);
} else {
return retainer.getProcedure();
* Check if the user is this procedure's owner
* @param procId the target procedure
* @param user the user
* @return true if the user is the owner of the procedure,
* false otherwise or the owner is unknown.
public boolean isProcedureOwner(long procId, User user) {
if (user == null) {
return false;
final Procedure<TEnvironment> runningProc = procedures.get(procId);
if (runningProc != null) {
return runningProc.getOwner().equals(user.getShortName());
final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
if (retainer != null) {
return retainer.getProcedure().getOwner().equals(user.getShortName());
// Procedure either does not exist or has already completed and got cleaned up.
// At this time, we cannot check the owner of the procedure
return false;
* Should only be used when starting up, where the procedure workers have not been started.
* <p/>
* If the procedure works has been started, the return values maybe changed when you are
* processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
* it will do a copy, and also include the finished procedures.
public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
return procedures.values();
* Get procedures.
* @return the procedures in a list
public List<Procedure<TEnvironment>> getProcedures() {
List<Procedure<TEnvironment>> procedureList =
new ArrayList<>(procedures.size() + completed.size());
// Note: The procedure could show up twice in the list with different state, as
// it could complete after we walk through procedures list and insert into
// procedureList - it is ok, as we will use the information in the Procedure
// to figure it out; to prevent this would increase the complexity of the logic.
return procedureList;
// ==========================================================================
// Listeners helpers
// ==========================================================================
public void registerListener(ProcedureExecutorListener listener) {
public boolean unregisterListener(ProcedureExecutorListener listener) {
return this.listeners.remove(listener);
private void sendProcedureLoadedNotification(final long procId) {
if (!this.listeners.isEmpty()) {
for (ProcedureExecutorListener listener: this.listeners) {
try {
} catch (Throwable e) {
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
private void sendProcedureAddedNotification(final long procId) {
if (!this.listeners.isEmpty()) {
for (ProcedureExecutorListener listener: this.listeners) {
try {
} catch (Throwable e) {
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
private void sendProcedureFinishedNotification(final long procId) {
if (!this.listeners.isEmpty()) {
for (ProcedureExecutorListener listener: this.listeners) {
try {
} catch (Throwable e) {
LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
// ==========================================================================
// Procedure IDs helpers
// ==========================================================================
private long nextProcId() {
long procId = lastProcId.incrementAndGet();
if (procId < 0) {
while (!lastProcId.compareAndSet(procId, 0)) {
procId = lastProcId.get();
if (procId >= 0) {
while (procedures.containsKey(procId)) {
procId = lastProcId.incrementAndGet();
assert procId >= 0 : "Invalid procId " + procId;
return procId;
protected long getLastProcId() {
return lastProcId.get();
public Set<Long> getActiveProcIds() {
return procedures.keySet();
Long getRootProcedureId(Procedure<TEnvironment> proc) {
return Procedure.getRootProcedureId(procedures, proc);
// ==========================================================================
// Executions
// ==========================================================================
private void executeProcedure(Procedure<TEnvironment> proc) {
if (proc.isFinished()) {
LOG.debug("{} is already finished, skipping execution", proc);
final Long rootProcId = getRootProcedureId(proc);
if (rootProcId == null) {
// The 'proc' was ready to run but the root procedure was rolledback
LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
if (procStack == null) {
LOG.warn("RootProcedureState is null for " + proc.getProcId());
do {
// Try to acquire the execution
if (!procStack.acquire(proc)) {
if (procStack.setRollback()) {
// we have the 'rollback-lock' we can start rollingback
switch (executeRollback(rootProcId, procStack)) {
case LOCK_EVENT_WAIT:"LOCK_EVENT_WAIT rollback..." + proc);
throw new UnsupportedOperationException();
} else {
// if we can't rollback means that some child is still running.
// the rollback will be executed after all the children are done.
// If the procedure was never executed, remove and mark it as rolledback.
if (!proc.wasExecuted()) {
switch (executeRollback(proc)) {
case LOCK_EVENT_WAIT:"LOCK_EVENT_WAIT can't rollback child running?..." + proc);
throw new UnsupportedOperationException();
// Execute the procedure
assert proc.getState() == ProcedureState.RUNNABLE : proc;
// Note that lock is NOT about concurrency but rather about ensuring
// ownership of a procedure of an entity such as a region or table
LockState lockState = acquireLock(proc);
switch (lockState) {
execProcedure(procStack, proc);
case LOCK_YIELD_WAIT: + " " + proc);
// Someone will wake us up when the lock is available
LOG.debug(lockState + " " + proc);
throw new UnsupportedOperationException();
if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);"Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
// Finalize the procedure state
if (proc.getProcId() == rootProcId) {
} else {
} while (procStack.isFailed());
private LockState acquireLock(Procedure<TEnvironment> proc) {
TEnvironment env = getEnvironment();
// if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
// hasLock is true.
if (proc.hasLock()) {
return LockState.LOCK_ACQUIRED;
return proc.doAcquireLock(env, store);
private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
TEnvironment env = getEnvironment();
// For how the framework works, we know that we will always have the lock
// when we call releaseLock(), so we can avoid calling proc.hasLock()
if (force || !proc.holdLock(env) || proc.isFinished()) {
proc.doReleaseLock(env, store);
* Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
* root-procedure will be visible as finished to user, and the result will be the fatal exception.
private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
// TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
// rolling back because the subprocedure does. Clarify.
if (exception == null) {
exception = procStack.getException();
List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;
int stackTail = subprocStack.size();
while (stackTail-- > 0) {
Procedure<TEnvironment> proc = subprocStack.get(stackTail);
IdLock.Entry lockEntry = null;
// Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
// this check, as the worker will hold the lock before executing a procedure. This is the only
// place where we may hold two procedure execution locks, and there is a fence in the
// RootProcedureState where we can make sure that only one worker can execute the rollback of
// a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
// prevent race between us and the force update thread.
if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
try {
lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
} catch (IOException e) {
// can only happen if interrupted, so not a big deal to propagate it
throw new UncheckedIOException(e);
try {
// For the sub procedures which are successfully finished, we do not rollback them.
// Typically, if we want to rollback a procedure, we first need to rollback it, and then
// recursively rollback its ancestors. The state changes which are done by sub procedures
// should be handled by parent procedures when rolling back. For example, when rolling back
// a MergeTableProcedure, we will schedule new procedures to bring the offline regions
// online, instead of rolling back the original procedures which offlined the regions(in
// fact these procedures can not be rolled back...).
if (proc.isSuccess()) {
// Just do the cleanup work, without actually executing the rollback
LockState lockState = acquireLock(proc);
if (lockState != LockState.LOCK_ACQUIRED) {
// can't take a lock on the procedure, add the root-proc back on the
// queue waiting for the lock availability
return lockState;
lockState = executeRollback(proc);
releaseLock(proc, false);
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (abortRollback) {
return lockState;
// if the procedure is kind enough to pass the slot to someone else, yield
// if the proc is already finished, do not yield
if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
return LockState.LOCK_YIELD_WAIT;
if (proc != rootProc) {
} finally {
if (lockEntry != null) {
// Finalize the procedure state"Rolled back {} exec-time={}", rootProc,
return LockState.LOCK_ACQUIRED;
private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
if (proc.removeStackIndex()) {
if (!proc.isSuccess()) {
// update metrics on finishing the procedure (fail)
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);
if (proc.hasParent()) {
} else {
final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
if (childProcIds != null) {
store.delete(proc, childProcIds);
} else {
} else {
* Execute the rollback of the procedure step.
* It updates the store with the new state (stack index)
* or will remove completly the procedure in case it is a child.
private LockState executeRollback(Procedure<TEnvironment> proc) {
try {
} catch (IOException e) {
LOG.debug("Roll back attempt failed for {}", proc, e);
return LockState.LOCK_YIELD_WAIT;
} catch (InterruptedException e) {
handleInterruptedException(proc, e);
return LockState.LOCK_YIELD_WAIT;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
String msg = "TESTING: Kill before store update";
throw new RuntimeException(msg);
return LockState.LOCK_ACQUIRED;
private void yieldProcedure(Procedure<TEnvironment> proc) {
releaseLock(proc, false);
* Executes <code>procedure</code>
* <ul>
* <li>Calls the doExecute() of the procedure
* <li>If the procedure execution didn't fail (i.e. valid user input)
* <ul>
* <li>...and returned subprocedures
* <ul><li>The subprocedures are initialized.
* <li>The subprocedures are added to the store
* <li>The subprocedures are added to the runnable queue
* <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
* </ul>
* </li>
* <li>...if there are no subprocedure
* <ul><li>the procedure completed successfully
* <li>if there is a parent (WAITING)
* <li>the parent state will be set to RUNNABLE
* </ul>
* </li>
* </ul>
* </li>
* <li>In case of failure
* <ul>
* <li>The store is updated with the new state</li>
* <li>The executor (caller of this method) will start the rollback of the procedure</li>
* </ul>
* </li>
* </ul>
private void execProcedure(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) {
Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
"NOT RUNNABLE! " + procedure.toString());
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
// idea is that the processing of this procedure will be unsuspended later by an external event
// such the report of a region open.
boolean suspended = false;
// Whether to 're-' -execute; run through the loop again.
boolean reExecute = false;
Procedure<TEnvironment>[] subprocs = null;
do {
reExecute = false;
try {
subprocs = procedure.doExecute(getEnvironment());
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
} catch (ProcedureSuspendedException e) {
LOG.trace("Suspend {}", procedure);
suspended = true;
} catch (ProcedureYieldException e) {
LOG.trace("Yield {}", procedure, e);
} catch (InterruptedException e) {
LOG.trace("Yield interrupt {}", procedure, e);
handleInterruptedException(procedure, e);
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
LOG.error(msg, e);
procedure.setFailure(new RemoteProcedureException(msg, e));
if (!procedure.isFailed()) {
if (subprocs != null) {
if (subprocs.length == 1 && subprocs[0] == procedure) {
// Procedure returned itself. Quick-shortcut for a state machine-like procedure;
// i.e. we go around this loop again rather than go back out on the scheduler queue.
subprocs = null;
reExecute = true;
LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
} else {
// Yield the current procedure, and make the subprocedure runnable
// subprocs may come back 'null'.
subprocs = initializeChildren(procStack, procedure, subprocs);"Initialized subprocedures=" +
(subprocs == null? null:
Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
} else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.trace("Added to timeoutExecutor {}", procedure);
} else if (!suspended) {
// No subtask, so we are done
// Add the procedure to the stack
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (testing != null &&
testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
kill("TESTING: Kill BEFORE store update: " + procedure);
// TODO: The code here doesn't check if store is running before persisting to the store as
// it relies on the method call below to throw RuntimeException to wind up the stack and
// executor thread to stop. The statement following the method call below seems to check if
// store is not running, to prevent scheduling children procedures, re-execution or yield
// of this procedure. This may need more scrutiny and subsequent cleanup in future
// Commit the transaction even if a suspend (state may have changed). Note this append
// can take a bunch of time to complete.
if (procedure.needPersistence()) {
updateStoreOnExec(procStack, procedure, subprocs);
// if the store is not running we are aborting
if (!store.isRunning()) {
// if the procedure is kind enough to pass the slot to someone else, yield
if (procedure.isRunnable() && !suspended &&
procedure.isYieldAfterExecutionStep(getEnvironment())) {
assert (reExecute && subprocs == null) || !reExecute;
} while (reExecute);
// Allows to kill the executor after something is stored to the WAL but before the below
// state settings are done -- in particular the one on the end where we make parent
// RUNNABLE again when its children are done; see countDownChildren.
if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
kill("TESTING: Kill AFTER store update: " + procedure);
// Submit the new subprocedures
if (subprocs != null && !procedure.isFailed()) {
// we need to log the release lock operation before waking up the parent procedure, as there
// could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
// the sub procedures from store and cause problems...
releaseLock(procedure, false);
// if the procedure is complete and has a parent, count down the children latch.
// If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
if (!suspended && procedure.isFinished() && procedure.hasParent()) {
countDownChildren(procStack, procedure);
private void kill(String msg) {
throw new RuntimeException(msg);
private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
assert subprocs != null : "expected subprocedures";
final long rootProcId = getRootProcedureId(procedure);
for (int i = 0; i < subprocs.length; ++i) {
Procedure<TEnvironment> subproc = subprocs[i];
if (subproc == null) {
String msg = "subproc[" + i + "] is null, aborting the procedure";
procedure.setFailure(new RemoteProcedureException(msg,
new IllegalArgumentIOException(msg)));
return null;
assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
if (!procedure.isFailed()) {
switch (procedure.getState()) {
return subprocs;
private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
for (int i = 0; i < subprocs.length; ++i) {
Procedure<TEnvironment> subproc = subprocs[i];
assert !procedures.containsKey(subproc.getProcId());
procedures.put(subproc.getProcId(), subproc);
private void countDownChildren(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure) {
Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
if (parent == null) {
assert procStack.isRollingback();
// If this procedure is the last child awake the parent procedure
if (parent.tryRunnable()) {
// If we succeeded in making the parent runnable -- i.e. all of its
// children have completed, move parent to front of the queue.
scheduler.addFront(parent);"Finished subprocedure(s) of " + parent + "; resume parent processing.");
private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack,
Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) {
if (subprocs != null && !procedure.isFailed()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
store.insert(procedure, subprocs);
} else {
LOG.trace("Store update {}", procedure);
if (procedure.isFinished() && !procedure.hasParent()) {
// remove child procedures
final long[] childProcIds = procStack.getSubprocedureIds();
if (childProcIds != null) {
store.delete(procedure, childProcIds);
for (int i = 0; i < childProcIds.length; ++i) {
} else {
} else {
private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
// NOTE: We don't call Thread.currentThread().interrupt()
// because otherwise all the subsequent calls e.g. Thread.sleep() will throw
// the InterruptedException. If the master is going down, we will be notified
// and the executor/store will be stopped.
// (The interrupted procedure will be retried on the next run)
private void execCompletionCleanup(Procedure<TEnvironment> proc) {
final TEnvironment env = getEnvironment();
if (proc.hasLock()) {
LOG.warn("Usually this should not happen, we will release the lock before if the procedure" +
" is finished, even if the holdLock is true, arrive here means we have some holes where" +
" we do not release the lock. And the releaseLock below may fail since the procedure may" +
" have already been deleted from the procedure store.");
releaseLock(proc, true);
try {
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
private void procedureFinished(Procedure<TEnvironment> proc) {
// call the procedure completion cleanup handler
CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);
// update the executor internal state maps
if (!proc.shouldWaitClientAck(getEnvironment())) {
completed.put(proc.getProcId(), retainer);
// call the runnableSet completion cleanup handler
try {
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
// Notify the listeners
RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
return rollbackStack.get(rootProcId);
ProcedureScheduler getProcedureScheduler() {
return scheduler;
int getCompletedSize() {
return completed.size();
public IdLock getProcExecutionLock() {
return procExecutionLock;
// ==========================================================================
// Worker Thread
// ==========================================================================
private class WorkerThread extends StoppableThread {
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
private volatile Procedure<TEnvironment> activeProcedure;
public WorkerThread(ThreadGroup group) {
this(group, "PEWorker-");
protected WorkerThread(ThreadGroup group, String prefix) {
super(group, prefix + workerId.incrementAndGet());
public void sendStopSignal() {
public void run() {
long lastUpdate = EnvironmentEdgeManager.currentTime();
try {
while (isRunning() && keepAlive(lastUpdate)) {
Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (proc == null) {
this.activeProcedure = proc;
int activeCount = activeExecutorCount.incrementAndGet();
int runningCount = store.setRunningProcedureCount(activeCount);
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
try {
} catch (AssertionError e) {"ASSERT pid=" + proc.getProcId(), e);
throw e;
} finally {
activeCount = activeExecutorCount.decrementAndGet();
runningCount = store.setRunningProcedureCount(activeCount);
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
this.activeProcedure = null;
lastUpdate = EnvironmentEdgeManager.currentTime();
} catch (Throwable t) {
LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
} finally {
LOG.trace("Worker terminated.");
public String toString() {
Procedure<?> p = this.activeProcedure;
return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")");
* @return the time since the current procedure is running
public long getCurrentRunTime() {
return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
// core worker never timeout
protected boolean keepAlive(long lastUpdate) {
return true;
// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {
public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KeepAlivePEWorker-");
protected boolean keepAlive(long lastUpdate) {
return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
// ----------------------------------------------------------------------------
// TODO-MAYBE: Should we provide a InlineChore to notify the store with the
// full set of procedures pending and completed to write a compacted
// version of the log (in case is a log)?
// In theory no, procedures are have a short life, so at some point the store
// will have the tracker saying everything is in the last log.
// ----------------------------------------------------------------------------
private final class WorkerMonitor extends InlineChore {
public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck
private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
public WorkerMonitor() {
public void run() {
final int stuckCount = checkForStuckWorkers();
// refresh interval (poor man dynamic conf update)
private int checkForStuckWorkers() {
// check if any of the worker is stuck
int stuckCount = 0;
for (WorkerThread worker : workerThreads) {
if (worker.getCurrentRunTime() < stuckThreshold) {
// WARN the worker is stuck
LOG.warn("Worker stuck {}, run time {}", worker,
return stuckCount;
private void checkThreadCount(final int stuckCount) {
// nothing to do if there are no runnable tasks
if (stuckCount < 1 || !scheduler.hasRunnables()) {
// add a new thread if the worker stuck percentage exceed the threshold limit
// and every handler is active.
final float stuckPerc = ((float) stuckCount) / workerThreads.size();
// let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
LOG.debug("Added new worker thread {}", worker);
private void refreshConfig() {
addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
public int getTimeoutInterval() {
return timeoutInterval;