[To dev/1.3] Fixed the concurrency issue of region migrate and load (#16796) (#16823)
* Fixed the concurrency issue of region migrate and load (#16796)
* rq
* gra
* fix
* fix
* coverage
* fix
* fix
* fix
* cp
* fix
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 453bd9a..7dbc720 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -24,6 +24,7 @@
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.config.ConsensusConfig;
@@ -58,6 +59,11 @@
// do nothing
}
+ @TestOnly
+ public static void setInstance(final IConsensus instance) {
+ DataRegionConsensusImplHolder.INSTANCE = instance;
+ }
+
public static IConsensus getInstance() {
return DataRegionConsensusImplHolder.INSTANCE;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index c430be4..9a26dba 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -93,7 +93,7 @@
logger.error(
"Exception occurs when taking snapshot for {}-{} in {}",
region.getDatabaseName(),
- region.getDataRegionId(),
+ region.getDataRegionIdString(),
snapshotDir,
e);
return false;
@@ -109,7 +109,7 @@
logger.error(
"Exception occurs when taking snapshot for {}-{} in {}",
region.getDatabaseName(),
- region.getDataRegionId(),
+ region.getDataRegionIdString(),
snapshotDir,
e);
return false;
@@ -127,7 +127,7 @@
new SnapshotLoader(
latestSnapshotRootDir.getAbsolutePath(),
region.getDatabaseName(),
- region.getDataRegionId())
+ region.getDataRegionIdString())
.loadSnapshotForStateMachine();
if (newRegion == null) {
logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
@@ -136,7 +136,8 @@
this.region = newRegion;
try {
StorageEngine.getInstance()
- .setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
+ .setDataRegion(
+ new DataRegionId(Integer.parseInt(region.getDataRegionIdString())), region);
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
@@ -185,13 +186,13 @@
return new SnapshotLoader(
latestSnapshotRootDir.getAbsolutePath(),
region.getDatabaseName(),
- region.getDataRegionId())
+ region.getDataRegionIdString())
.getSnapshotFileInfo();
} catch (IOException e) {
logger.error(
"Meets error when getting snapshot files for {}-{}",
region.getDatabaseName(),
- region.getDataRegionId(),
+ region.getDataRegionIdString(),
e);
return null;
}
@@ -272,7 +273,7 @@
+ File.separator
+ region.getDatabaseName()
+ "-"
- + region.getDataRegionId();
+ + region.getDataRegionIdString();
return new File(snapshotDir).getCanonicalFile();
} catch (IOException | NullPointerException e) {
logger.warn("{}: cannot get the canonical file of {} due to {}", this, snapshotDir, e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index c8c89d0..2ee3f5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -751,7 +751,7 @@
if (initQueryDataSourceRetryCount % 10 == 0) {
LOGGER.warn(
"Failed to acquire the read lock of DataRegion-{} for {} times",
- dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionId(),
+ dataRegion == null ? "UNKNOWN" : dataRegion.getDataRegionIdString(),
initQueryDataSourceRetryCount);
}
return UNFINISHED_QUERY_DATA_SOURCE;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index d8ce02a..a4a8d46 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -168,7 +168,7 @@
// We don't need to output the region having ExplainAnalyzeOperator only.
return false;
}
- statistics.setDataRegion(context.getDataRegion().getDataRegionId());
+ statistics.setDataRegion(context.getDataRegion().getDataRegionIdString());
statistics.setIp(CONFIG.getInternalAddress() + ":" + CONFIG.getInternalPort());
statistics.setStartTimeInMS(context.getStartTime());
statistics.setEndTimeInMS(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 65eb21b..e9cd679 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -501,7 +501,7 @@
MemTableFlushTask.recordFlushPointsMetricInternal(
node.getWritePointCount(),
databaseName,
- dataRegion.getDataRegionId());
+ dataRegion.getDataRegionIdString());
MetricService.getInstance()
.count(
@@ -513,7 +513,7 @@
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
MetricService.getInstance()
@@ -526,7 +526,7 @@
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
}));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 69cb7bb..5ddf412 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -518,7 +518,8 @@
}
public void createDataRegionMemoryCostMetrics(DataRegion dataRegion) {
- DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(dataRegion.getDataRegionId()));
+ DataRegionId dataRegionId =
+ new DataRegionId(Integer.parseInt(dataRegion.getDataRegionIdString()));
MetricService.getInstance()
.createAutoGauge(
Metric.DATA_REGION_MEM_COST.toString(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 0419f26..c8439e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -527,7 +527,7 @@
public void syncCloseProcessorsInRegion(List<String> dataRegionIds) {
List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
- if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionId())) {
+ if (dataRegion != null && dataRegionIds.contains(dataRegion.getDataRegionIdString())) {
tasks.add(
cachedThreadPool.submit(
() -> {
@@ -784,7 +784,7 @@
// delete wal
WALManager.getInstance()
.deleteWALNode(
- region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
+ region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionIdString());
// delete snapshot
for (String dataDir : CONFIG.getLocalDataDirs()) {
File regionSnapshotDir =
@@ -803,12 +803,13 @@
WRITING_METRICS.removeDataRegionMemoryCostMetrics(regionId);
WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
- FileMetrics.getInstance().deleteRegion(region.getDatabaseName(), region.getDataRegionId());
+ FileMetrics.getInstance()
+ .deleteRegion(region.getDatabaseName(), region.getDataRegionIdString());
} catch (Exception e) {
LOGGER.error(
"Error occurs when deleting data region {}-{}",
region.getDatabaseName(),
- region.getDataRegionId(),
+ region.getDataRegionIdString(),
e);
} finally {
deletingDataRegionMap.remove(regionId);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eefede2..068990c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -36,8 +36,11 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
@@ -247,7 +250,9 @@
ConcurrentHashMap.newKeySet();
/** data region id. */
- private final String dataRegionId;
+ private final String dataRegionIdString;
+
+ private final DataRegionId dataRegionId;
/** database name. */
private final String databaseName;
@@ -316,20 +321,25 @@
* Construct a database processor.
*
* @param systemDir system dir path
- * @param dataRegionId data region id e.g. 1
+ * @param dataRegionIdString data region id e.g. 1
* @param fileFlushPolicy file flush policy
* @param databaseName database name e.g. root.sg1
*/
public DataRegion(
- String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName)
+ String systemDir,
+ String dataRegionIdString,
+ TsFileFlushPolicy fileFlushPolicy,
+ String databaseName)
throws DataRegionException {
- this.dataRegionId = dataRegionId;
+ this.dataRegionIdString = dataRegionIdString;
+ this.dataRegionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
this.databaseName = databaseName;
this.fileFlushPolicy = fileFlushPolicy;
acquireDirectBufferMemory();
- dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId);
- this.tsFileManager = new TsFileManager(databaseName, dataRegionId, dataRegionSysDir.getPath());
+ dataRegionSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionIdString);
+ this.tsFileManager =
+ new TsFileManager(databaseName, dataRegionIdString, dataRegionSysDir.getPath());
if (dataRegionSysDir.mkdirs()) {
logger.info(
"Database system Directory {} doesn't exist, create it", dataRegionSysDir.getPath());
@@ -345,17 +355,17 @@
logger.debug(
"Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.",
databaseName,
- dataRegionId);
+ dataRegionIdString);
for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
File dataRegionFolder =
- fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
+ fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionIdString);
try {
fsFactory.deleteDirectory(dataRegionFolder.getPath());
} catch (IOException e) {
logger.error(
"Exception occurs when deleting data region folder for {}-{}",
databaseName,
- dataRegionId,
+ dataRegionIdString,
e);
}
if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
@@ -379,10 +389,11 @@
}
@TestOnly
- public DataRegion(String databaseName, String id) {
+ public DataRegion(String databaseName, String dataRegionIdString) {
this.databaseName = databaseName;
- this.dataRegionId = id;
- this.tsFileManager = new TsFileManager(databaseName, id, "");
+ this.dataRegionIdString = dataRegionIdString;
+ this.dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionIdString));
+ this.tsFileManager = new TsFileManager(databaseName, dataRegionIdString, "");
this.partitionMaxFileVersions = new HashMap<>();
partitionMaxFileVersions.put(0L, 0L);
this.metrics = new DataRegionMetrics(this);
@@ -451,7 +462,7 @@
logger.info(
"The TsFiles of data region {}[{}] has recovered {}/{}.",
databaseName,
- dataRegionId,
+ dataRegionIdString,
recoveredFilesNum,
numOfFilesToRecover);
lastLogTime = System.currentTimeMillis();
@@ -460,7 +471,7 @@
logger.info(
"The TsFiles of data region {}[{}] has recovered completely {}/{}.",
databaseName,
- dataRegionId,
+ dataRegionIdString,
numOfFilesToRecover,
numOfFilesToRecover);
}
@@ -572,7 +583,7 @@
if (logFile.exists()) {
try {
FileTimeIndexCacheReader logReader =
- new FileTimeIndexCacheReader(logFile, dataRegionId);
+ new FileTimeIndexCacheReader(logFile, dataRegionIdString);
logReader.read(fileTimeIndexMap);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -607,7 +618,7 @@
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
latestPartitionId,
false,
Long.MAX_VALUE,
@@ -659,11 +670,14 @@
|| config
.getDataRegionConsensusProtocolClass()
.equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
- WALManager.getInstance().applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId);
+ WALManager.getInstance()
+ .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString);
}
- logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId);
+ logger.info(
+ "The data region {}[{}] is created successfully", databaseName, dataRegionIdString);
} else {
- logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId);
+ logger.info(
+ "The data region {}[{}] is recovered successfully", databaseName, dataRegionIdString);
}
}
@@ -717,7 +731,7 @@
private void recoverCompaction() {
CompactionRecoverManager compactionRecoverManager =
- new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId);
+ new CompactionRecoverManager(tsFileManager, databaseName, dataRegionIdString);
compactionRecoverManager.recoverCompaction();
}
@@ -734,7 +748,8 @@
// "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition
Map<String, File> tsFilePartitionPath2File = new HashMap<>();
for (String baseDir : folders) {
- File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
+ File fileFolder =
+ fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionIdString);
if (!fileFolder.exists()) {
continue;
}
@@ -804,7 +819,7 @@
String.format(
"data region %s[%s] is down, because the time of tsfile %s is larger than system current time, "
+ "file time is %d while system current time is %d, please check it.",
- databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime));
+ databaseName, dataRegionIdString, tsFile.getAbsolutePath(), fileTime, currentTime));
}
}
@@ -847,10 +862,10 @@
long timePartitionId = tsFileResource.getTimePartition();
TimePartitionManager.getInstance()
.updateAfterOpeningTsFileProcessor(
- new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId);
+ new DataRegionId(Integer.parseInt(dataRegionIdString)), timePartitionId);
TsFileProcessor tsFileProcessor =
new TsFileProcessor(
- dataRegionId,
+ dataRegionIdString,
dataRegionInfo,
tsFileResource,
this::closeUnsealedTsFileProcessorCallBack,
@@ -951,7 +966,7 @@
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
partitionId,
false,
Long.MAX_VALUE,
@@ -962,7 +977,7 @@
}
TimePartitionManager.getInstance()
.updateAfterFlushing(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
partitionId,
System.currentTimeMillis(),
lastFlushTimeMap.getMemSize(partitionId),
@@ -1003,7 +1018,7 @@
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
partitionId,
false,
Long.MAX_VALUE,
@@ -1016,7 +1031,7 @@
}
TimePartitionManager.getInstance()
.updateAfterFlushing(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
partitionId,
System.currentTimeMillis(),
lastFlushTimeMap.getMemSize(partitionId),
@@ -1213,7 +1228,7 @@
TimePartitionManager.getInstance()
.registerTimePartitionInfo(
new TimePartitionInfo(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
timePartitionId,
true,
Long.MAX_VALUE,
@@ -1489,7 +1504,7 @@
// build new processor, memory control module will control the number of memtables
TimePartitionManager.getInstance()
.updateAfterOpeningTsFileProcessor(
- new DataRegionId(Integer.parseInt(dataRegionId)), timeRangeId);
+ new DataRegionId(Integer.parseInt(dataRegionIdString)), timeRangeId);
res = newTsFileProcessor(sequence, timeRangeId);
if (workSequenceTsFileProcessors.get(timeRangeId) == null
&& workUnsequenceTsFileProcessors.get(timeRangeId) == null) {
@@ -1511,7 +1526,7 @@
TsFileNameGenerator.generateNewTsFilePathWithMkdir(
sequence,
databaseName,
- dataRegionId,
+ dataRegionIdString,
timePartitionId,
System.currentTimeMillis(),
version,
@@ -1525,7 +1540,7 @@
boolean sequence, String filePath, long timePartitionId) throws IOException {
TsFileProcessor tsFileProcessor =
new TsFileProcessor(
- databaseName + FILE_NAME_SEPARATOR + dataRegionId,
+ databaseName + FILE_NAME_SEPARATOR + dataRegionIdString,
fsFactory.getFileWithParent(filePath),
dataRegionInfo,
this::closeUnsealedTsFileProcessorCallBack,
@@ -1600,15 +1615,15 @@
public void deleteFolder(String systemDir) {
logger.info(
"{} will close all files for deleting data folder {}",
- databaseName + "-" + dataRegionId,
+ databaseName + "-" + dataRegionIdString,
systemDir);
FileTimeIndexCacheRecorder.getInstance()
- .removeFileTimeIndexCache(Integer.parseInt(dataRegionId));
+ .removeFileTimeIndexCache(Integer.parseInt(dataRegionIdString));
writeLock("deleteFolder");
try {
File dataRegionSystemFolder =
SystemFileFactory.INSTANCE.getFile(
- systemDir + File.separator + databaseName, dataRegionId);
+ systemDir + File.separator + databaseName, dataRegionIdString);
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
dataRegionSystemFolder);
} finally {
@@ -1637,7 +1652,7 @@
/** delete tsfile */
public void syncDeleteDataFiles() throws TsFileProcessorException {
logger.info(
- "{} will close all files for deleting data files", databaseName + "-" + dataRegionId);
+ "{} will close all files for deleting data files", databaseName + "-" + dataRegionIdString);
writeLock("syncDeleteDataFiles");
try {
forceCloseAllWorkingTsFileProcessors();
@@ -1662,11 +1677,11 @@
lastFlushTimeMap.clearFlushedTime();
lastFlushTimeMap.clearGlobalFlushedTime();
TimePartitionManager.getInstance()
- .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionId)));
+ .removeTimePartitionInfo(new DataRegionId(Integer.parseInt(dataRegionIdString)));
} catch (InterruptedException e) {
logger.error(
"CloseFileNodeCondition error occurs while waiting for closing the storage " + "group {}",
- databaseName + "-" + dataRegionId,
+ databaseName + "-" + dataRegionIdString,
e);
Thread.currentThread().interrupt();
} finally {
@@ -1677,7 +1692,7 @@
private void deleteAllSGFolders(List<String> folder) {
for (String tsfilePath : folder) {
File dataRegionDataFolder =
- fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId);
+ fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionIdString);
if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
try {
fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
@@ -1707,7 +1722,7 @@
"Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
databaseName,
- dataRegionId);
+ dataRegionIdString);
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
count++;
}
@@ -1733,7 +1748,7 @@
"Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]",
tsFileProcessor.getTimeRangeId(),
databaseName,
- dataRegionId);
+ dataRegionIdString);
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
count++;
}
@@ -1756,7 +1771,7 @@
} catch (InterruptedException | ExecutionException e) {
logger.error(
"CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}",
- databaseName + "-" + dataRegionId,
+ databaseName + "-" + dataRegionIdString,
e);
Thread.currentThread().interrupt();
}
@@ -1790,7 +1805,7 @@
} catch (InterruptedException | ExecutionException e) {
logger.error(
"CloseFileNodeCondition error occurs while waiting for closing tsfile processors of {}",
- databaseName + "-" + dataRegionId,
+ databaseName + "-" + dataRegionIdString,
e);
Thread.currentThread().interrupt();
}
@@ -1810,7 +1825,7 @@
if (System.currentTimeMillis() - startTime > 60_000) {
logger.warn(
"{} has spent {}s to wait for closing all TsFiles.",
- databaseName + "-" + this.dataRegionId,
+ databaseName + "-" + this.dataRegionIdString,
(System.currentTimeMillis() - startTime) / 1000);
}
}
@@ -1822,7 +1837,8 @@
List<Future<?>> futures = new ArrayList<>();
int count = 0;
try {
- logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId);
+ logger.info(
+ "async force close all files in database: {}", databaseName + "-" + dataRegionIdString);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor :
new ArrayList<>(workSequenceTsFileProcessors.values())) {
@@ -1846,7 +1862,8 @@
public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException {
writeLock("forceCloseAllWorkingTsFileProcessors");
try {
- logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId);
+ logger.info(
+ "force close all processors in database: {}", databaseName + "-" + dataRegionIdString);
// to avoid concurrent modification problem, we need a new array list
List<TsFileResource> closedTsFileResources = new ArrayList<>();
for (TsFileProcessor tsFileProcessor :
@@ -2404,7 +2421,7 @@
PartialPath pathToDelete, long startTime, long endTime, long searchIndex) throws IOException {
logger.info(
"{} will delete data files directly for deleting data between {} and {}",
- databaseName + "-" + dataRegionId,
+ databaseName + "-" + dataRegionIdString,
startTime,
endTime);
@@ -2774,7 +2791,7 @@
if (config.isEnableSeparateData()) {
TimePartitionManager.getInstance()
.updateAfterFlushing(
- new DataRegionId(Integer.parseInt(dataRegionId)),
+ new DataRegionId(Integer.parseInt(dataRegionIdString)),
processor.getTimeRangeId(),
systemFlushTime,
lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
@@ -2915,7 +2932,7 @@
logger.info(
"[TTL] {}-{} Totally select {} all-outdated files and {} partial-outdated files.",
databaseName,
- dataRegionId,
+ dataRegionIdString,
context.getFullyDirtyFileNum(),
context.getPartiallyDirtyFileNum());
} catch (InterruptedException e) {
@@ -3071,6 +3088,17 @@
final boolean isGeneratedByPipe,
final boolean isFromConsensus)
throws LoadFileException {
+ if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensus) {
+ final IoTConsensusServerImpl impl =
+ ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(dataRegionId);
+ if (Objects.nonNull(impl) && !impl.isActive()) {
+ throw new LoadFileException(
+ String.format(
+ "Peer is inactive and not ready to write request, %s, DataNode Id: %s",
+ dataRegionId, IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));
+ }
+ }
+
final File tsfileToBeInserted = newTsFileResource.getTsFile().getAbsoluteFile();
final long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
@@ -3136,7 +3164,8 @@
newTsFileResource.getTsFile().getName());
if (config.isEnableSeparateData()) {
- final DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionId));
+ final DataRegionId dataRegionId =
+ new DataRegionId(Integer.parseInt(this.dataRegionIdString));
final long timePartitionId = newTsFileResource.getTimePartition();
initFlushTimeMap(timePartitionId);
updateDeviceLastFlushTime(newTsFileResource);
@@ -3301,7 +3330,7 @@
final String fileName =
databaseName
+ File.separatorChar
- + dataRegionId
+ + dataRegionIdString
+ File.separatorChar
+ filePartitionId
+ File.separator
@@ -3443,7 +3472,8 @@
}
// Listen before the tsFile is added into tsFile manager to avoid it being compacted
- PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, tsFileResource, true);
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToTsFile(dataRegionIdString, tsFileResource, true);
tsFileManager.add(tsFileResource, false);
@@ -3554,8 +3584,8 @@
}
@Override
- public String getDataRegionId() {
- return dataRegionId;
+ public String getDataRegionIdString() {
+ return dataRegionIdString;
}
/**
@@ -3564,14 +3594,15 @@
* @return data region path, like root.sg1/0
*/
public String getStorageGroupPath() {
- return databaseName + File.separator + dataRegionId;
+ return databaseName + File.separator + dataRegionIdString;
}
public void abortCompaction() {
tsFileManager.setAllowCompaction(false);
CompactionScheduleTaskManager.getInstance().unregisterDataRegion(this);
List<AbstractCompactionTask> runningTasks =
- CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId);
+ CompactionTaskManager.getInstance()
+ .abortCompaction(databaseName + "-" + dataRegionIdString);
while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) {
try {
TimeUnit.MILLISECONDS.sleep(10);
@@ -3808,7 +3839,7 @@
.getAllLocalFilesFolders()
.forEach(
folder -> {
- folder = folder + File.separator + databaseName + File.separator + dataRegionId;
+ folder = folder + File.separator + databaseName + File.separator + dataRegionIdString;
countFolderDiskSize(folder, diskSize);
});
return diskSize.get() / 1024 / 1024;
@@ -3931,7 +3962,7 @@
// identifier should be same with getTsFileProcessor method
return Optional.of(
WALManager.getInstance()
- .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId));
+ .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionIdString));
}
/** Wait for this data region successfully deleted */
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
index 14c6db5..adcb613 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/IDataRegionForQuery.java
@@ -81,5 +81,5 @@
/** Get database name of this DataRegion */
String getDatabaseName();
- String getDataRegionId();
+ String getDataRegionIdString();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
index 9e2f53b..20cec7c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/VirtualDataRegion.java
@@ -117,7 +117,7 @@
}
@Override
- public String getDataRegionId() {
+ public String getDataRegionIdString() {
return VIRTUAL_DATA_REGION_ID;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
index 87a4d0f..eea9556 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartition.java
@@ -41,7 +41,7 @@
public RepairTimePartition(DataRegion dataRegion, long timePartitionId, long maxFileTimestamp) {
this.databaseName = dataRegion.getDatabaseName();
- this.dataRegionId = dataRegion.getDataRegionId();
+ this.dataRegionId = dataRegion.getDataRegionIdString();
this.tsFileManager = dataRegion.getTsFileManager();
this.timePartitionId = timePartitionId;
this.maxFileTimestamp = maxFileTimestamp;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 2d58015..a508ec4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -273,7 +273,7 @@
// recordCreateMemtableBlockCost
costsForMetrics[0] += System.nanoTime() - startTime;
WritingMetrics.getInstance()
- .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+ .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
}
long[] memIncrements;
@@ -327,7 +327,7 @@
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource);
+ dataRegionInfo.getDataRegion().getDataRegionIdString(), insertRowNode, tsFileResource);
int pointInserted;
if (insertRowNode.isAligned()) {
@@ -359,7 +359,7 @@
// recordCreateMemtableBlockCost
costsForMetrics[0] += System.nanoTime() - startTime;
WritingMetrics.getInstance()
- .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+ .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
}
long[] memIncrements;
@@ -426,7 +426,7 @@
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertRowsNode, tsFileResource);
+ dataRegionInfo.getDataRegion().getDataRegionIdString(), insertRowsNode, tsFileResource);
int pointInserted = 0;
for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
@@ -455,7 +455,7 @@
MemTableManager.getInstance()
.getAvailableMemTable(
dataRegionInfo.getDataRegion().getDatabaseName(),
- dataRegionInfo.getDataRegion().getDataRegionId());
+ dataRegionInfo.getDataRegion().getDataRegionIdString());
walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
}
@@ -478,7 +478,7 @@
createNewWorkingMemTable();
PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime);
WritingMetrics.getInstance()
- .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1);
+ .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), 1);
}
long[] memIncrements;
@@ -542,7 +542,9 @@
}
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
- dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource);
+ dataRegionInfo.getDataRegion().getDataRegionIdString(),
+ insertTabletNode,
+ tsFileResource);
int pointInserted;
try {
@@ -1282,7 +1284,7 @@
WritingMetrics.getInstance()
.recordMemTableLiveDuration(System.currentTimeMillis() - getWorkMemTableCreatedTime());
WritingMetrics.getInstance()
- .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), -1);
+ .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionIdString(), -1);
WritingMetrics.getInstance().recordWALEntryNumForOneTsFile(walEntryNum);
workMemTable = null;
return FlushManager.getInstance().registerTsFileProcessor(this);
@@ -1378,7 +1380,7 @@
memTableToFlush,
writer,
storageGroupName,
- dataRegionInfo.getDataRegion().getDataRegionId());
+ dataRegionInfo.getDataRegion().getDataRegionIdString());
flushTask.syncFlushMemTable();
memTableFlushPointCount = memTableToFlush.getTotalPointsNum();
} catch (Throwable e) {
@@ -1562,7 +1564,7 @@
String.format("%.2f", compressionRatio),
totalMemTableSize,
writer.getPos());
- String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId();
+ String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionIdString();
WritingMetrics.getInstance()
.recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio);
CompressionRatio.getInstance().updateRatio(totalMemTableSize, writer.getPos());
@@ -1585,7 +1587,8 @@
// Listen after "endFile" to avoid unnecessary waiting for tsFile close
// before resource serialization to avoid missing hardlink after restart
PipeInsertionDataNodeListener.getInstance()
- .listenToTsFile(dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource, false);
+ .listenToTsFile(
+ dataRegionInfo.getDataRegion().getDataRegionIdString(), tsFileResource, false);
tsFileResource.serialize();
FileTimeIndexCacheRecorder.getInstance().logFileTimeIndex(tsFileResource);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
index 5dae144..34b7aef 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java
@@ -108,14 +108,14 @@
LOGGER.warn(
"Failed to take snapshot for {}-{}, clean up",
dataRegion.getDatabaseName(),
- dataRegion.getDataRegionId());
+ dataRegion.getDataRegionIdString());
cleanUpWhenFail(finalSnapshotId);
} else {
snapshotLogger.logEnd();
LOGGER.info(
"Successfully take snapshot for {}-{}, snapshot directory is {}",
dataRegion.getDatabaseName(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
snapshotDir.getParentFile().getAbsolutePath() + File.separator + finalSnapshotId);
}
@@ -124,7 +124,7 @@
LOGGER.error(
"Exception occurs when taking snapshot for {}-{}",
dataRegion.getDatabaseName(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
e);
return false;
} finally {
@@ -147,7 +147,9 @@
StringBuilder pathBuilder = new StringBuilder(dataDir);
pathBuilder.append(File.separator).append(IoTDBConstant.SNAPSHOT_FOLDER_NAME);
pathBuilder.append(File.separator).append(dataRegion.getDatabaseName());
- pathBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR).append(dataRegion.getDataRegionId());
+ pathBuilder
+ .append(IoTDBConstant.FILE_NAME_SEPARATOR)
+ .append(dataRegion.getDataRegionIdString());
try {
String path = pathBuilder.toString();
if (new File(path).exists()) {
@@ -298,7 +300,7 @@
stringBuilder.append(File.separator);
stringBuilder.append(dataRegion.getDatabaseName());
stringBuilder.append(IoTDBConstant.FILE_NAME_SEPARATOR);
- stringBuilder.append(dataRegion.getDataRegionId());
+ stringBuilder.append(dataRegion.getDataRegionIdString());
stringBuilder.append(File.separator);
stringBuilder.append(snapshotId);
stringBuilder.append(File.separator);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 4a91f4d..1e94188 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -337,7 +337,7 @@
final long writePointCount,
final boolean isGeneratedByPipeConsensusLeader) {
MemTableFlushTask.recordFlushPointsMetricInternal(
- writePointCount, databaseName, dataRegion.getDataRegionId());
+ writePointCount, databaseName, dataRegion.getDataRegionIdString());
MetricService.getInstance()
.count(
writePointCount,
@@ -348,7 +348,7 @@
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
// Because we cannot accurately judge who is the leader here,
@@ -359,7 +359,7 @@
.getReplicationNum(
ConsensusGroupId.Factory.create(
TConsensusGroupType.DataRegion.getValue(),
- Integer.parseInt(dataRegion.getDataRegionId())));
+ Integer.parseInt(dataRegion.getDataRegionIdString())));
// It may happen that the replicationNum is 0 when load and db deletion occurs
// concurrently, so we can just not to count the number of points in this case
if (replicationNum != 0 && !isGeneratedByPipeConsensusLeader) {
@@ -373,7 +373,7 @@
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
}
@@ -803,7 +803,7 @@
return String.join(
IoTDBConstant.FILE_NAME_SEPARATOR,
dataRegion.getDatabaseName(),
- dataRegion.getDataRegionId(),
+ dataRegion.getDataRegionIdString(),
Long.toString(timePartitionSlot.getStartTime()));
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index fced938..c8ac2c8 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -25,15 +25,23 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
+import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -41,6 +49,8 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
@@ -66,21 +76,51 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class DataNodeInternalRPCServiceImplTest {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
+ private static IConsensus instance;
private static final int dataNodeId = 0;
+ private static final File storageDir = new File("target" + java.io.File.separator + "impl");
+ private static DataRegion dataRegion;
@BeforeClass
- public static void setUpBeforeClass() throws IOException, MetadataException {
+ public static void setUpBeforeClass() throws IOException, MetadataException, ConsensusException {
// In standalone mode, we need to set dataNodeId to 0 for RaftPeerId in RatisConsensus
conf.setDataNodeId(dataNodeId);
+ org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir);
SchemaEngine.getInstance().init();
SchemaEngine.getInstance()
.createSchemaRegion(new PartialPath("root.ln"), new SchemaRegionId(0));
+ final DataRegionId id = new DataRegionId(1);
+ dataRegion = new DataRegion("root.ln", "1");
+ instance = DataRegionConsensusImpl.getInstance();
+ DataRegionConsensusImpl.setInstance(
+ ConsensusFactory.getConsensusImpl(
+ ConsensusFactory.IOT_CONSENSUS,
+ ConsensusConfig.newBuilder()
+ .setThisNodeId(1)
+ .setThisNode(new TEndPoint("0.0.0.0", 6667))
+ .setStorageDir(storageDir.getAbsolutePath())
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .build(),
+ gid -> new IoTConsensusDataRegionStateMachine(dataRegion))
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ ConsensusFactory.IOT_CONSENSUS))));
+ if (Objects.isNull(
+ ((IoTConsensus) DataRegionConsensusImpl.getInstance()).getImpl(new DataRegionId(1)))) {
+ DataRegionConsensusImpl.getInstance()
+ .createLocalPeer(
+ id, Collections.singletonList(new Peer(id, 1, new TEndPoint("0.0.0.0", 6667))));
+ }
DataRegionConsensusImpl.getInstance().start();
SchemaRegionConsensusImpl.getInstance().start();
DataNodeRegionManager.getInstance().init();
@@ -109,13 +149,15 @@
public static void tearDownAfterClass() throws IOException, StorageEngineException {
DataNodeRegionManager.getInstance().clear();
DataRegionConsensusImpl.getInstance().stop();
+ DataRegionConsensusImpl.setInstance(instance);
SchemaRegionConsensusImpl.getInstance().stop();
SchemaEngine.getInstance().clear();
EnvironmentUtils.cleanEnv();
+ org.apache.iotdb.commons.utils.FileUtils.deleteFileOrDirectory(storageDir);
}
@Test
- public void testCreateTimeseries() throws MetadataException {
+ public void testCreateTimeSeries() throws MetadataException {
CreateTimeSeriesNode createTimeSeriesNode =
new CreateTimeSeriesNode(
new PlanNodeId("0"),
@@ -162,8 +204,19 @@
Assert.assertTrue(response.getResponses().get(0).accepted);
}
+ @Test(expected = LoadFileException.class)
+ public void testRejectLoad4NonActiveImpl() throws LoadFileException {
+ ((IoTConsensus) DataRegionConsensusImpl.getInstance())
+ .getImpl(new DataRegionId(1))
+ .setActive(false);
+ dataRegion.loadNewTsFile(new TsFileResource(), false, false, false);
+ ((IoTConsensus) DataRegionConsensusImpl.getInstance())
+ .getImpl(new DataRegionId(1))
+ .setActive(true);
+ }
+
@Test
- public void testCreateAlignedTimeseries() throws MetadataException {
+ public void testCreateAlignedTimeSeries() throws MetadataException {
CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode =
new CreateAlignedTimeSeriesNode(
new PlanNodeId("0"),
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
index a298c7a..4c4ac20 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/StorageEngineTest.java
@@ -58,8 +58,8 @@
DataRegion rg1 = PowerMockito.mock(DataRegion.class);
DataRegion rg2 = PowerMockito.mock(DataRegion.class);
DataRegionId id2 = new DataRegionId(2);
- PowerMockito.when(rg1.getDataRegionId()).thenReturn("1");
- PowerMockito.when(rg2.getDataRegionId()).thenReturn("2");
+ PowerMockito.when(rg1.getDataRegionIdString()).thenReturn("1");
+ PowerMockito.when(rg2.getDataRegionIdString()).thenReturn("2");
storageEngine.setDataRegion(id1, rg1);
storageEngine.setDataRegion(id2, rg2);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 022d9ed3..5082b8f 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -1128,7 +1128,7 @@
}
for (DataRegion region : regionsToBeDeleted) {
StorageEngine.getInstance()
- .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())));
+ .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString())));
}
Thread.sleep(500);
@@ -1320,7 +1320,7 @@
}
for (DataRegion region : regionsToBeDeleted) {
StorageEngine.getInstance()
- .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())));
+ .deleteDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionIdString())));
}
Thread.sleep(500);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
index 92337fe..3108a20 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileSchedulerTest.java
@@ -89,7 +89,7 @@
DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
- Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -136,7 +136,7 @@
DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
- Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -206,7 +206,7 @@
DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
- Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -265,7 +265,7 @@
DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
- Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
TsFileResource seqResource1 = createEmptyFileAndResource(true);
@@ -313,7 +313,7 @@
DataRegion mockDataRegion = Mockito.mock(DataRegion.class);
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(tsFileManager);
Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("root.testsg");
- Mockito.when(mockDataRegion.getDataRegionId()).thenReturn("0");
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
Mockito.when(mockDataRegion.getTimePartitions()).thenReturn(Collections.singletonList(0L));
TsFileResource seqResource1 = createEmptyFileAndResource(true);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
index 68879cb..7878380 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java
@@ -214,7 +214,7 @@
+ "1-1-0-0.tsfile");
DataRegion region = Mockito.mock(DataRegion.class);
Mockito.when(region.getDatabaseName()).thenReturn("root.test");
- Mockito.when(region.getDataRegionId()).thenReturn("0");
+ Mockito.when(region.getDataRegionIdString()).thenReturn("0");
File snapshotFile =
new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId");
Assert.assertEquals(