[IOTDB-4392] build separate cache queue for MultiLeader log sync

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
index a4a38fc..5b148b6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -29,8 +29,10 @@
   private long startSyncIndex;
   private long endSyncIndex;
   private final List<IndexedConsensusRequest> requests;
+  private final String sourcePeerId;
 
-  public BatchIndexedConsensusRequest() {
+  public BatchIndexedConsensusRequest(String sourcePeerId) {
+    this.sourcePeerId = sourcePeerId;
     this.requests = new LinkedList<>();
     this.isStartSyncIndexInitialized = false;
   }
@@ -52,6 +54,10 @@
     return endSyncIndex;
   }
 
+  public String getSourcePeerId() {
+    return sourcePeerId;
+  }
+
   public List<IndexedConsensusRequest> getRequests() {
     return requests;
   }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 82d0cd9..0574d12 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -60,6 +60,7 @@
 
   private final MultiLeaderServerImpl impl;
   private final List<LogDispatcherThread> threads;
+  private final String selfPeerId;
   private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager;
   private ExecutorService executorService;
 
@@ -67,6 +68,7 @@
       MultiLeaderServerImpl impl,
       IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
     this.impl = impl;
+    this.selfPeerId = impl.getThisNode().getEndpoint().toString();
     this.clientManager = clientManager;
     this.threads =
         impl.getConfiguration().stream()
@@ -325,7 +327,8 @@
       try {
         AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint());
         TSyncLogReq req =
-            new TSyncLogReq(peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
+            new TSyncLogReq(
+                selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches());
         logger.debug(
             "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
             batch.getStartIndex(),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 6337e4d..86c3b0f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -76,7 +76,8 @@
             new IoTDBException(message, TSStatusCode.READ_ONLY_SYSTEM_ERROR.getStatusCode()));
         return;
       }
-      BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest();
+      BatchIndexedConsensusRequest requestsInThisBatch =
+          new BatchIndexedConsensusRequest(req.peerId);
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7a3a17f..75b18d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -55,6 +55,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -74,14 +75,11 @@
   private static final long CACHE_WINDOW_TIME_IN_MS =
       IoTDBDescriptor.getInstance().getConfig().getCacheWindowTimeInMs();
 
-  private final Lock queueLock = new ReentrantLock();
-  private final Condition queueSortCondition = queueLock.newCondition();
-  private final PriorityQueue<InsertNodeWrapper> requestCache;
-  private long nextSyncIndex = -1;
+  private ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new PriorityQueue<>();
+    this.cacheQueueMap = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -136,75 +134,95 @@
    * in follower the same as the leader. And besides order insurance, we can make the
    * deserialization of PlanNode to be concurrent
    */
-  private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
-    queueLock.lock();
-    try {
-      requestCache.add(insertNodeWrapper);
-      // If the peek is not hold by current thread, it should notify the corresponding thread to
-      // process the peek when the queue is full
-      if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-          && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
-        queueSortCondition.signalAll();
-      }
-      while (true) {
-        // If current InsertNode is the next target InsertNode, write it
-        if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
-          requestCache.remove(insertNodeWrapper);
-          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
-          break;
-        }
-        // If all write thread doesn't hit nextSyncIndex and the heap is full, write
-        // the peek request. This is used to keep the whole write correct when nextSyncIndex
-        // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
-        // There are some cases that nextSyncIndex is not set:
-        //   1. When the system was just started
-        //   2. When some exception occurs during SyncLog
+  private class SyncLogCacheQueue {
+    private final String sourcePeerId;
+    private final Lock queueLock = new ReentrantLock();
+    private final Condition queueSortCondition = queueLock.newCondition();
+    private final PriorityQueue<InsertNodeWrapper> requestCache;
+    private long nextSyncIndex = -1;
+
+    public SyncLogCacheQueue(String sourcePeerId, int queueSize, long timeout) {
+      this.sourcePeerId = sourcePeerId;
+      this.requestCache = new PriorityQueue<>();
+    }
+
+    /**
+     * This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
+     * in follower the same as the leader. And besides order insurance, we can make the
+     * deserialization of PlanNode to be concurrent
+     */
+    private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
+      queueLock.lock();
+      try {
+        requestCache.add(insertNodeWrapper);
+        // If the peek is not hold by current thread, it should notify the corresponding thread to
+        // process the peek when the queue is full
         if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-            && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
-          requestCache.remove();
-          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
-          break;
+            && requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
+          queueSortCondition.signalAll();
         }
-        try {
-          boolean timeout =
-              !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
-          if (timeout) {
-            // although the timeout is triggered, current thread cannot write its request
-            // if current thread does not hold the peek request. And there should be some
-            // other thread who hold the peek request. In this scenario, current thread
-            // should go into await again and wait until its request becoming peek request
-            if (requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
-              // current thread hold the peek request thus it can write the peek immediately.
-              logger.info(
-                  "waiting target request timeout. current index: {}, target index: {}",
-                  insertNodeWrapper.getStartSyncIndex(),
-                  nextSyncIndex);
-              requestCache.remove(insertNodeWrapper);
-              break;
-            }
+        while (true) {
+          // If current InsertNode is the next target InsertNode, write it
+          if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+            requestCache.remove(insertNodeWrapper);
+            nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+            break;
           }
-        } catch (InterruptedException e) {
-          logger.warn(
-              "current waiting is interrupted. SyncIndex: {}. Exception: {}",
-              insertNodeWrapper.getStartSyncIndex(),
-              e);
-          Thread.currentThread().interrupt();
+          // If all write thread doesn't hit nextSyncIndex and the heap is full, write
+          // the peek request. This is used to keep the whole write correct when nextSyncIndex
+          // is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
+          // There are some cases that nextSyncIndex is not set:
+          //   1. When the system was just started
+          //   2. When some exception occurs during SyncLog
+          if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
+              && requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
+            requestCache.remove();
+            nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
+            break;
+          }
+          try {
+            boolean timeout =
+                !queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
+            if (timeout) {
+              // although the timeout is triggered, current thread cannot write its request
+              // if current thread does not hold the peek request. And there should be some
+              // other thread who hold the peek request. In this scenario, current thread
+              // should go into await again and wait until its request becoming peek request
+              if (requestCache.peek().getStartSyncIndex()
+                  == insertNodeWrapper.getStartSyncIndex()) {
+                // current thread hold the peek request thus it can write the peek immediately.
+                logger.info(
+                    "waiting target request timeout. current index: {}, target index: {}",
+                    insertNodeWrapper.getStartSyncIndex(),
+                    nextSyncIndex);
+                requestCache.remove(insertNodeWrapper);
+                break;
+              }
+            }
+          } catch (InterruptedException e) {
+            logger.warn(
+                "current waiting is interrupted. SyncIndex: {}. Exception: {}",
+                insertNodeWrapper.getStartSyncIndex(),
+                e);
+            Thread.currentThread().interrupt();
+          }
         }
+        logger.debug(
+            "source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+            sourcePeerId,
+            region.getDataRegionId(),
+            requestCache.size(),
+            insertNodeWrapper.getStartSyncIndex(),
+            insertNodeWrapper.getEndSyncIndex());
+        List<TSStatus> subStatus = new LinkedList<>();
+        for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
+          subStatus.add(write(planNode));
+        }
+        queueSortCondition.signalAll();
+        return new TSStatus().setSubStatus(subStatus);
+      } finally {
+        queueLock.unlock();
       }
-      logger.debug(
-          "region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
-          region.getDataRegionId(),
-          requestCache.size(),
-          insertNodeWrapper.getStartSyncIndex(),
-          insertNodeWrapper.getEndSyncIndex());
-      List<TSStatus> subStatus = new LinkedList<>();
-      for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
-        subStatus.add(write(planNode));
-      }
-      queueSortCondition.signalAll();
-      return new TSStatus().setSubStatus(subStatus);
-    } finally {
-      queueLock.unlock();
     }
   }
 
@@ -299,7 +317,12 @@
       } else if (request instanceof BatchIndexedConsensusRequest) {
         InsertNodeWrapper insertNodeWrapper =
             deserializeAndWrap((BatchIndexedConsensusRequest) request);
-        return cacheAndInsertLatestNode(insertNodeWrapper);
+        String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();
+        return cacheQueueMap
+            .computeIfAbsent(
+                sourcePeerId,
+                k -> new SyncLogCacheQueue(k, MAX_REQUEST_CACHE_SIZE, CACHE_WINDOW_TIME_IN_MS))
+            .cacheAndInsertLatestNode(insertNodeWrapper);
       } else {
         planNode = getPlanNode(request);
       }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 2183181..7d315d2 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -27,8 +27,10 @@
 }
 
 struct TSyncLogReq {
-  1: required common.TConsensusGroupId consensusGroupId
-  2: required list<TLogBatch> batches
+  # source peer where the TSyncLogReq is generated
+  1: required string peerId
+  2: required common.TConsensusGroupId consensusGroupId
+  3: required list<TLogBatch> batches
 }
 
 struct TSyncLogRes {