optimize iotconsensus

Signed-off-by: OneSizeFitQuorum <tanxinyu@apache.org>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fd..2fe821a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,6 +235,8 @@
     private final int maxLogEntriesNumPerBatch;
     private final int maxSizePerBatch;
     private final int maxPendingBatchesNum;
+
+    private final int maxQueueLength;
     private final long maxWaitingTimeForWaitBatchInMs;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
@@ -249,6 +251,7 @@
         int maxLogEntriesNumPerBatch,
         int maxSizePerBatch,
         int maxPendingBatchesNum,
+        int maxQueueLength,
         long maxWaitingTimeForWaitBatchInMs,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
@@ -261,6 +264,7 @@
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
+      this.maxQueueLength = maxQueueLength;
       this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
       this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -284,6 +288,10 @@
       return maxPendingBatchesNum;
     }
 
+    public int getMaxQueueLength() {
+      return maxQueueLength;
+    }
+
     public long getMaxWaitingTimeForWaitBatchInMs() {
       return maxWaitingTimeForWaitBatchInMs;
     }
@@ -326,13 +334,12 @@
 
     public static class Builder {
 
-      private int maxLogEntriesNumPerBatch = 30;
+      private int maxLogEntriesNumPerBatch = 1024;
       private int maxSizePerBatch = 16 * 1024 * 1024;
-      // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE
-      // in DataRegionStateMachine
-      private int maxPendingBatchesNum = 5;
+      private int maxPendingBatchesNum = 16;
+      private int maxQueueLength = 4096;
       private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
-      private int maxWaitingTimeForAccumulatingBatchInMs = 500;
+      private int maxWaitingTimeForAccumulatingBatchInMs = 100;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -356,6 +363,11 @@
         return this;
       }
 
+      public Builder setMaxQueueLength(int maxQueueLength) {
+        this.maxQueueLength = maxQueueLength;
+        return this;
+      }
+
       public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
           long maxWaitingTimeForWaitBatchInMs) {
         this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
@@ -408,6 +420,7 @@
             maxLogEntriesNumPerBatch,
             maxSizePerBatch,
             maxPendingBatchesNum,
+            maxQueueLength,
             maxWaitingTimeForWaitBatchInMs,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0ef..cd70efe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -214,7 +214,7 @@
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long initialSyncIndex) {
       this.peer = peer;
       this.config = config;
-      this.pendingEntries = new LinkedBlockingQueue<>();
+      this.pendingEntries = new ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
       this.controller =
           new IndexController(
               impl.getStorageDir(),
@@ -314,10 +314,6 @@
                 pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS);
             if (request != null) {
               bufferedEntries.add(request);
-              // If write pressure is low, we simply sleep a little to reduce the number of RPC
-              if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch()) {
-                Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
-              }
             }
           }
           MetricService.getInstance()