update
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java
index ff074f5..44f29f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java
@@ -266,30 +266,30 @@
currentChunkList.add(chunkSuit4CPV);
itr.remove();
} else {
+ // TODO modify logic
// the chunk partially overlaps in time with the current M4 interval Ii.
// load this chunk, split it on deletes and all w intervals.
// add to currentChunkList and futureChunkList.
itr.remove();
- // B: loads chunk data from disk to memory
- // C: decompress page data, split time&value buffers
- PageReader pageReader =
- FileLoaderUtils.loadPageReaderList4CPV(
- chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
- // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
- // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
- // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
- // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
- // DIRECTLY), WHICH WILL INTRODUCE BUGS!
-
- // chunk data read operation (b) get the closest data point after or before a timestamp
- pageReader.split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- splitChunkList,
- chunkMetadata);
+ int numberOfSpans =
+ (int)
+ Math.floor(
+ (Math.min(chunkMetadata.getEndTime(), endTime - 1) - curStartTime)
+ * 1.0
+ / interval)
+ + 1;
+ for (int n = 0; n < numberOfSpans; n++) {
+ ChunkSuit4CPV newChunkSuit4CPV = new ChunkSuit4CPV(chunkMetadata, null, true);
+ newChunkSuit4CPV.needsUpdateStartEndPos =
+ true; // note this, because this chunk does not fall within a time span, but cross
+ if (n == 0) {
+ currentChunkList.add(newChunkSuit4CPV);
+ } else {
+ int globalIdx = curIdx + n; // note splitChunkList uses global idx key
+ splitChunkList.computeIfAbsent(globalIdx, k -> new ArrayList<>());
+ splitChunkList.get(globalIdx).add(newChunkSuit4CPV);
+ }
+ }
}
}
}
@@ -396,6 +396,30 @@
.getPageReader()
.setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList());
}
+ // TODO add logic
+ if (chunkSuit4CPV
+ .needsUpdateStartEndPos) { // otherwise the chunk falls completely with the current
+ // time span
+ chunkSuit4CPV.needsUpdateStartEndPos = false;
+ // update startPos & endPos by dealing with the current time span virtual deletes
+ long leftEndIncluded = curStartTime;
+ long rightEndExcluded = curStartTime + interval;
+ int FP_pos = -1;
+ int LP_pos = -1;
+ if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) {
+ FP_pos = chunkSuit4CPV.updateFPwithTheClosetPointEqualOrAfter(leftEndIncluded);
+ }
+ if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) {
+ // -1 is because right end is excluded end
+ LP_pos =
+ chunkSuit4CPV.updateLPwithTheClosetPointEqualOrBefore(rightEndExcluded - 1);
+ }
+ if (FP_pos != -1 && LP_pos != -1 && FP_pos > LP_pos) {
+ // means the chunk has no point in this span
+ currentChunkList.remove(chunkSuit4CPV);
+ }
+ }
+
// chunk data read operation (c)
// chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV);
chunkSuit4CPV.getPageReader().updateBP_withValueIndex(chunkSuit4CPV);
@@ -594,6 +618,30 @@
.getPageReader()
.setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList());
}
+ // TODO add logic
+ if (chunkSuit4CPV
+ .needsUpdateStartEndPos) { // otherwise the chunk falls completely with the current
+ // time span
+ chunkSuit4CPV.needsUpdateStartEndPos = false;
+ // update startPos & endPos by dealing with the current time span virtual deletes
+ long leftEndIncluded = curStartTime;
+ long rightEndExcluded = curStartTime + interval;
+ int FP_pos = -1;
+ int LP_pos = -1;
+ if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) {
+ FP_pos = chunkSuit4CPV.updateFPwithTheClosetPointEqualOrAfter(leftEndIncluded);
+ }
+ if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) {
+ // -1 is because right end is excluded end
+ LP_pos =
+ chunkSuit4CPV.updateLPwithTheClosetPointEqualOrBefore(rightEndExcluded - 1);
+ }
+ if (FP_pos != -1 && LP_pos != -1 && FP_pos > LP_pos) {
+ // means the chunk has no point in this span
+ currentChunkList.remove(chunkSuit4CPV);
+ }
+ }
+
// chunk data read operation (c)
// chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV);
chunkSuit4CPV.getPageReader().updateTP_withValueIndex(chunkSuit4CPV); //
@@ -721,229 +769,6 @@
}
}
- private void calculateFirstPoint(
- List<ChunkSuit4CPV> currentChunkList,
- long startTime,
- long endTime,
- long interval,
- long curStartTime)
- throws IOException {
- // IOMonitor2.M4_LSM_status = Operation.M4_LSM_FP;
- while (currentChunkList.size() > 0) { // loop 1
- // sorted by startTime and version, find FP candidate
- currentChunkList.sort(
- new Comparator<ChunkSuit4CPV>() { // double check the sort order logic for different
- // aggregations
- public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
- // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
- int res =
- ((Comparable) (o1.getStatistics().getStartTime()))
- .compareTo(o2.getStatistics().getStartTime());
- if (res != 0) {
- return res;
- } else {
- return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
- .compareTo(
- new MergeReaderPriority(
- o1.getChunkMetadata().getVersion(),
- o1.getChunkMetadata().getOffsetOfChunkHeader()));
- }
- }
- });
-
- ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
- if (susp_candidate.isLazyLoad()) {
- // means the chunk is already lazy loaded, then load the chunk, apply deletes, update
- // statistics,
- // cancel the lazy loaded mark, and back to loop 1
- if (susp_candidate.getPageReader() == null) {
- PageReader pageReader =
- FileLoaderUtils.loadPageReaderList4CPV(
- susp_candidate.getChunkMetadata(), this.timeFilter);
- // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
- // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
- // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
- // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
- // DIRECTLY), WHICH WILL INTRODUCE BUGS!
- susp_candidate.setPageReader(pageReader);
- }
- // chunk data read operation (b): get the closest data point after or before a timestamp
- susp_candidate.updateFPwithTheClosetPointEqualOrAfter(
- susp_candidate.getStatistics().getStartTime());
- susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!!
- continue; // back to loop 1
- } else {
- // the chunk has not been lazy loaded, then verify whether the candidate point is deleted
- // Note the higher versions of deletes are guaranteed by
- // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications)
- // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
- long candidateTimestamp = susp_candidate.getStatistics().getStartTime(); // check
- Object candidateValue = susp_candidate.getStatistics().getFirstValue(); // check
-
- boolean isDeletedItself = false;
- long deleteEndTime = -1;
- List<TimeRange> deleteIntervalList =
- susp_candidate.getChunkMetadata().getDeleteIntervalList();
- if (deleteIntervalList != null) {
- int deleteCursor = 0;
- while (deleteCursor < deleteIntervalList.size()) {
- if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) {
- deleteCursor++;
- } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) {
- isDeletedItself = true;
- deleteEndTime = deleteIntervalList.get(deleteCursor).getMax();
- break; // since delete intervals are already sorted and merged
- } else {
- break; // since delete intervals are already sorted and merged
- }
- }
- }
- if (isDeletedItself) {
- // deleteEndTime may be after the current endTime,
- // because deleteStartTime can be after the startTime of the whole chunk
- if (deleteEndTime
- >= susp_candidate.getStatistics().getEndTime()) { // NOTE here calculate FP
- // deleted as a whole
- currentChunkList.remove(susp_candidate);
- } else {
- // the candidate point is deleted, then label the chunk as already lazy loaded,
- // update chunkStartTime without loading data, and back to loop 1
- susp_candidate.setLazyLoad(true);
- // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
- susp_candidate.getStatistics().setStartTime(deleteEndTime + 1); // check
- // +1 is because delete is closed interval
- }
- continue; // back to loop 1
- } else {
- // the candidate point is not deleted, then it is the final result
- results
- .get(0)
- .updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
- results
- .get(2)
- .updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
- return;
- }
- }
- }
- }
-
- private void calculateLastPoint(
- List<ChunkSuit4CPV> currentChunkList,
- long startTime,
- long endTime,
- long interval,
- long curStartTime)
- throws IOException {
- // IOMonitor2.M4_LSM_status = Operation.M4_LSM_LP;
- while (currentChunkList.size() > 0) { // loop 1
- // sorted by endTime and version, find LP candidate
- currentChunkList.sort(
- new Comparator<ChunkSuit4CPV>() {
- // aggregations
- public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
- int res =
- ((Comparable) (o2.getStatistics().getEndTime()))
- .compareTo(o1.getStatistics().getEndTime());
- // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
- if (res != 0) {
- return res;
- } else {
- return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
- .compareTo(
- new MergeReaderPriority(
- o1.getChunkMetadata().getVersion(),
- o1.getChunkMetadata().getOffsetOfChunkHeader()));
- }
- }
- });
-
- ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
- if (susp_candidate.isLazyLoad()) {
- // means the chunk is already lazy loaded, then load the chunk, apply deletes, update
- // statistics,
- // cancel the lazy loaded mark, and back to loop 1
- if (susp_candidate.getPageReader() == null) {
- PageReader pageReader =
- FileLoaderUtils.loadPageReaderList4CPV(
- susp_candidate.getChunkMetadata(), this.timeFilter);
- // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK,
- // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
- // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
- // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN
- // DIRECTLY), WHICH WILL INTRODUCE BUGS!
- susp_candidate.setPageReader(pageReader);
- }
- // update LP equal to or before statistics.getEndTime
- // (b) get the closest data point after or before a timestamp
- susp_candidate.updateLPwithTheClosetPointEqualOrBefore(
- susp_candidate.getStatistics().getEndTime()); // DEBUG
- susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!!
- continue; // back to loop 1
- } else {
- // the chunk has not been lazy loaded, then verify whether the candidate point is deleted
- // Note the higher versions of deletes are guaranteed by
- // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications)
- // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata
- long candidateTimestamp = susp_candidate.getStatistics().getEndTime(); // check
- Object candidateValue = susp_candidate.getStatistics().getLastValue(); // check
-
- boolean isDeletedItself = false;
- long deleteStartTime = Long.MAX_VALUE; // check
- List<TimeRange> deleteIntervalList =
- susp_candidate.getChunkMetadata().getDeleteIntervalList();
- if (deleteIntervalList != null) {
- int deleteCursor = 0;
- while (deleteCursor < deleteIntervalList.size()) {
- if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) {
- deleteCursor++;
- } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) {
- isDeletedItself = true;
- deleteStartTime = deleteIntervalList.get(deleteCursor).getMin();
- break; // since delete intervals are already sorted and merged
- } else {
- break; // since delete intervals are already sorted and merged
- }
- }
- }
- if (isDeletedItself) {
- // deleteStartTime may be before the current startTime,
- // because deleteEndTime can be before the endTime of the whole chunk
- if (deleteStartTime <= susp_candidate.getStatistics().getStartTime()) {
- // NOTE here calculate LP.
- // deleted as a whole
- currentChunkList.remove(susp_candidate);
- } else {
- susp_candidate.setLazyLoad(true);
- // NOTE here get statistics from ChunkSuit4CPV, not from
- // ChunkSuit4CPV.ChunkMetadata
- susp_candidate.getStatistics().setEndTime(deleteStartTime - 1);
- // -1 is because delete is closed interval
- // check
- }
- continue; // back to loop 1
- } else {
- // the candidate point is not deleted, then it is the final result
- results
- .get(1)
- .updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
- results
- .get(3)
- .updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[] {candidateValue});
- return;
- }
- }
- }
- }
-
@Override
public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
throws IOException {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java
index e3a8a2c..df261cf 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java
@@ -41,6 +41,7 @@
import static org.junit.Assert.fail;
public class MyTest8 {
+
// test MinMax-LSM
private static final String TIMESTAMP_STR = "Time";
@@ -65,6 +66,7 @@
@Before
public void setUp() throws Exception {
TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(true);
+
TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN");
originalCompactionStrategy = config.getCompactionStrategy();
config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
@@ -84,6 +86,7 @@
@After
public void tearDown() throws Exception {
+ TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false);
EnvironmentUtils.cleanEnv();
config.setCompactionStrategy(originalCompactionStrategy);
config.setEnableCPV(originalEnableCPV);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
index ccab972..9be0b03 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
@@ -45,6 +45,7 @@
public Statistics statistics; // dynamically updated, includes FP/LP/BP/TP info
// [startPos,endPos] definitely for curStartTime interval, thanks to split4CPV
+ public boolean needsUpdateStartEndPos = false;
public int startPos = -1; // the first point position, starting from 0
public int endPos = -1; // the last point position, starting from 0