Revert "Fix CN pipe procedures restore dead lock (#16324)"

This reverts commit b422e9a5c59391a2aa17234c3cf5a230bbce9830.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index aaafcda..1df5ac0 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@
 
   private boolean autoRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
       return false;
@@ -160,7 +160,7 @@
 
   private boolean handleSuccessfulRestartWithLock() {
     final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-        configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+        configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("Failed to acquire pipe lock for handling successful restart.");
       return false;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 8e13a046..ace07f5 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@
         .submit(
             () -> {
               final AtomicReference<PipeTaskInfo> pipeTaskInfo =
-                  configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+                  configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
               if (pipeTaskInfo == null) {
                 LOGGER.warn(
                     "Failed to acquire lock when parseHeartbeat from node (id={}).", nodeId);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 25f8e0c..4aaf3ab 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,7 +37,6 @@
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,11 +68,10 @@
    * @return the pipe task info holder, which can be used to get the pipe task info. The holder is
    *     null if the lock is not acquired.
    */
-  public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
-    long lockSeqId = pipeTaskCoordinatorLock.tryLock();
-    if (lockSeqId != -1) {
+  public AtomicReference<PipeTaskInfo> tryLock() {
+    if (pipeTaskCoordinatorLock.tryLock()) {
       pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-      return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+      return pipeTaskInfoHolder;
     }
 
     return null;
@@ -85,10 +83,10 @@
    * @return the {@link PipeTaskInfo} holder, which can be used to get the {@link PipeTaskInfo}.
    *     Wait until lock is acquired
    */
-  public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
-    long lockSeqId = pipeTaskCoordinatorLock.lock();
+  public AtomicReference<PipeTaskInfo> lock() {
+    pipeTaskCoordinatorLock.lock();
     pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
-    return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+    return pipeTaskInfoHolder;
   }
 
   /**
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0caf620..e57add9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,9 +37,8 @@
 
   private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
   private final AtomicLong idGenerator = new AtomicLong(0);
-  private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
 
-  public long lock() {
+  public void lock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -51,17 +50,15 @@
           "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
           id,
           Thread.currentThread().getName());
-      return lockSeqIdGenerator.incrementAndGet();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
           Thread.currentThread().getName());
-      return -1;
     }
   }
 
-  public long tryLock() {
+  public boolean tryLock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.debug(
@@ -73,20 +70,20 @@
             "PipeTaskCoordinator lock (id: {}) acquired by thread {}",
             id,
             Thread.currentThread().getName());
-        return lockSeqIdGenerator.incrementAndGet();
+        return true;
       } else {
         LOGGER.info(
             "PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout",
             id,
             Thread.currentThread().getName());
-        return -1;
+        return false;
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       LOGGER.error(
           "Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
           Thread.currentThread().getName());
-      return -1;
+      return false;
     }
   }
 
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index d90e1bb..b52f958 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@
   /////////////////////////////// Lock ///////////////////////////////
 
   public AtomicReference<SubscriptionInfo> tryLock() {
-    if (coordinatorLock.tryLock() != -1) {
+    if (coordinatorLock.tryLock()) {
       subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
       return subscriptionInfoHolder;
     }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f0f28ec..0d83685 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -38,12 +37,10 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +49,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
 
@@ -120,41 +116,9 @@
     recover();
   }
 
-  /***
-   * Filter out pipe procedures that do not need to re-acquire lock and re-execute when there are multiple locked pipe procedures during restore.
-   * @return non pipe procedures and one pipe procedure with max lock seq id (if there is.)
-   */
-  private List<Procedure<Env>> filteredProcedureList(final List<Procedure<Env>> procedures) {
-    List<Procedure<Env>> nonPipeOrLockedProcedures =
-        procedures.stream()
-            .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) || !p.isLockedWhenLoading())
-            .collect(Collectors.toList());
-
-    List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
-        procedures.stream()
-            .filter(p -> p instanceof AbstractOperatePipeProcedureV2 && p.isLockedWhenLoading())
-            .map(AbstractOperatePipeProcedureV2.class::cast)
-            .collect(Collectors.toList());
-    Optional<Procedure<Env>> maxPipeProcedure =
-        lockedPipeProcedures.stream()
-            .max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
-            .map(p -> (Procedure<Env>) p);
-
-    if (lockedPipeProcedures.size() > 1) {
-      LOG.warn(
-          "[Procedure restore]Detected multiple locked pipe procedures in procedure executor {}, only keep last one {}",
-          lockedPipeProcedures,
-          maxPipeProcedure.get());
-    }
-
-    maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
-    return nonPipeOrLockedProcedures;
-  }
-
   private void recover() {
     // 1.Build rollback stack
-    List<Procedure<Env>> procedureList =
-        filteredProcedureList(getProcedureListFromDifferentVersion());
+    List<Procedure<Env>> procedureList = getProcedureListFromDifferentVersion();
     // Load procedure wal file
     for (Procedure<Env> proc : procedureList) {
       if (proc.isFinished()) {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0dde52a..25466d3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,7 +37,6 @@
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,17 +94,17 @@
   // This variable should not be serialized into procedure store,
   // putting it here is just for convenience
   protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
-  protected long lockSeqId;
 
   private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
       "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
 
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .lock();
   }
 
   @Override
@@ -189,10 +188,6 @@
     }
   }
 
-  public long getLockSeqId() {
-    return lockSeqId;
-  }
-
   protected abstract PipeTaskOperation getOperation();
 
   /**
@@ -617,15 +612,11 @@
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
     ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
-    ReadWriteIOUtils.write(lockSeqId, stream);
   }
 
   @Override
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     isRollbackFromOperateOnDataNodesSuccessful = ReadWriteIOUtils.readBool(byteBuffer);
-    if (byteBuffer.remaining() >= Long.BYTES) {
-      lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
-    }
   }
 }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 28c5993..665a378 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@
     final SubscriptionCoordinator subscriptionCoordinator =
         env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
 
-    final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock().left;
+    final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock();
     pipePluginCoordinator.lock();
     SubscriptionInfo subscriptionInfo = subscriptionCoordinator.getSubscriptionInfo();
 
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 6b2fec6..401859f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,7 +31,6 @@
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,14 +64,11 @@
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
   }
 
   @Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 81ed75b..393a8bd 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,7 +39,6 @@
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,14 +73,11 @@
   @Override
   protected AtomicReference<PipeTaskInfo> acquireLockInternal(
       ConfigNodeProcedureEnv configNodeProcedureEnv) {
-    Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
-    lockSeqId = lockRes.right;
-    return lockRes.left;
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
   }
 
   @Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 16391a9..de90951 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,12 +50,7 @@
     LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", getProcId());
 
     pipeTaskInfo =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .lock()
-            .left;
+        configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {