Pipe/Subscription: Update the way SubscriptionCoordinator and PipeTaskCoordinator acquire locks to prevent procedure success without any effect (#12909)
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 35c47e9..b07203e 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -74,6 +74,18 @@
}
/**
+ * Lock the pipe task coordinator.
+ *
+ * @return the {@link PipeTaskInfo} holder, which can be used to get the {@link PipeTaskInfo}.
+ * Wait until lock is acquired
+ */
+ public AtomicReference<PipeTaskInfo> lock() {
+ pipeTaskCoordinatorLock.lock();
+ pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
+ return pipeTaskInfoHolder;
+ }
+
+ /**
* Unlock the pipe task coordinator. Calling this method will clear the pipe task info holder,
* which means that the holder will be null after calling this method.
*
@@ -100,7 +112,7 @@
return pipeTaskCoordinatorLock.isLocked();
}
- /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+ /** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus createPipe(TCreatePipeReq req) {
final TSStatus status = configManager.getProcedureManager().createPipe(req);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -109,7 +121,7 @@
return status;
}
- /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+ /** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus alterPipe(TAlterPipeReq req) {
final TSStatus status = configManager.getProcedureManager().alterPipe(req);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -118,7 +130,7 @@
return status;
}
- /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+ /** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus startPipe(String pipeName) {
final TSStatus status = configManager.getProcedureManager().startPipe(pipeName);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -127,7 +139,7 @@
return status;
}
- /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+ /** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus stopPipe(String pipeName) {
final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName);
final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName);
@@ -147,7 +159,7 @@
return status;
}
- /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
+ /** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus dropPipe(String pipeName) {
final boolean isPipeExistedBeforeDrop = pipeTaskInfo.isPipeExisted(pipeName);
final TSStatus status = configManager.getProcedureManager().dropPipe(pipeName);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index a1a23a1..535cf02 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -38,7 +38,7 @@
private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
private final AtomicLong idGenerator = new AtomicLong(0);
- void lock() {
+ public void lock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.info(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 92383f3..2d72f3c 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -80,6 +80,18 @@
return null;
}
+ /**
+ * Lock the {@link SubscriptionInfo} coordinator.
+ *
+ * @return the {@link SubscriptionInfo} holder, which can be used to get the {@link
+ * SubscriptionInfo}. Wait until lock is acquired
+ */
+ public AtomicReference<SubscriptionInfo> lock() {
+ coordinatorLock.lock();
+ subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
+ return subscriptionInfoHolder;
+ }
+
public boolean unlock() {
if (subscriptionInfoHolder != null) {
subscriptionInfoHolder.set(null);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 477a99f..8decdd0 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -90,15 +90,19 @@
private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
+ protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .lock();
+ }
+
@Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
LOGGER.info("ProcedureId {} try to acquire pipe lock.", getProcId());
- pipeTaskInfo =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
+ pipeTaskInfo = acquireLockInternal(configNodeProcedureEnv);
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index ed49253..2502a56 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -113,17 +113,7 @@
final PipePluginCoordinator pipePluginCoordinator =
env.getConfigManager().getPipeManager().getPipePluginCoordinator();
- final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.tryLock();
- if (pipeTaskInfo == null) {
- String exceptionMessage =
- String.format(
- "ProcedureId %d failed to acquire pipe lock due to high competition with other pipe operations. "
- + "The PipeTaskInfo is frequently accessed by other operations.",
- getProcId());
- LOGGER.warn(exceptionMessage);
- setFailure(new ProcedureException(exceptionMessage));
- return Flow.NO_MORE_STATE;
- }
+ final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock();
pipePluginCoordinator.lock();
try {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index d777e34..0919894 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -40,6 +41,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV2 {
@@ -60,6 +62,16 @@
}
@Override
+ protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
+ }
+
+ @Override
protected PipeTaskOperation getOperation() {
return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index c275958..8b1991a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
@@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class PipeMetaSyncProcedure extends AbstractOperatePipeProcedureV2 {
@@ -57,6 +59,16 @@
}
@Override
+ protected AtomicReference<PipeTaskInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
+ }
+
+ @Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
// Skip the procedure if the last execution time is within the minimum execution interval.
// Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index a15382b..7b0ca8a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -57,15 +57,19 @@
protected AtomicReference<SubscriptionInfo> subscriptionInfo;
+ protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .lock();
+ }
+
@Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
LOGGER.info("ProcedureId {} try to acquire subscription lock.", getProcId());
- subscriptionInfo =
- configNodeProcedureEnv
- .getConfigManager()
- .getSubscriptionManager()
- .getSubscriptionCoordinator()
- .tryLock();
+ subscriptionInfo = acquireLockInternal(configNodeProcedureEnv);
if (subscriptionInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire subscription lock.", getProcId());
} else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 5f91511..6e3a538 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
@@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class ConsumerGroupMetaSyncProcedure extends AbstractOperateSubscriptionProcedure {
@@ -58,6 +60,16 @@
}
@Override
+ protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .tryLock();
+ }
+
+ @Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
// Skip the procedure if the last execution time is within the minimum execution interval.
// Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 5026bea..e3d180a8 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -48,11 +48,7 @@
LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", getProcId());
pipeTaskInfo =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
+ configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 7d8cef7..3d49a76 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.SubscriptionOperation;
@@ -42,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
public class TopicMetaSyncProcedure extends AbstractOperateSubscriptionProcedure {
@@ -57,6 +59,16 @@
}
@Override
+ protected AtomicReference<SubscriptionInfo> acquireLockInternal(
+ ConfigNodeProcedureEnv configNodeProcedureEnv) {
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getSubscriptionManager()
+ .getSubscriptionCoordinator()
+ .tryLock();
+ }
+
+ @Override
protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProcedureEnv) {
// Skip the procedure if the last execution time is within the minimum execution interval.
// Often used to prevent the procedure from being executed too frequently when system reboot.
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
index dedda70..9876256 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -58,7 +58,7 @@
private void tryAcquireWithRateCheck(final int bytes) {
while (!loadWriteRateLimiter.tryAcquire(
bytes,
- PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+ PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
TimeUnit.MILLISECONDS)) {
if (reloadParams()) {
return;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 751d7f5..225e0d2 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -210,7 +210,7 @@
private int pipeAsyncConnectorMaxClientNumber = 16;
private double pipeAllSinksRateLimitBytesPerSecond = -1;
- private int pipeEndPointRateLimiterDropCheckIntervalMs = 1000;
+ private int rateLimiterHotReloadCheckIntervalMs = 1000;
private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
@@ -1083,13 +1083,12 @@
this.pipeAllSinksRateLimitBytesPerSecond = pipeAllSinksRateLimitBytesPerSecond;
}
- public int getPipeEndPointRateLimiterDropCheckIntervalMs() {
- return pipeEndPointRateLimiterDropCheckIntervalMs;
+ public int getRateLimiterHotReloadCheckIntervalMs() {
+ return rateLimiterHotReloadCheckIntervalMs;
}
- public void setPipeEndPointRateLimiterDropCheckIntervalMs(
- int pipeEndPointRateLimiterDropCheckIntervalMs) {
- this.pipeEndPointRateLimiterDropCheckIntervalMs = pipeEndPointRateLimiterDropCheckIntervalMs;
+ public void setRateLimiterHotReloadCheckIntervalMs(int rateLimiterHotReloadCheckIntervalMs) {
+ this.rateLimiterHotReloadCheckIntervalMs = rateLimiterHotReloadCheckIntervalMs;
}
public long getTwoStageAggregateMaxCombinerLiveTimeInMs() {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 9ba6018..31275dc 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -397,11 +397,11 @@
properties.getProperty(
"pipe_all_sinks_rate_limit_bytes_per_second",
String.valueOf(config.getPipeAllSinksRateLimitBytesPerSecond()))));
- config.setPipeEndPointRateLimiterDropCheckIntervalMs(
+ config.setRateLimiterHotReloadCheckIntervalMs(
Integer.parseInt(
properties.getProperty(
- "pipe_end_point_rate_limiter_drop_check_interval_ms",
- String.valueOf(config.getPipeEndPointRateLimiterDropCheckIntervalMs()))));
+ "rate_limiter_hot_reload_check_interval_ms",
+ String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
config.setSeperatedPipeHeartbeatEnabled(
Boolean.parseBoolean(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 28a973d..5c0d272 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -134,8 +134,8 @@
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
- public int getPipeEndPointRateLimiterDropCheckIntervalMs() {
- return COMMON_CONFIG.getPipeEndPointRateLimiterDropCheckIntervalMs();
+ public int getRateLimiterHotReloadCheckIntervalMs() {
+ return COMMON_CONFIG.getRateLimiterHotReloadCheckIntervalMs();
}
public float getPipeLeaderCacheMemoryUsagePercentage() {
@@ -363,8 +363,7 @@
"PipeAllConnectorsRateLimitBytesPerSecond: {}",
getPipeAllConnectorsRateLimitBytesPerSecond());
LOGGER.info(
- "PipeEndPointRateLimiterDropCheckIntervalMs: {}",
- getPipeEndPointRateLimiterDropCheckIntervalMs());
+ "RateLimiterHotReloadCheckIntervalMs: {}", getRateLimiterHotReloadCheckIntervalMs());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled());
LOGGER.info(
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
index fbabb60..08190ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java
@@ -62,7 +62,7 @@
private void tryAcquireWithRateCheck(final int bytes) {
while (!rateLimiter.tryAcquire(
bytes,
- PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+ PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
TimeUnit.MILLISECONDS)) {
if (reloadParams()) {
return;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
index 46f16a8..f89c5b4 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java
@@ -79,7 +79,7 @@
private boolean tryAcquireWithPipeCheck(final RateLimiter rateLimiter, final int bytes) {
while (!rateLimiter.tryAcquire(
bytes,
- PipeConfig.getInstance().getPipeEndPointRateLimiterDropCheckIntervalMs(),
+ PipeConfig.getInstance().getRateLimiterHotReloadCheckIntervalMs(),
TimeUnit.MILLISECONDS)) {
final PipeTaskAgent finalTaskAgent = taskAgent;
if (Objects.nonNull(finalTaskAgent)