[IOTDB-4478] [Ratis] Add RatisConsensus config parameters in confignode.properties
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 321383d..f8b0b4e 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -342,4 +342,22 @@
####################
# max payload size for a single log-sync-RPC from leader to follower
-# ratis_log_appender_buffer_size_max = 4194304
\ No newline at end of file
+# ratis_log_appender_buffer_size_max = 4194304
+
+# trigger a snapshot when ratis_snapshot_trigger_threshold logs are written
+# ratis_snapshot_trigger_threshold = 400000
+
+# allow flushing Raft Log asynchronously
+# ratis_log_unsafe_flush_enable = false
+
+# max capacity of a single Raft Log segment (by default 24MB)
+# ratis_log_segment_size_max = 25165824
+
+# flow control window for ratis grpc log appender
+# ratis_grpc_flow_control_window = 4194304
+
+# min election timeout for leader election
+# ratis_rpc_leader_election_timeout_min_ms = 2000
+
+# max election timeout for leader election
+# ratis_rpc_leader_election_timeout_max_ms = 4000
\ No newline at end of file
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 2859686..ad56aad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -132,7 +132,28 @@
private String readConsistencyLevel = "strong";
/** RatisConsensus protocol, Max size for a single log append request from leader */
- private long RatisConsensusLogAppenderBufferSize = 4 * 1024 * 1024L;
+ private long ratisConsensusLogAppenderBufferSize = 4 * 1024 * 1024L;
+
+ /**
+ * RatisConsensus protocol, trigger a snapshot when ratis_snapshot_trigger_threshold logs are
+ * written
+ */
+ private long ratisSnapshotTriggerThreshold = 400000L;
+
+ /** RatisConsensus protocol, allow flushing Raft Log asynchronously */
+ private boolean ratisLogUnsafeFlushEnable = false;
+
+ /** RatisConsensus protocol, max capacity of a single Raft Log segment */
+ private long ratisLogSegmentSizeMax = 24 * 1024 * 1024L;
+
+ /** RatisConsensus protocol, flow control window for ratis grpc log appender */
+ private long ratisGrpcFlowControlWindow = 4 * 1024 * 1024L;
+
+ /** RatisConsensus protocol, min election timeout for leader election */
+ private long ratisRpcLeaderElectionTimeoutMinMs = 2000L;
+
+ /** RatisConsensus protocol, max election timeout for leader election */
+ private long ratisRpcLeaderElectionTimeoutMaxMs = 4000L;
public ConfigNodeConfig() {
// empty constructor
@@ -416,10 +437,58 @@
}
public long getRatisConsensusLogAppenderBufferSize() {
- return RatisConsensusLogAppenderBufferSize;
+ return ratisConsensusLogAppenderBufferSize;
}
public void setRatisConsensusLogAppenderBufferSize(long ratisConsensusLogAppenderBufferSize) {
- RatisConsensusLogAppenderBufferSize = ratisConsensusLogAppenderBufferSize;
+ this.ratisConsensusLogAppenderBufferSize = ratisConsensusLogAppenderBufferSize;
+ }
+
+ public long getRatisSnapshotTriggerThreshold() {
+ return ratisSnapshotTriggerThreshold;
+ }
+
+ public void setRatisSnapshotTriggerThreshold(long ratisSnapshotTriggerThreshold) {
+ this.ratisSnapshotTriggerThreshold = ratisSnapshotTriggerThreshold;
+ }
+
+ public boolean isRatisLogUnsafeFlushEnable() {
+ return ratisLogUnsafeFlushEnable;
+ }
+
+ public void setRatisLogUnsafeFlushEnable(boolean ratisLogUnsafeFlushEnable) {
+ this.ratisLogUnsafeFlushEnable = ratisLogUnsafeFlushEnable;
+ }
+
+ public long getRatisLogSegmentSizeMax() {
+ return ratisLogSegmentSizeMax;
+ }
+
+ public void setRatisLogSegmentSizeMax(long ratisLogSegmentSizeMax) {
+ this.ratisLogSegmentSizeMax = ratisLogSegmentSizeMax;
+ }
+
+ public long getRatisGrpcFlowControlWindow() {
+ return ratisGrpcFlowControlWindow;
+ }
+
+ public void setRatisGrpcFlowControlWindow(long ratisGrpcFlowControlWindow) {
+ this.ratisGrpcFlowControlWindow = ratisGrpcFlowControlWindow;
+ }
+
+ public long getRatisRpcLeaderElectionTimeoutMinMs() {
+ return ratisRpcLeaderElectionTimeoutMinMs;
+ }
+
+ public void setRatisRpcLeaderElectionTimeoutMinMs(long ratisRpcLeaderElectionTimeoutMinMs) {
+ this.ratisRpcLeaderElectionTimeoutMinMs = ratisRpcLeaderElectionTimeoutMinMs;
+ }
+
+ public long getRatisRpcLeaderElectionTimeoutMaxMs() {
+ return ratisRpcLeaderElectionTimeoutMaxMs;
+ }
+
+ public void setRatisRpcLeaderElectionTimeoutMaxMs(long ratisRpcLeaderElectionTimeoutMaxMs) {
+ this.ratisRpcLeaderElectionTimeoutMaxMs = ratisRpcLeaderElectionTimeoutMaxMs;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 147995b..a1092e8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -286,6 +286,41 @@
properties.getProperty(
"ratis_log_appender_buffer_size_max",
String.valueOf(conf.getRatisConsensusLogAppenderBufferSize()))));
+
+ conf.setRatisSnapshotTriggerThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "ratis_snapshot_trigger_threshold",
+ String.valueOf(conf.getRatisConsensusLogAppenderBufferSize()))));
+
+ conf.setRatisLogUnsafeFlushEnable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "ratis_log_unsafe_flush_enable",
+ String.valueOf(conf.isRatisLogUnsafeFlushEnable()))));
+
+ conf.setRatisLogSegmentSizeMax(
+ Long.parseLong(
+ properties.getProperty(
+ "ratis_log_segment_size_max", String.valueOf(conf.getRatisLogSegmentSizeMax()))));
+
+ conf.setRatisGrpcFlowControlWindow(
+ Long.parseLong(
+ properties.getProperty(
+ "ratis_grpc_flow_control_window",
+ String.valueOf(conf.getRatisGrpcFlowControlWindow()))));
+
+ conf.setRatisRpcLeaderElectionTimeoutMinMs(
+ Long.parseLong(
+ properties.getProperty(
+ "ratis_rpc_leader_election_timeout_min_ms",
+ String.valueOf(conf.getRatisRpcLeaderElectionTimeoutMinMs()))));
+
+ conf.setRatisRpcLeaderElectionTimeoutMinMs(
+ Long.parseLong(
+ properties.getProperty(
+ "ratis_rpc_leader_election_timeout_max_ms",
+ String.valueOf(conf.getRatisRpcLeaderElectionTimeoutMaxMs()))));
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 97c73c9..00f9d66 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -134,6 +134,12 @@
final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
TRatisConfig ratisConfig = new TRatisConfig();
ratisConfig.setAppenderBufferSize(conf.getRatisConsensusLogAppenderBufferSize());
+ ratisConfig.setSnapshotTriggerThreshold(conf.getRatisSnapshotTriggerThreshold());
+ ratisConfig.setLogUnsafeFlushEnable(conf.isRatisLogUnsafeFlushEnable());
+ ratisConfig.setLogSegmentSizeMax(conf.getRatisLogSegmentSizeMax());
+ ratisConfig.setGrpcFlowControlWindow(conf.getRatisGrpcFlowControlWindow());
+ ratisConfig.setLeaderElectionTimeoutMin(conf.getRatisRpcLeaderElectionTimeoutMinMs());
+ ratisConfig.setLeaderElectionTimeoutMax(conf.getRatisRpcLeaderElectionTimeoutMaxMs());
dataSet.setRatisConfig(ratisConfig);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5f25249..734e53f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1003,7 +1003,19 @@
/** Maximum wait time of write cache in MultiLeader consensus. Unit: ms */
private long cacheWindowTimeInMs = 60 * 1000;
- private long RatisConsensusLogAppenderBufferSizeMax = 4 * 1024 * 1024L;
+ private long ratisConsensusLogAppenderBufferSizeMax = 4 * 1024 * 1024L;
+
+ private long ratisConsensusSnapshotTriggerThreshold = 400000L;
+
+ private boolean ratisConsensusLogUnsafeFlushEnable = false;
+
+ private long ratisConsensusLogSegmentSizeMax = 24 * 1024 * 1024L;
+
+ private long ratisConsensusGrpcFlowControlWindow = 4 * 1024 * 1024L;
+
+ private long ratisConsensusLeaderElectionTimeoutMinMs = 2000L;
+
+ private long RatisConsensusLeaderElectionTimeoutMaxMs = 4000L;
IoTDBConfig() {}
@@ -3197,12 +3209,12 @@
}
public long getRatisConsensusLogAppenderBufferSizeMax() {
- return RatisConsensusLogAppenderBufferSizeMax;
+ return ratisConsensusLogAppenderBufferSizeMax;
}
public void setRatisConsensusLogAppenderBufferSizeMax(
long ratisConsensusLogAppenderBufferSizeMax) {
- RatisConsensusLogAppenderBufferSizeMax = ratisConsensusLogAppenderBufferSizeMax;
+ this.ratisConsensusLogAppenderBufferSizeMax = ratisConsensusLogAppenderBufferSizeMax;
}
public String getConfigMessage() {
@@ -3237,4 +3249,55 @@
}
return configMessage;
}
+
+ public long getRatisConsensusSnapshotTriggerThreshold() {
+ return ratisConsensusSnapshotTriggerThreshold;
+ }
+
+ public void setRatisConsensusSnapshotTriggerThreshold(
+ long ratisConsensusSnapshotTriggerThreshold) {
+ this.ratisConsensusSnapshotTriggerThreshold = ratisConsensusSnapshotTriggerThreshold;
+ }
+
+ public boolean isRatisConsensusLogUnsafeFlushEnable() {
+ return ratisConsensusLogUnsafeFlushEnable;
+ }
+
+ public void setRatisConsensusLogUnsafeFlushEnable(boolean ratisConsensusLogUnsafeFlushEnable) {
+ this.ratisConsensusLogUnsafeFlushEnable = ratisConsensusLogUnsafeFlushEnable;
+ }
+
+ public long getRatisConsensusLogSegmentSizeMax() {
+ return ratisConsensusLogSegmentSizeMax;
+ }
+
+ public void setRatisConsensusLogSegmentSizeMax(long ratisConsensusLogSegmentSizeMax) {
+ this.ratisConsensusLogSegmentSizeMax = ratisConsensusLogSegmentSizeMax;
+ }
+
+ public long getRatisConsensusGrpcFlowControlWindow() {
+ return ratisConsensusGrpcFlowControlWindow;
+ }
+
+ public void setRatisConsensusGrpcFlowControlWindow(long ratisConsensusGrpcFlowControlWindow) {
+ this.ratisConsensusGrpcFlowControlWindow = ratisConsensusGrpcFlowControlWindow;
+ }
+
+ public long getRatisConsensusLeaderElectionTimeoutMinMs() {
+ return ratisConsensusLeaderElectionTimeoutMinMs;
+ }
+
+ public void setRatisConsensusLeaderElectionTimeoutMinMs(
+ long ratisConsensusLeaderElectionTimeoutMinMs) {
+ this.ratisConsensusLeaderElectionTimeoutMinMs = ratisConsensusLeaderElectionTimeoutMinMs;
+ }
+
+ public long getRatisConsensusLeaderElectionTimeoutMaxMs() {
+ return RatisConsensusLeaderElectionTimeoutMaxMs;
+ }
+
+ public void setRatisConsensusLeaderElectionTimeoutMaxMs(
+ long ratisConsensusLeaderElectionTimeoutMaxMs) {
+ RatisConsensusLeaderElectionTimeoutMaxMs = ratisConsensusLeaderElectionTimeoutMaxMs;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 178cfd6..a0117f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1899,6 +1899,12 @@
public void loadRatisConfig(TRatisConfig ratisConfig) {
conf.setRatisConsensusLogAppenderBufferSizeMax(ratisConfig.getAppenderBufferSize());
+ conf.setRatisConsensusSnapshotTriggerThreshold(ratisConfig.getSnapshotTriggerThreshold());
+ conf.setRatisConsensusLogUnsafeFlushEnable(ratisConfig.isLogUnsafeFlushEnable());
+ conf.setRatisConsensusLogSegmentSizeMax(ratisConfig.getLogSegmentSizeMax());
+ conf.setRatisConsensusGrpcFlowControlWindow(ratisConfig.getGrpcFlowControlWindow());
+ conf.setRatisConsensusLeaderElectionTimeoutMinMs(ratisConfig.getLeaderElectionTimeoutMin());
+ conf.setRatisConsensusLeaderElectionTimeoutMaxMs(ratisConfig.getLeaderElectionTimeoutMax());
}
public void initClusterSchemaMemoryAllocate() {
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 4fd9dbc..0e493c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -33,6 +33,11 @@
import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
/**
* We can use DataRegionConsensusImpl.getInstance() to obtain a consensus layer reference for
* dataRegion's reading and writing
@@ -87,7 +92,37 @@
// An empty log is committed after each restart, even if no data is
// written. This setting ensures that compaction work is not discarded
// even if there are frequent restarts
- .setSnapshot(Snapshot.newBuilder().setCreationGap(1).build())
+ .setSnapshot(
+ Snapshot.newBuilder()
+ .setCreationGap(1)
+ .setAutoTriggerThreshold(
+ conf.getRatisConsensusSnapshotTriggerThreshold())
+ .build())
+ .setLog(
+ RatisConfig.Log.newBuilder()
+ .setUnsafeFlushEnabled(
+ conf.isRatisConsensusLogUnsafeFlushEnable())
+ .setSegmentCacheSizeMax(
+ SizeInBytes.valueOf(
+ conf.getRatisConsensusLogSegmentSizeMax()))
+ .build())
+ .setGrpc(
+ RatisConfig.Grpc.newBuilder()
+ .setFlowControlWindow(
+ SizeInBytes.valueOf(
+ conf.getRatisConsensusGrpcFlowControlWindow()))
+ .build())
+ .setRpc(
+ RatisConfig.Rpc.newBuilder()
+ .setTimeoutMin(
+ TimeDuration.valueOf(
+ conf.getRatisConsensusLeaderElectionTimeoutMinMs(),
+ TimeUnit.MILLISECONDS))
+ .setTimeoutMax(
+ TimeDuration.valueOf(
+ conf.getRatisConsensusLeaderElectionTimeoutMaxMs(),
+ TimeUnit.MILLISECONDS))
+ .build())
.setLeaderLogAppender(
RatisConfig.LeaderLogAppender.newBuilder()
.setBufferByteLimit(
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 78de4b0..3151432 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -49,6 +49,12 @@
struct TRatisConfig {
1: optional i64 appenderBufferSize
+ 2: optional i64 snapshotTriggerThreshold
+ 3: optional bool logUnsafeFlushEnable
+ 4: optional i64 logSegmentSizeMax
+ 5: optional i64 grpcFlowControlWindow
+ 6: optional i64 leaderElectionTimeoutMin
+ 7: optional i64 leaderElectionTimeoutMax
}
struct TDataNodeRemoveReq {