blob: 440f9e7d6ec10ff24d978e6972b826a2259daa24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
/**
* Internal state of the ProcedureExecutor that describes the state of a "Root Procedure".
* A "Root Procedure" is a Procedure without parent, each subprocedure will be
* added to the "Root Procedure" stack (or rollback-stack).
*
* RootProcedureState is used and managed only by the ProcedureExecutor.
* Long rootProcId = getRootProcedureId(proc);
* rollbackStack.get(rootProcId).acquire(proc)
* rollbackStack.get(rootProcId).release(proc)
* ...
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class RootProcedureState<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
private enum State {
RUNNING, // The Procedure is running or ready to run
FAILED, // The Procedure failed, waiting for the rollback executing
ROLLINGBACK, // The Procedure failed and the execution was rolledback
}
private Set<Procedure<TEnvironment>> subprocs = null;
private ArrayList<Procedure<TEnvironment>> subprocStack = null;
private State state = State.RUNNING;
private int running = 0;
public synchronized boolean isFailed() {
switch (state) {
case ROLLINGBACK:
case FAILED:
return true;
default:
break;
}
return false;
}
public synchronized boolean isRollingback() {
return state == State.ROLLINGBACK;
}
/**
* Called by the ProcedureExecutor to mark rollback execution
*/
protected synchronized boolean setRollback() {
if (running == 0 && state == State.FAILED) {
state = State.ROLLINGBACK;
return true;
}
return false;
}
/**
* Called by the ProcedureExecutor to mark rollback execution
*/
protected synchronized void unsetRollback() {
assert state == State.ROLLINGBACK;
state = State.FAILED;
}
protected synchronized long[] getSubprocedureIds() {
if (subprocs == null) {
return null;
}
return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
}
protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
return subprocStack;
}
protected synchronized RemoteProcedureException getException() {
if (subprocStack != null) {
for (Procedure<TEnvironment> proc: subprocStack) {
if (proc.hasException()) {
return proc.getException();
}
}
}
return null;
}
/**
* Called by the ProcedureExecutor to mark the procedure step as running.
*/
protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
if (state != State.RUNNING) {
return false;
}
running++;
return true;
}
/**
* Called by the ProcedureExecutor to mark the procedure step as finished.
*/
protected synchronized void release(Procedure<TEnvironment> proc) {
running--;
}
protected synchronized void abort() {
if (state == State.RUNNING) {
state = State.FAILED;
}
}
/**
* Called by the ProcedureExecutor after the procedure step is completed,
* to add the step to the rollback list (or procedure stack)
*/
protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
if (proc.isFailed()) {
state = State.FAILED;
}
if (subprocStack == null) {
subprocStack = new ArrayList<>();
}
proc.addStackIndex(subprocStack.size());
LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size());
subprocStack.add(proc);
}
protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
if (!proc.hasParent()) {
return;
}
if (subprocs == null) {
subprocs = new HashSet<>();
}
subprocs.add(proc);
}
/**
* Called on store load by the ProcedureExecutor to load part of the stack.
*
* Each procedure has its own stack-positions. Which means we have to write
* to the store only the Procedure we executed, and nothing else.
* on load we recreate the full stack by aggregating each procedure stack-positions.
*/
protected synchronized void loadStack(Procedure<TEnvironment> proc) {
addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) {
if (subprocStack == null) {
subprocStack = new ArrayList<>();
}
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
if (diff > 0) {
subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
while (diff-- > 0) {
subprocStack.add(null);
}
}
for (int i = 0; i < stackIndexes.length; ++i) {
subprocStack.set(stackIndexes[i], proc);
}
}
if (proc.getState() == ProcedureState.ROLLEDBACK) {
state = State.ROLLINGBACK;
} else if (proc.isFailed()) {
state = State.FAILED;
}
}
/**
* Called on store load by the ProcedureExecutor to validate the procedure stack.
*/
protected synchronized boolean isValid() {
if (subprocStack != null) {
for (Procedure<TEnvironment> proc : subprocStack) {
if (proc == null) {
return false;
}
}
}
return true;
}
}