Refactor procedure framework (simplified StateMachineProcedure, and some other things) (#14683) (#14741)

* delete KAWorking

* delete ProcedureSuspended

* delete "aborted" and suspended

* refactor states and delete stateCount

* refactor setNextState

* delete previousState

* spotless

* compile

* Revert "delete KAWorking"

This reverts commit 43176fa105dd2536611ee59f29a7a022c84a4356.

* recover KAWorker but rename it to TemporaryWorker

* spotless

* fix npe

* delete useless code

* delete useless code

* necessary log

(cherry picked from commit 41a49e7c1e6db3e88242f0b38f62bb5074fa8810)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
index 817a8e3..4732343 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
@@ -49,10 +49,5 @@
   }
 
   @Override
-  protected boolean abort(Env env) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public void deserialize(ByteBuffer byteBuffer) {}
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 35fc402..d4f97c1 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -70,20 +70,6 @@
 
   private int[] stackIndexes = null;
 
-  private boolean persist = true;
-
-  public boolean needPersistance() {
-    return this.persist;
-  }
-
-  public void resetPersistance() {
-    this.persist = true;
-  }
-
-  public final void skipPersistance() {
-    this.persist = false;
-  }
-
   public final boolean hasLock() {
     return locked;
   }
@@ -119,17 +105,6 @@
   protected abstract void rollback(Env env)
       throws IOException, InterruptedException, ProcedureException;
 
-  /**
-   * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
-   * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
-   * abort() method and then the execute() will check if the abort flag is set or not. abort() may
-   * be called multiple times from the client, so the implementation must be idempotent.
-   *
-   * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
-   * procedure implementor abort.
-   */
-  protected abstract boolean abort(Env env);
-
   public void serialize(DataOutputStream stream) throws IOException {
     // procid
     stream.writeLong(this.procId);
@@ -263,28 +238,6 @@
     return clazz;
   }
 
-  public static Procedure<?> newInstance(ByteBuffer byteBuffer) {
-    Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
-    Procedure<?> procedure;
-    try {
-      procedure = (Procedure<?>) procedureClass.newInstance();
-    } catch (InstantiationException | IllegalAccessException e) {
-      throw new RuntimeException("Instantiation failed", e);
-    }
-    return procedure;
-  }
-
-  /**
-   * The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
-   * will call us to determine whether we need to wait for initialization, second, it will call
-   * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
-   *
-   * @return true means we need to wait until the environment has been initialized, otherwise true.
-   */
-  protected boolean waitInitialized(Env env) {
-    return false;
-  }
-
   /**
    * Acquire a lock, user should override it if necessary.
    *
@@ -315,34 +268,6 @@
   }
 
   /**
-   * Called before the procedure is recovered and added into the queue.
-   *
-   * @param env environment
-   */
-  protected final void beforeRecover(Env env) {
-    // no op
-  }
-
-  /**
-   * Called when the procedure is recovered and added into the queue.
-   *
-   * @param env environment
-   */
-  protected final void afterRecover(Env env) {
-    // no op
-  }
-
-  /**
-   * Called when the procedure is completed (success or rollback). The procedure may use this method
-   * to clean up in-memory states. This operation will not be retried on failure.
-   *
-   * @param env environment
-   */
-  protected void completionCleanup(Env env) {
-    // no op
-  }
-
-  /**
    * To make executor yield between each execution step to give other procedures a chance to run.
    *
    * @param env environment
@@ -393,9 +318,6 @@
    * @return ProcedureLockState
    */
   public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
-    if (waitInitialized(env)) {
-      return ProcedureLockState.LOCK_EVENT_WAIT;
-    }
     if (lockedWhenLoading) {
       lockedWhenLoading = false;
       locked = true;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index d1bbb32..84e2b7f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -186,24 +185,14 @@
             // executing, we need to set its state to RUNNABLE.
             procedure.setState(ProcedureState.RUNNABLE);
             runnableList.add(procedure);
-          } else {
-            procedure.afterRecover(environment);
           }
         });
     restoreLocks();
 
-    waitingTimeoutList.forEach(
-        procedure -> {
-          procedure.afterRecover(environment);
-          timeoutExecutor.add(procedure);
-        });
+    waitingTimeoutList.forEach(timeoutExecutor::add);
 
     failedList.forEach(scheduler::addBack);
-    runnableList.forEach(
-        procedure -> {
-          procedure.afterRecover(environment);
-          scheduler.addBack(procedure);
-        });
+    runnableList.forEach(scheduler::addBack);
     scheduler.signalAll();
   }
 
@@ -418,21 +407,16 @@
           "The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
       return;
     }
-    boolean suspended = false;
     boolean reExecute;
 
     Procedure<Env>[] subprocs = null;
     do {
       reExecute = false;
-      proc.resetPersistance();
       try {
         subprocs = proc.doExecute(this.environment);
         if (subprocs != null && subprocs.length == 0) {
           subprocs = null;
         }
-      } catch (ProcedureSuspendedException e) {
-        LOG.debug("Suspend {}", proc);
-        suspended = true;
       } catch (ProcedureYieldException e) {
         LOG.debug("Yield {}", proc);
         yieldProcedure(proc);
@@ -455,22 +439,20 @@
           }
         } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
           LOG.info("Added into timeoutExecutor {}", proc);
-        } else if (!suspended) {
+        } else {
           proc.setState(ProcedureState.SUCCESS);
         }
       }
       // add procedure into rollback stack.
       rootProcStack.addRollbackStep(proc);
 
-      if (proc.needPersistance()) {
-        updateStoreOnExecution(rootProcStack, proc, subprocs);
-      }
+      updateStoreOnExecution(rootProcStack, proc, subprocs);
 
       if (!store.isRunning()) {
         return;
       }
 
-      if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
+      if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) {
         yieldProcedure(proc);
         return;
       }
@@ -481,7 +463,7 @@
     }
 
     releaseLock(proc, false);
-    if (!suspended && proc.isFinished() && proc.hasParent()) {
+    if (proc.isFinished() && proc.hasParent()) {
       countDownChildren(rootProcStack, proc);
     }
   }
@@ -518,6 +500,7 @@
       subproc.updateMetricsOnSubmit(getEnvironment());
       procedures.put(subproc.getProcId(), subproc);
       scheduler.addFront(subproc);
+      LOG.info("Sub-Procedure pid={} has been submitted", subproc.getProcId());
     }
   }
 
@@ -693,11 +676,6 @@
     if (proc.hasLock()) {
       releaseLock(proc, true);
     }
-    try {
-      proc.completionCleanup(this.environment);
-    } catch (Throwable e) {
-      LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e);
-    }
   }
 
   private void rootProcedureCleanup(Procedure<Env> proc) {
@@ -731,7 +709,7 @@
     protected long keepAliveTime = -1;
 
     public WorkerThread(ThreadGroup threadGroup) {
-      this(threadGroup, "ProcExecWorker-");
+      this(threadGroup, "ProcedureCoreWorker-");
     }
 
     public WorkerThread(ThreadGroup threadGroup, String prefix) {
@@ -751,26 +729,28 @@
         while (isRunning() && keepAlive(lastUpdated)) {
           Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
           if (procedure == null) {
+            Thread.sleep(1000);
             continue;
           }
           this.activeProcedure.set(procedure);
-          int activeCount = activeExecutorCount.incrementAndGet();
+          activeExecutorCount.incrementAndGet();
           startTime.set(System.currentTimeMillis());
           executeProcedure(procedure);
-          activeCount = activeExecutorCount.decrementAndGet();
-          LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
+          activeExecutorCount.decrementAndGet();
+          LOG.trace(
+              "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
           this.activeProcedure.set(null);
           lastUpdated = System.currentTimeMillis();
           startTime.set(lastUpdated);
         }
 
-      } catch (Throwable throwable) {
+      } catch (Exception e) {
         if (this.activeProcedure.get() != null) {
           LOG.warn(
-              "Procedure Worker {} terminated {}",
+              "Exception happened when worker {} execute procedure {}",
               getName(),
               this.activeProcedure.get(),
-              throwable);
+              e);
         }
       } finally {
         LOG.info("Procedure worker {} terminated.", getName());
@@ -796,12 +776,12 @@
     }
   }
 
-  // A worker thread which can be added when core workers are stuck. Will timeout after
-  // keepAliveTime if there is no procedure to run.
-  private final class KeepAliveWorkerThread extends WorkerThread {
+  // A temporary worker thread will be launched when too many core workers are stuck.
+  // They will timeout after keepAliveTime if there is no procedure to run.
+  private final class TemporaryWorkerThread extends WorkerThread {
 
-    public KeepAliveWorkerThread(ThreadGroup group) {
-      super(group, "KAProcExecWorker-");
+    public TemporaryWorkerThread(ThreadGroup group) {
+      super(group, "ProcedureTemporaryWorker-");
       this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
     }
 
@@ -823,22 +803,25 @@
       updateTimestamp();
     }
 
-    private int checkForStuckWorkers() {
+    private int calculateRunningAndStuckWorkers() {
       // Check if any of the worker is stuck
-      int stuckCount = 0;
+      int runningCount = 0, stuckCount = 0;
       for (WorkerThread worker : workerThreads) {
-        if (worker.activeProcedure.get() == null
-            || worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
+        if (worker.activeProcedure.get() == null) {
           continue;
         }
-
+        runningCount++;
         // WARN the worker is stuck
-        stuckCount++;
-        LOG.warn(
-            "Worker stuck {}({}), run time {} ms",
-            worker,
-            worker.activeProcedure.get().getProcType(),
-            worker.getCurrentRunTime());
+        if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
+          stuckCount++;
+          LOG.warn(
+              "Worker stuck {}({}), run time {} ms",
+              worker,
+              worker.activeProcedure.get().getProcType(),
+              worker.getCurrentRunTime());
+        }
+        LOG.info(
+            "Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount);
       }
       return stuckCount;
     }
@@ -854,7 +837,7 @@
       // Let's add new worker thread more aggressively, as they will timeout finally if there is no
       // work to do.
       if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
-        final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
+        final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup);
         workerThreads.add(worker);
         worker.start();
         LOG.debug("Added new worker thread {}", worker);
@@ -863,7 +846,7 @@
 
     @Override
     protected void periodicExecute(Env env) {
-      final int stuckCount = checkForStuckWorkers();
+      final int stuckCount = calculateRunningAndStuckWorkers();
       checkThreadCount(stuckCount);
       updateTimestamp();
     }
@@ -942,28 +925,6 @@
     return pushProcedure(procedure);
   }
 
-  /**
-   * Abort a specified procedure.
-   *
-   * @param procId procedure id
-   * @param force whether abort the running procdure.
-   * @return true if the procedure exists and has received the abort.
-   */
-  public boolean abort(long procId, boolean force) {
-    Procedure<Env> procedure = procedures.get(procId);
-    if (procedure != null) {
-      if (!force && procedure.wasExecuted()) {
-        return false;
-      }
-      return procedure.abort(this.environment);
-    }
-    return false;
-  }
-
-  public boolean abort(long procId) {
-    return abort(procId, true);
-  }
-
   public ProcedureScheduler getScheduler() {
     return scheduler;
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index ed32ca6..1c775c2 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -32,9 +32,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Procedure described by a series of steps.
@@ -51,24 +50,21 @@
 
   private static final int EOF_STATE = Integer.MIN_VALUE;
 
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-
   private Flow stateFlow = Flow.HAS_MORE_STATE;
-  protected int stateCount = 0;
-  private int[] states = null;
+  private final LinkedList<Integer> states = new LinkedList<>();
 
-  private List<Procedure<Env>> subProcList = null;
+  private final List<Procedure<?>> subProcList = new ArrayList<>();
 
-  /** Cycles on same state. Good for figuring if we are stuck. */
+  /** Cycles on the same state. Good for figuring if we are stuck. */
   private int cycles = 0;
 
-  /** Ordinal of the previous state. So we can tell if we are progressing or not. */
-  private int previousState;
+  private static final int NO_NEXT_STATE = -1;
+  private int nextState = NO_NEXT_STATE;
 
   /** Mark whether this procedure is called by a pipe forwarded request. */
   protected boolean isGeneratedByPipe;
 
-  private boolean stateDeserialized = false;
+  private boolean isStateDeserialized = false;
 
   protected StateMachineProcedure() {
     this(false);
@@ -136,20 +132,6 @@
    */
   protected void setNextState(final TState state) {
     setNextState(getStateId(state));
-    failIfAborted();
-  }
-
-  /**
-   * By default, the executor will try ro run all the steps of the procedure start to finish. Return
-   * true to make the executor yield between execution steps to give other procedures time to run
-   * their steps.
-   *
-   * @param state the state we are going to execute next.
-   * @return Return true if the executor should yield before the execution of the specified step.
-   *     Defaults to return false.
-   */
-  protected boolean isYieldBeforeExecuteFromState(Env env, TState state) {
-    return false;
   }
 
   /**
@@ -158,105 +140,84 @@
    * @param childProcedure the child procedure
    */
   protected void addChildProcedure(Procedure<Env> childProcedure) {
-    if (childProcedure == null) {
-      return;
-    }
-    if (subProcList == null) {
-      subProcList = new ArrayList<>();
-    }
     subProcList.add(childProcedure);
   }
 
   @Override
-  protected Procedure[] execute(final Env env)
+  protected Procedure<Env>[] execute(final Env env)
       throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
     updateTimestamp();
     try {
-      failIfAborted();
-
-      if (!hasMoreState() || isFailed()) {
+      if (noMoreState() || isFailed()) {
         return null;
       }
 
       TState state = getCurrentState();
-      if (stateCount == 0) {
+      if (states.isEmpty()) {
         setNextState(getStateId(state));
       }
 
-      LOG.debug("{} {}; cycles={}", state, this, cycles);
-      // Keep running count of cycles
-      if (getStateId(state) != this.previousState) {
-        this.previousState = getStateId(state);
-        this.cycles = 0;
-      } else {
-        this.cycles++;
-      }
-
       LOG.trace("{}", this);
       stateFlow = executeFromState(env, state);
+      addNextStateAndCalculateCycles();
       setStateDeserialized(false);
-      if (!hasMoreState()) {
-        setNextState(EOF_STATE);
-      }
 
-      if (subProcList != null && !subProcList.isEmpty()) {
-        Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
-        subProcList = null;
+      if (!subProcList.isEmpty()) {
+        Procedure<Env>[] subProcedures = subProcList.toArray(new Procedure[0]);
+        subProcList.clear();
         return subProcedures;
       }
-      return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
+      return (isWaiting() || isFailed() || noMoreState()) ? null : new Procedure[] {this};
     } finally {
       updateTimestamp();
     }
   }
 
+  private void addNextStateAndCalculateCycles() {
+    int stateToBeAdded = EOF_STATE;
+    if (Flow.HAS_MORE_STATE == stateFlow) {
+      if (nextState == NO_NEXT_STATE) {
+        LOG.error(
+            "StateMachineProcedure pid={} not set next state, but return HAS_MORE_STATE",
+            getProcId());
+      } else {
+        stateToBeAdded = nextState;
+      }
+    } else {
+      if (nextState != NO_NEXT_STATE) {
+        LOG.warn(
+            "StateMachineProcedure pid={} set next state to {}, but return NO_MORE_STATE",
+            getProcId(),
+            nextState);
+      }
+    }
+    if (getStateId(getCurrentState()) == stateToBeAdded) {
+      cycles++;
+    } else {
+      cycles = 0;
+    }
+    states.add(stateToBeAdded);
+    nextState = NO_NEXT_STATE;
+  }
+
   @Override
   protected void rollback(final Env env)
       throws IOException, InterruptedException, ProcedureException {
     if (isEofState()) {
-      stateCount--;
+      states.removeLast();
     }
 
     try {
       updateTimestamp();
       rollbackState(env, getCurrentState());
     } finally {
-      stateCount--;
+      states.removeLast();
       updateTimestamp();
     }
   }
 
   protected boolean isEofState() {
-    return stateCount > 0 && states[stateCount - 1] == EOF_STATE;
-  }
-
-  @Override
-  protected boolean abort(final Env env) {
-    LOG.debug("Abort requested for {}", this);
-    if (!hasMoreState()) {
-      LOG.warn("Ignore abort request on {} because it has already been finished", this);
-      return false;
-    }
-    if (!isRollbackSupported(getCurrentState())) {
-      LOG.warn("Ignore abort request on {} because it does not support rollback", this);
-      return false;
-    }
-    aborted.set(true);
-    return true;
-  }
-
-  /**
-   * If procedure has more states then abort it otherwise procedure is finished and abort can be
-   * ignored.
-   */
-  protected final void failIfAborted() {
-    if (aborted.get()) {
-      if (hasMoreState()) {
-        setAbortFailure(getClass().getSimpleName(), "abort requested");
-      } else {
-        LOG.warn("Ignoring abort request on state='{}' for {}", getCurrentState(), this);
-      }
-    }
+    return !states.isEmpty() && states.getLast() == EOF_STATE;
   }
 
   /**
@@ -267,50 +228,28 @@
     return false;
   }
 
-  @Override
-  protected boolean isYieldAfterExecution(final Env env) {
-    return isYieldBeforeExecuteFromState(env, getCurrentState());
-  }
-
-  private boolean hasMoreState() {
-    return stateFlow != Flow.NO_MORE_STATE;
+  private boolean noMoreState() {
+    return stateFlow == Flow.NO_MORE_STATE;
   }
 
   @Nullable
   protected TState getCurrentState() {
-    if (stateCount > 0) {
-      if (states[stateCount - 1] == EOF_STATE) {
+    if (!states.isEmpty()) {
+      if (states.getLast() == EOF_STATE) {
         return null;
       }
-      return getState(states[stateCount - 1]);
+      return getState(states.getLast());
     }
     return getInitialState();
   }
 
   /**
-   * This method is used from test code as it cannot be assumed that state transition will happen
-   * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
-   * future.
-   */
-  public int getCurrentStateId() {
-    return getStateId(getCurrentState());
-  }
-
-  /**
    * Set the next state for the procedure.
    *
    * @param stateId the ordinal() of the state enum (or state id)
    */
   private void setNextState(final int stateId) {
-    if (states == null || states.length == stateCount) {
-      int newCapacity = stateCount + 8;
-      if (states != null) {
-        states = Arrays.copyOf(states, newCapacity);
-      } else {
-        states = new int[newCapacity];
-      }
-    }
-    states[stateCount++] = stateId;
+    nextState = stateId;
   }
 
   @Override
@@ -324,26 +263,24 @@
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
-    stream.writeInt(stateCount);
-    for (int i = 0; i < stateCount; ++i) {
-      stream.writeInt(states[i]);
+    stream.writeInt(states.size());
+    for (int state : states) {
+      stream.writeInt(state);
     }
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
-    stateCount = byteBuffer.getInt();
+    int stateCount = byteBuffer.getInt();
+    states.clear();
     if (stateCount > 0) {
-      states = new int[stateCount];
       for (int i = 0; i < stateCount; ++i) {
-        states[i] = byteBuffer.getInt();
+        states.add(byteBuffer.getInt());
       }
       if (isEofState()) {
         stateFlow = Flow.NO_MORE_STATE;
       }
-    } else {
-      states = null;
     }
     this.setStateDeserialized(true);
   }
@@ -355,10 +292,10 @@
    * the code in this stage, which is the purpose of this variable.
    */
   public boolean isStateDeserialized() {
-    return stateDeserialized;
+    return isStateDeserialized;
   }
 
   private void setStateDeserialized(boolean isDeserialized) {
-    this.stateDeserialized = isDeserialized;
+    this.isStateDeserialized = isDeserialized;
   }
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
index 7513d23..d85aba6 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -21,7 +21,6 @@
 
 import org.apache.iotdb.commons.sync.PipeInfo;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 
@@ -58,11 +57,6 @@
   }
 
   @Override
-  protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    return false;
-  }
-
-  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.START_PIPE_PROCEDURE.getTypeCode());
     super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
index 2a86110..d85e405 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
@@ -52,11 +52,6 @@
   }
 
   @Override
-  protected boolean abort(TestProcEnv testProcEnv) {
-    return true;
-  }
-
-  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeInt(TestProcedureFactory.TestProcedureType.INC_PROCEDURE.ordinal());
     super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
index bdaf040..159ee6f 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
@@ -36,9 +36,4 @@
 
   @Override
   protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
-
-  @Override
-  protected boolean abort(TestProcEnv testProcEnv) {
-    return false;
-  }
 }
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index 564b980..9675df1 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -49,11 +49,6 @@
   protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
 
   @Override
-  protected boolean abort(TestProcEnv testProcEnv) {
-    return false;
-  }
-
-  @Override
   protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) {
     if (testProcEnv.getEnvLock().tryLock()) {
       testProcEnv.lockAcquireSeq.append(procName);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
index f3b2abf..26a9a9a 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
@@ -41,11 +41,6 @@
   protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
 
   @Override
-  protected boolean abort(TestProcEnv testProcEnv) {
-    return false;
-  }
-
-  @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeInt(TestProcedureFactory.TestProcedureType.SLEEP_PROCEDURE.ordinal());
     super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
index 0238b6a..1351d21 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
@@ -53,9 +53,4 @@
 
   @Override
   protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
-
-  @Override
-  protected boolean abort(TestProcEnv testProcEnv) {
-    return false;
-  }
 }