Refactor procedure framework (simplified StateMachineProcedure, and some other things) (#14683) (#14741)
* delete KAWorking
* delete ProcedureSuspended
* delete "aborted" and suspended
* refactor states and delete stateCount
* refactor setNextState
* delete previousState
* spotless
* compile
* Revert "delete KAWorking"
This reverts commit 43176fa105dd2536611ee59f29a7a022c84a4356.
* recover KAWorker but rename it to TemporaryWorker
* spotless
* fix npe
* delete useless code
* delete useless code
* necessary log
(cherry picked from commit 41a49e7c1e6db3e88242f0b38f62bb5074fa8810)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
index 817a8e3..4732343 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java
@@ -49,10 +49,5 @@
}
@Override
- protected boolean abort(Env env) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public void deserialize(ByteBuffer byteBuffer) {}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 35fc402..d4f97c1 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -70,20 +70,6 @@
private int[] stackIndexes = null;
- private boolean persist = true;
-
- public boolean needPersistance() {
- return this.persist;
- }
-
- public void resetPersistance() {
- this.persist = true;
- }
-
- public final void skipPersistance() {
- this.persist = false;
- }
-
public final boolean hasLock() {
return locked;
}
@@ -119,17 +105,6 @@
protected abstract void rollback(Env env)
throws IOException, InterruptedException, ProcedureException;
- /**
- * The abort() call is asynchronous and each procedure must decide how to deal with it, if they
- * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
- * abort() method and then the execute() will check if the abort flag is set or not. abort() may
- * be called multiple times from the client, so the implementation must be idempotent.
- *
- * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
- * procedure implementor abort.
- */
- protected abstract boolean abort(Env env);
-
public void serialize(DataOutputStream stream) throws IOException {
// procid
stream.writeLong(this.procId);
@@ -263,28 +238,6 @@
return clazz;
}
- public static Procedure<?> newInstance(ByteBuffer byteBuffer) {
- Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
- Procedure<?> procedure;
- try {
- procedure = (Procedure<?>) procedureClass.newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new RuntimeException("Instantiation failed", e);
- }
- return procedure;
- }
-
- /**
- * The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
- * will call us to determine whether we need to wait for initialization, second, it will call
- * {@link #acquireLock(Object)} to actually handle the lock for this procedure.
- *
- * @return true means we need to wait until the environment has been initialized, otherwise true.
- */
- protected boolean waitInitialized(Env env) {
- return false;
- }
-
/**
* Acquire a lock, user should override it if necessary.
*
@@ -315,34 +268,6 @@
}
/**
- * Called before the procedure is recovered and added into the queue.
- *
- * @param env environment
- */
- protected final void beforeRecover(Env env) {
- // no op
- }
-
- /**
- * Called when the procedure is recovered and added into the queue.
- *
- * @param env environment
- */
- protected final void afterRecover(Env env) {
- // no op
- }
-
- /**
- * Called when the procedure is completed (success or rollback). The procedure may use this method
- * to clean up in-memory states. This operation will not be retried on failure.
- *
- * @param env environment
- */
- protected void completionCleanup(Env env) {
- // no op
- }
-
- /**
* To make executor yield between each execution step to give other procedures a chance to run.
*
* @param env environment
@@ -393,9 +318,6 @@
* @return ProcedureLockState
*/
public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
- if (waitInitialized(env)) {
- return ProcedureLockState.LOCK_EVENT_WAIT;
- }
if (lockedWhenLoading) {
lockedWhenLoading = false;
locked = true;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index d1bbb32..84e2b7f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
@@ -186,24 +185,14 @@
// executing, we need to set its state to RUNNABLE.
procedure.setState(ProcedureState.RUNNABLE);
runnableList.add(procedure);
- } else {
- procedure.afterRecover(environment);
}
});
restoreLocks();
- waitingTimeoutList.forEach(
- procedure -> {
- procedure.afterRecover(environment);
- timeoutExecutor.add(procedure);
- });
+ waitingTimeoutList.forEach(timeoutExecutor::add);
failedList.forEach(scheduler::addBack);
- runnableList.forEach(
- procedure -> {
- procedure.afterRecover(environment);
- scheduler.addBack(procedure);
- });
+ runnableList.forEach(scheduler::addBack);
scheduler.signalAll();
}
@@ -418,21 +407,16 @@
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
return;
}
- boolean suspended = false;
boolean reExecute;
Procedure<Env>[] subprocs = null;
do {
reExecute = false;
- proc.resetPersistance();
try {
subprocs = proc.doExecute(this.environment);
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
- } catch (ProcedureSuspendedException e) {
- LOG.debug("Suspend {}", proc);
- suspended = true;
} catch (ProcedureYieldException e) {
LOG.debug("Yield {}", proc);
yieldProcedure(proc);
@@ -455,22 +439,20 @@
}
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.info("Added into timeoutExecutor {}", proc);
- } else if (!suspended) {
+ } else {
proc.setState(ProcedureState.SUCCESS);
}
}
// add procedure into rollback stack.
rootProcStack.addRollbackStep(proc);
- if (proc.needPersistance()) {
- updateStoreOnExecution(rootProcStack, proc, subprocs);
- }
+ updateStoreOnExecution(rootProcStack, proc, subprocs);
if (!store.isRunning()) {
return;
}
- if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
+ if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) {
yieldProcedure(proc);
return;
}
@@ -481,7 +463,7 @@
}
releaseLock(proc, false);
- if (!suspended && proc.isFinished() && proc.hasParent()) {
+ if (proc.isFinished() && proc.hasParent()) {
countDownChildren(rootProcStack, proc);
}
}
@@ -518,6 +500,7 @@
subproc.updateMetricsOnSubmit(getEnvironment());
procedures.put(subproc.getProcId(), subproc);
scheduler.addFront(subproc);
+ LOG.info("Sub-Procedure pid={} has been submitted", subproc.getProcId());
}
}
@@ -693,11 +676,6 @@
if (proc.hasLock()) {
releaseLock(proc, true);
}
- try {
- proc.completionCleanup(this.environment);
- } catch (Throwable e) {
- LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e);
- }
}
private void rootProcedureCleanup(Procedure<Env> proc) {
@@ -731,7 +709,7 @@
protected long keepAliveTime = -1;
public WorkerThread(ThreadGroup threadGroup) {
- this(threadGroup, "ProcExecWorker-");
+ this(threadGroup, "ProcedureCoreWorker-");
}
public WorkerThread(ThreadGroup threadGroup, String prefix) {
@@ -751,26 +729,28 @@
while (isRunning() && keepAlive(lastUpdated)) {
Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) {
+ Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
- int activeCount = activeExecutorCount.incrementAndGet();
+ activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
- activeCount = activeExecutorCount.decrementAndGet();
- LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
+ activeExecutorCount.decrementAndGet();
+ LOG.trace(
+ "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}
- } catch (Throwable throwable) {
+ } catch (Exception e) {
if (this.activeProcedure.get() != null) {
LOG.warn(
- "Procedure Worker {} terminated {}",
+ "Exception happened when worker {} execute procedure {}",
getName(),
this.activeProcedure.get(),
- throwable);
+ e);
}
} finally {
LOG.info("Procedure worker {} terminated.", getName());
@@ -796,12 +776,12 @@
}
}
- // A worker thread which can be added when core workers are stuck. Will timeout after
- // keepAliveTime if there is no procedure to run.
- private final class KeepAliveWorkerThread extends WorkerThread {
+ // A temporary worker thread will be launched when too many core workers are stuck.
+ // They will timeout after keepAliveTime if there is no procedure to run.
+ private final class TemporaryWorkerThread extends WorkerThread {
- public KeepAliveWorkerThread(ThreadGroup group) {
- super(group, "KAProcExecWorker-");
+ public TemporaryWorkerThread(ThreadGroup group) {
+ super(group, "ProcedureTemporaryWorker-");
this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
}
@@ -823,22 +803,25 @@
updateTimestamp();
}
- private int checkForStuckWorkers() {
+ private int calculateRunningAndStuckWorkers() {
// Check if any of the worker is stuck
- int stuckCount = 0;
+ int runningCount = 0, stuckCount = 0;
for (WorkerThread worker : workerThreads) {
- if (worker.activeProcedure.get() == null
- || worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
+ if (worker.activeProcedure.get() == null) {
continue;
}
-
+ runningCount++;
// WARN the worker is stuck
- stuckCount++;
- LOG.warn(
- "Worker stuck {}({}), run time {} ms",
- worker,
- worker.activeProcedure.get().getProcType(),
- worker.getCurrentRunTime());
+ if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
+ stuckCount++;
+ LOG.warn(
+ "Worker stuck {}({}), run time {} ms",
+ worker,
+ worker.activeProcedure.get().getProcType(),
+ worker.getCurrentRunTime());
+ }
+ LOG.info(
+ "Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount);
}
return stuckCount;
}
@@ -854,7 +837,7 @@
// Let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
- final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
+ final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread {}", worker);
@@ -863,7 +846,7 @@
@Override
protected void periodicExecute(Env env) {
- final int stuckCount = checkForStuckWorkers();
+ final int stuckCount = calculateRunningAndStuckWorkers();
checkThreadCount(stuckCount);
updateTimestamp();
}
@@ -942,28 +925,6 @@
return pushProcedure(procedure);
}
- /**
- * Abort a specified procedure.
- *
- * @param procId procedure id
- * @param force whether abort the running procdure.
- * @return true if the procedure exists and has received the abort.
- */
- public boolean abort(long procId, boolean force) {
- Procedure<Env> procedure = procedures.get(procId);
- if (procedure != null) {
- if (!force && procedure.wasExecuted()) {
- return false;
- }
- return procedure.abort(this.environment);
- }
- return false;
- }
-
- public boolean abort(long procId) {
- return abort(procId, true);
- }
-
public ProcedureScheduler getScheduler() {
return scheduler;
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
index ed32ca6..1c775c2 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java
@@ -32,9 +32,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Procedure described by a series of steps.
@@ -51,24 +50,21 @@
private static final int EOF_STATE = Integer.MIN_VALUE;
- private final AtomicBoolean aborted = new AtomicBoolean(false);
-
private Flow stateFlow = Flow.HAS_MORE_STATE;
- protected int stateCount = 0;
- private int[] states = null;
+ private final LinkedList<Integer> states = new LinkedList<>();
- private List<Procedure<Env>> subProcList = null;
+ private final List<Procedure<?>> subProcList = new ArrayList<>();
- /** Cycles on same state. Good for figuring if we are stuck. */
+ /** Cycles on the same state. Good for figuring if we are stuck. */
private int cycles = 0;
- /** Ordinal of the previous state. So we can tell if we are progressing or not. */
- private int previousState;
+ private static final int NO_NEXT_STATE = -1;
+ private int nextState = NO_NEXT_STATE;
/** Mark whether this procedure is called by a pipe forwarded request. */
protected boolean isGeneratedByPipe;
- private boolean stateDeserialized = false;
+ private boolean isStateDeserialized = false;
protected StateMachineProcedure() {
this(false);
@@ -136,20 +132,6 @@
*/
protected void setNextState(final TState state) {
setNextState(getStateId(state));
- failIfAborted();
- }
-
- /**
- * By default, the executor will try ro run all the steps of the procedure start to finish. Return
- * true to make the executor yield between execution steps to give other procedures time to run
- * their steps.
- *
- * @param state the state we are going to execute next.
- * @return Return true if the executor should yield before the execution of the specified step.
- * Defaults to return false.
- */
- protected boolean isYieldBeforeExecuteFromState(Env env, TState state) {
- return false;
}
/**
@@ -158,105 +140,84 @@
* @param childProcedure the child procedure
*/
protected void addChildProcedure(Procedure<Env> childProcedure) {
- if (childProcedure == null) {
- return;
- }
- if (subProcList == null) {
- subProcList = new ArrayList<>();
- }
subProcList.add(childProcedure);
}
@Override
- protected Procedure[] execute(final Env env)
+ protected Procedure<Env>[] execute(final Env env)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
updateTimestamp();
try {
- failIfAborted();
-
- if (!hasMoreState() || isFailed()) {
+ if (noMoreState() || isFailed()) {
return null;
}
TState state = getCurrentState();
- if (stateCount == 0) {
+ if (states.isEmpty()) {
setNextState(getStateId(state));
}
- LOG.debug("{} {}; cycles={}", state, this, cycles);
- // Keep running count of cycles
- if (getStateId(state) != this.previousState) {
- this.previousState = getStateId(state);
- this.cycles = 0;
- } else {
- this.cycles++;
- }
-
LOG.trace("{}", this);
stateFlow = executeFromState(env, state);
+ addNextStateAndCalculateCycles();
setStateDeserialized(false);
- if (!hasMoreState()) {
- setNextState(EOF_STATE);
- }
- if (subProcList != null && !subProcList.isEmpty()) {
- Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
- subProcList = null;
+ if (!subProcList.isEmpty()) {
+ Procedure<Env>[] subProcedures = subProcList.toArray(new Procedure[0]);
+ subProcList.clear();
return subProcedures;
}
- return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this};
+ return (isWaiting() || isFailed() || noMoreState()) ? null : new Procedure[] {this};
} finally {
updateTimestamp();
}
}
+ private void addNextStateAndCalculateCycles() {
+ int stateToBeAdded = EOF_STATE;
+ if (Flow.HAS_MORE_STATE == stateFlow) {
+ if (nextState == NO_NEXT_STATE) {
+ LOG.error(
+ "StateMachineProcedure pid={} not set next state, but return HAS_MORE_STATE",
+ getProcId());
+ } else {
+ stateToBeAdded = nextState;
+ }
+ } else {
+ if (nextState != NO_NEXT_STATE) {
+ LOG.warn(
+ "StateMachineProcedure pid={} set next state to {}, but return NO_MORE_STATE",
+ getProcId(),
+ nextState);
+ }
+ }
+ if (getStateId(getCurrentState()) == stateToBeAdded) {
+ cycles++;
+ } else {
+ cycles = 0;
+ }
+ states.add(stateToBeAdded);
+ nextState = NO_NEXT_STATE;
+ }
+
@Override
protected void rollback(final Env env)
throws IOException, InterruptedException, ProcedureException {
if (isEofState()) {
- stateCount--;
+ states.removeLast();
}
try {
updateTimestamp();
rollbackState(env, getCurrentState());
} finally {
- stateCount--;
+ states.removeLast();
updateTimestamp();
}
}
protected boolean isEofState() {
- return stateCount > 0 && states[stateCount - 1] == EOF_STATE;
- }
-
- @Override
- protected boolean abort(final Env env) {
- LOG.debug("Abort requested for {}", this);
- if (!hasMoreState()) {
- LOG.warn("Ignore abort request on {} because it has already been finished", this);
- return false;
- }
- if (!isRollbackSupported(getCurrentState())) {
- LOG.warn("Ignore abort request on {} because it does not support rollback", this);
- return false;
- }
- aborted.set(true);
- return true;
- }
-
- /**
- * If procedure has more states then abort it otherwise procedure is finished and abort can be
- * ignored.
- */
- protected final void failIfAborted() {
- if (aborted.get()) {
- if (hasMoreState()) {
- setAbortFailure(getClass().getSimpleName(), "abort requested");
- } else {
- LOG.warn("Ignoring abort request on state='{}' for {}", getCurrentState(), this);
- }
- }
+ return !states.isEmpty() && states.getLast() == EOF_STATE;
}
/**
@@ -267,50 +228,28 @@
return false;
}
- @Override
- protected boolean isYieldAfterExecution(final Env env) {
- return isYieldBeforeExecuteFromState(env, getCurrentState());
- }
-
- private boolean hasMoreState() {
- return stateFlow != Flow.NO_MORE_STATE;
+ private boolean noMoreState() {
+ return stateFlow == Flow.NO_MORE_STATE;
}
@Nullable
protected TState getCurrentState() {
- if (stateCount > 0) {
- if (states[stateCount - 1] == EOF_STATE) {
+ if (!states.isEmpty()) {
+ if (states.getLast() == EOF_STATE) {
return null;
}
- return getState(states[stateCount - 1]);
+ return getState(states.getLast());
}
return getInitialState();
}
/**
- * This method is used from test code as it cannot be assumed that state transition will happen
- * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in
- * future.
- */
- public int getCurrentStateId() {
- return getStateId(getCurrentState());
- }
-
- /**
* Set the next state for the procedure.
*
* @param stateId the ordinal() of the state enum (or state id)
*/
private void setNextState(final int stateId) {
- if (states == null || states.length == stateCount) {
- int newCapacity = stateCount + 8;
- if (states != null) {
- states = Arrays.copyOf(states, newCapacity);
- } else {
- states = new int[newCapacity];
- }
- }
- states[stateCount++] = stateId;
+ nextState = stateId;
}
@Override
@@ -324,26 +263,24 @@
@Override
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
- stream.writeInt(stateCount);
- for (int i = 0; i < stateCount; ++i) {
- stream.writeInt(states[i]);
+ stream.writeInt(states.size());
+ for (int state : states) {
+ stream.writeInt(state);
}
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
- stateCount = byteBuffer.getInt();
+ int stateCount = byteBuffer.getInt();
+ states.clear();
if (stateCount > 0) {
- states = new int[stateCount];
for (int i = 0; i < stateCount; ++i) {
- states[i] = byteBuffer.getInt();
+ states.add(byteBuffer.getInt());
}
if (isEofState()) {
stateFlow = Flow.NO_MORE_STATE;
}
- } else {
- states = null;
}
this.setStateDeserialized(true);
}
@@ -355,10 +292,10 @@
* the code in this stage, which is the purpose of this variable.
*/
public boolean isStateDeserialized() {
- return stateDeserialized;
+ return isStateDeserialized;
}
private void setStateDeserialized(boolean isDeserialized) {
- this.stateDeserialized = isDeserialized;
+ this.isStateDeserialized = isDeserialized;
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
index 7513d23..d85aba6 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java
@@ -21,7 +21,6 @@
import org.apache.iotdb.commons.sync.PipeInfo;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
@@ -58,11 +57,6 @@
}
@Override
- protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) {
- return false;
- }
-
- @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(ProcedureType.START_PIPE_PROCEDURE.getTypeCode());
super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
index 2a86110..d85e405 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java
@@ -52,11 +52,6 @@
}
@Override
- protected boolean abort(TestProcEnv testProcEnv) {
- return true;
- }
-
- @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeInt(TestProcedureFactory.TestProcedureType.INC_PROCEDURE.ordinal());
super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
index bdaf040..159ee6f 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java
@@ -36,9 +36,4 @@
@Override
protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
-
- @Override
- protected boolean abort(TestProcEnv testProcEnv) {
- return false;
- }
}
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
index 564b980..9675df1 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java
@@ -49,11 +49,6 @@
protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
@Override
- protected boolean abort(TestProcEnv testProcEnv) {
- return false;
- }
-
- @Override
protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) {
if (testProcEnv.getEnvLock().tryLock()) {
testProcEnv.lockAcquireSeq.append(procName);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
index f3b2abf..26a9a9a 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java
@@ -41,11 +41,6 @@
protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
@Override
- protected boolean abort(TestProcEnv testProcEnv) {
- return false;
- }
-
- @Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeInt(TestProcedureFactory.TestProcedureType.SLEEP_PROCEDURE.ordinal());
super.serialize(stream);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
index 0238b6a..1351d21 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java
@@ -53,9 +53,4 @@
@Override
protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {}
-
- @Override
- protected boolean abort(TestProcEnv testProcEnv) {
- return false;
- }
}