| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.procedure; |
| |
| import org.apache.iotdb.confignode.procedure.exception.ProcedureException; |
| import org.apache.iotdb.confignode.procedure.state.ProcedureState; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| |
| public class RootProcedureStack<Env> { |
| private static final Logger LOG = LoggerFactory.getLogger(RootProcedureStack.class); |
| |
| private enum State { |
| RUNNING, // The Procedure is running or ready to run |
| FAILED, // The Procedure failed, waiting for the rollback executing |
| ROLLINGBACK, // The Procedure failed and the execution was rolledback |
| } |
| |
| private Set<Procedure<Env>> subprocs = null; |
| private ArrayList<Procedure<Env>> subprocStack = null; |
| private State state = State.RUNNING; |
| private int running = 0; |
| |
| public synchronized boolean isFailed() { |
| switch (state) { |
| case ROLLINGBACK: |
| case FAILED: |
| return true; |
| default: |
| break; |
| } |
| return false; |
| } |
| |
| public synchronized boolean isRollingback() { |
| return state == State.ROLLINGBACK; |
| } |
| |
| /** Called by the ProcedureExecutor to mark rollback execution */ |
| protected synchronized boolean setRollback() { |
| if (running == 0 && state == State.FAILED) { |
| state = State.ROLLINGBACK; |
| return true; |
| } |
| return false; |
| } |
| |
| /** Called by the ProcedureExecutor to mark rollback execution */ |
| protected synchronized void unsetRollback() { |
| assert state == State.ROLLINGBACK; |
| state = State.FAILED; |
| } |
| |
| protected synchronized long[] getSubprocedureIds() { |
| if (subprocs == null) { |
| return null; |
| } |
| return subprocs.stream().mapToLong(Procedure::getProcId).toArray(); |
| } |
| |
| protected synchronized List<Procedure<Env>> getSubproceduresStack() { |
| return subprocStack; |
| } |
| |
| protected synchronized ProcedureException getException() { |
| if (subprocStack != null) { |
| for (Procedure<Env> proc : subprocStack) { |
| if (proc.hasException()) { |
| return proc.getException(); |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** Called by the ProcedureExecutor to mark the procedure step as running. */ |
| protected synchronized boolean acquire() { |
| if (state != State.RUNNING) { |
| return false; |
| } |
| |
| running++; |
| return true; |
| } |
| |
| /** Called by the ProcedureExecutor to mark the procedure step as finished. */ |
| protected synchronized void release() { |
| running--; |
| } |
| |
| protected synchronized void abort() { |
| if (state == State.RUNNING) { |
| state = State.FAILED; |
| } |
| } |
| |
| /** |
| * Called by the ProcedureExecutor after the procedure step is completed, to add the step to the |
| * rollback list (or procedure stack) |
| */ |
| protected synchronized void addRollbackStep(Procedure<Env> proc) { |
| if (proc.isFailed()) { |
| state = State.FAILED; |
| } |
| if (subprocStack == null) { |
| subprocStack = new ArrayList<>(); |
| } |
| proc.addStackIndex(subprocStack.size()); |
| LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size()); |
| subprocStack.add(proc); |
| } |
| |
| protected synchronized void addSubProcedure(Procedure<Env> proc) { |
| if (!proc.hasParent()) { |
| return; |
| } |
| if (subprocs == null) { |
| subprocs = new HashSet<>(); |
| } |
| subprocs.add(proc); |
| } |
| |
| /** |
| * Called on store load by the ProcedureExecutor to load part of the stack. |
| * |
| * <p>Each procedure has its own stack-positions. Which means we have to write to the store only |
| * the Procedure we executed, and nothing else. on load we recreate the full stack by aggregating |
| * each procedure stack-positions. |
| */ |
| protected synchronized void loadStack(Procedure<Env> proc) { |
| addSubProcedure(proc); |
| int[] stackIndexes = proc.getStackIndexes(); |
| if (stackIndexes != null) { |
| if (subprocStack == null) { |
| subprocStack = new ArrayList<>(); |
| } |
| int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size(); |
| if (diff > 0) { |
| subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); |
| while (diff-- > 0) { |
| subprocStack.add(null); |
| } |
| } |
| for (int stackIndex : stackIndexes) { |
| subprocStack.set(stackIndex, proc); |
| } |
| } |
| if (proc.getState() == ProcedureState.ROLLEDBACK) { |
| state = State.ROLLINGBACK; |
| } else if (proc.isFailed()) { |
| state = State.FAILED; |
| } |
| } |
| } |