Merge pull request #2483 from neuyilan/apache_master_0113_fix_node_tools
Add authentication in the node tool and modify the node tools doc
diff --git a/cluster/pom.xml b/cluster/pom.xml
index 5529690..251c310 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -20,12 +20,13 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>iotdb-parent</artifactId>
- <groupId>org.apache.iotdb</groupId>
- <version>0.12.0-SNAPSHOT</version>
- </parent>
<modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-parent</artifactId>
+ <version>0.12.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
<artifactId>cluster</artifactId>
<name>cluster</name>
<properties>
@@ -168,6 +169,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
<executions>
<!-- Package binaries-->
<execution>
diff --git a/cluster/src/assembly/cluster.xml b/cluster/src/assembly/cluster.xml
index 5c0c581..9fa6af5 100644
--- a/cluster/src/assembly/cluster.xml
+++ b/cluster/src/assembly/cluster.xml
@@ -17,11 +17,11 @@
<fileSets>
<fileSet>
<directory>src/assembly/resources</directory>
- <outputDirectory>/</outputDirectory>
+ <outputDirectory>${file.separator}</outputDirectory>
</fileSet>
<fileSet>
<directory>${maven.multiModuleProjectDirectory}/server/src/assembly/resources</directory>
- <outputDirectory>/</outputDirectory>
+ <outputDirectory>${file.separator}</outputDirectory>
</fileSet>
</fileSets>
</assembly>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 017b3aa..2469f5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -262,7 +262,6 @@
targetResource, writer, modificationCache);
}
writer.endChunkGroup();
- writer.writeVersion(maxVersion);
} else {
long maxVersion = Long.MIN_VALUE;
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap
@@ -291,7 +290,6 @@
}
}
writer.endChunkGroup();
- writer.writeVersion(maxVersion);
}
if (compactionLogger != null) {
compactionLogger.logDevice(device, writer.getPos());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 7168a0d..916d2d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -68,8 +68,8 @@
this.storageGroup = storageGroup;
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
- LOGGER.debug("flush task of Storage group {} memtable {} is created ",
- storageGroup, memTable.getVersion());
+ LOGGER.debug("flush task of Storage group {} memtable is created, flushing to file {}.",
+ storageGroup, writer.getFile().getName());
}
/**
@@ -103,8 +103,8 @@
noMoreEncodingTask = true;
LOGGER.debug(
- "Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
- storageGroup, memTable.getVersion(), sortTime);
+ "Storage group {} memtable flushing into file {}: data sort time cost {} ms.",
+ storageGroup, writer.getFile().getName(), sortTime);
try {
encodingTaskFuture.get();
@@ -118,7 +118,6 @@
ioTaskFuture.get();
try {
- writer.writeVersion(memTable.getVersion());
writer.writePlanIndices();
} catch (IOException e) {
throw new ExecutionException(e);
@@ -172,8 +171,8 @@
public void run() {
long memSerializeTime = 0;
boolean noMoreMessages = false;
- LOGGER.debug("Storage group {} memtable {}, starts to encoding data.", storageGroup,
- memTable.getVersion());
+ LOGGER.debug("Storage group {} memtable flushing to file {} starts to encoding data.",
+ storageGroup, writer.getFile().getName());
while (true) {
if (noMoreEncodingTask) {
noMoreMessages = true;
@@ -186,8 +185,8 @@
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- LOGGER.error("Storage group {} memtable {}, encoding task is interrupted.",
- storageGroup, memTable.getVersion(), e);
+ LOGGER.error("Storage group {} memtable flushing to file {}, encoding task is interrupted.",
+ storageGroup, writer.getFile().getName(), e);
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
@@ -205,9 +204,9 @@
}
}
noMoreIOTask = true;
- LOGGER.debug("Storage group {}, flushing memtable {} into disk: Encoding data cost "
+ LOGGER.debug("Storage group {}, flushing memtable into file {}: Encoding data cost "
+ "{} ms.",
- storageGroup, memTable.getVersion(), memSerializeTime);
+ storageGroup, writer.getFile().getName(), memSerializeTime);
}
};
@@ -215,7 +214,8 @@
private Runnable ioTask = () -> {
long ioTime = 0;
boolean returnWhenNoTask = false;
- LOGGER.debug("Storage group {} memtable {}, start io.", storageGroup, memTable.getVersion());
+ LOGGER.debug("Storage group {} memtable flushing to file {} start io.",
+ storageGroup, writer.getFile().getName());
while (true) {
if (noMoreIOTask) {
returnWhenNoTask = true;
@@ -228,8 +228,8 @@
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (@SuppressWarnings("squid:S2142") InterruptedException e) {
- LOGGER.error("Storage group {} memtable {}, io task is interrupted.", storageGroup
- , memTable.getVersion());
+ LOGGER.error("Storage group {} memtable flushing to file {}, io task is interrupted.",
+ storageGroup, writer.getFile().getName());
// generally it is because the thread pool is shutdown so the task should be aborted
break;
}
@@ -247,15 +247,15 @@
this.writer.endChunkGroup();
}
} catch (IOException e) {
- LOGGER.error("Storage group {} memtable {}, io task meets error.", storageGroup,
- memTable.getVersion(), e);
+ LOGGER.error("Storage group {} memtable flushing to file {}, io task meets error.",
+ storageGroup, writer.getFile().getName(), e);
throw new FlushRunTimeException(e);
}
ioTime += System.currentTimeMillis() - starTime;
}
}
- LOGGER.debug("flushing a memtable {} in storage group {}, io cost {}ms", memTable.getVersion(),
- storageGroup, ioTime);
+ LOGGER.debug("flushing a memtable to file {} in storage group {}, io cost {}ms",
+ writer.getFile().getName(), storageGroup, ioTime);
};
static class EndChunkGroupIoTask {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index e97f5c0..7e31b28 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -19,15 +19,12 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.modification.Deletion;
-import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -50,8 +47,6 @@
* The initial value is true because we want calculate the text data size when recover memTable!!
*/
protected boolean disableMemControl = true;
- private long version = Long.MAX_VALUE;
- private List<Modification> modifications = new ArrayList<>();
private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig()
.getAvgSeriesPointNumberThreshold();
/**
@@ -223,7 +218,6 @@
@Override
public void clear() {
memTableMap.clear();
- modifications.clear();
memSize = 0;
seriesNumber = 0;
totalPointsNum = 0;
@@ -239,37 +233,18 @@
@Override
public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
- TSEncoding encoding, Map<String, String> props, long timeLowerBound)
+ TSEncoding encoding, Map<String, String> props, long timeLowerBound, List<TimeRange> deletionList)
throws IOException, QueryProcessException, MetadataException {
if (!checkPath(deviceId, measurement)) {
return null;
}
- List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound);
-
IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement);
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList chunkCopy = memChunk.getSortedTVListForQuery();
int curSize = chunkCopy.size();
- return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion(),
- curSize, deletionList);
- }
-
- private List<TimeRange> constructDeletionList(String deviceId, String measurement,
- long timeLowerBound) throws MetadataException {
- List<TimeRange> deletionList = new ArrayList<>();
- deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
- for (Modification modification : modifications) {
- if (modification instanceof Deletion) {
- Deletion deletion = (Deletion) modification;
- if (deletion.getPath().matchFullPath(new PartialPath(deviceId, measurement))
- && deletion.getEndTime() > timeLowerBound) {
- long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound);
- deletionList.add(new TimeRange(lowerBound, deletion.getEndTime()));
- }
- }
- }
- return TimeRange.sortAndMerge(deletionList);
+ return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props,
+ curSize, deletionList);
}
@Override
@@ -296,19 +271,6 @@
}
@Override
- public void delete(Deletion deletion) {
- this.modifications.add(deletion);
- }
-
- public long getVersion() {
- return version;
- }
-
- public void setVersion(long version) {
- this.version = version;
- }
-
- @Override
public void addTVListRamCost(long cost) {
this.tvListRamCost += cost;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 81435c9..ad45279 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.engine.memtable;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
-import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -30,6 +30,7 @@
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
/**
@@ -88,7 +89,7 @@
throws WriteProcessException;
ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType,
- TSEncoding encoding, Map<String, String> props, long timeLowerBound)
+ TSEncoding encoding, Map<String, String> props, long timeLowerBound, List<TimeRange> deletionList)
throws IOException, QueryProcessException, MetadataException;
/**
@@ -110,14 +111,6 @@
void delete(PartialPath path, PartialPath devicePath, long startTimestamp, long endTimestamp);
/**
- * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
- * deviceId.measurementId. Only called for flushing MemTable.
- *
- * @param deletion and object representing this deletion
- */
- void delete(Deletion deletion);
-
- /**
* Make a copy of this MemTable.
*
* @return a MemTable with the same data as this one.
@@ -126,10 +119,6 @@
boolean isSignalMemTable();
- long getVersion();
-
- void setVersion(long version);
-
void release();
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index cbddabb..299f46a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -54,14 +54,6 @@
}
@Override
- public int hashCode() {return (int) getVersion();}
-
- @Override
- public boolean equals(Object obj) {
- return this == obj;
- }
-
- @Override
public String toString() {
return "PrimitiveMemTable{planIndex=[" + getMinPlanIndex() +"," + getMaxPlanIndex() + "]}";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 673ac34..a21e1a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -239,14 +239,11 @@
TsFileSequenceReader reader, TsFileIOWriter fileWriter)
throws IOException {
fileWriter.startChunkGroup(device);
- long maxVersion = 0;
for (ChunkMetadata chunkMetaData : chunkMetadataList) {
Chunk chunk = reader.readMemChunk(chunkMetaData);
fileWriter.writeChunk(chunk, chunkMetaData);
- maxVersion = Math.max(chunkMetaData.getVersion(), maxVersion);
context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
}
- fileWriter.writeVersion(maxVersion);
fileWriter.endChunkGroup();
}
@@ -275,7 +272,7 @@
}
fileWriter.startChunkGroup(path.getDevice());
- long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
+ writeUnmergedChunks(chunkStartTimes, chunkMetadataList,
resource.getFileReader(seqFile), fileWriter);
if (Thread.interrupted()) {
@@ -283,7 +280,6 @@
return;
}
- fileWriter.writeVersion(maxVersion + 1);
fileWriter.endChunkGroup();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 82ec837..53098b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -217,7 +217,6 @@
boolean dataWritten = mergeChunks(seqChunkMeta, isLastFile, fileSequenceReader, unseqReaders,
mergeFileWriter, currTsFile);
if (dataWritten) {
- mergeFileWriter.writeVersion(0L);
mergeFileWriter.endChunkGroup();
mergeLogger.logFilePosition(mergeFileWriter.getFile());
currTsFile.updateStartTime(deviceId, currDeviceMinTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
index acddc41..c364b2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -38,8 +38,8 @@
* @param endTime end time of delete interval
* @param path time series path
*/
- public Deletion(PartialPath path, long versionNum, long endTime) {
- super(Type.DELETION, path, versionNum);
+ public Deletion(PartialPath path, long fileOffset, long endTime) {
+ super(Type.DELETION, path, fileOffset);
this.startTime = Long.MIN_VALUE;
this.endTime = endTime;
}
@@ -50,8 +50,8 @@
* @param endTime end time of delete interval
* @param path time series path
*/
- public Deletion(PartialPath path, long versionNum, long startTime, long endTime) {
- super(Type.DELETION, path, versionNum);
+ public Deletion(PartialPath path, long fileOffset, long startTime, long endTime) {
+ super(Type.DELETION, path, fileOffset);
this.startTime = startTime;
this.endTime = endTime;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
index 3e3f46e..33e878c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
@@ -29,12 +29,12 @@
protected Type type;
protected PartialPath path;
- protected long versionNum;
+ protected long fileOffset;
- Modification(Type type, PartialPath path, long versionNum) {
+ Modification(Type type, PartialPath path, long fileOffset) {
this.type = type;
this.path = path;
- this.versionNum = versionNum;
+ this.fileOffset = fileOffset;
}
public String getPathString() {
@@ -57,12 +57,12 @@
this.path = path;
}
- public long getVersionNum() {
- return versionNum;
+ public long getFileOffset() {
+ return fileOffset;
}
- public void setVersionNum(long versionNum) {
- this.versionNum = versionNum;
+ public void setFileOffset(long fileOffset) {
+ this.fileOffset = fileOffset;
}
public Type getType() {
@@ -87,11 +87,11 @@
}
Modification mod = (Modification) obj;
return mod.type.equals(this.type) && mod.path.equals(this.path)
- && mod.versionNum == this.versionNum;
+ && mod.fileOffset == this.fileOffset;
}
@Override
public int hashCode() {
- return Objects.hash(type, path, versionNum);
+ return Objects.hash(type, path, fileOffset);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 8e09964..abc8c41 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -124,7 +124,7 @@
private static String encodeDeletion(Deletion del) {
return del.getType().toString() + SEPARATOR + del.getPathString()
- + SEPARATOR + del.getVersionNum() + SEPARATOR
+ + SEPARATOR + del.getFileOffset() + SEPARATOR
+ del.getStartTime() + SEPARATOR + del.getEndTime();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index d87a4ea..9ac9d12 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -46,7 +46,6 @@
private TSEncoding encoding;
private static final Logger logger = LoggerFactory.getLogger(ReadOnlyMemChunk.class);
- private long version;
private int floatPrecision = TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
@@ -59,13 +58,11 @@
private int chunkDataSize;
public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TSEncoding encoding,
- TVList tvList, Map<String, String> props, long version, int size,
- List<TimeRange> deletionList)
+ TVList tvList, Map<String, String> props, int size, List<TimeRange> deletionList)
throws IOException, QueryProcessException {
this.measurementUid = measurementUid;
this.dataType = dataType;
this.encoding = encoding;
- this.version = version;
if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) {
try {
this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
@@ -143,10 +140,6 @@
return chunkPointReader;
}
- public long getVersion() {
- return version;
- }
-
public String getMeasurementUid() {
return measurementUid;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index d32386f..8e84416 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -433,24 +433,6 @@
}
}
- /**
- * get version controller by time partition Id Thread-safety should be ensure by caller
- *
- * @param timePartitionId time partition Id
- * @return version controller
- */
- private VersionController getVersionControllerByTimePartitionId(long timePartitionId) {
- return timePartitionIdVersionControllerMap.computeIfAbsent(timePartitionId,
- id -> {
- try {
- return new SimpleFileVersionController(storageGroupSysDir.getPath(), timePartitionId);
- } catch (IOException e) {
- logger.error("can't build a version controller for time partition {}", timePartitionId);
- return null;
- }
- });
- }
-
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
throws IOException {
@@ -572,8 +554,7 @@
long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(
- storageGroupName + FILE_NAME_SEPARATOR,
- getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, isSeq,
+ storageGroupName + FILE_NAME_SEPARATOR, tsFileResource, isSeq,
i == tsFiles.size() - 1);
RestorableTsFileIOWriter writer;
@@ -605,7 +586,6 @@
TsFileProcessor tsFileProcessor;
if (isSeq) {
tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack, this::updateLatestFlushTimeCallback,
true, writer);
if (enableMemControl) {
@@ -618,7 +598,6 @@
workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
} else {
tsFileProcessor = new TsFileProcessor(storageGroupName, storageGroupInfo, tsFileResource,
- getVersionControllerByTimePartitionId(timePartitionId),
this::closeUnsealedTsFileProcessorCallBack, this::unsequenceFlushCallback, false,
writer);
if (enableMemControl) {
@@ -1046,11 +1025,10 @@
+ getNewTsFileName(timePartitionId);
TsFileProcessor tsFileProcessor;
- VersionController versionController = getVersionControllerByTimePartitionId(timePartitionId);
if (sequence) {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
- versionController, this::closeUnsealedTsFileProcessorCallBack,
+ this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, deviceNumInLastClosedTsFile);
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
@@ -1062,7 +1040,7 @@
} else {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath), storageGroupInfo,
- versionController, this::closeUnsealedTsFileProcessorCallBack,
+ this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, deviceNumInLastClosedTsFile);
if (enableMemControl) {
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo);
@@ -1553,17 +1531,14 @@
}
private void deleteDataInFiles(Collection<TsFileResource> tsFileResourceList, Deletion deletion,
- Set<PartialPath> devicePaths, List<ModificationFile> updatedModFiles, long planIndex)
- throws IOException {
+ Set<PartialPath> devicePaths, List<ModificationFile> updatedModFiles, long planIndex) throws IOException {
for (TsFileResource tsFileResource : tsFileResourceList) {
if (canSkipDelete(tsFileResource, devicePaths, deletion.getStartTime(),
deletion.getEndTime())) {
continue;
}
- long partitionId = tsFileResource.getTimePartition();
- deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());
-
+ deletion.setFileOffset(tsFileResource.getTsFileSize());
// write deletion into modification file
tsFileResource.getModFile().write(deletion);
// remember to close mod file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5b404c9..f827877 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,7 +46,6 @@
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack;
-import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
@@ -70,7 +70,9 @@
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +93,7 @@
* sync this object in query() and asyncTryToFlush()
*/
private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
+ private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
private RestorableTsFileIOWriter writer;
private final TsFileResource tsFileResource;
// time range index to indicate this processor belongs to which time range
@@ -107,8 +110,6 @@
private volatile boolean shouldClose;
private IMemTable workMemTable;
- private final VersionController versionController;
-
/**
* this callback is called before the workMemtable is added into the flushingMemTables.
*/
@@ -127,7 +128,6 @@
@SuppressWarnings("squid:S107")
TsFileProcessor(String storageGroupName, File tsfile,
StorageGroupInfo storageGroupInfo,
- VersionController versionController,
CloseFileListener closeTsFileCallback,
UpdateEndTimeCallBack updateLatestFlushTimeCallback, boolean sequence,
int deviceNumInLastClosedTsFile)
@@ -137,7 +137,6 @@
if (enableMemControl) {
this.storageGroupInfo = storageGroupInfo;
}
- this.versionController = versionController;
this.writer = new RestorableTsFileIOWriter(tsfile);
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
@@ -148,8 +147,7 @@
@SuppressWarnings("java:S107") // ignore number of arguments
public TsFileProcessor(String storageGroupName, StorageGroupInfo storageGroupInfo,
- TsFileResource tsFileResource,
- VersionController versionController, CloseFileListener closeUnsealedTsFileProcessor,
+ TsFileResource tsFileResource, CloseFileListener closeUnsealedTsFileProcessor,
UpdateEndTimeCallBack updateLatestFlushTimeCallback, boolean sequence,
RestorableTsFileIOWriter writer) {
this.storageGroupName = storageGroupName;
@@ -157,7 +155,6 @@
if (enableMemControl) {
this.storageGroupInfo = storageGroupInfo;
}
- this.versionController = versionController;
this.writer = writer;
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
@@ -405,8 +402,9 @@
}
}
// flushing memTables are immutable, only record this deletion in these memTables for query
- for (IMemTable memTable : flushingMemTables) {
- memTable.delete(deletion);
+ if (!flushingMemTables.isEmpty()) {
+ logger.warn("{}", modsToMemtable);
+ modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
}
} finally {
flushQueryLock.writeLock().unlock();
@@ -651,8 +649,6 @@
storageGroupName, tsFileResource.getTsFile().getName(),
tobeFlushed.isSignalMemTable(), flushingMemTables.size());
}
- long cur = versionController.nextVersion();
- tobeFlushed.setVersion(cur);
if (!tobeFlushed.isSignalMemTable()) {
totalMemTableSize += tobeFlushed.memSize();
@@ -757,6 +753,23 @@
flushListener.onFlushEnd(memTableToFlush);
}
+ try {
+ Iterator<Pair<Modification, IMemTable>> iterator = modsToMemtable.iterator();
+ logger.warn("{}", modsToMemtable);
+ while(iterator.hasNext()){
+ Pair<Modification, IMemTable> entry = iterator.next();
+ if (entry.right.equals(memTableToFlush)) {
+ entry.left.setFileOffset(tsFileResource.getTsFileSize());
+ this.tsFileResource.getModFile().write(entry.left);
+ tsFileResource.getModFile().close();
+ iterator.remove();
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Meet error when writing into ModificationFile file of {} ",
+ tsFileResource.getTsFile().getName(), e);
+ }
+
if (logger.isDebugEnabled()) {
logger.debug("{}: {} try get lock to release a memtable (signal={})", storageGroupName,
tsFileResource.getTsFile().getName(), memTableToFlush.isSignalMemTable());
@@ -899,6 +912,35 @@
return storageGroupName;
}
+ private List<Modification> getModificationsForMemtable(IMemTable memTable) {
+ List<Modification> modifications = new ArrayList<>();
+ boolean foundMemtable = false;
+ for (Pair<Modification, IMemTable> entry : modsToMemtable) {
+ if (foundMemtable || entry.right.equals(memTable)) {
+ modifications.add(entry.left);
+ foundMemtable = true;
+ }
+ }
+ return modifications;
+ }
+
+ private List<TimeRange> constructDeletionList(IMemTable memTable, String deviceId, String measurement,
+ long timeLowerBound) throws MetadataException {
+ List<TimeRange> deletionList = new ArrayList<>();
+ deletionList.add(new TimeRange(Long.MIN_VALUE, timeLowerBound));
+ for (Modification modification : getModificationsForMemtable(memTable)) {
+ if (modification instanceof Deletion) {
+ Deletion deletion = (Deletion) modification;
+ if (deletion.getPath().matchFullPath(new PartialPath(deviceId, measurement))
+ && deletion.getEndTime() > timeLowerBound) {
+ long lowerBound = Math.max(deletion.getStartTime(), timeLowerBound);
+ deletionList.add(new TimeRange(lowerBound, deletion.getEndTime()));
+ }
+ }
+ }
+ return TimeRange.sortAndMerge(deletionList);
+ }
+
/**
* get the chunk(s) in the memtable (one from work memtable and the other ones in flushing
* memtables and then compact them into one TimeValuePairSorter). Then get the related
@@ -925,15 +967,17 @@
if (flushingMemTable.isSignalMemTable()) {
continue;
}
+ List<TimeRange> deletionList = constructDeletionList(flushingMemTable,
+ deviceId, measurementId, context.getQueryTimeLowerBound());
ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId,
- dataType, encoding, props, context.getQueryTimeLowerBound());
+ dataType, encoding, props, context.getQueryTimeLowerBound(), deletionList);
if (memChunk != null) {
readOnlyMemChunks.add(memChunk);
}
}
if (workMemTable != null) {
ReadOnlyMemChunk memChunk = workMemTable.query(deviceId, measurementId, dataType, encoding,
- props, context.getQueryTimeLowerBound());
+ props, context.getQueryTimeLowerBound(), null);
if (memChunk != null) {
readOnlyMemChunks.add(memChunk);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 4f57cad..0519851 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -42,6 +42,7 @@
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.service.UpgradeSevice;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -151,6 +152,8 @@
*/
protected long minPlanIndex = Long.MAX_VALUE;
+ private long version = 0;
+
public TsFileResource() {
}
@@ -170,6 +173,7 @@
this.fsFactory = other.fsFactory;
this.maxPlanIndex = other.maxPlanIndex;
this.minPlanIndex = other.minPlanIndex;
+ this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
}
/**
@@ -177,6 +181,7 @@
*/
public TsFileResource(File file) {
this.file = file;
+ this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
this.timeIndex = config.getTimeIndexLevel().getTimeIndex();
this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
}
@@ -186,6 +191,7 @@
*/
public TsFileResource(File file, TsFileProcessor processor, int deviceNumInLastClosedTsFile) {
this.file = file;
+ this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
this.timeIndex = config.getTimeIndexLevel().getTimeIndex(deviceNumInLastClosedTsFile);
this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
this.processor = processor;
@@ -203,6 +209,7 @@
this.chunkMetadataList = chunkMetadataList;
this.readOnlyMemChunk = readOnlyMemChunk;
this.originTsFileResource = originTsFileResource;
+ this.version = originTsFileResource.version;
generateTimeSeriesMetadata();
}
@@ -719,4 +726,12 @@
public void setMinPlanIndex(long minPlanIndex) {
this.minPlanIndex = minPlanIndex;
}
+
+ public void setVersion(long version) {
+ this.version = version;
+ }
+
+ public long getVersion() {
+ return version;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index b31d5aa..b3416a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -873,11 +873,9 @@
}
Map<Path, MeasurementSchema> schemaMap = new HashMap<>();
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
-
List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- reader.selfCheck(schemaMap, chunkGroupMetadataList, versionInfo, false);
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
}
FileLoaderUtils.checkTsFileResource(tsFileResource);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index 384ac00..e1021f6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -52,9 +52,11 @@
private QueryDataSource dataSource;
+ private ChunkMetadata cachedLastChunk;
+
private List<TimeseriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
- public LastPointReader() {
+ public LastPointReader() {
}
@@ -70,22 +72,22 @@
}
public TimeValuePair readLastPoint() throws IOException {
- TimeValuePair lastPointResult = retrieveValidLastPointFromSeqFiles();
- UnpackOverlappedUnseqFiles(lastPointResult.getTimestamp());
+ TimeValuePair resultPoint = retrieveValidLastPointFromSeqFiles();
+ UnpackOverlappedUnseqFiles(resultPoint.getTimestamp());
- long lastVersion = 0;
PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
while (!sortedChunkMetatdataList.isEmpty()
- && lastPointResult.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
+ && resultPoint.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
- TimeValuePair lastChunkPoint = getChunkLastPoint(chunkMetadata);
- if (shouldUpdate(lastPointResult.getTimestamp(), lastVersion,
- lastChunkPoint.getTimestamp(), chunkMetadata.getVersion())) {
- lastPointResult = lastChunkPoint;
- lastVersion = chunkMetadata.getVersion();
+ TimeValuePair chunkLastPoint = getChunkLastPoint(chunkMetadata);
+ if (chunkLastPoint.getTimestamp() > resultPoint.getTimestamp() ||
+ (chunkLastPoint.getTimestamp() == resultPoint.getTimestamp() &&
+ (cachedLastChunk == null || shouldUpdate(cachedLastChunk, chunkMetadata)))) {
+ cachedLastChunk = chunkMetadata;
+ resultPoint = chunkLastPoint;
}
}
- return lastPointResult;
+ return resultPoint;
}
/** Pick up and cache the last sequence TimeseriesMetadata that satisfies timeFilter */
@@ -178,8 +180,10 @@
return lastPoint;
}
- private boolean shouldUpdate(long time, long version, long newTime, long newVersion) {
- return time < newTime || (time == newTime && version < newVersion);
+ private boolean shouldUpdate(ChunkMetadata cachedChunk, ChunkMetadata newChunk) {
+ return (newChunk.getVersion() > cachedChunk.getVersion()) ||
+ (newChunk.getVersion() == cachedChunk.getVersion() &&
+ newChunk.getOffsetOfChunkHeader() > cachedChunk.getOffsetOfChunkHeader());
}
private PriorityQueue<TsFileResource> sortUnSeqFileResourcesInDecendingOrder(
@@ -206,7 +210,11 @@
} else if (endTime1 > endTime2) {
return -1;
}
- return Long.compare(o2.getVersion(), o1.getVersion());
+ if (o2.getVersion() > o1.getVersion()) {
+ return 1;
+ }
+ return (o2.getVersion() < o1.getVersion() ? -1 :
+ Long.compare(o2.getOffsetOfChunkHeader(), o1.getOffsetOfChunkHeader()));
});
for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
if (timeseriesMetadata != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
index 95466e9..ec13354 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java
@@ -62,6 +62,10 @@
chunkMetadataList.removeIf(chunkMetaData -> (filter != null && !filter
.satisfyStartEndTime(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()))
|| chunkMetaData.getStartTime() > chunkMetaData.getEndTime());
+
+ for (ChunkMetadata metadata : chunkMetadataList) {
+ metadata.setVersion(resource.getVersion());
+ }
return chunkMetadataList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
index ad2a4e3..396cb24 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java
@@ -63,6 +63,10 @@
chunkMetadataList.removeIf(chunkMetaData -> (timeFilter != null && !timeFilter
.satisfyStartEndTime(chunkMetaData.getStartTime(), chunkMetaData.getEndTime()))
|| chunkMetaData.getStartTime() > chunkMetaData.getEndTime());
+
+ for (ChunkMetadata metadata : chunkMetadataList) {
+ metadata.setVersion(resource.getVersion());
+ }
return chunkMetadataList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 0d15e76..3d50e9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -38,6 +38,7 @@
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.universal.DescPriorityMergeReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader.MergeReaderPriority;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.TestOnly;
@@ -485,15 +486,18 @@
// addLast for asc; addFirst for desc
if (orderUtils.getAscending()) {
seqPageReaders
- .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, true));
+ .add(new VersionPageReader(chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(), pageReader, true));
} else {
seqPageReaders
- .add(0, new VersionPageReader(chunkMetaData.getVersion(), pageReader, true));
+ .add(0, new VersionPageReader(chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(), pageReader, true));
}
} else {
unSeqPageReaders
- .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, false));
+ .add(new VersionPageReader(chunkMetaData.getVersion(),
+ chunkMetaData.getOffsetOfChunkHeader(), pageReader, false));
}
});
}
@@ -893,13 +897,13 @@
private class VersionPageReader {
- protected long version;
+ protected PriorityMergeReader.MergeReaderPriority version;
protected IPageReader data;
protected boolean isSeq;
- VersionPageReader(long version, IPageReader data, boolean isSeq) {
- this.version = version;
+ VersionPageReader(long version, long offset, IPageReader data, boolean isSeq) {
+ this.version = new MergeReaderPriority(version, offset);
this.data = data;
this.isSeq = isSeq;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
index 499587c..6479bc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/DescPriorityMergeReader.java
@@ -29,7 +29,7 @@
super.heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o2.timeValuePair.getTimestamp(),
o1.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority);
+ return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
});
}
@@ -40,7 +40,7 @@
* @throws IOException
*/
@Override
- public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
+ public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime) throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
super.currentReadStopTime = Math.min(currentReadStopTime, endTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index d5998b4..5bf7cfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import java.util.PriorityQueue;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
@@ -39,7 +40,7 @@
heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority);
+ return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
});
}
@@ -49,7 +50,7 @@
heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
- return timeCompare != 0 ? timeCompare : Long.compare(o2.priority, o1.priority);
+ return timeCompare != 0 ? timeCompare : o2.priority.compareTo(o1.priority);
});
for (IPointReader reader : prioritySeriesReaders) {
addReader(reader, startPriority++);
@@ -58,13 +59,13 @@
public void addReader(IPointReader reader, long priority) throws IOException {
if (reader.hasNextTimeValuePair()) {
- heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
+ heap.add(new Element(reader, reader.nextTimeValuePair(), new MergeReaderPriority(priority, 0)));
} else {
reader.close();
}
}
- public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
+ public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime) throws IOException {
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
currentReadStopTime = Math.max(currentReadStopTime, endTime);
@@ -141,9 +142,9 @@
IPointReader reader;
TimeValuePair timeValuePair;
- long priority;
+ MergeReaderPriority priority;
- Element(IPointReader reader, TimeValuePair timeValuePair, long priority) {
+ Element(IPointReader reader, TimeValuePair timeValuePair, MergeReaderPriority priority) {
this.reader = reader;
this.timeValuePair = timeValuePair;
this.priority = priority;
@@ -169,4 +170,39 @@
reader.close();
}
}
-}
\ No newline at end of file
+
+ public static class MergeReaderPriority implements Comparable<MergeReaderPriority> {
+ long version;
+ long offset;
+
+ public MergeReaderPriority(long version, long offset) {
+ this.version = version;
+ this.offset = offset;
+ }
+
+ @Override
+ public int compareTo(MergeReaderPriority o) {
+ if (version < o.version) {
+ return -1;
+ }
+ return ((version > o.version) ? 1 : (Long.compare(offset, o.offset)));
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ MergeReaderPriority that = (MergeReaderPriority) object;
+ return (this.version == that.version && this.offset == that.offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(version, offset);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index a558015..f48fb35 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -23,7 +23,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -60,12 +59,7 @@
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
TsFileMetadata tsFileMetaData = reader.readFileMetadata();
List<ChunkGroupMetadata> allChunkGroupMetadata = new ArrayList<>();
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
- reader.selfCheck(null, allChunkGroupMetadata, versionInfo, false);
- Map<Long, Long> versionMap = new HashMap<>();
- for (Pair<Long, Long> versionPair : versionInfo) {
- versionMap.put(versionPair.left - Long.BYTES - 1, versionPair.right);
- }
+ reader.selfCheck(null, allChunkGroupMetadata, false);
// begin print
StringBuilder str1 = new StringBuilder();
@@ -115,16 +109,6 @@
.getNumberOfChunks());
printlnBoth(pw, str1.toString() + "\t[Chunk Group] of "
+ chunkGroupMetadata.getDevice() + " ends");
- // versionInfo begins if there is a versionInfo
- if (versionMap.containsKey(chunkEndPos + chunkGroupFooter.getSerializedSize())) {
- printlnBoth(pw,
- String.format("%20s", chunkEndPos + chunkGroupFooter.getSerializedSize())
- + "|\t[Version Info]");
- printlnBoth(pw, String.format("%20s", "") + "|\t\t[marker] 3");
- printlnBoth(pw,
- String.format("%20s", "") + "|\t\t[version] "
- + versionMap.get(chunkEndPos + chunkGroupFooter.getSerializedSize()));
- }
}
// metadata begins
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 59c2b96..307210c 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -49,7 +49,6 @@
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.v1.file.metadata.ChunkGroupMetaDataV1;
import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataIndexV1;
import org.apache.iotdb.tsfile.v1.file.metadata.TsDeviceMetadataV1;
import org.apache.iotdb.tsfile.v1.file.metadata.TsFileMetadataV1;
@@ -329,13 +328,8 @@
return;
}
- // ChunkGroupOffset -> version
- Map<Long, Long> oldVersionInfo = getVersionInfo();
-
// start to scan chunks and chunkGroups
- long startOffsetOfChunkGroup = 0;
boolean newChunkGroup = true;
- long versionOfChunkGroup = 0;
int chunkGroupCount = 0;
List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
@@ -349,8 +343,6 @@
// this is the first chunk of a new ChunkGroup.
if (newChunkGroup) {
newChunkGroup = false;
- startOffsetOfChunkGroup = this.position() - 1;
- versionOfChunkGroup = oldVersionInfo.get(startOffsetOfChunkGroup);
}
ChunkHeader header = this.readChunkHeader();
MeasurementSchema measurementSchema = new MeasurementSchema(header.getMeasurementID(),
@@ -380,7 +372,7 @@
ChunkGroupFooter chunkGroupFooter = this.readChunkGroupFooter();
String deviceID = chunkGroupFooter.getDeviceID();
rewrite(oldTsFile, deviceID, measurementSchemaList, pageHeadersInChunkGroup,
- pageDataInChunkGroup, versionOfChunkGroup, pagePartitionInfoInChunkGroup);
+ pageDataInChunkGroup, pagePartitionInfoInChunkGroup);
pageHeadersInChunkGroup.clear();
pageDataInChunkGroup.clear();
@@ -422,7 +414,7 @@
*/
private void rewrite(File oldTsFile, String deviceId, List<MeasurementSchema> schemas,
List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup,
- long versionOfChunkGroup, List<List<Boolean>> pagePartitionInfoInChunkGroup)
+ List<List<Boolean>> pagePartitionInfoInChunkGroup)
throws IOException, PageException {
Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
for (int i = 0; i < schemas.size(); i++) {
@@ -453,7 +445,6 @@
chunkWriter.writeToFileWriter(tsFileIOWriter);
}
tsFileIOWriter.endChunkGroup();
- tsFileIOWriter.writeVersion(versionOfChunkGroup);
}
}
@@ -578,27 +569,6 @@
return true;
}
- private Map<Long, Long> getVersionInfo() throws IOException {
- Map<Long, Long> versionInfo = new HashMap<>();
- TsFileMetadataV1 fileMetadata = readFileMetadata();
- List<TsDeviceMetadataV1> oldDeviceMetadataList = new ArrayList<>();
- for (TsDeviceMetadataIndexV1 index : fileMetadata.getDeviceMap().values()) {
- TsDeviceMetadataV1 oldDeviceMetadata = readTsDeviceMetaData(index);
- oldDeviceMetadataList.add(oldDeviceMetadata);
- }
-
- for (TsDeviceMetadataV1 oldTsDeviceMetadata : oldDeviceMetadataList) {
- for (ChunkGroupMetaDataV1 oldChunkGroupMetadata : oldTsDeviceMetadata
- .getChunkGroupMetaDataList()) {
- long version = oldChunkGroupMetadata.getVersion();
- long offsetOfChunkGroup = oldChunkGroupMetadata.getStartOffsetOfChunkGroup();
- // get version informations
- versionInfo.put(offsetOfChunkGroup, version);
- }
- }
- return versionInfo;
- }
-
private TsFileResource endFileAndGenerateResource(TsFileIOWriter tsFileIOWriter)
throws IOException {
tsFileIOWriter.endFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index b1f5101..64a8e7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.utils;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
@@ -199,4 +201,11 @@
return resultSet;
}
+ public static long splitAndGetTsFileVersion(String tsFileName) {
+ String[] names = tsFileName.split(FILE_NAME_SEPARATOR);
+ if (names.length != 3) {
+ return 0;
+ }
+ return Long.parseLong(names[1]);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
index 6d38744..9265083 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -48,7 +48,10 @@
for (int metaIndex = 0; metaIndex < chunkMetaData.size(); metaIndex++) {
ChunkMetadata metaData = chunkMetaData.get(metaIndex);
for (Modification modification : modifications) {
- if (modification.getVersionNum() > metaData.getVersion()) {
+ // The case modification.getFileOffset() == metaData.getOffsetOfChunkHeader()
+ // is not supposed to exist as getFileOffset() is offset containing full chunk,
+ // while getOffsetOfChunkHeader() returns the chunk header offset
+ if (modification.getFileOffset() > metaData.getOffsetOfChunkHeader()) {
doModifyChunkMetaData(modification, metaData);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 4be75cc..7ed64ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -28,7 +28,6 @@
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -59,7 +58,6 @@
private String logNodePrefix;
private String insertFilePath;
private ModificationFile modFile;
- private VersionController versionController;
private TsFileResource currentTsFileResource;
private IMemTable recoverMemTable;
@@ -70,12 +68,10 @@
private Map<String, Long> tempEndTimeMap = new HashMap<>();
public LogReplayer(String logNodePrefix, String insertFilePath, ModificationFile modFile,
- VersionController versionController, TsFileResource currentTsFileResource,
- IMemTable memTable, boolean sequence) {
+ TsFileResource currentTsFileResource, IMemTable memTable, boolean sequence) {
this.logNodePrefix = logNodePrefix;
this.insertFilePath = insertFilePath;
this.modFile = modFile;
- this.versionController = versionController;
this.currentTsFileResource = currentTsFileResource;
this.recoverMemTable = memTable;
this.sequence = sequence;
@@ -127,7 +123,7 @@
}
modFile
.write(
- new Deletion(path, versionController.nextVersion(), deletePlan.getDeleteStartTime(),
+ new Deletion(path, currentTsFileResource.getTsFileSize(), deletePlan.getDeleteStartTime(),
deletePlan.getDeleteEndTime()));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 8120d00..2a84fd9 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -32,7 +32,6 @@
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -56,18 +55,16 @@
private final String filePath;
private final String logNodePrefix;
- private final VersionController versionController;
private final TsFileResource tsFileResource;
private final boolean sequence;
/**
* @param isLastFile whether this TsFile is the last file of its partition
*/
- public TsFileRecoverPerformer(String logNodePrefix, VersionController versionController,
- TsFileResource currentTsFileResource, boolean sequence, boolean isLastFile) {
+ public TsFileRecoverPerformer(String logNodePrefix,
+ TsFileResource currentTsFileResource, boolean sequence, boolean isLastFile) {
this.filePath = currentTsFileResource.getTsFilePath();
this.logNodePrefix = logNodePrefix;
- this.versionController = versionController;
this.tsFileResource = currentTsFileResource;
this.sequence = sequence;
}
@@ -199,9 +196,9 @@
private void redoLogs(RestorableTsFileIOWriter restorableTsFileIOWriter)
throws StorageGroupProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
- recoverMemTable.setVersion(versionController.nextVersion());
- LogReplayer logReplayer = new LogReplayer(logNodePrefix, filePath, tsFileResource.getModFile(),
- versionController, tsFileResource, recoverMemTable, sequence);
+ LogReplayer logReplayer = new LogReplayer(
+ logNodePrefix, filePath, tsFileResource.getModFile(),
+ tsFileResource, recoverMemTable, sequence);
logReplayer.replayLogs();
try {
if (!recoverMemTable.isEmpty()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index b967fe1..68ffe10 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -87,7 +87,7 @@
}
ReadOnlyMemChunk memChunk = memTable
.query(deviceId, measurementId[0], TSDataType.INT32, TSEncoding.RLE, Collections.emptyMap(),
- Long.MIN_VALUE);
+ Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
for (int i = 0; i < dataSize; i++) {
iterator.hasNextTimeValuePair();
@@ -106,7 +106,7 @@
aRet.getValue().getValue());
}
IPointReader tvPair = memTable
- .query(deviceId, sensorId, dataType, encoding, Collections.emptyMap(), Long.MIN_VALUE)
+ .query(deviceId, sensorId, dataType, encoding, Collections.emptyMap(), Long.MIN_VALUE, null)
.getPointReader();
Arrays.sort(ret);
TimeValuePair last = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index fb73a27..bcbafc5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -218,7 +218,7 @@
public void mergeWithDeletionTest() throws Exception {
try {
PartialPath device = new PartialPath(deviceIds[0]);
- seqResources.get(0).getModFile().write(new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
+ seqResources.get(0).getModFile().write(new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), seqResources.get(0).getTsFileSize(), 0, 49));
} finally {
seqResources.get(0).getModFile().close();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 139ac70..e296014 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -130,6 +130,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(i);
tsFileResource.setMaxPlanIndex(i);
+ tsFileResource.setVersion(i);
seqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum, 0);
}
@@ -143,6 +144,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(i + seqFileNum);
tsFileResource.setMaxPlanIndex(i + seqFileNum);
+ tsFileResource.setVersion(i + seqFileNum);
unseqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000);
}
@@ -155,6 +157,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
+ tsFileResource.setVersion(seqFileNum + unseqFileNum);
unseqResources.add(tsFileResource);
prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index d819780..5ea165e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -27,12 +27,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -88,7 +86,7 @@
public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndFlush begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
- SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
+ this::closeTsFileProcessor,
(tsFileProcessor) -> true, true, INIT_ARRAY_SIZE);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -144,7 +142,7 @@
throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
- SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
+ this::closeTsFileProcessor,
(tsFileProcessor) -> true, true, INIT_ARRAY_SIZE);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -227,7 +225,7 @@
public void testMultiFlush() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
- SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
+ this::closeTsFileProcessor,
(tsFileProcessor) -> true, true, INIT_ARRAY_SIZE);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -268,7 +266,7 @@
public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
- SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
+ this::closeTsFileProcessor,
(tsFileProcessor) -> true, true, INIT_ARRAY_SIZE);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
index f62170c..2ecc041 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
@@ -29,6 +29,7 @@
import java.sql.Statement;
import java.util.Locale;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -280,6 +281,45 @@
}
@Test
+ public void testDelMultipleFlushingMemtable() throws SQLException {
+ long size = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
+ // Adjust memstable threshold size to make it flush automatically
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1000000);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+ "root");
+ Statement statement = connection.createStatement()) {
+
+ for (int i = 1; i <= 100000; i++) {
+ statement.execute(
+ String.format(insertTemplate, i, i, i, (double) i, "'" + i + "'",
+ i % 2 == 0));
+ }
+
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time > 15000 and time <= 30000");
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time > 30000 and time <= 40000");
+ for (int i = 100001; i <= 200000; i++) {
+ statement.execute(
+ String.format(insertTemplate, i, i, i, (double) i, "'" + i + "'",
+ i % 2 == 0));
+ }
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time > 50000 and time <= 80000");
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time > 90000 and time <= 110000");
+ statement.execute("DELETE FROM root.vehicle.d0.s0 WHERE time > 150000 and time <= 165000");
+ statement.execute("flush");
+ try (ResultSet set = statement.executeQuery("SELECT s0 FROM root.vehicle.d0")) {
+ int cnt = 0;
+ while (set.next()) {
+ cnt++;
+ }
+ assertEquals(110000, cnt);
+ }
+ cleanData();
+ }
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(size);
+ }
+
+ @Test
public void testDelSeriesWithSpecialSymbol() throws SQLException {
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 3997f94..6bff6c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -93,6 +93,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(i);
tsFileResource.setMaxPlanIndex(i);
+ tsFileResource.setVersion(i);
seqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum, 0, measurementSchemas, deviceIds);
}
@@ -106,6 +107,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(i + seqFileNum);
tsFileResource.setMaxPlanIndex(i + seqFileNum);
+ tsFileResource.setVersion(i + seqFileNum);
unseqResources.add(tsFileResource);
prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000,
measurementSchemas, deviceIds);
@@ -119,6 +121,7 @@
tsFileResource.setClosed(true);
tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum);
tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum);
+ tsFileResource.setVersion(seqFileNum + unseqFileNum);
unseqResources.add(tsFileResource);
prepareFile(tsFileResource, 0, ptNum * 2, 20000, measurementSchemas,
deviceIds);
@@ -149,7 +152,6 @@
}
if ((i + 1) % flushInterval == 0) {
fileWriter.flushAllChunkGroups();
- fileWriter.writeVersion(tsFileResource.getMaxPlanIndex());
}
}
fileWriter.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/VersionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/utils/VersionUtilsTest.java
deleted file mode 100644
index bdfdf46..0000000
--- a/server/src/test/java/org/apache/iotdb/db/utils/VersionUtilsTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.utils.VersionUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class VersionUtilsTest {
-
- @Test
- public void uncompleteFileTest() {
- List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
- chunkMetadataList.add(new ChunkMetadata("s1", TSDataType.INT32, 10, null));
- chunkMetadataList.add(new ChunkMetadata("s1", TSDataType.INT32, 20, null));
- chunkMetadataList.add(new ChunkMetadata("s1", TSDataType.INT32, 30, null));
- chunkMetadataList.add(new ChunkMetadata("s1", TSDataType.INT32, 40, null));
- chunkMetadataList.add(new ChunkMetadata("s1", TSDataType.INT32, 50, null));
-
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
- versionInfo.add(new Pair<>(25L, 1L));
- versionInfo.add(new Pair<>(45L, 2L));
-
- VersionUtils.applyVersion(chunkMetadataList, versionInfo);
-
- Assert.assertEquals(1L, chunkMetadataList.get(0).getVersion());
- Assert.assertEquals(1L, chunkMetadataList.get(1).getVersion());
- Assert.assertEquals(2L, chunkMetadataList.get(2).getVersion());
- Assert.assertEquals(2L, chunkMetadataList.get(3).getVersion());
- Assert.assertEquals(0L, chunkMetadataList.get(4).getVersion());
- }
-
-
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index f7a5c63..9d81855 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -80,17 +80,6 @@
File tsFile = SystemFileFactory.INSTANCE.getFile("temp", "1-1-1.tsfile");
File modF = SystemFileFactory.INSTANCE.getFile("test.mod");
ModificationFile modFile = new ModificationFile(modF.getPath());
- VersionController versionController = new VersionController() {
- @Override
- public long nextVersion() {
- return 5;
- }
-
- @Override
- public long currVersion() {
- return 5;
- }
- };
TsFileResource tsFileResource = new TsFileResource(tsFile);
IMemTable memTable = new PrimitiveMemTable();
@@ -106,7 +95,7 @@
}
LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
- versionController, tsFileResource, memTable, false);
+ tsFileResource, memTable, false);
WriteLogNode node =
MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
@@ -129,7 +118,7 @@
for (int i = 0; i < 5; i++) {
ReadOnlyMemChunk memChunk = memTable
.query("root.sg.device" + i, "sensor" + i, TSDataType.INT64,
- TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE);
+ TSEncoding.RLE, Collections.emptyMap(), Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
if (i == 0) {
assertFalse(iterator.hasNextTimeValuePair());
@@ -145,7 +134,7 @@
Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
assertEquals("root.sg.device0.sensor0", mods[0].getPathString());
- assertEquals(5, mods[0].getVersionNum());
+ assertEquals(0, mods[0].getFileOffset());
assertEquals(200, ((Deletion) mods[0]).getEndTime());
assertEquals(2, tsFileResource.getStartTime("root.sg.device0"));
@@ -159,7 +148,7 @@
for (int i = 0; i < 2 ; i++) {
ReadOnlyMemChunk memChunk = memTable
.query("root.sg.device5", "sensor" + i, TSDataType.INT64,
- TSEncoding.PLAIN, Collections.emptyMap(), Long.MIN_VALUE);
+ TSEncoding.PLAIN, Collections.emptyMap(), Long.MIN_VALUE, null);
//s0 has datatype boolean, but required INT64, will return null
if (i == 0) {
assertNull(memChunk);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
index 94d4feb..f093cc7 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/RecoverResourceFromReaderTest.java
@@ -181,7 +181,7 @@
ReadWriteIOUtils.write(123, outputStream);
}
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
resource, false, false);
performer.recover(true).close();
assertEquals(1, resource.getStartTime("root.sg.device99"));
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index f4ff7cf..9627d1d 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -169,7 +169,7 @@
@Test
public void testNonLastRecovery() throws StorageGroupProcessorException, IOException {
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
resource, false, false);
RestorableTsFileIOWriter writer = performer.recover(true);
assertFalse(writer.canWrite());
@@ -218,7 +218,7 @@
@Test
public void testLastRecovery() throws StorageGroupProcessorException, IOException {
- TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix, versionController,
+ TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
resource, false, true);
RestorableTsFileIOWriter writer = performer.recover(true);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 8caa93a..6e1351b 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -179,7 +179,7 @@
@Test
public void test() throws StorageGroupProcessorException, IOException {
TsFileRecoverPerformer performer = new TsFileRecoverPerformer(logNodePrefix,
- versionController, resource, false, false);
+ resource, false, false);
performer.recover(true).close();
assertEquals(1, resource.getStartTime("root.sg.device99"));
@@ -201,8 +201,7 @@
Chunk chunk = chunkLoader.loadChunk(chunkMetaData);
ChunkReader chunkReader = new ChunkReader(chunk, null);
unSeqMergeReader
- .addReader(new ChunkDataIterator(chunkReader), priorityValue);
- priorityValue++;
+ .addReader(new ChunkDataIterator(chunkReader), priorityValue++);
}
for (int i = 0; i < 10; i++) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
index f2313f4..3378c7f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
@@ -34,14 +34,9 @@
*/
class AutoResizingBuffer {
- // if resizeIfNecessary is called continuously with a small size for more than
- // MAX_BUFFER_OVERSIZE_TIME times, we will shrink the buffer to reclaim space
- private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
- private static final long MIN_SHRINK_INTERVAL = 60_000L;
-
private byte[] array;
- private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
- private int initialCapacity;
+ private int bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
+ private final int initialCapacity;
private long lastShrinkTime;
@@ -60,15 +55,15 @@
int growCapacity = currentCapacity + (currentCapacity >> 1);
int newCapacity = Math.max(growCapacity, size);
this.array = Arrays.copyOf(array, newCapacity);
- bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+ bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
logger
.debug("{} expand from {} to {}, request: {}", this, currentCapacity, newCapacity, size);
} else if (size > initialCapacity && currentCapacity * loadFactor > size
&& bufTooLargeCounter-- <= 0
- && System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
+ && System.currentTimeMillis() - lastShrinkTime > RpcUtils.MIN_SHRINK_INTERVAL) {
// do not resize if it is reading the request size and do not shrink too often
array = Arrays.copyOf(array, size + (currentCapacity - size) / 2);
- bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+ bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
lastShrinkTime = System.currentTimeMillis();
logger.debug("{} shrink from {} to {}", this, currentCapacity, size);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 7566906..950c52d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -76,4 +76,13 @@
public void resizeIfNecessary(int size) {
buf.resizeIfNecessary(size);
}
+
+ public void limit(int newLimit) {
+ this.limit = newLimit;
+ }
+
+ public void position(int newPosition) {
+ this.pos = newPosition;
+ }
+
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index 080d1f5..35cb203 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -57,7 +57,8 @@
buf.resizeIfNecessary(size);
}
- public AutoResizingBuffer getBuf() {
- return buf;
+ @Override
+ public byte[] getBuffer() {
+ return buf.array();
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 9192de6..4afccd6 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -51,6 +51,13 @@
*/
public static final int FRAME_HARD_MAX_LENGTH = 536870912;
+ /**
+ * if resizeIfNecessary is called continuously with a small size for more than
+ * MAX_BUFFER_OVERSIZE_TIME times, we will shrink the buffer to reclaim space.
+ */
+ public static final int MAX_BUFFER_OVERSIZE_TIME = 5;
+ public static final long MIN_SHRINK_INTERVAL = 60_000L;
+
private RpcUtils() {
// util class
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 4dd8cb9..cca104f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -19,27 +19,20 @@
package org.apache.iotdb.rpc;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public abstract class TCompressedElasticFramedTransport extends TElasticFramedTransport {
- private TByteBuffer writeCompressBuffer;
- private TByteBuffer readCompressBuffer;
-
- private static final long MIN_SHRINK_INTERVAL = 60_000L;
- private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
- private long lastShrinkTime;
- private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+ private AutoScalingBufferWriteTransport writeCompressBuffer;
+ private AutoScalingBufferReadTransport readCompressBuffer;
protected TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
int softMaxLength) {
super(underlying, initialBufferCapacity, softMaxLength);
- writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
- readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
+ writeCompressBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
+ readCompressBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
}
@Override
@@ -58,59 +51,29 @@
try {
int uncompressedLength = uncompressedLength(readBuffer.getBuffer(), 0, size);
RpcStat.readBytes.addAndGet(uncompressedLength);
- readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
- uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
- readCompressBuffer.getByteBuffer().limit(uncompressedLength);
- readCompressBuffer.getByteBuffer().position(0);
-
+ readCompressBuffer.resizeIfNecessary(uncompressedLength);
+ uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getBuffer(), 0);
+ readCompressBuffer.limit(uncompressedLength);
+ readCompressBuffer.position(0);
readBuffer.fill(readCompressBuffer, uncompressedLength);
} catch (IOException e) {
throw new TTransportException(e);
}
}
- private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer)
- throws TTransportException {
- if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
- close();
- throw new TTransportException(TTransportException.CORRUPTED_DATA,
- "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
- + ")!");
- }
-
- final int currentCapacity = byteBuffer.getByteBuffer().capacity();
- final double loadFactor = 0.6;
- if (currentCapacity < size) {
- // Increase by a factor of 1.5x
- int growCapacity = currentCapacity + (currentCapacity >> 1);
- int newCapacity = Math.max(growCapacity, size);
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCapacity));
- bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
- } else if (currentCapacity > softMaxLength && currentCapacity * loadFactor > size
- && bufTooLargeCounter-- <= 0
- && System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
- // do not shrink beneath the initial size and do not shrink too often
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(size + (currentCapacity - size) / 2));
- lastShrinkTime = System.currentTimeMillis();
- bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
- }
- return byteBuffer;
- }
-
@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
RpcStat.writeBytes.addAndGet(length);
try {
int maxCompressedLength = maxCompressedLength(length);
- writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
- int compressedLength = compress(writeBuffer.getBuf().array(), 0, length,
- writeCompressBuffer.getByteBuffer().array(), 0);
+ writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
+ int compressedLength = compress(writeBuffer.getBuffer(), 0, length,
+ writeCompressBuffer.getBuffer(), 0);
RpcStat.writeCompressedBytes.addAndGet(compressedLength);
TFramedTransport.encodeFrameSize(compressedLength, i32buf);
underlying.write(i32buf, 0, 4);
-
- underlying.write(writeCompressBuffer.getByteBuffer().array(), 0, compressedLength);
+ underlying.write(writeCompressBuffer.getBuffer(), 0, compressedLength);
} catch (IOException e) {
throw new TTransportException(e);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 87ca996..5c31b04 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -56,7 +56,8 @@
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}
- public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int softMaxLength) {
+ public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
+ int softMaxLength) {
this.underlying = underlying;
this.softMaxLength = softMaxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
@@ -66,9 +67,9 @@
/**
* The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
* requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
- * beneath maxSoftLength.
- * The shrinking is limited at most once per minute to reduce overhead when maxSoftLength is
- * set unreasonably or the workload naturally contains both ver large and very small requests.
+ * beneath maxSoftLength. The shrinking is limited at most once per minute to reduce overhead when
+ * maxSoftLength is set unreasonably or the workload naturally contains both ver large and very
+ * small requests.
*/
protected final int softMaxLength;
protected final TTransport underlying;
@@ -116,7 +117,8 @@
if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
- "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+ "Frame size (" + size + ") larger than protect max length ("
+ + RpcUtils.FRAME_HARD_MAX_LENGTH
+ ")!");
}
@@ -131,7 +133,7 @@
int length = writeBuffer.getPos();
TFramedTransport.encodeFrameSize(length, i32buf);
underlying.write(i32buf, 0, 4);
- underlying.write(writeBuffer.getBuf().array(), 0, length);
+ underlying.write(writeBuffer.getBuffer(), 0, length);
writeBuffer.reset();
if (length > softMaxLength) {
writeBuffer.resizeIfNecessary(softMaxLength);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index b34e979..89f8248 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -33,10 +33,12 @@
this.underlyingSocket = underlying;
}
+ @Override
public void setTimeout(int timeout) {
underlyingSocket.setTimeout(timeout);
}
+ @Override
public int getTimeOut() throws SocketException {
return underlyingSocket.getSocket().getSoTimeout();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index 5b3ea59..9dc9dc1 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -33,10 +33,12 @@
this.underlyingSocket = underlying;
}
+ @Override
public void setTimeout(int timeout) {
underlyingSocket.setTimeout(timeout);
}
+ @Override
public int getTimeOut() throws SocketException {
return underlyingSocket.getSocket().getSoTimeout();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
index 6098284..5a039b7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadata.java
@@ -22,13 +22,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BloomFilter;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
/**
@@ -49,9 +46,6 @@
// List of <name, offset, childMetadataIndexType>
private MetadataIndexNode metadataIndex;
- // offset -> version
- private List<Pair<Long, Long>> versionInfo;
-
// offset of MetaMarker.SEPARATOR
private long metaOffset;
@@ -69,16 +63,6 @@
fileMetaData.totalChunkNum = ReadWriteIOUtils.readInt(buffer);
fileMetaData.invalidChunkNum = ReadWriteIOUtils.readInt(buffer);
- // versionInfo
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
- int versionSize = ReadWriteIOUtils.readInt(buffer);
- for (int i = 0; i < versionSize; i++) {
- long versionPos = ReadWriteIOUtils.readLong(buffer);
- long version = ReadWriteIOUtils.readLong(buffer);
- versionInfo.add(new Pair<>(versionPos, version));
- }
- fileMetaData.setVersionInfo(versionInfo);
-
// metaOffset
long metaOffset = ReadWriteIOUtils.readLong(buffer);
fileMetaData.setMetaOffset(metaOffset);
@@ -118,13 +102,6 @@
byteLen += ReadWriteIOUtils.write(totalChunkNum, outputStream);
byteLen += ReadWriteIOUtils.write(invalidChunkNum, outputStream);
- // versionInfo
- byteLen += ReadWriteIOUtils.write(versionInfo.size(), outputStream);
- for (Pair<Long, Long> versionPair : versionInfo) {
- byteLen += ReadWriteIOUtils.write(versionPair.left, outputStream);
- byteLen += ReadWriteIOUtils.write(versionPair.right, outputStream);
- }
-
// metaOffset
byteLen += ReadWriteIOUtils.write(metaOffset, outputStream);
@@ -197,12 +174,4 @@
public void setMetadataIndex(MetadataIndexNode metadataIndex) {
this.metadataIndex = metadataIndex;
}
-
- public void setVersionInfo(List<Pair<Long, Long>> versionInfo) {
- this.versionInfo = versionInfo;
- }
-
- public List<Pair<Long, Long>> getVersionInfo() {
- return versionInfo;
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index e1c0e03..d13c631 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -29,7 +29,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +60,6 @@
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -591,11 +589,6 @@
.add(chunkMetadata);
}
- // set version in ChunkMetadata
- List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo();
- for (Entry<String, List<ChunkMetadata>> entry : seriesMetadata.entrySet()) {
- VersionUtils.applyVersion(entry.getValue(), versionInfo);
- }
return seriesMetadata;
}
@@ -977,7 +970,6 @@
*
* @param newSchema the schema on each time series in the file
* @param chunkGroupMetadataList ChunkGroupMetadata List
- * @param versionInfo version pair List
* @param fastFinish if true and the file is complete, then newSchema and
* chunkGroupMetadataList parameter will be not modified.
* @return the position of the file that is fine. All data after the position in the file should
@@ -986,7 +978,6 @@
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public long selfCheck(Map<Path, MeasurementSchema> newSchema,
List<ChunkGroupMetadata> chunkGroupMetadataList,
- List<Pair<Long, Long>> versionInfo,
boolean fastFinish) throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
@@ -1081,8 +1072,6 @@
measurementSchemaList = new ArrayList<>();
break;
case MetaMarker.VERSION:
- long version = readVersion();
- versionInfo.add(new Pair<>(position(), version));
truncatedSize = this.position();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
@@ -1134,7 +1123,6 @@
throws IOException {
try {
readFileMetadata();
- List<Pair<Long, Long>> versionInfo = tsFileMetaData.getVersionInfo();
ArrayList<ChunkMetadata> chunkMetadataList = new ArrayList<>();
long startOffsetOfChunkMetadataList = timeseriesMetaData.getOffsetOfChunkMetaDataList();
int dataSizeOfChunkMetadataList = timeseriesMetaData.getDataSizeOfChunkMetaDataList();
@@ -1144,8 +1132,6 @@
chunkMetadataList.add(ChunkMetadata.deserializeFrom(buffer));
}
- VersionUtils.applyVersion(chunkMetadataList, versionInfo);
-
// minimize the storage of an ArrayList instance.
chunkMetadataList.trimToSize();
return chunkMetadataList;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java
deleted file mode 100644
index 1eaaafc..0000000
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/VersionUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.tsfile.utils;
-
-import java.util.List;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-
-public class VersionUtils {
-
- private VersionUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- public static void applyVersion(List<ChunkMetadata> chunkMetadataList, List<Pair<Long, Long>> versionInfo) {
- if (versionInfo == null || versionInfo.isEmpty()) {
- return;
- }
- int versionIndex = 0;
- for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-
- while (chunkMetadata.getOffsetOfChunkHeader() >= versionInfo.get(versionIndex).left) {
- versionIndex++;
- // When the TsFile is uncompleted,
- // skip the chunkMetadatas those don't have their version information
- if (versionIndex >= versionInfo.size()) {
- return;
- }
- }
-
- chunkMetadata.setVersion(versionInfo.get(versionIndex).right);
- }
- }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index daa9f65..1986758 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -354,7 +354,6 @@
public void close() throws IOException {
LOG.info("start close file");
flushAllChunkGroups();
- fileWriter.setDefaultVersionPair();
fileWriter.endFile();
}
@@ -366,8 +365,4 @@
public TsFileIOWriter getIOWriter() {
return this.fileWriter;
}
-
- public void writeVersion(long versionPair) throws IOException {
- fileWriter.writeVersion(versionPair);
- }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
index 9dfccf8..d635034 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/ForceAppendTsFileWriter.java
@@ -65,7 +65,6 @@
truncatePosition = tsFileMetadata.getMetaOffset();
canWrite = true;
- versionInfo = tsFileMetadata.getVersionInfo();
totalChunkNum = tsFileMetadata.getTotalChunkNum();
invalidChunkNum = tsFileMetadata.getInvalidChunkNum();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index d230984..5821770 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -36,7 +36,6 @@
import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +92,7 @@
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, versionInfo, true);
+ truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
minPlanIndex = reader.getMinPlanIndex();
maxPlanIndex = reader.getMaxPlanIndex();
totalChunkNum = reader.getTotalChunkNum();
@@ -175,7 +174,6 @@
// all the stale data.
if (dataType == null || dataType.equals(chunkMetaData.getDataType())) {
chunkMetadataList.add(chunkMetaData);
- VersionUtils.applyVersion(chunkMetadataList, versionInfo);
}
}
}
@@ -197,8 +195,6 @@
for (ChunkGroupMetadata chunkGroupMetadata : newlyFlushedMetadataList) {
List<ChunkMetadata> rowMetaDataList = chunkGroupMetadata.getChunkMetadataList();
- VersionUtils.applyVersion(rowMetaDataList, versionInfo);
-
String device = chunkGroupMetadata.getDevice();
for (ChunkMetadata chunkMetaData : rowMetaDataList) {
String measurementId = chunkMetaData.getMeasurementUid();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 0275909..3faa377 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -46,10 +46,8 @@
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import org.apache.iotdb.tsfile.utils.VersionUtils;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +84,7 @@
private long markedPosition;
private String currentChunkGroupDeviceId;
private long currentChunkGroupStartOffset;
- protected List<Pair<Long, Long>> versionInfo = new ArrayList<>();
-
+
// for upgrade tool
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap;
@@ -243,7 +240,6 @@
MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setMetadataIndex(metadataIndex);
- tsFileMetaData.setVersionInfo(versionInfo);
tsFileMetaData.setTotalChunkNum(totalChunkNum);
tsFileMetaData.setInvalidChunkNum(invalidChunkNum);
tsFileMetaData.setMetaOffset(metaOffset);
@@ -333,7 +329,6 @@
Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
- VersionUtils.applyVersion(chunkGroupMetadata.getChunkMetadataList(), versionInfo);
deviceChunkMetadataMap.computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
.addAll(chunkGroupMetadata.getChunkMetadataList());
}
@@ -421,28 +416,12 @@
}
}
- /**
- * write MetaMarker.VERSION with version Then, cache offset-version in versionInfo
- */
- public void writeVersion(long version) throws IOException {
- ReadWriteIOUtils.write(MetaMarker.VERSION, out.wrapAsStream());
- ReadWriteIOUtils.write(version, out.wrapAsStream());
- versionInfo.add(new Pair<>(getPos(), version));
- }
-
public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
ReadWriteIOUtils.write(maxPlanIndex, out.wrapAsStream());
}
- public void setDefaultVersionPair() {
- // only happen when using tsfile module write api
- if (versionInfo.isEmpty()) {
- versionInfo.add(new Pair<>(Long.MAX_VALUE, 0L));
- }
- }
-
/**
* this function is only for Test.
*
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
index 89c414d..a952de7 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/file/metadata/utils/TestHelper.java
@@ -38,7 +38,6 @@
public static TsFileMetadata createSimpleFileMetaData() {
TsFileMetadata metaData = new TsFileMetadata();
metaData.setMetadataIndex(generateMetaDataIndex());
- metaData.setVersionInfo(generateVersionInfo());
return metaData;
}
@@ -50,14 +49,6 @@
return metaDataIndex;
}
- private static List<Pair<Long, Long>> generateVersionInfo() {
- List<Pair<Long, Long>> versionInfo = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
- versionInfo.add(new Pair<>((long) i * 5, 0L));
- }
- return versionInfo;
- }
-
public static MeasurementSchema createSimpleMeasurementSchema(String measurementuid) {
return new MeasurementSchema(measurementuid, TSDataType.INT64, TSEncoding.RLE);
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index c6050dd..3f81d58 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -65,7 +65,6 @@
writer.endCurrentChunk();
writer.endChunkGroup();
- writer.writeVersion(0L);
writer.setMinPlanIndex(100);
writer.setMaxPlanIndex(10000);
writer.writePlanIndices();
@@ -102,11 +101,6 @@
ChunkGroupFooter footer = reader.readChunkGroupFooter();
Assert.assertEquals(deviceId, footer.getDeviceID());
- // separator
- Assert.assertEquals(MetaMarker.VERSION, reader.readMarker());
-
- reader.readVersion();
-
Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
reader.readPlanIndex();
Assert.assertEquals(100, reader.getMinPlanIndex());
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
index fd2393f..5c68d89 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileWriterTest.java
@@ -238,7 +238,6 @@
public void flushForTestWithVersion() throws IOException {
//The interface is just for test
writer.flushAllChunkGroups();
- writer.writeVersion(10L);
closeFile();
readNothing();
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
index 87b88fc..9daf498 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriterTest.java
@@ -155,16 +155,23 @@
writer.write(new TSRecord(2, "d1").addTuple(new FloatDataPoint("s1", 5))
.addTuple(new FloatDataPoint("s2", 4)));
writer.flushAllChunkGroups();
- writer.writeVersion(0);
- long pos = writer.getIOWriter().getPos();
+ long pos1 = writer.getIOWriter().getPos();
+ writer.registerTimeseries(new Path("d2", "s1"),
+ new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
+ writer.registerTimeseries(new Path("d2", "s2"),
+ new MeasurementSchema("s2", TSDataType.FLOAT, TSEncoding.RLE));
+ writer.write(new TSRecord(1, "d1").addTuple(new FloatDataPoint("s1", 5))
+ .addTuple(new FloatDataPoint("s2", 4)));
+ writer.flushAllChunkGroups();
+ long pos2 = writer.getIOWriter().getPos();
// let's delete one byte. the version is broken
- writer.getIOWriter().out.truncate(pos - 1);
+ writer.getIOWriter().out.truncate(pos2 - 1);
writer.getIOWriter().close();
RestorableTsFileIOWriter rWriter = new RestorableTsFileIOWriter(file);
writer = new TsFileWriter(rWriter);
writer.close();
// truncate version marker and version
- assertEquals(pos - 1 - Long.BYTES, rWriter.getTruncatedSize());
+ assertEquals(pos1, rWriter.getTruncatedSize());
assertTrue(file.delete());
}