[To dev/1.3] Fixed the concurrency issue of region migrate and load (#16796) (#16823)

* Fixed the concurrency issue of region migrate and load (#16796)

* rq

* gra

* fix

* fix

* coverage

* fix

* fix

* fix

* cp

* fix
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 453bd9a..7dbc720 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -24,6 +24,7 @@
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -58,6 +59,11 @@
     // do nothing
   }
 
+  @TestOnly
+  public static void setInstance(final IConsensus instance) {
+    DataRegionConsensusImplHolder.INSTANCE = instance;
+  }
+
   public static IConsensus getInstance() {
     return DataRegionConsensusImplHolder.INSTANCE;
   }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index c430be4..9a26dba 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -93,7 +93,7 @@
       logger.error(
           "Exception occurs when taking snapshot for {}-{} in {}",
           region.getDatabaseName(),
-          region.getDataRegionId(),
+          region.getDataRegionIdString(),
           snapshotDir,
           e);
       return false;
@@ -109,7 +109,7 @@
       logger.error(
           "Exception occurs when taking snapshot for {}-{} in {}",
           region.getDatabaseName(),
-          region.getDataRegionId(),
+          region.getDataRegionIdString(),
           snapshotDir,
           e);
       return false;
@@ -127,7 +127,7 @@
         new SnapshotLoader(
                 latestSnapshotRootDir.getAbsolutePath(),
                 region.getDatabaseName(),
-                region.getDataRegionId())
+                region.getDataRegionIdString())
             .loadSnapshotForStateMachine();
     if (newRegion == null) {
       logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
@@ -136,7 +136,8 @@
     this.region = newRegion;
     try {
       StorageEngine.getInstance()
-          .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+          .setDataRegion(
+              new DataRegionId(Integer.parseInt(region.getDataRegionIdString())), region);
       ChunkCache.getInstance().clear();
       TimeSeriesMetadataCache.getInstance().clear();
       BloomFilterCache.getInstance().clear();
@@ -185,13 +186,13 @@
       return new SnapshotLoader(
               latestSnapshotRootDir.getAbsolutePath(),
               region.getDatabaseName(),
-              region.getDataRegionId())
+              region.getDataRegionIdString())
           .getSnapshotFileInfo();
     } catch (IOException e) {
       logger.error(
           "Meets error when getting snapshot files for {}-{}",
           region.getDatabaseName(),
-          region.getDataRegionId(),
+          region.getDataRegionIdString(),
           e);
       return null;
     }
@@ -272,7 +273,7 @@
               + File.separator
               + region.getDatabaseName()
               + "-"
-              + region.getDataRegionId();
+              + region.getDataRegionIdString();
       return new File(snapshotDir).getCanonicalFile();
     } catch (IOException | NullPointerException e) {
       logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index c8c89d0..2ee3f5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -751,7 +751,7 @@
     if (initQueryDataSourceRetryCount % 10 == 0) {
       LOGGER.warn(
           "Failed to acquire the read lock of DataRegion-{} for {} times",
-          dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
+          dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionIdString(),
           initQueryDataSourceRetryCount);
     }
     return UNFINISHED_QUERY_DATA_SOURCE;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index d8ce02a..a4a8d46 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -168,7 +168,7 @@
       // We don't need to output the region having ExplainAnalyzeOperator only.
       return false;
     }
-    statistics.setDataRegion(context.getDataRegion().getDataRegionId());
+    statistics.setDataRegion(context.getDataRegion().getDataRegionIdString());
     statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort());
     statistics.setStartTimeInMS(context.getStartTime());
     statistics.setEndTimeInMS(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 65eb21b..e9cd679 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -501,7 +501,7 @@
                           MemTableFlushTask.recordFlushPointsMetricInternal(
                               node.getWritePointCount(),
                               databaseName,
-                              dataRegion.getDataRegionId());
+                              dataRegion.getDataRegionIdString());
 
                           MetricService.getInstance()
                               .count(
@@ -513,7 +513,7 @@
                                   Tag.DATABASE.toString(),
                                   databaseName,
                                   Tag.REGION.toString(),
-                                  dataRegion.getDataRegionId(),
+                                  dataRegion.getDataRegionIdString(),
                                   Tag.TYPE.toString(),
                                   Metric.LOAD_POINT_COUNT.toString());
                           MetricService.getInstance()
@@ -526,7 +526,7 @@
                                   Tag.DATABASE.toString(),
                                   databaseName,
                                   Tag.REGION.toString(),
-                                  dataRegion.getDataRegionId(),
+                                  dataRegion.getDataRegionIdString(),
                                   Tag.TYPE.toString(),
                                   Metric.LOAD_POINT_COUNT.toString());
                         }));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 69cb7bb..5ddf412 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -518,7 +518,8 @@
   }
 
   public void createDataRegionMemoryCostMetrics(DataRegion dataRegion) {
-    DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(dataRegion.getDataRegionId()));
+    DataRegionId dataRegionId =
+        new DataRegionId(Integer.parseInt(dataRegion.getDataRegionIdString()));
     MetricService.getInstance()
         .createAutoGauge(
             Metric.DATA_REGION_MEM_COST.toString(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 0419f26..c8439e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -527,7 +527,7 @@
   public void syncCloseProcessorsInRegion(List<String> dataRegionIds) {
     List<Future<Void>> tasks = new ArrayList<>();
     for (DataRegion dataRegion : dataRegionMap.values()) {
-      if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionId())) {
+      if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionIdString())) {
         tasks.add(
             cachedThreadPool.submit(
                 () -> {
@@ -784,7 +784,7 @@
           // delete wal
           WALManager.getInstance()
               .deleteWALNode(
-                  region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
+                  region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionIdString());
           // delete snapshot
           for (String dataDir : CONFIG.getLocalDataDirs()) {
             File regionSnapshotDir =
@@ -803,12 +803,13 @@
         WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
         WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
         WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
-        FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionId());
+        FileMetrics.getInstance()
+            .deleteRegion(region.getDatabaseName(), region.getDataRegionIdString());
       } catch (Exception e) {
         LOGGER.error(
             "Error occurs when deleting data region {}-{}",
             region.getDatabaseName(),
-            region.getDataRegionId(),
+            region.getDataRegionIdString(),
             e);
       } finally {
         deletingDataRegionMap.remove(regionId);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eefede2..068990c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -36,8 +36,11 @@
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -247,7 +250,9 @@
       ConcurrentHashMap.newKeySet();
 
   /** data region id. */
-  private final String dataRegionId;
+  private final String dataRegionIdString;
+
+  private final DataRegionId dataRegionId;
 
   /** database name. */
   private final String databaseName;
@@ -316,20 +321,25 @@
    * Construct a database processor.
    *
    * @param systemDir system dir path
-   * @param dataRegionId data region id e.g. 1
+   * @param dataRegionIdString data region id e.g. 1
    * @param fileFlushPolicy file flush policy
    * @param databaseName database name e.g. root.sg1
    */
   public DataRegion(
-      String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName)
+      String systemDir,
+      String dataRegionIdString,
+      TsFileFlushPolicy fileFlushPolicy,
+      String databaseName)
       throws DataRegionException {
-    this.dataRegionId = dataRegionId;
+    this.dataRegionIdString = dataRegionIdString;
+    this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
     this.databaseName = databaseName;
     this.fileFlushPolicy = fileFlushPolicy;
     acquireDirectBufferMemory();
 
-    dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
-    this.tsFileManager = new TsFileManager(databaseName, dataRegionId, dataRegionSysDir.getPath());
+    dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionIdString);
+    this.tsFileManager =
+        new TsFileManager(databaseName, dataRegionIdString, dataRegionSysDir.getPath());
     if (dataRegionSysDir.mkdirs()) {
       logger.info(
           "Database system Directory {} doesn't exist, create it", dataRegionSysDir.getPath());
@@ -345,17 +355,17 @@
       logger.debug(
           "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.",
           databaseName,
-          dataRegionId);
+          dataRegionIdString);
       for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
         File dataRegionFolder =
-            fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
+            fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionIdString);
         try {
           fsFactory.deleteDirectory(dataRegionFolder.getPath());
         } catch (IOException e) {
           logger.error(
               "Exception occurs when deleting data region folder for {}-{}",
               databaseName,
-              dataRegionId,
+              dataRegionIdString,
               e);
         }
         if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
@@ -379,10 +389,11 @@
   }
 
   @TestOnly
-  public DataRegion(String databaseName, String id) {
+  public DataRegion(String databaseName, String dataRegionIdString) {
     this.databaseName = databaseName;
-    this.dataRegionId = id;
-    this.tsFileManager = new TsFileManager(databaseName, id, "");
+    this.dataRegionIdString = dataRegionIdString;
+    this.dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionIdString));
+    this.tsFileManager = new TsFileManager(databaseName, dataRegionIdString, "");
     this.partitionMaxFileVersions = new HashMap<>();
     partitionMaxFileVersions.put(0L, 0L);
     this.metrics = new DataRegionMetrics(this);
@@ -451,7 +462,7 @@
           logger.info(
               "The TsFiles of data region {}[{}] has recovered {}/{}.",
               databaseName,
-              dataRegionId,
+              dataRegionIdString,
               recoveredFilesNum,
               numOfFilesToRecover);
           lastLogTime = System.currentTimeMillis();
@@ -460,7 +471,7 @@
         logger.info(
             "The TsFiles of data region {}[{}] has recovered completely {}/{}.",
             databaseName,
-            dataRegionId,
+            dataRegionIdString,
             numOfFilesToRecover,
             numOfFilesToRecover);
       }
@@ -572,7 +583,7 @@
         if (logFile.exists()) {
           try {
             FileTimeIndexCacheReader logReader =
-                new FileTimeIndexCacheReader(logFile, dataRegionId);
+                new FileTimeIndexCacheReader(logFile, dataRegionIdString);
             logReader.read(fileTimeIndexMap);
           } catch (Exception e) {
             throw new RuntimeException(e);
@@ -607,7 +618,7 @@
           TimePartitionManager.getInstance()
               .registerTimePartitionInfo(
                   new TimePartitionInfo(
-                      new DataRegionId(Integer.parseInt(dataRegionId)),
+                      new DataRegionId(Integer.parseInt(dataRegionIdString)),
                       latestPartitionId,
                       false,
                       Long.MAX_VALUE,
@@ -659,11 +670,14 @@
           || config
               .getDataRegionConsensusProtocolClass()
               .equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
-        WALManager.getInstance().applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
+        WALManager.getInstance()
+            .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString);
       }
-      logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
+      logger.info(
+          "The data region {}[{}] is created successfully", databaseName, dataRegionIdString);
     } else {
-      logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
+      logger.info(
+          "The data region {}[{}] is recovered successfully", databaseName, dataRegionIdString);
     }
   }
 
@@ -717,7 +731,7 @@
 
   private void recoverCompaction() {
     CompactionRecoverManager compactionRecoverManager =
-        new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId);
+        new CompactionRecoverManager(tsFileManager, databaseName, dataRegionIdString);
     compactionRecoverManager.recoverCompaction();
   }
 
@@ -734,7 +748,8 @@
     // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition
     Map<String, File> tsFilePartitionPath2File = new HashMap<>();
     for (String baseDir : folders) {
-      File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
+      File fileFolder =
+          fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionIdString);
       if (!fileFolder.exists()) {
         continue;
       }
@@ -804,7 +819,7 @@
           String.format(
               "data region %s[%s] is down, because the time of tsfile %s is larger than system current time, "
                   + "file time is %d while system current time is %d, please check it.",
-              databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime));
+              databaseName, dataRegionIdString, tsFile.getAbsolutePath(), fileTime, currentTime));
     }
   }
 
@@ -847,10 +862,10 @@
         long timePartitionId = tsFileResource.getTimePartition();
         TimePartitionManager.getInstance()
             .updateAfterOpeningTsFileProcessor(
-                new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
+                new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionId);
         TsFileProcessor tsFileProcessor =
             new TsFileProcessor(
-                dataRegionId,
+                dataRegionIdString,
                 dataRegionInfo,
                 tsFileResource,
                 this::closeUnsealedTsFileProcessorCallBack,
@@ -951,7 +966,7 @@
         TimePartitionManager.getInstance()
             .registerTimePartitionInfo(
                 new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
+                    new DataRegionId(Integer.parseInt(dataRegionIdString)),
                     partitionId,
                     false,
                     Long.MAX_VALUE,
@@ -962,7 +977,7 @@
       }
       TimePartitionManager.getInstance()
           .updateAfterFlushing(
-              new DataRegionId(Integer.parseInt(dataRegionId)),
+              new DataRegionId(Integer.parseInt(dataRegionIdString)),
               partitionId,
               System.currentTimeMillis(),
               lastFlushTimeMap.getMemSize(partitionId),
@@ -1003,7 +1018,7 @@
         TimePartitionManager.getInstance()
             .registerTimePartitionInfo(
                 new TimePartitionInfo(
-                    new DataRegionId(Integer.parseInt(dataRegionId)),
+                    new DataRegionId(Integer.parseInt(dataRegionIdString)),
                     partitionId,
                     false,
                     Long.MAX_VALUE,
@@ -1016,7 +1031,7 @@
       }
       TimePartitionManager.getInstance()
           .updateAfterFlushing(
-              new DataRegionId(Integer.parseInt(dataRegionId)),
+              new DataRegionId(Integer.parseInt(dataRegionIdString)),
               partitionId,
               System.currentTimeMillis(),
               lastFlushTimeMap.getMemSize(partitionId),
@@ -1213,7 +1228,7 @@
       TimePartitionManager.getInstance()
           .registerTimePartitionInfo(
               new TimePartitionInfo(
-                  new DataRegionId(Integer.parseInt(dataRegionId)),
+                  new DataRegionId(Integer.parseInt(dataRegionIdString)),
                   timePartitionId,
                   true,
                   Long.MAX_VALUE,
@@ -1489,7 +1504,7 @@
       // build new processor, memory control module will control the number of memtables
       TimePartitionManager.getInstance()
           .updateAfterOpeningTsFileProcessor(
-              new DataRegionId(Integer.parseInt(dataRegionId)), timeRangeId);
+              new DataRegionId(Integer.parseInt(dataRegionIdString)), timeRangeId);
       res = newTsFileProcessor(sequence, timeRangeId);
       if (workSequenceTsFileProcessors.get(timeRangeId) == null
           && workUnsequenceTsFileProcessors.get(timeRangeId) == null) {
@@ -1511,7 +1526,7 @@
         TsFileNameGenerator.generateNewTsFilePathWithMkdir(
             sequence,
             databaseName,
-            dataRegionId,
+            dataRegionIdString,
             timePartitionId,
             System.currentTimeMillis(),
             version,
@@ -1525,7 +1540,7 @@
       boolean sequence, String filePath, long timePartitionId) throws IOException {
     TsFileProcessor tsFileProcessor =
         new TsFileProcessor(
-            databaseName + FILE_NAME_SEPARATOR + dataRegionId,
+            databaseName + FILE_NAME_SEPARATOR + dataRegionIdString,
             fsFactory.getFileWithParent(filePath),
             dataRegionInfo,
             this::closeUnsealedTsFileProcessorCallBack,
@@ -1600,15 +1615,15 @@
   public void deleteFolder(String systemDir) {
     logger.info(
         "{} will close all files for deleting data folder {}",
-        databaseName + "-" + dataRegionId,
+        databaseName + "-" + dataRegionIdString,
         systemDir);
     FileTimeIndexCacheRecorder.getInstance()
-        .removeFileTimeIndexCache(Integer.parseInt(dataRegionId));
+        .removeFileTimeIndexCache(Integer.parseInt(dataRegionIdString));
     writeLock("deleteFolder");
     try {
       File dataRegionSystemFolder =
           SystemFileFactory.INSTANCE.getFile(
-              systemDir + File.separator + databaseName, dataRegionId);
+              systemDir + File.separator + databaseName, dataRegionIdString);
       org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
           dataRegionSystemFolder);
     } finally {
@@ -1637,7 +1652,7 @@
   /** delete tsfile */
   public void syncDeleteDataFiles() throws TsFileProcessorException {
     logger.info(
-        "{} will close all files for deleting data files", databaseName + "-" + dataRegionId);
+        "{} will close all files for deleting data files", databaseName + "-" + dataRegionIdString);
     writeLock("syncDeleteDataFiles");
     try {
       forceCloseAllWorkingTsFileProcessors();
@@ -1662,11 +1677,11 @@
       lastFlushTimeMap.clearFlushedTime();
       lastFlushTimeMap.clearGlobalFlushedTime();
       TimePartitionManager.getInstance()
-          .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionId)));
+          .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionIdString)));
     } catch (InterruptedException e) {
       logger.error(
           "CloseFileNodeCondition error occurs while waiting for closing the storage " + "group {}",
-          databaseName + "-" + dataRegionId,
+          databaseName + "-" + dataRegionIdString,
           e);
       Thread.currentThread().interrupt();
     } finally {
@@ -1677,7 +1692,7 @@
   private void deleteAllSGFolders(List<String> folder) {
     for (String tsfilePath : folder) {
       File dataRegionDataFolder =
-          fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId);
+          fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionIdString);
       if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
         try {
           fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
@@ -1707,7 +1722,7 @@
               "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
               tsFileProcessor.getTimeRangeId(),
               databaseName,
-              dataRegionId);
+              dataRegionIdString);
           fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
           count++;
         }
@@ -1733,7 +1748,7 @@
               "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
               tsFileProcessor.getTimeRangeId(),
               databaseName,
-              dataRegionId);
+              dataRegionIdString);
           fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
           count++;
         }
@@ -1756,7 +1771,7 @@
     } catch (InterruptedException | ExecutionException e) {
       logger.error(
           "CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}",
-          databaseName + "-" + dataRegionId,
+          databaseName + "-" + dataRegionIdString,
           e);
       Thread.currentThread().interrupt();
     }
@@ -1790,7 +1805,7 @@
     } catch (InterruptedException | ExecutionException e) {
       logger.error(
           "CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}",
-          databaseName + "-" + dataRegionId,
+          databaseName + "-" + dataRegionIdString,
           e);
       Thread.currentThread().interrupt();
     }
@@ -1810,7 +1825,7 @@
       if (System.currentTimeMillis() - startTime > 60_000) {
         logger.warn(
             "{} has spent {}s to wait for closing all TsFiles.",
-            databaseName + "-" + this.dataRegionId,
+            databaseName + "-" + this.dataRegionIdString,
             (System.currentTimeMillis() - startTime) / 1000);
       }
     }
@@ -1822,7 +1837,8 @@
     List<Future<?>> futures = new ArrayList<>();
     int count = 0;
     try {
-      logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId);
+      logger.info(
+          "async force close all files in database: {}", databaseName + "-" + dataRegionIdString);
       // to avoid concurrent modification problem, we need a new array list
       for (TsFileProcessor tsFileProcessor :
           new ArrayList<>(workSequenceTsFileProcessors.values())) {
@@ -1846,7 +1862,8 @@
   public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
     writeLock("forceCloseAllWorkingTsFileProcessors");
     try {
-      logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId);
+      logger.info(
+          "force close all processors in database: {}", databaseName + "-" + dataRegionIdString);
       // to avoid concurrent modification problem, we need a new array list
       List<TsFileResource> closedTsFileResources = new ArrayList<>();
       for (TsFileProcessor tsFileProcessor :
@@ -2404,7 +2421,7 @@
       PartialPath pathToDelete, long startTime, long endTime, long searchIndex) throws IOException {
     logger.info(
         "{} will delete data files directly for deleting data between {} and {}",
-        databaseName + "-" + dataRegionId,
+        databaseName + "-" + dataRegionIdString,
         startTime,
         endTime);
 
@@ -2774,7 +2791,7 @@
     if (config.isEnableSeparateData()) {
       TimePartitionManager.getInstance()
           .updateAfterFlushing(
-              new DataRegionId(Integer.parseInt(dataRegionId)),
+              new DataRegionId(Integer.parseInt(dataRegionIdString)),
               processor.getTimeRangeId(),
               systemFlushTime,
               lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
@@ -2915,7 +2932,7 @@
       logger.info(
           "[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.",
           databaseName,
-          dataRegionId,
+          dataRegionIdString,
           context.getFullyDirtyFileNum(),
           context.getPartiallyDirtyFileNum());
     } catch (InterruptedException e) {
@@ -3071,6 +3088,17 @@
       final boolean isGeneratedByPipe,
       final boolean isFromConsensus)
       throws LoadFileException {
+    if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensus) {
+      final IoTConsensusServerImpl impl =
+          ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(dataRegionId);
+      if (Objects.nonNull(impl) && !impl.isActive()) {
+        throw new LoadFileException(
+            String.format(
+                "Peer is inactive and not ready to write request, %s, DataNode Id: %s",
+                dataRegionId, IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));
+      }
+    }
+
     final File tsfileToBeInserted = newTsFileResource.getTsFile().getAbsoluteFile();
     final long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
 
@@ -3136,7 +3164,8 @@
               newTsFileResource.getTsFile().getName());
 
       if (config.isEnableSeparateData()) {
-        final DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionId));
+        final DataRegionId dataRegionId =
+            new DataRegionId(Integer.parseInt(this.dataRegionIdString));
         final long timePartitionId = newTsFileResource.getTimePartition();
         initFlushTimeMap(timePartitionId);
         updateDeviceLastFlushTime(newTsFileResource);
@@ -3301,7 +3330,7 @@
     final String fileName =
         databaseName
             + File.separatorChar
-            + dataRegionId
+            + dataRegionIdString
             + File.separatorChar
             + filePartitionId
             + File.separator
@@ -3443,7 +3472,8 @@
     }
 
     // Listen before the tsFile is added into tsFile manager to avoid it being compacted
-    PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, tsFileResource, true);
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToTsFile(dataRegionIdString, tsFileResource, true);
 
     tsFileManager.add(tsFileResource, false);
 
@@ -3554,8 +3584,8 @@
   }
 
   @Override
-  public String getDataRegionId() {
-    return dataRegionId;
+  public String getDataRegionIdString() {
+    return dataRegionIdString;
   }
 
   /**
@@ -3564,14 +3594,15 @@
    * @return data region path, like root.sg1/0
    */
   public String getStorageGroupPath() {
-    return databaseName + File.separator + dataRegionId;
+    return databaseName + File.separator + dataRegionIdString;
   }
 
   public void abortCompaction() {
     tsFileManager.setAllowCompaction(false);
     CompactionScheduleTaskManager.getInstance().unregisterDataRegion(this);
     List<AbstractCompactionTask> runningTasks =
-        CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId);
+        CompactionTaskManager.getInstance()
+            .abortCompaction(databaseName + "-" + dataRegionIdString);
     while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) {
       try {
         TimeUnit.MILLISECONDS.sleep(10);
@@ -3808,7 +3839,7 @@
         .getAllLocalFilesFolders()
         .forEach(
             folder -> {
-              folder = folder + File.separator + databaseName + File.separator + dataRegionId;
+              folder = folder + File.separator + databaseName + File.separator + dataRegionIdString;
               countFolderDiskSize(folder, diskSize);
             });
     return diskSize.get() / 1024 / 1024;
@@ -3931,7 +3962,7 @@
     // identifier should be same with getTsFileProcessor method
     return Optional.of(
         WALManager.getInstance()
-            .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId));
+            .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString));
   }
 
   /** Wait for this data region successfully deleted */
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
index 14c6db5..adcb613 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
@@ -81,5 +81,5 @@
   /** Get database name of this DataRegion */
   String getDatabaseName();
 
-  String getDataRegionId();
+  String getDataRegionIdString();
 }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
index 9e2f53b..20cec7c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
@@ -117,7 +117,7 @@
   }
 
   @Override
-  public String getDataRegionId() {
+  public String getDataRegionIdString() {
     return VIRTUAL_DATA_REGION_ID;
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
index 87a4d0f..eea9556 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
@@ -41,7 +41,7 @@
 
   public RepairTimePartition(DataRegion dataRegion, long timePartitionId, long maxFileTimestamp) {
     this.databaseName = dataRegion.getDatabaseName();
-    this.dataRegionId = dataRegion.getDataRegionId();
+    this.dataRegionId = dataRegion.getDataRegionIdString();
     this.tsFileManager = dataRegion.getTsFileManager();
     this.timePartitionId = timePartitionId;
     this.maxFileTimestamp = maxFileTimestamp;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 2d58015..a508ec4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -273,7 +273,7 @@
       // recordCreateMemtableBlockCost
       costsForMetrics[0] += System.nanoTime() - startTime;
       WritingMetrics.getInstance()
-          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
     }
 
     long[] memIncrements;
@@ -327,7 +327,7 @@
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionIdString(), insertRowNode, tsFileResource);
 
     int pointInserted;
     if (insertRowNode.isAligned()) {
@@ -359,7 +359,7 @@
       // recordCreateMemtableBlockCost
       costsForMetrics[0] += System.nanoTime() - startTime;
       WritingMetrics.getInstance()
-          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
     }
 
     long[] memIncrements;
@@ -426,7 +426,7 @@
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowsNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionIdString(), insertRowsNode, tsFileResource);
 
     int pointInserted = 0;
     for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
@@ -455,7 +455,7 @@
         MemTableManager.getInstance()
             .getAvailableMemTable(
                 dataRegionInfo.getDataRegion().getDatabaseName(),
-                dataRegionInfo.getDataRegion().getDataRegionId());
+                dataRegionInfo.getDataRegion().getDataRegionIdString());
     walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
   }
 
@@ -478,7 +478,7 @@
       createNewWorkingMemTable();
       PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime);
       WritingMetrics.getInstance()
-          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+          .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
     }
 
     long[] memIncrements;
@@ -542,7 +542,9 @@
     }
     PipeInsertionDataNodeListener.getInstance()
         .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+            dataRegionInfo.getDataRegion().getDataRegionIdString(),
+            insertTabletNode,
+            tsFileResource);
 
     int pointInserted;
     try {
@@ -1282,7 +1284,7 @@
     WritingMetrics.getInstance()
         .recordMemTableLiveDuration(System.currentTimeMillis() - getWorkMemTableCreatedTime());
     WritingMetrics.getInstance()
-        .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), -1);
+        .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), -1);
     WritingMetrics.getInstance().recordWALEntryNumForOneTsFile(walEntryNum);
     workMemTable = null;
     return FlushManager.getInstance().registerTsFileProcessor(this);
@@ -1378,7 +1380,7 @@
                   memTableToFlush,
                   writer,
                   storageGroupName,
-                  dataRegionInfo.getDataRegion().getDataRegionId());
+                  dataRegionInfo.getDataRegion().getDataRegionIdString());
           flushTask.syncFlushMemTable();
           memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
         } catch (Throwable e) {
@@ -1562,7 +1564,7 @@
           String.format("%.2f", compressionRatio),
           totalMemTableSize,
           writer.getPos());
-      String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId();
+      String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionIdString();
       WritingMetrics.getInstance()
           .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio);
       CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos());
@@ -1585,7 +1587,8 @@
     // Listen after "endFile" to avoid unnecessary waiting for tsFile close
     // before resource serialization to avoid missing hardlink after restart
     PipeInsertionDataNodeListener.getInstance()
-        .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource, false);
+        .listenToTsFile(
+            dataRegionInfo.getDataRegion().getDataRegionIdString(), tsFileResource, false);
 
     tsFileResource.serialize();
     FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
index 5dae144..34b7aef 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
@@ -108,14 +108,14 @@
         LOGGER.warn(
             "Failed to take snapshot for {}-{}, clean up",
             dataRegion.getDatabaseName(),
-            dataRegion.getDataRegionId());
+            dataRegion.getDataRegionIdString());
         cleanUpWhenFail(finalSnapshotId);
       } else {
         snapshotLogger.logEnd();
         LOGGER.info(
             "Successfully take snapshot for {}-{}, snapshot directory is {}",
             dataRegion.getDatabaseName(),
-            dataRegion.getDataRegionId(),
+            dataRegion.getDataRegionIdString(),
             snapshotDir.getParentFile().getAbsolutePath() + File.separator + finalSnapshotId);
       }
 
@@ -124,7 +124,7 @@
       LOGGER.error(
           "Exception occurs when taking snapshot for {}-{}",
           dataRegion.getDatabaseName(),
-          dataRegion.getDataRegionId(),
+          dataRegion.getDataRegionIdString(),
           e);
       return false;
     } finally {
@@ -147,7 +147,9 @@
       StringBuilder pathBuilder = new StringBuilder(dataDir);
       pathBuilder.append(File.separator).append(IoTDBConstant.SNAPSHOT_FOLDER_NAME);
       pathBuilder.append(File.separator).append(dataRegion.getDatabaseName());
-      pathBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR).append(dataRegion.getDataRegionId());
+      pathBuilder
+          .append(IoTDBConstant.FILE_NAME_SEPARATOR)
+          .append(dataRegion.getDataRegionIdString());
       try {
         String path = pathBuilder.toString();
         if (new File(path).exists()) {
@@ -298,7 +300,7 @@
     stringBuilder.append(File.separator);
     stringBuilder.append(dataRegion.getDatabaseName());
     stringBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR);
-    stringBuilder.append(dataRegion.getDataRegionId());
+    stringBuilder.append(dataRegion.getDataRegionIdString());
     stringBuilder.append(File.separator);
     stringBuilder.append(snapshotId);
     stringBuilder.append(File.separator);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 4a91f4d..1e94188 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -337,7 +337,7 @@
       final long writePointCount,
       final boolean isGeneratedByPipeConsensusLeader) {
     MemTableFlushTask.recordFlushPointsMetricInternal(
-        writePointCount, databaseName, dataRegion.getDataRegionId());
+        writePointCount, databaseName, dataRegion.getDataRegionIdString());
     MetricService.getInstance()
         .count(
             writePointCount,
@@ -348,7 +348,7 @@
             Tag.DATABASE.toString(),
             databaseName,
             Tag.REGION.toString(),
-            dataRegion.getDataRegionId(),
+            dataRegion.getDataRegionIdString(),
             Tag.TYPE.toString(),
             Metric.LOAD_POINT_COUNT.toString());
     // Because we cannot accurately judge who is the leader here,
@@ -359,7 +359,7 @@
             .getReplicationNum(
                 ConsensusGroupId.Factory.create(
                     TConsensusGroupType.DataRegion.getValue(),
-                    Integer.parseInt(dataRegion.getDataRegionId())));
+                    Integer.parseInt(dataRegion.getDataRegionIdString())));
     // It may happen that the replicationNum is 0 when load and db deletion occurs
     // concurrently, so we can just not to count the number of points in this case
     if (replicationNum != 0 && !isGeneratedByPipeConsensusLeader) {
@@ -373,7 +373,7 @@
               Tag.DATABASE.toString(),
               databaseName,
               Tag.REGION.toString(),
-              dataRegion.getDataRegionId(),
+              dataRegion.getDataRegionIdString(),
               Tag.TYPE.toString(),
               Metric.LOAD_POINT_COUNT.toString());
     }
@@ -803,7 +803,7 @@
       return String.join(
           IoTDBConstant.FILE_NAME_SEPARATOR,
           dataRegion.getDatabaseName(),
-          dataRegion.getDataRegionId(),
+          dataRegion.getDataRegionIdString(),
           Long.toString(timePartitionSlot.getStartTime()));
     }
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index fced938..c8ac2c8 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -25,15 +25,23 @@
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -41,6 +49,8 @@
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
 import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
@@ -66,21 +76,51 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 public class DataNodeInternalRPCServiceImplTest {
 
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
+  private static IConsensus instance;
   private static final int dataNodeId = 0;
+  private static final File storageDir = new File("target" + java.io.File.separator + "impl");
+  private static DataRegion dataRegion;
 
   @BeforeClass
-  public static void setUpBeforeClass() throws IOException, MetadataException {
+  public static void setUpBeforeClass() throws IOException, MetadataException, ConsensusException {
     // In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in RatisConsensus
     conf.setDataNodeId(dataNodeId);
 
+    org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir);
     SchemaEngine.getInstance().init();
     SchemaEngine.getInstance()
         .createSchemaRegion(new PartialPath("root.ln"), new SchemaRegionId(0));
+    final DataRegionId id = new DataRegionId(1);
+    dataRegion = new DataRegion("root.ln", "1");
+    instance = DataRegionConsensusImpl.getInstance();
+    DataRegionConsensusImpl.setInstance(
+        ConsensusFactory.getConsensusImpl(
+                ConsensusFactory.IOT_CONSENSUS,
+                ConsensusConfig.newBuilder()
+                    .setThisNodeId(1)
+                    .setThisNode(new TEndPoint("0.0.0.0", 6667))
+                    .setStorageDir(storageDir.getAbsolutePath())
+                    .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                    .build(),
+                gid -> new IoTConsensusDataRegionStateMachine(dataRegion))
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            ConsensusFactory.IOT_CONSENSUS))));
+    if (Objects.isNull(
+        ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(new DataRegionId(1)))) {
+      DataRegionConsensusImpl.getInstance()
+          .createLocalPeer(
+              id, Collections.singletonList(new Peer(id, 1, new TEndPoint("0.0.0.0", 6667))));
+    }
     DataRegionConsensusImpl.getInstance().start();
     SchemaRegionConsensusImpl.getInstance().start();
     DataNodeRegionManager.getInstance().init();
@@ -109,13 +149,15 @@
   public static void tearDownAfterClass() throws IOException, StorageEngineException {
     DataNodeRegionManager.getInstance().clear();
     DataRegionConsensusImpl.getInstance().stop();
+    DataRegionConsensusImpl.setInstance(instance);
     SchemaRegionConsensusImpl.getInstance().stop();
     SchemaEngine.getInstance().clear();
     EnvironmentUtils.cleanEnv();
+    org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir);
   }
 
   @Test
-  public void testCreateTimeseries() throws MetadataException {
+  public void testCreateTimeSeries() throws MetadataException {
     CreateTimeSeriesNode createTimeSeriesNode =
         new CreateTimeSeriesNode(
             new PlanNodeId("0"),
@@ -162,8 +204,19 @@
     Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
+  @Test(expected = LoadFileException.class)
+  public void testRejectLoad4NonActiveImpl() throws LoadFileException {
+    ((IoTConsensus) DataRegionConsensusImpl.getInstance())
+        .getImpl(new DataRegionId(1))
+        .setActive(false);
+    dataRegion.loadNewTsFile(new TsFileResource(), false, false, false);
+    ((IoTConsensus) DataRegionConsensusImpl.getInstance())
+        .getImpl(new DataRegionId(1))
+        .setActive(true);
+  }
+
   @Test
-  public void testCreateAlignedTimeseries() throws MetadataException {
+  public void testCreateAlignedTimeSeries() throws MetadataException {
     CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode =
         new CreateAlignedTimeSeriesNode(
             new PlanNodeId("0"),
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
index a298c7a..4c4ac20 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
@@ -58,8 +58,8 @@
     DataRegion rg1 = PowerMockito.mock(DataRegion.class);
     DataRegion rg2 = PowerMockito.mock(DataRegion.class);
     DataRegionId id2 = new DataRegionId(2);
-    PowerMockito.when(rg1.getDataRegionId()).thenReturn("1");
-    PowerMockito.when(rg2.getDataRegionId()).thenReturn("2");
+    PowerMockito.when(rg1.getDataRegionIdString()).thenReturn("1");
+    PowerMockito.when(rg2.getDataRegionIdString()).thenReturn("2");
     storageEngine.setDataRegion(id1, rg1);
     storageEngine.setDataRegion(id2, rg2);
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 022d9ed3..5082b8f 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1128,7 +1128,7 @@
       }
       for (DataRegion region : regionsToBeDeleted) {
         StorageEngine.getInstance()
-            .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())));
+            .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString())));
       }
       Thread.sleep(500);
 
@@ -1320,7 +1320,7 @@
     }
     for (DataRegion region : regionsToBeDeleted) {
       StorageEngine.getInstance()
-          .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())));
+          .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString())));
     }
     Thread.sleep(500);
 
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
index 92337fe..3108a20 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
@@ -89,7 +89,7 @@
     DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
     Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
     Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
-    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
     Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
 
     TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -136,7 +136,7 @@
     DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
     Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
     Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
-    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
     Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
 
     TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -206,7 +206,7 @@
     DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
     Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
     Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
-    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
     Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
 
     TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -265,7 +265,7 @@
     DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
     Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
     Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
-    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
     Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
 
     TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -313,7 +313,7 @@
     DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
     Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
     Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
-    Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+    Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
     Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
 
     TsFileResource seqResource1 = createEmptyFileAndResource(true);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index 68879cb..7878380 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -214,7 +214,7 @@
                 + "1-1-0-0.tsfile");
     DataRegion region = Mockito.mock(DataRegion.class);
     Mockito.when(region.getDatabaseName()).thenReturn("root.test");
-    Mockito.when(region.getDataRegionId()).thenReturn("0");
+    Mockito.when(region.getDataRegionIdString()).thenReturn("0");
     File snapshotFile =
         new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId");
     Assert.assertEquals(