optimize iotconsensus
Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fd..2fe821a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,6 +235,8 @@
private final int maxLogEntriesNumPerBatch;
private final int maxSizePerBatch;
private final int maxPendingBatchesNum;
+
+ private final int maxQueueLength;
private final long maxWaitingTimeForWaitBatchInMs;
private final int maxWaitingTimeForAccumulatingBatchInMs;
private final long basicRetryWaitTimeMs;
@@ -249,6 +251,7 @@
int maxLogEntriesNumPerBatch,
int maxSizePerBatch,
int maxPendingBatchesNum,
+ int maxQueueLength,
long maxWaitingTimeForWaitBatchInMs,
int maxWaitingTimeForAccumulatingBatchInMs,
long basicRetryWaitTimeMs,
@@ -261,6 +264,7 @@
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatchesNum = maxPendingBatchesNum;
+ this.maxQueueLength = maxQueueLength;
this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -284,6 +288,10 @@
return maxPendingBatchesNum;
}
+ public int getMaxQueueLength() {
+ return maxQueueLength;
+ }
+
public long getMaxWaitingTimeForWaitBatchInMs() {
return maxWaitingTimeForWaitBatchInMs;
}
@@ -326,13 +334,12 @@
public static class Builder {
- private int maxLogEntriesNumPerBatch = 30;
+ private int maxLogEntriesNumPerBatch = 1024;
private int maxSizePerBatch = 16 * 1024 * 1024;
- // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
- // in DataRegionStateMachine
- private int maxPendingBatchesNum = 5;
+ private int maxPendingBatchesNum = 16;
+ private int maxQueueLength = 4096;
private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
- private int maxWaitingTimeForAccumulatingBatchInMs = 500;
+ private int maxWaitingTimeForAccumulatingBatchInMs = 100;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -356,6 +363,11 @@
return this;
}
+ public Builder setMaxQueueLength(int maxQueueLength) {
+ this.maxQueueLength = maxQueueLength;
+ return this;
+ }
+
public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
long maxWaitingTimeForWaitBatchInMs) {
this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
@@ -408,6 +420,7 @@
maxLogEntriesNumPerBatch,
maxSizePerBatch,
maxPendingBatchesNum,
+ maxQueueLength,
maxWaitingTimeForWaitBatchInMs,
maxWaitingTimeForAccumulatingBatchInMs,
basicRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0ef..cd70efe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -214,7 +214,7 @@
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
this.peer = peer;
this.config = config;
- this.pendingEntries = new LinkedBlockingQueue<>();
+ this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
this.controller =
new IndexController(
impl.getStorageDir(),
@@ -314,10 +314,6 @@
pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
if (request != null) {
bufferedEntries.add(request);
- // If write pressure is low, we simply sleep a little to reduce the number of RPC
- if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch()) {
- Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
- }
}
}
MetricService.getInstance()