[IOTDB-942] Optimization of query with long time unsequence page (#1824)
* init
* add some todo
* implement function of ascending
* implement desc part
* mark the bug position
* fix bugs
* update currentPageEndPointTime
* fix code style
* add javadoc
Co-authored-by: JackieTien97 <JackieTien@foxmail.com>
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index e74d4b9..a2a9988 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -95,12 +95,13 @@
* page cache
*/
private VersionPageReader firstPageReader;
- private PriorityQueue<VersionPageReader> cachedPageReaders;
+ private final List<VersionPageReader> seqPageReaders = new LinkedList<>();
+ private final PriorityQueue<VersionPageReader> unSeqPageReaders;
/*
* point cache
*/
- private PriorityMergeReader mergeReader;
+ private final PriorityMergeReader mergeReader;
/*
* result cache
@@ -133,7 +134,7 @@
timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
cachedChunkMetadata = new PriorityQueue<>(orderUtils.comparingLong(
chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
- cachedPageReaders = new PriorityQueue<>(orderUtils.comparingLong(
+ unSeqPageReaders = new PriorityQueue<>(orderUtils.comparingLong(
versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
}
@@ -163,7 +164,7 @@
timeSeriesMetadata -> orderUtils.getOrderTime(timeSeriesMetadata.getStatistics())));
cachedChunkMetadata = new PriorityQueue<>(orderUtils.comparingLong(
chunkMetadata -> orderUtils.getOrderTime(chunkMetadata.getStatistics())));
- cachedPageReaders = new PriorityQueue<>(orderUtils.comparingLong(
+ unSeqPageReaders = new PriorityQueue<>(orderUtils.comparingLong(
versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics())));
}
@@ -173,12 +174,12 @@
boolean hasNextFile() throws IOException {
- if (!cachedPageReaders.isEmpty()
+ if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
throw new IOException(
"all cached pages should be consumed first cachedPageReaders.isEmpty() is "
- + cachedPageReaders.isEmpty()
+ + unSeqPageReaders.isEmpty()
+ " firstPageReader != null is "
+ (firstPageReader != null)
+ " mergeReader.hasNextTimeValuePair() = "
@@ -231,12 +232,12 @@
* overlapped chunks are consumed
*/
boolean hasNextChunk() throws IOException {
- if (!cachedPageReaders.isEmpty()
+ if (!unSeqPageReaders.isEmpty()
|| firstPageReader != null
|| mergeReader.hasNextTimeValuePair()) {
throw new IOException(
"all cached pages should be consumed first cachedPageReaders.isEmpty() is "
- + cachedPageReaders.isEmpty()
+ + unSeqPageReaders.isEmpty()
+ " firstPageReader != null is "
+ (firstPageReader != null)
+ " mergeReader.hasNextTimeValuePair() = "
@@ -300,6 +301,7 @@
throws IOException {
List<ChunkMetadata> chunkMetadataList = FileLoaderUtils
.loadChunkMetadataList(timeSeriesMetadata);
+ chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq()));
// try to calculate the total number of chunk and time-value points in chunk
if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
QueryResourceManager queryResourceManager = QueryResourceManager.getInstance();
@@ -381,8 +383,8 @@
/*
* first chunk metadata is already unpacked, consume cached pages
*/
- if (!cachedPageReaders.isEmpty()) {
- firstPageReader = cachedPageReaders.poll();
+ initFirstPageReader();
+ if (firstPageReader != null) {
long endpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics());
unpackAllOverlappedTsFilesToTimeSeriesMetadata(endpointTime);
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(endpointTime, false);
@@ -390,10 +392,7 @@
}
}
- if (firstPageReader != null
- && !cachedPageReaders.isEmpty()
- && orderUtils
- .isOverlapped(firstPageReader.getStatistics(), cachedPageReaders.peek().getStatistics())) {
+ if (firstPageOverlapped()) {
/*
* next page is overlapped, read overlapped data and cache it
*/
@@ -407,11 +406,11 @@
}
// make sure firstPageReader won't be null while cachedPageReaders has more cached page readers
- while (firstPageReader == null && !cachedPageReaders.isEmpty()) {
- firstPageReader = cachedPageReaders.poll();
- if (!cachedPageReaders.isEmpty()
- && orderUtils.isOverlapped(firstPageReader.getStatistics(),
- cachedPageReaders.peek().getStatistics())) {
+ while (firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders.isEmpty())) {
+
+ initFirstPageReader();
+
+ if (firstPageOverlapped()) {
/*
* next page is overlapped, read overlapped data and cache it
*/
@@ -427,6 +426,13 @@
return firstPageReader != null;
}
+ private boolean firstPageOverlapped() {
+ return firstPageReader != null && (!seqPageReaders.isEmpty() && orderUtils
+ .isOverlapped(firstPageReader.getStatistics(), seqPageReaders.get(0).getStatistics())) || (
+ !unSeqPageReaders.isEmpty() && orderUtils.isOverlapped(firstPageReader.getStatistics(),
+ unSeqPageReaders.peek().getStatistics()));
+ }
+
private void unpackAllOverlappedChunkMetadataToCachedPageReaders(long endpointTime, boolean init)
throws IOException {
while (!cachedChunkMetadata.isEmpty() &&
@@ -438,17 +444,31 @@
unpackOneChunkMetaData(firstChunkMetadata);
firstChunkMetadata = null;
}
- if (init && firstPageReader == null && !cachedPageReaders.isEmpty()) {
- firstPageReader = cachedPageReaders.poll();
+ if (init && firstPageReader == null && (!seqPageReaders.isEmpty() || !unSeqPageReaders
+ .isEmpty())) {
+ initFirstPageReader();
}
}
- private void unpackOneChunkMetaData(ChunkMetadata chunkMetaData) throws IOException {
- FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter)
- .forEach(
- pageReader ->
- cachedPageReaders.add(
- new VersionPageReader(chunkMetaData.getVersion(), pageReader)));
+ private void unpackOneChunkMetaData(ChunkMetadata chunkMetaData)
+ throws IOException {
+ FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter).forEach(
+ pageReader -> {
+ if (chunkMetaData.isSeq()) {
+ // addLast for asc; addFirst for desc
+ if (orderUtils.getAscending()) {
+ seqPageReaders
+ .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, true));
+ } else {
+ seqPageReaders
+ .add(0, new VersionPageReader(chunkMetaData.getVersion(), pageReader, true));
+ }
+
+ } else {
+ unSeqPageReaders
+ .add(new VersionPageReader(chunkMetaData.getVersion(), pageReader, false));
+ }
+ });
}
/**
@@ -469,14 +489,16 @@
/*
* has a non-overlapped page in firstPageReader
*/
- if (mergeReader.hasNextTimeValuePair()) {
+ if (mergeReader.hasNextTimeValuePair()
+ && mergeReader.currentTimeValuePair().getTimestamp() <= firstPageReader.getStatistics()
+ .getEndTime()) {
throw new IOException("overlapped data should be consumed first");
}
Statistics firstPageStatistics = firstPageReader.getStatistics();
- return !cachedPageReaders.isEmpty()
- && orderUtils.isOverlapped(firstPageStatistics, cachedPageReaders.peek().getStatistics());
+ return !unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(firstPageStatistics, unSeqPageReaders.peek().getStatistics());
}
Statistics currentPageStatistics() {
@@ -535,7 +557,7 @@
return true;
}
- tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader();
+ tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader();
while (true) {
@@ -543,7 +565,14 @@
cachedBatchData = BatchDataFactory.createBatchData(dataType);
long currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
-
+ if (firstPageReader != null) {
+ currentPageEndPointTime = orderUtils
+ .getCurrentEndPoint(currentPageEndPointTime, firstPageReader.getStatistics());
+ }
+ if (!seqPageReaders.isEmpty()) {
+ currentPageEndPointTime = orderUtils
+ .getCurrentEndPoint(currentPageEndPointTime, seqPageReaders.get(0).getStatistics());
+ }
while (mergeReader.hasNextTimeValuePair()) {
/*
@@ -552,23 +581,55 @@
TimeValuePair timeValuePair = mergeReader.currentTimeValuePair();
if (orderUtils.isExcessEndpoint(timeValuePair.getTimestamp(), currentPageEndPointTime)) {
- break;
+ if (cachedBatchData.hasCurrent() || firstPageReader != null || !seqPageReaders.isEmpty()) {
+ break;
+ }
+ currentPageEndPointTime = mergeReader.getCurrentReadStopTime();
}
unpackAllOverlappedTsFilesToTimeSeriesMetadata(timeValuePair.getTimestamp());
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
timeValuePair.getTimestamp(), false);
unpackAllOverlappedChunkMetadataToCachedPageReaders(timeValuePair.getTimestamp(), false);
- unpackAllOverlappedCachedPageReadersToMergeReader(timeValuePair.getTimestamp());
+ unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+
+ if (firstPageReader != null) {
+ if ((orderUtils.getAscending() && timeValuePair.getTimestamp() > firstPageReader
+ .getStatistics().getEndTime()) || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp() < firstPageReader.getStatistics().getStartTime())) {
+ hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+ return hasCachedNextOverlappedPage;
+ } else {
+ mergeReader
+ .addReader(firstPageReader.getAllSatisfiedPageData(orderUtils.getAscending())
+ .getBatchDataIterator(), firstPageReader.version,
+ orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()));
+ firstPageReader = null;
+ }
+ }
+
+ if (!seqPageReaders.isEmpty()) {
+ if ((orderUtils.getAscending() && timeValuePair.getTimestamp() > seqPageReaders.get(0)
+ .getStatistics().getEndTime()) || (!orderUtils.getAscending()
+ && timeValuePair.getTimestamp() < seqPageReaders.get(0).getStatistics()
+ .getStartTime())) {
+ hasCachedNextOverlappedPage = cachedBatchData.hasCurrent();
+ return hasCachedNextOverlappedPage;
+ } else {
+ VersionPageReader pageReader = seqPageReaders.remove(0);
+ mergeReader.addReader(pageReader.getAllSatisfiedPageData(orderUtils.getAscending())
+ .getBatchDataIterator(), pageReader.version,
+ orderUtils.getOverlapCheckTime(pageReader.getStatistics()));
+ }
+ }
/*
* get the latest first point in mergeReader
*/
timeValuePair = mergeReader.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
+ if (valueFilter == null || valueFilter
+ .satisfy(timeValuePair.getTimestamp(), timeValuePair.getValue().getValue())) {
cachedBatchData.putAnObject(
timeValuePair.getTimestamp(), timeValuePair.getValue().getValue());
}
@@ -579,6 +640,9 @@
*/
if (hasCachedNextOverlappedPage) {
return true;
+ // condition: seqPage.endTime < mergeReader.currentTime
+ } else if (mergeReader.hasNextTimeValuePair()) {
+ return false;
}
} else {
return false;
@@ -586,12 +650,12 @@
}
}
- private void tryToPutAllDirectlyOverlappedPageReadersIntoMergeReader() throws IOException {
+ private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException {
/*
* no cached page readers
*/
- if (firstPageReader == null && cachedPageReaders.isEmpty()) {
+ if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) {
return;
}
@@ -599,7 +663,7 @@
* init firstPageReader
*/
if (firstPageReader == null) {
- firstPageReader = cachedPageReaders.poll();
+ initFirstPageReader();
}
long currentPageEndpointTime;
@@ -610,18 +674,33 @@
}
/*
- * put all currently directly overlapped page reader to merge reader
+ * put all currently directly overlapped unseq page reader to merge reader
*/
- unpackAllOverlappedCachedPageReadersToMergeReader(currentPageEndpointTime);
+ unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime);
}
- private void unpackAllOverlappedCachedPageReadersToMergeReader(long endpointTime)
- throws IOException {
- while (!cachedPageReaders.isEmpty()
- && orderUtils.isOverlapped(endpointTime, cachedPageReaders.peek().data.getStatistics())) {
- putPageReaderToMergeReader(cachedPageReaders.poll());
+ private void initFirstPageReader() {
+ if (!seqPageReaders.isEmpty() && !unSeqPageReaders.isEmpty()) {
+ if (orderUtils.isTakeSeqAsFirst(seqPageReaders.get(0).getStatistics(), unSeqPageReaders.peek()
+ .getStatistics())) {
+ firstPageReader = seqPageReaders.remove(0);
+ } else {
+ firstPageReader = unSeqPageReaders.poll();
+ }
+ } else if (!seqPageReaders.isEmpty()) {
+ firstPageReader = seqPageReaders.remove(0);
+ } else if (!unSeqPageReaders.isEmpty()) {
+ firstPageReader = unSeqPageReaders.poll();
}
- if (firstPageReader != null &&
+ }
+
+ private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime)
+ throws IOException {
+ while (!unSeqPageReaders.isEmpty()
+ && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().data.getStatistics())) {
+ putPageReaderToMergeReader(unSeqPageReaders.poll());
+ }
+ if (firstPageReader != null && !firstPageReader.isSeq() &&
orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) {
putPageReaderToMergeReader(firstPageReader);
firstPageReader = null;
@@ -668,6 +747,7 @@
orderUtils.getNextSeqFileResource(seqFileResource, true), seriesPath, context,
getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
+ timeseriesMetadata.setSeq(true);
seqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
@@ -681,6 +761,7 @@
unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
timeseriesMetadata.setModified(true);
+ timeseriesMetadata.setSeq(false);
unSeqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
@@ -691,19 +772,14 @@
long endTime = -1L;
if (!seqTimeSeriesMetadata.isEmpty() && unSeqTimeSeriesMetadata.isEmpty()) {
// only has seq
-
endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
} else if (seqTimeSeriesMetadata.isEmpty() && !unSeqTimeSeriesMetadata.isEmpty()) {
// only has unseq
endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
} else if (!seqTimeSeriesMetadata.isEmpty()) {
// has seq and unseq
- if (orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics())
- <= orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics())) {
- endTime = orderUtils.getOverlapCheckTime(seqTimeSeriesMetadata.get(0).getStatistics());
- } else {
- endTime = orderUtils.getOverlapCheckTime(unSeqTimeSeriesMetadata.peek().getStatistics());
- }
+ endTime = orderUtils.getCurrentEndPoint(seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics());
}
/*
@@ -724,8 +800,8 @@
firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
} else if (!seqTimeSeriesMetadata.isEmpty()) {
// has seq and unseq
- if (orderUtils.getOrderTime(seqTimeSeriesMetadata.get(0).getStatistics())
- <= orderUtils.getOrderTime(unSeqTimeSeriesMetadata.peek().getStatistics())) {
+ if (orderUtils.isTakeSeqAsFirst(seqTimeSeriesMetadata.get(0).getStatistics(),
+ unSeqTimeSeriesMetadata.peek().getStatistics())) {
firstTimeSeriesMetadata = seqTimeSeriesMetadata.remove(0);
} else {
firstTimeSeriesMetadata = unSeqTimeSeriesMetadata.poll();
@@ -742,6 +818,7 @@
unseqFileResource.remove(0), seriesPath, context, getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
timeseriesMetadata.setModified(true);
+ timeseriesMetadata.setSeq(false);
unSeqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
@@ -753,6 +830,7 @@
orderUtils.getNextSeqFileResource(seqFileResource, true), seriesPath, context,
getAnyFilter(), allSensors);
if (timeseriesMetadata != null) {
+ timeseriesMetadata.setSeq(true);
seqTimeSeriesMetadata.add(timeseriesMetadata);
}
}
@@ -775,9 +853,12 @@
protected long version;
protected IPageReader data;
- VersionPageReader(long version, IPageReader data) {
+ protected boolean isSeq;
+
+ VersionPageReader(long version, IPageReader data, boolean isSeq) {
this.version = version;
this.data = data;
+ this.isSeq = isSeq;
}
Statistics getStatistics() {
@@ -795,6 +876,10 @@
boolean isModified() {
return data.isModified();
}
+
+ public boolean isSeq() {
+ return isSeq;
+ }
}
@@ -816,8 +901,19 @@
<T> Comparator<T> comparingLong(ToLongFunction<? super T> keyExtractor);
+ long getCurrentEndPoint(long time, Statistics<? extends Object> statistics);
+
+ long getCurrentEndPoint(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics);
+
boolean isExcessEndpoint(long time, long endpointTime);
+ /**
+ * Return true if taking first page reader from seq readers
+ */
+ boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics);
+
boolean getAscending();
}
@@ -870,6 +966,16 @@
(c1, c2) -> Long.compare(keyExtractor.applyAsLong(c2), keyExtractor.applyAsLong(c1));
}
+ @Override
+ public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+ return Math.max(time, statistics.getStartTime());
+ }
+
+ @Override
+ public long getCurrentEndPoint(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics) {
+ return Math.max(seqStatistics.getStartTime(), unseqStatistics.getStartTime());
+ }
@Override
public boolean isExcessEndpoint(long time, long endpointTime) {
@@ -877,6 +983,12 @@
}
@Override
+ public boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics) {
+ return seqStatistics.getEndTime() > unseqStatistics.getEndTime();
+ }
+
+ @Override
public boolean getAscending() {
return false;
}
@@ -932,11 +1044,28 @@
}
@Override
+ public long getCurrentEndPoint(long time, Statistics<? extends Object> statistics) {
+ return Math.min(time, statistics.getEndTime());
+ }
+
+ @Override
+ public long getCurrentEndPoint(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics) {
+ return Math.min(seqStatistics.getEndTime(), unseqStatistics.getEndTime());
+ }
+
+ @Override
public boolean isExcessEndpoint(long time, long endpointTime) {
return time > endpointTime;
}
@Override
+ public boolean isTakeSeqAsFirst(Statistics<? extends Object> seqStatistics,
+ Statistics<? extends Object> unseqStatistics) {
+ return seqStatistics.getStartTime() < unseqStatistics.getStartTime();
+ }
+
+ @Override
public boolean getAscending() {
return true;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
index da68c93..499bf57 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java
@@ -73,6 +73,9 @@
private static final int CHUNK_METADATA_FIXED_RAM_SIZE = 80;
+ // used for SeriesReader to indicate whether it is a seq/unseq timeseries metadata
+ private boolean isSeq = true;
+
private ChunkMetadata() {
}
@@ -269,4 +272,12 @@
this.statistics.mergeStatistics(chunkMetadata.getStatistics());
this.ramSize = calculateRamSize();
}
+
+ public void setSeq(boolean seq) {
+ isSeq = seq;
+ }
+
+ public boolean isSeq() {
+ return isSeq;
+ }
}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
index 754d48b..ad8fc3f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java
@@ -46,6 +46,9 @@
private long ramSize;
+ // used for SeriesReader to indicate whether it is a seq/unseq timeseries metadata
+ private boolean isSeq = true;
+
public TimeseriesMetadata() {
}
@@ -158,4 +161,12 @@
public long getRamSize() {
return ramSize;
}
+
+ public void setSeq(boolean seq) {
+ isSeq = seq;
+ }
+
+ public boolean isSeq() {
+ return isSeq;
+ }
}