temporay support parallel insert and pass session-example
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index d5f4045..088f3f1 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -90,11 +90,11 @@
     //     createTemplate();
     createTimeseries();
     createMultiTimeseries();
-    insertRecord();
-    insertTablet();
+    //    insertRecord();
+    //    insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
-    //    insertRecords();
+    insertRecords();
     //    insertText();
     //    selectInto();
     //    createAndDropContinuousQueries();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 854401d..3b50f45 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1061,6 +1061,9 @@
   // customizedProperties, this should be empty by default.
   private Properties customizedProperties = new Properties();
 
+  private boolean enableMultiThreadingInsert = true;
+  private int insertThreadNum = Runtime.getRuntime().availableProcessors();
+
   // IoTConsensus Config
   private int maxLogEntriesNumPerBatch = 1024;
   private int maxSizePerBatch = 16 * 1024 * 1024;
@@ -3801,4 +3804,20 @@
       double innerCompactionTaskSelectionDiskRedundancy) {
     this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy;
   }
+
+  public boolean isEnableMultiThreadingInsert() {
+    return enableMultiThreadingInsert;
+  }
+
+  public void setEnableMultiThreadingInsert(boolean enableMultiThreadingInsert) {
+    this.enableMultiThreadingInsert = enableMultiThreadingInsert;
+  }
+
+  public int getInsertThreadNum() {
+    return insertThreadNum;
+  }
+
+  public void setInsertThreadNum(int insertThreadNum) {
+    this.insertThreadNum = insertThreadNum;
+  }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e4c5c24..13a0b2c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -663,6 +663,16 @@
                 "inner_compaction_task_selection_mods_file_threshold",
                 Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold()))));
 
+    conf.setEnableMultiThreadingInsert(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_multi_threading_insert",
+                String.valueOf(conf.isEnableMultiThreadingInsert()))));
+    conf.setInsertThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "insert_thread_num", Integer.toString(conf.getInsertThreadNum()))));
+
     conf.setEnablePartialInsert(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
new file mode 100644
index 0000000..4e312b5
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.experiment;
+
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class InsertTask implements Runnable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertTask.class);
+  private TsFileProcessor processor;
+  private InsertRowNode insertRowNode;
+  private CountDownLatch countDownLatch;
+  private AtomicLong[] costForMetrics;
+  private List<InsertRowNode> executedList;
+
+  public InsertTask(
+      TsFileProcessor processor,
+      InsertRowNode insertRowNode,
+      AtomicLong[] costForMetrics,
+      CountDownLatch countDownLatch,
+      List<InsertRowNode> executedList) {
+    this.processor = processor;
+    this.insertRowNode = insertRowNode;
+    this.countDownLatch = countDownLatch;
+    this.costForMetrics = costForMetrics;
+    this.executedList = executedList;
+  }
+
+  @Override
+  public void run() {
+    try {
+      long[] tempCostForMetrics = new long[4];
+      processor.insert(insertRowNode, tempCostForMetrics);
+      for (int i = 0; i < 4; i++) {
+        costForMetrics[i].addAndGet(tempCostForMetrics[i]);
+      }
+      executedList.add(insertRowNode);
+    } catch (WriteProcessException e) {
+      LOGGER.error("Insertion failed", e);
+    } finally {
+      countDownLatch.countDown();
+    }
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
new file mode 100644
index 0000000..6e254d2
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.experiment;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class InsertWorkers {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InsertWorkers.class);
+  private static ThreadPoolExecutor threadPool = null;
+
+  static {
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) {
+      LOGGER.info("Initializing thread pool for parallel insert");
+      threadPool =
+          new ThreadPoolExecutor(
+              Runtime.getRuntime().availableProcessors(),
+              IoTDBDescriptor.getInstance().getConfig().getInsertThreadNum(),
+              10,
+              TimeUnit.SECONDS,
+              new LinkedBlockingQueue<>(8196),
+              new ThreadPoolExecutor.CallerRunsPolicy());
+    }
+  }
+
+  public static void submit(InsertTask task) {
+    threadPool.submit(task);
+  }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index c408b73..e3c847d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -163,6 +163,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
@@ -214,6 +215,11 @@
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
+  public static final AtomicLong partitionSettingTimeInTotal = new AtomicLong(0);
+  public static final AtomicLong partitionSettingTimeCount = new AtomicLong(0);
+  public static final AtomicLong partitionFetchingTimeInTotal = new AtomicLong(0);
+  public static final AtomicLong partitionFetchingTimeCount = new AtomicLong(0);
+
   public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
@@ -2526,6 +2532,7 @@
 
   private Analysis computeAnalysisForInsertRows(
       Analysis analysis, InsertRowsStatement insertRowsStatement, String userName) {
+    final long startTime = System.nanoTime();
     Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>();
     for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
@@ -2541,7 +2548,15 @@
       dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue()));
       dataPartitionQueryParams.add(dataPartitionQueryParam);
     }
-
+    long timeCost = System.nanoTime() - startTime;
+    long time = partitionSettingTimeInTotal.addAndGet(timeCost);
+    long count = partitionSettingTimeCount.incrementAndGet();
+    if (count % 1000 == 0) {
+      logger.info(
+          String.format(
+              "The average time cost of setting partition is %.2f ms",
+              (double) time / count / 1000000));
+    }
     return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
   }
 
@@ -2703,7 +2718,7 @@
   /** get analysis according to statement and params */
   private Analysis getAnalysisForWriting(
       Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) {
-
+    final long startTime = System.nanoTime();
     DataPartition dataPartition =
         partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, userName);
     if (dataPartition.isEmpty()) {
@@ -2715,6 +2730,14 @@
                   + "because enable_auto_create_schema is FALSE."));
     }
     analysis.setDataPartitionInfo(dataPartition);
+    long timeCost = System.nanoTime() - startTime;
+    long time = partitionFetchingTimeCount.addAndGet(timeCost);
+    long count = partitionFetchingTimeCount.incrementAndGet();
+    if (count % 1000 == 0) {
+      logger.info(
+          String.format(
+              "Average time cost of fetching partition is %.2f ms", (double) time / count / 1e6));
+    }
     return analysis;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index d5cf082..da1675a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -49,6 +49,8 @@
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
+import org.apache.iotdb.db.experiment.InsertTask;
+import org.apache.iotdb.db.experiment.InsertWorkers;
 import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
@@ -144,6 +146,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Phaser;
@@ -3199,7 +3202,11 @@
                     > lastFlushTimeMap.getFlushedTime(
                         timePartitionIds[i], insertRowNode.getDevicePath().getFullPath());
       }
-      insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
+      if (IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) {
+        insertToTsFileProcessorsParallel(insertRowsNode, areSequence, timePartitionIds);
+      } else {
+        insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds);
+      }
       if (!insertRowsNode.getResults().isEmpty()) {
         throw new BatchProcessException("Partial failed inserting rows");
       }
@@ -3208,6 +3215,61 @@
     }
   }
 
+  public void insertToTsFileProcessorsParallel(
+      InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) {
+    List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
+    AtomicLong[] costsForMetrics = new AtomicLong[4];
+    for (int i = 0; i < costsForMetrics.length; i++) {
+      costsForMetrics[i] = new AtomicLong(0);
+    }
+    Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>();
+    CountDownLatch latch = new CountDownLatch(insertRowsNode.getInsertRowNodeList().size());
+    for (int i = 0; i < areSequence.length; i++) {
+      InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i);
+      if (insertRowNode.allMeasurementFailed()) {
+        latch.countDown();
+        continue;
+      }
+      TsFileProcessor tsFileProcessor =
+          getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]);
+      if (tsFileProcessor == null) {
+        latch.countDown();
+        continue;
+      }
+      tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]);
+      InsertWorkers.submit(
+          new InsertTask(
+              tsFileProcessor, insertRowNode, costsForMetrics, latch, executedInsertRowNodeList));
+    }
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      logger.error("Insertion is interrupted", e);
+      return;
+    }
+    // check memtable size and may asyncTryToFlush the work memtable
+    for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) {
+      if (entry.getKey().shouldFlush()) {
+        fileFlushPolicy.apply(this, entry.getKey(), entry.getValue());
+      }
+    }
+
+    PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0].get());
+    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1].get());
+    PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2].get());
+    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3].get());
+    if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
+      if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)
+          && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) {
+        return;
+      }
+      // disable updating last cache on follower
+      long startTime = System.nanoTime();
+      tryToUpdateInsertRowsLastCache(executedInsertRowNodeList);
+      PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime);
+    }
+  }
+
   /**
    * insert batch of tablets belongs to multiple devices
    *
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index e54e7ef..62cc8ad 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -51,6 +51,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -70,13 +71,15 @@
   private volatile FlushStatus flushStatus = FlushStatus.WORKING;
   private final int avgSeriesPointNumThreshold =
       IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+
   /** Memory size of data points, including TEXT values. */
-  private long memSize = 0;
+  private AtomicLong memSize = new AtomicLong(0);
+
   /**
    * Memory usage of all TVLists memory usage regardless of whether these TVLists are full,
    * including TEXT values.
    */
-  private long tvListRamCost = 0;
+  private AtomicLong tvListRamCost = new AtomicLong(0);
 
   private int seriesNumber = 0;
 
@@ -110,13 +113,13 @@
   protected AbstractMemTable() {
     this.database = null;
     this.dataRegionId = null;
-    this.memTableMap = new HashMap<>();
+    this.memTableMap = new ConcurrentHashMap<>();
   }
 
   protected AbstractMemTable(String database, String dataRegionId) {
     this.database = database;
     this.dataRegionId = dataRegionId;
-    this.memTableMap = new HashMap<>();
+    this.memTableMap = new ConcurrentHashMap<>();
   }
 
   protected AbstractMemTable(
@@ -142,13 +145,15 @@
       IDeviceID deviceId, List<IMeasurementSchema> schemaList) {
     IWritableMemChunkGroup memChunkGroup =
         memTableMap.computeIfAbsent(deviceId, k -> new WritableMemChunkGroup());
-    for (IMeasurementSchema schema : schemaList) {
-      if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
-        seriesNumber++;
-        totalPointsNumThreshold += avgSeriesPointNumThreshold;
+    synchronized (memChunkGroup) {
+      for (IMeasurementSchema schema : schemaList) {
+        if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) {
+          seriesNumber++;
+          totalPointsNumThreshold += avgSeriesPointNumThreshold;
+        }
       }
+      return memChunkGroup;
     }
-    return memChunkGroup;
   }
 
   private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet(
@@ -199,7 +204,7 @@
         dataTypes.add(schema.getType());
       }
     }
-    memSize += MemUtils.getRowRecordSize(dataTypes, values);
+    memSize.addAndGet(MemUtils.getRowRecordSize(dataTypes, values));
     write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
 
     int pointsInserted =
@@ -246,7 +251,7 @@
     if (schemaList.isEmpty()) {
       return;
     }
-    memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
+    memSize.addAndGet(MemUtils.getAlignedRowRecordSize(dataTypes, values));
     writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values);
     int pointsInserted =
         insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber();
@@ -270,7 +275,7 @@
       throws WriteProcessException {
     try {
       writeTabletNode(insertTabletNode, start, end);
-      memSize += MemUtils.getTabletSize(insertTabletNode, start, end);
+      memSize.addAndGet(MemUtils.getTabletSize(insertTabletNode, start, end));
       int pointsInserted =
           (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
@@ -296,7 +301,7 @@
       throws WriteProcessException {
     try {
       writeAlignedTablet(insertTabletNode, start, end);
-      memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end);
+      memSize.addAndGet(MemUtils.getAlignedTabletSize(insertTabletNode, start, end));
       int pointsInserted =
           (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
@@ -436,7 +441,7 @@
 
   @Override
   public long memSize() {
-    return memSize;
+    return memSize.get();
   }
 
   @Override
@@ -450,11 +455,11 @@
   @Override
   public void clear() {
     memTableMap.clear();
-    memSize = 0;
+    memSize = new AtomicLong(0);
     seriesNumber = 0;
     totalPointsNum = 0;
     totalPointsNumThreshold = 0;
-    tvListRamCost = 0;
+    tvListRamCost = new AtomicLong(0);
     maxPlanIndex = 0;
     minPlanIndex = 0;
   }
@@ -531,27 +536,27 @@
 
   @Override
   public void addTVListRamCost(long cost) {
-    this.tvListRamCost += cost;
+    this.tvListRamCost.addAndGet(cost);
   }
 
   @Override
   public void releaseTVListRamCost(long cost) {
-    this.tvListRamCost -= cost;
+    this.tvListRamCost.addAndGet(-cost);
   }
 
   @Override
   public long getTVListsRamCost() {
-    return tvListRamCost;
+    return tvListRamCost.get();
   }
 
   @Override
   public void addTextDataSize(long textDataSize) {
-    this.memSize += textDataSize;
+    this.memSize.addAndGet(textDataSize);
   }
 
   @Override
   public void releaseTextDataSize(long textDataSize) {
-    this.memSize -= textDataSize;
+    this.memSize.addAndGet(-textDataSize);
   }
 
   @Override
@@ -634,8 +639,8 @@
       return;
     }
     buffer.putInt(seriesNumber);
-    buffer.putLong(memSize);
-    buffer.putLong(tvListRamCost);
+    buffer.putLong(memSize.get());
+    buffer.putLong(tvListRamCost.get());
     buffer.putLong(totalPointsNum);
     buffer.putLong(totalPointsNumThreshold);
     buffer.putLong(maxPlanIndex);
@@ -653,8 +658,8 @@
 
   public void deserialize(DataInputStream stream) throws IOException {
     seriesNumber = stream.readInt();
-    memSize = stream.readLong();
-    tvListRamCost = stream.readLong();
+    memSize = new AtomicLong(stream.readLong());
+    tvListRamCost = new AtomicLong(stream.readLong());
     totalPointsNum = stream.readLong();
     totalPointsNumThreshold = stream.readLong();
     maxPlanIndex = stream.readLong();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
index 70b0f90..b8ec268 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class PrimitiveMemTable extends AbstractMemTable {
   // this constructor only used when deserialize
@@ -39,7 +39,7 @@
 
   @Override
   public IMemTable copy() {
-    Map<IDeviceID, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap());
+    Map<IDeviceID, IWritableMemChunkGroup> newMap = new ConcurrentHashMap<>(getMemTableMap());
 
     return new PrimitiveMemTable(getDatabase(), getDataRegionId(), newMap);
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 953606e..6a20269 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -139,7 +139,7 @@
   private volatile boolean shouldClose;
 
   /** working memtable. */
-  private IMemTable workMemTable;
+  private volatile IMemTable workMemTable;
 
   /** last flush time to flush the working memtable. */
   private long lastWorkMemtableFlushTime;
@@ -239,12 +239,16 @@
       throws WriteProcessException {
 
     if (workMemTable == null) {
-      long startTime = System.nanoTime();
-      createNewWorkingMemTable();
-      // recordCreateMemtableBlockCost
-      costsForMetrics[0] += System.nanoTime() - startTime;
-      WritingMetrics.getInstance()
-          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+      synchronized (this) {
+        if (workMemTable == null) {
+          long startTime = System.nanoTime();
+          createNewWorkingMemTable();
+          // recordCreateMemtableBlockCost
+          costsForMetrics[0] += System.nanoTime() - startTime;
+          WritingMetrics.getInstance()
+              .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+        }
+      }
     }
 
     long[] memIncrements = null;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
index b06ed63..9ebbdbe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
@@ -21,6 +21,8 @@
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /** The TsFileProcessorInfo records the memory cost of this TsFileProcessor. */
 public class TsFileProcessorInfo {
 
@@ -28,13 +30,13 @@
   private final DataRegionInfo dataRegionInfo;
 
   /** memory occupation of unsealed TsFileResource, ChunkMetadata, WAL */
-  private long memCost;
+  private AtomicLong memCost;
 
   private final TsFileProcessorInfoMetrics metrics;
 
   public TsFileProcessorInfo(DataRegionInfo dataRegionInfo) {
     this.dataRegionInfo = dataRegionInfo;
-    this.memCost = 0L;
+    this.memCost = new AtomicLong(0);
     this.metrics =
         new TsFileProcessorInfoMetrics(dataRegionInfo.getDataRegion().getDatabaseName(), this);
     MetricService.getInstance().addMetricSet(metrics);
@@ -42,25 +44,25 @@
 
   /** called in each insert */
   public void addTSPMemCost(long cost) {
-    memCost += cost;
+    memCost.addAndGet(cost);
     dataRegionInfo.addStorageGroupMemCost(cost);
   }
 
   /** called when meet exception */
   public void releaseTSPMemCost(long cost) {
     dataRegionInfo.releaseStorageGroupMemCost(cost);
-    memCost -= cost;
+    memCost.addAndGet(-cost);
   }
 
   /** called when closing TSP */
   public void clear() {
-    dataRegionInfo.releaseStorageGroupMemCost(memCost);
-    memCost = 0L;
+    dataRegionInfo.releaseStorageGroupMemCost(memCost.get());
+    memCost.set(0);
     MetricService.getInstance().removeMetricSet(metrics);
   }
 
   /** get memCost */
   public long getMemCost() {
-    return memCost;
+    return memCost.get();
   }
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index c1a71d3..6248026 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -59,28 +59,30 @@
 
   @Override
   public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
-    switch (schema.getType()) {
-      case BOOLEAN:
-        putBoolean(insertTime, (boolean) objectValue);
-        break;
-      case INT32:
-        putInt(insertTime, (int) objectValue);
-        break;
-      case INT64:
-        putLong(insertTime, (long) objectValue);
-        break;
-      case FLOAT:
-        putFloat(insertTime, (float) objectValue);
-        break;
-      case DOUBLE:
-        putDouble(insertTime, (double) objectValue);
-        break;
-      case TEXT:
-        return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
-      default:
-        throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+    synchronized (this) {
+      switch (schema.getType()) {
+        case BOOLEAN:
+          putBoolean(insertTime, (boolean) objectValue);
+          break;
+        case INT32:
+          putInt(insertTime, (int) objectValue);
+          break;
+        case INT64:
+          putLong(insertTime, (long) objectValue);
+          break;
+        case FLOAT:
+          putFloat(insertTime, (float) objectValue);
+          break;
+        case DOUBLE:
+          putDouble(insertTime, (double) objectValue);
+          break;
+        case TEXT:
+          return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
+        default:
+          throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
+      }
+      return false;
     }
-    return false;
   }
 
   @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
index 707eace..2245808 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -29,18 +29,18 @@
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class WritableMemChunkGroup implements IWritableMemChunkGroup {
 
   private Map<String, IWritableMemChunk> memChunkMap;
 
   public WritableMemChunkGroup() {
-    memChunkMap = new HashMap<>();
+    memChunkMap = new ConcurrentHashMap<>();
   }
 
   @Override