Pipe/Subscription: Update the way SubscriptionCoordinator and PipeTaskCoordinator acquire locks to prevent procedure success without any effect (#12909)

diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 35c47e9..b07203e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -74,6 +74,18 @@
   }
 
   /**
+   * Lock the pipe task coordinator.
+   *
+   * @return the {@link PipeTaskInfo} holder, which can be used to get the {@link PipeTaskInfo}.
+   *     Wait until lock is acquired
+   */
+  public AtomicReference<PipeTaskInfo> lock() {
+    pipeTaskCoordinatorLock.lock();
+    pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
+    return pipeTaskInfoHolder;
+  }
+
+  /**
    * Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
    * which means that the holder will be null after calling this method.
    *
@@ -100,7 +112,7 @@
     return pipeTaskCoordinatorLock.isLocked();
   }
 
-  /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+  /** Caller should ensure that the method is called in the lock {@link #lock()}. */
   public TSStatus createPipe(TCreatePipeReq req) {
     final TSStatus status = configManager.getProcedureManager().createPipe(req);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -109,7 +121,7 @@
     return status;
   }
 
-  /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+  /** Caller should ensure that the method is called in the lock {@link #lock()}. */
   public TSStatus alterPipe(TAlterPipeReq req) {
     final TSStatus status = configManager.getProcedureManager().alterPipe(req);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -118,7 +130,7 @@
     return status;
   }
 
-  /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+  /** Caller should ensure that the method is called in the lock {@link #lock()}. */
   public TSStatus startPipe(String pipeName) {
     final TSStatus status = configManager.getProcedureManager().startPipe(pipeName);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -127,7 +139,7 @@
     return status;
   }
 
-  /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+  /** Caller should ensure that the method is called in the lock {@link #lock()}. */
   public TSStatus stopPipe(String pipeName) {
     final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName);
     final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName);
@@ -147,7 +159,7 @@
     return status;
   }
 
-  /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+  /** Caller should ensure that the method is called in the lock {@link #lock()}. */
   public TSStatus dropPipe(String pipeName) {
     final boolean isPipeExistedBeforeDrop = pipeTaskInfo.isPipeExisted(pipeName);
     final TSStatus status = configManager.getProcedureManager().dropPipe(pipeName);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index a1a23a1..535cf02 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -38,7 +38,7 @@
   private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
   private final AtomicLong idGenerator = new AtomicLong(0);
 
-  void lock() {
+  public void lock() {
     try {
       final long id = idGenerator.incrementAndGet();
       LOGGER.info(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 92383f3..2d72f3c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -80,6 +80,18 @@
     return null;
   }
 
+  /**
+   * Lock the {@link SubscriptionInfo} coordinator.
+   *
+   * @return the {@link SubscriptionInfo} holder, which can be used to get the {@link
+   *     SubscriptionInfo}. Wait until lock is acquired
+   */
+  public AtomicReference<SubscriptionInfo> lock() {
+    coordinatorLock.lock();
+    subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
+    return subscriptionInfoHolder;
+  }
+
   public boolean unlock() {
     if (subscriptionInfoHolder != null) {
       subscriptionInfoHolder.set(null);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 477a99f..8decdd0 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -90,15 +90,19 @@
   private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
       "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
 
+  protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .lock();
+  }
+
   @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
-    pipeTaskInfo =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
+    pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index ed49253..2502a56 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -113,17 +113,7 @@
     final PipePluginCoordinator pipePluginCoordinator =
         env.getConfigManager().getPipeManager().getPipePluginCoordinator();
 
-    final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.tryLock();
-    if (pipeTaskInfo == null) {
-      String exceptionMessage =
-          String.format(
-              "ProcedureId %d failed to acquire pipe lock due to high competition with other pipe operations. "
-                  + "The PipeTaskInfo is frequently accessed by other operations.",
-              getProcId());
-      LOGGER.warn(exceptionMessage);
-      setFailure(new ProcedureException(exceptionMessage));
-      return Flow.NO_MORE_STATE;
-    }
+    final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock();
     pipePluginCoordinator.lock();
 
     try {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index d777e34..0919894 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -22,6 +22,7 @@
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -40,6 +41,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV2 {
 
@@ -60,6 +62,16 @@
   }
 
   @Override
+  protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
+  }
+
+  @Override
   protected PipeTaskOperation getOperation() {
     return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index c275958..8b1991a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
 
@@ -57,6 +59,16 @@
   }
 
   @Override
+  protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getPipeManager()
+        .getPipeTaskCoordinator()
+        .tryLock();
+  }
+
+  @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     // Skip the procedure if the last execution time is within the minimum execution interval.
     // Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index a15382b..7b0ca8a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -57,15 +57,19 @@
 
   protected AtomicReference<SubscriptionInfo> subscriptionInfo;
 
+  protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getSubscriptionManager()
+        .getSubscriptionCoordinator()
+        .lock();
+  }
+
   @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     LOGGER.info("ProcedureId {} try to acquire subscription lock.", getProcId());
-    subscriptionInfo =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getSubscriptionManager()
-            .getSubscriptionCoordinator()
-            .tryLock();
+    subscriptionInfo = acquireLockInternal(configNodeProcedureEnv);
     if (subscriptionInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire subscription lock.", getProcId());
     } else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 5f91511..6e3a538 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
 import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class ConsumerGroupMetaSyncProcedure extends AbstractOperateSubscriptionProcedure {
 
@@ -58,6 +60,16 @@
   }
 
   @Override
+  protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getSubscriptionManager()
+        .getSubscriptionCoordinator()
+        .tryLock();
+  }
+
+  @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     // Skip the procedure if the last execution time is within the minimum execution interval.
     // Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 5026bea..e3d180a8 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -48,11 +48,7 @@
     LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", getProcId());
 
     pipeTaskInfo =
-        configNodeProcedureEnv
-            .getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .tryLock();
+        configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
     if (pipeTaskInfo == null) {
       LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
     } else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 7d8cef7..3d49a76 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
 import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TopicMetaSyncProcedure extends AbstractOperateSubscriptionProcedure {
 
@@ -57,6 +59,16 @@
   }
 
   @Override
+  protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+      ConfigNodeProcedureEnv configNodeProcedureEnv) {
+    return configNodeProcedureEnv
+        .getConfigManager()
+        .getSubscriptionManager()
+        .getSubscriptionCoordinator()
+        .tryLock();
+  }
+
+  @Override
   protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
     // Skip the procedure if the last execution time is within the minimum execution interval.
     // Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
index dedda70..9876256 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -58,7 +58,7 @@
   private void tryAcquireWithRateCheck(final int bytes) {
     while (!loadWriteRateLimiter.tryAcquire(
         bytes,
-        PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+        PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
         TimeUnit.MILLISECONDS)) {
       if (reloadParams()) {
         return;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 751d7f5..225e0d2 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -210,7 +210,7 @@
   private int pipeAsyncConnectorMaxClientNumber = 16;
 
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
-  private int pipeEndPointRateLimiterDropCheckIntervalMs = 1000;
+  private int rateLimiterHotReloadCheckIntervalMs = 1000;
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
   private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
@@ -1083,13 +1083,12 @@
     this.pipeAllSinksRateLimitBytesPerSecond = pipeAllSinksRateLimitBytesPerSecond;
   }
 
-  public int getPipeEndPointRateLimiterDropCheckIntervalMs() {
-    return pipeEndPointRateLimiterDropCheckIntervalMs;
+  public int getRateLimiterHotReloadCheckIntervalMs() {
+    return rateLimiterHotReloadCheckIntervalMs;
   }
 
-  public void setPipeEndPointRateLimiterDropCheckIntervalMs(
-      int pipeEndPointRateLimiterDropCheckIntervalMs) {
-    this.pipeEndPointRateLimiterDropCheckIntervalMs = pipeEndPointRateLimiterDropCheckIntervalMs;
+  public void setRateLimiterHotReloadCheckIntervalMs(int rateLimiterHotReloadCheckIntervalMs) {
+    this.rateLimiterHotReloadCheckIntervalMs = rateLimiterHotReloadCheckIntervalMs;
   }
 
   public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 9ba6018..31275dc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -397,11 +397,11 @@
             properties.getProperty(
                 "pipe_all_sinks_rate_limit_bytes_per_second",
                 String.valueOf(config.getPipeAllSinksRateLimitBytesPerSecond()))));
-    config.setPipeEndPointRateLimiterDropCheckIntervalMs(
+    config.setRateLimiterHotReloadCheckIntervalMs(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_end_point_rate_limiter_drop_check_interval_ms",
-                String.valueOf(config.getPipeEndPointRateLimiterDropCheckIntervalMs()))));
+                "rate_limiter_hot_reload_check_interval_ms",
+                String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
 
     config.setSeperatedPipeHeartbeatEnabled(
         Boolean.parseBoolean(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 28a973d..5c0d272 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -134,8 +134,8 @@
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
 
-  public int getPipeEndPointRateLimiterDropCheckIntervalMs() {
-    return COMMON_CONFIG.getPipeEndPointRateLimiterDropCheckIntervalMs();
+  public int getRateLimiterHotReloadCheckIntervalMs() {
+    return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
   }
 
   public float getPipeLeaderCacheMemoryUsagePercentage() {
@@ -363,8 +363,7 @@
         "PipeAllConnectorsRateLimitBytesPerSecond: {}",
         getPipeAllConnectorsRateLimitBytesPerSecond());
     LOGGER.info(
-        "PipeEndPointRateLimiterDropCheckIntervalMs: {}",
-        getPipeEndPointRateLimiterDropCheckIntervalMs());
+        "RateLimiterHotReloadCheckIntervalMs: {}", getRateLimiterHotReloadCheckIntervalMs());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
index fbabb60..08190ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
@@ -62,7 +62,7 @@
   private void tryAcquireWithRateCheck(final int bytes) {
     while (!rateLimiter.tryAcquire(
         bytes,
-        PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+        PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
         TimeUnit.MILLISECONDS)) {
       if (reloadParams()) {
         return;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
index 46f16a8..f89c5b4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
@@ -79,7 +79,7 @@
   private boolean tryAcquireWithPipeCheck(final RateLimiter rateLimiter, final int bytes) {
     while (!rateLimiter.tryAcquire(
         bytes,
-        PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+        PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
         TimeUnit.MILLISECONDS)) {
       final PipeTaskAgent finalTaskAgent = taskAgent;
       if (Objects.nonNull(finalTaskAgent)