blob: 88057c97de5b21a7cb8373936e7ec0fafef169d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class RootProcedureStack<Env> {
private static final Logger LOG = LoggerFactory.getLogger(RootProcedureStack.class);
private enum State {
RUNNING, // The Procedure is running or ready to run
FAILED, // The Procedure failed, waiting for the rollback executing
ROLLINGBACK, // The Procedure failed and the execution was rolledback
}
private Set<Procedure<Env>> subprocs = null;
private ArrayList<Procedure<Env>> subprocStack = null;
private State state = State.RUNNING;
private int running = 0;
public synchronized boolean isFailed() {
switch (state) {
case ROLLINGBACK:
case FAILED:
return true;
default:
break;
}
return false;
}
public synchronized boolean isRollingback() {
return state == State.ROLLINGBACK;
}
/** Called by the ProcedureExecutor to mark rollback execution */
protected synchronized boolean setRollback() {
if (running == 0 && state == State.FAILED) {
state = State.ROLLINGBACK;
return true;
}
return false;
}
/** Called by the ProcedureExecutor to mark rollback execution */
protected synchronized void unsetRollback() {
assert state == State.ROLLINGBACK;
state = State.FAILED;
}
protected synchronized long[] getSubprocedureIds() {
if (subprocs == null) {
return null;
}
return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
}
protected synchronized List<Procedure<Env>> getSubproceduresStack() {
return subprocStack;
}
protected synchronized ProcedureException getException() {
if (subprocStack != null) {
for (Procedure<Env> proc : subprocStack) {
if (proc.hasException()) {
return proc.getException();
}
}
}
return null;
}
/** Called by the ProcedureExecutor to mark the procedure step as running. */
protected synchronized boolean acquire() {
if (state != State.RUNNING) {
return false;
}
running++;
return true;
}
/** Called by the ProcedureExecutor to mark the procedure step as finished. */
protected synchronized void release() {
running--;
}
protected synchronized void abort() {
if (state == State.RUNNING) {
state = State.FAILED;
}
}
/**
* Called by the ProcedureExecutor after the procedure step is completed, to add the step to the
* rollback list (or procedure stack)
*/
protected synchronized void addRollbackStep(Procedure<Env> proc) {
if (proc.isFailed()) {
state = State.FAILED;
}
if (subprocStack == null) {
subprocStack = new ArrayList<>();
}
proc.addStackIndex(subprocStack.size());
LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size());
subprocStack.add(proc);
}
protected synchronized void addSubProcedure(Procedure<Env> proc) {
if (!proc.hasParent()) {
return;
}
if (subprocs == null) {
subprocs = new HashSet<>();
}
subprocs.add(proc);
}
/**
* Called on store load by the ProcedureExecutor to load part of the stack.
*
* <p>Each procedure has its own stack-positions. Which means we have to write to the store only
* the Procedure we executed, and nothing else. on load we recreate the full stack by aggregating
* each procedure stack-positions.
*/
protected synchronized void loadStack(Procedure<Env> proc) {
addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) {
if (subprocStack == null) {
subprocStack = new ArrayList<>();
}
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
if (diff > 0) {
subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
while (diff-- > 0) {
subprocStack.add(null);
}
}
for (int stackIndex : stackIndexes) {
subprocStack.set(stackIndex, proc);
}
}
if (proc.getState() == ProcedureState.ROLLEDBACK) {
state = State.ROLLINGBACK;
} else if (proc.isFailed()) {
state = State.FAILED;
}
}
}