make locally dispatch concurrently
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 72ad599..6194996 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -37,7 +37,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -230,9 +229,7 @@
     }
     analysis.setRedirectNodeList(redirectInfo);
 
-    List<WritePlanNode> res = new ArrayList<>(splitMap.values());
-    Collections.shuffle(res);
-    return res;
+    return new ArrayList<>(splitMap.values());
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 2e89294..de756c7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -19,12 +19,15 @@
 
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -54,9 +57,15 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -85,6 +94,10 @@
 
   private static final String UNEXPECTED_ERRORS = "Unexpected errors: ";
 
+  private final Lock queueLock = new ReentrantLock();
+
+  private final Map<Integer, ExecutorService> dataRegionQueueMap = new HashMap<>();
+
   public FragmentInstanceDispatcherImpl(
       QueryType type,
       MPPQueryContext queryContext,
@@ -192,18 +205,69 @@
     if (!localInstances.isEmpty()) {
       // sync dispatch to local
       long localScheduleStartTime = System.nanoTime();
-      for (FragmentInstance localInstance : localInstances) {
-        try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
-          dispatchLocally(localInstance);
-        } catch (FragmentInstanceDispatchException e) {
-          dataNodeFailureList.add(e.getFailureStatus());
-        } catch (Throwable t) {
-          logger.warn(DISPATCH_FAILED, t);
-          dataNodeFailureList.add(
-              RpcUtils.getStatus(
-                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
+      if (localInstances.get(0).getRegionReplicaSet().regionId.type
+          == TConsensusGroupType.DataRegion) {
+        List<Future<Void>> resList = new ArrayList<>(localInstances.size());
+        queueLock.lock();
+        try {
+          for (FragmentInstance localInstance : localInstances) {
+            try {
+              int regionId = localInstance.getRegionReplicaSet().getRegionId().id;
+              ExecutorService executorService = dataRegionQueueMap.get(regionId);
+              if (executorService == null) {
+                executorService =
+                    IoTDBThreadPoolFactory.newFixedThreadPool(
+                        1, ThreadName.DATA_REGION_CONSUMER.getName() + "-" + regionId);
+                dataRegionQueueMap.put(regionId, executorService);
+              }
+              resList.add(executorService.submit(new writeFITask(localInstance)));
+            } catch (Throwable t) {
+              logger.warn(DISPATCH_FAILED, t);
+              dataNodeFailureList.add(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
+            }
+          }
+        } finally {
+          queueLock.unlock();
+        }
+
+        for (Future<Void> future : resList) {
+          try {
+            future.get();
+          } catch (ExecutionException e) {
+            if (e.getCause() instanceof FragmentInstanceDispatchException) {
+              FragmentInstanceDispatchException cause =
+                  (FragmentInstanceDispatchException) e.getCause();
+              dataNodeFailureList.add(cause.getFailureStatus());
+            } else {
+              logger.warn(DISPATCH_FAILED, e);
+              dataNodeFailureList.add(
+                  RpcUtils.getStatus(
+                      TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + e.getMessage()));
+            }
+          } catch (Throwable e) {
+            logger.warn(DISPATCH_FAILED, e);
+            dataNodeFailureList.add(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + e.getMessage()));
+          }
+        }
+      } else {
+        for (FragmentInstance localInstance : localInstances) {
+          try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
+            dispatchLocally(localInstance);
+          } catch (FragmentInstanceDispatchException e) {
+            dataNodeFailureList.add(e.getFailureStatus());
+          } catch (Throwable t) {
+            logger.warn(DISPATCH_FAILED, t);
+            dataNodeFailureList.add(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + t.getMessage()));
+          }
         }
       }
+
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
           System.nanoTime() - localScheduleStartTime);
     }
@@ -239,6 +303,57 @@
     }
   }
 
+  private static class writeFITask implements Callable<Void> {
+
+    private final FragmentInstance instance;
+
+    public writeFITask(FragmentInstance instance) {
+      this.instance = instance;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      // deserialize ConsensusGroupId
+      ConsensusGroupId groupId = null;
+      if (instance.getExecutorType().isStorageExecutor()) {
+        try {
+          groupId =
+              ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                  instance.getRegionReplicaSet().getRegionId());
+        } catch (Throwable t) {
+          logger.warn("Deserialize ConsensusGroupId failed. ", t);
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(
+                  TSStatusCode.EXECUTE_STATEMENT_ERROR,
+                  "Deserialize ConsensusGroupId failed: " + t.getMessage()));
+        }
+      }
+
+      PlanNode planNode = instance.getFragment().getPlanNodeTree();
+      RegionWriteExecutor writeExecutor = new RegionWriteExecutor();
+      RegionExecutionResult writeResult = writeExecutor.execute(groupId, planNode);
+      if (!writeResult.isAccepted()) {
+        logger.warn(
+            "write locally failed. TSStatus: {}, message: {}",
+            writeResult.getStatus(),
+            writeResult.getMessage());
+        if (writeResult.getStatus() == null) {
+          throw new FragmentInstanceDispatchException(
+              RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, writeResult.getMessage()));
+        } else {
+          throw new FragmentInstanceDispatchException(writeResult.getStatus());
+        }
+      } else {
+        // some expected and accepted status except SUCCESS_STATUS need to be returned
+        TSStatus status = writeResult.getStatus();
+        if (status != null && status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          throw new FragmentInstanceDispatchException(status);
+        }
+      }
+      return null;
+    }
+  }
+
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index aff32b2..29f7d83 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -45,6 +45,7 @@
   DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
   MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"),
   ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
+  DATA_REGION_CONSUMER("Data-Region-Consumer"),
 
   // -------------------------- Compaction --------------------------
   COMPACTION_WORKER("Compaction-Worker"),
@@ -198,7 +199,8 @@
               DATANODE_INTERNAL_RPC_SERVICE,
               DATANODE_INTERNAL_RPC_PROCESSOR,
               MPP_COORDINATOR_WRITE_EXECUTOR,
-              ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL));
+              ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL,
+              DATA_REGION_CONSUMER));
   private static final Set<ThreadName> compactionThreadNames =
       new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK, COMPACTION_SCHEDULE));