blob: e7eb64f47d31bb0bc775ec25ce93709d212d9778 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.executor.fill;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
public class LastPointReader {
private PartialPath seriesPath;
long queryTime;
TSDataType dataType;
private QueryContext context;
// measurements of the same device as "seriesPath"
private Set<String> deviceMeasurements;
private Filter timeFilter;
private QueryDataSource dataSource;
private ChunkMetadata cachedLastChunk;
private List<TimeseriesMetadata> unseqTimeseriesMetadataList = new ArrayList<>();
public LastPointReader() {}
public LastPointReader(
PartialPath seriesPath,
TSDataType dataType,
Set<String> deviceMeasurements,
QueryContext context,
QueryDataSource dataSource,
long queryTime,
Filter timeFilter) {
this.seriesPath = seriesPath;
this.dataType = dataType;
this.dataSource = dataSource;
this.context = context;
this.queryTime = queryTime;
this.deviceMeasurements = deviceMeasurements;
this.timeFilter = timeFilter;
}
public TimeValuePair readLastPoint() throws IOException {
TimeValuePair resultPoint = retrieveValidLastPointFromSeqFiles();
UnpackOverlappedUnseqFiles(resultPoint.getTimestamp());
PriorityQueue<ChunkMetadata> sortedChunkMetatdataList = sortUnseqChunkMetadatasByEndtime();
while (!sortedChunkMetatdataList.isEmpty()
&& resultPoint.getTimestamp() <= sortedChunkMetatdataList.peek().getEndTime()) {
ChunkMetadata chunkMetadata = sortedChunkMetatdataList.poll();
TimeValuePair chunkLastPoint = getChunkLastPoint(chunkMetadata);
if (chunkLastPoint.getTimestamp() > resultPoint.getTimestamp()
|| (chunkLastPoint.getTimestamp() == resultPoint.getTimestamp()
&& (cachedLastChunk == null || shouldUpdate(cachedLastChunk, chunkMetadata)))) {
cachedLastChunk = chunkMetadata;
resultPoint = chunkLastPoint;
}
}
return resultPoint;
}
/** Pick up and cache the last sequence TimeseriesMetadata that satisfies timeFilter */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private TimeValuePair retrieveValidLastPointFromSeqFiles() throws IOException {
List<TsFileResource> seqFileResource = dataSource.getSeqResources();
TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
for (int index = seqFileResource.size() - 1; index >= 0; index--) {
TsFileResource resource = seqFileResource.get(index);
ITimeSeriesMetadata timeseriesMetadata;
if (seriesPath instanceof VectorPartialPath) {
timeseriesMetadata =
new VectorTimeSeriesMetadata(
FileLoaderUtils.loadTimeSeriesMetadata(
resource, seriesPath, context, timeFilter, deviceMeasurements),
Collections.singletonList(
FileLoaderUtils.loadTimeSeriesMetadata(
resource,
((VectorPartialPath) seriesPath).getSubSensorsPathList().get(0),
context,
timeFilter,
deviceMeasurements)));
} else {
timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
resource, seriesPath, context, timeFilter, deviceMeasurements);
}
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()
&& endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
return constructLastPair(
timeseriesMetadata.getStatistics().getEndTime(),
timeseriesMetadata.getStatistics().getLastValue(),
dataType);
} else {
List<IChunkMetadata> seqChunkMetadataList = timeseriesMetadata.loadChunkMetadataList();
for (int i = seqChunkMetadataList.size() - 1; i >= 0; i--) {
lastPoint = getChunkLastPoint((ChunkMetadata) seqChunkMetadataList.get(i));
// last point of this sequence chunk is valid, quit the loop
if (lastPoint.getValue() != null) {
return lastPoint;
}
}
}
}
}
return lastPoint;
}
/** find the last TimeseriesMetadata in unseq files and unpack all overlapped unseq files */
private void UnpackOverlappedUnseqFiles(long lBoundTime) throws IOException {
PriorityQueue<TsFileResource> unseqFileResource =
sortUnSeqFileResourcesInDecendingOrder(dataSource.getUnseqResources());
while (!unseqFileResource.isEmpty()
&& (lBoundTime <= unseqFileResource.peek().getEndTime(seriesPath.getDevice()))) {
TimeseriesMetadata timeseriesMetadata =
FileLoaderUtils.loadTimeSeriesMetadata(
unseqFileResource.poll(), seriesPath, context, timeFilter, deviceMeasurements);
if (timeseriesMetadata == null
|| (!timeseriesMetadata.isModified()
&& timeseriesMetadata.getStatistics().getEndTime() < lBoundTime)) {
continue;
}
unseqTimeseriesMetadataList.add(timeseriesMetadata);
if (!timeseriesMetadata.isModified()) {
if (endtimeContainedByTimeFilter(timeseriesMetadata.getStatistics())) {
lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getEndTime());
} else {
lBoundTime = Math.max(lBoundTime, timeseriesMetadata.getStatistics().getStartTime());
}
}
}
}
private TimeValuePair getChunkLastPoint(ChunkMetadata chunkMetaData) throws IOException {
TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
if (chunkMetaData == null) {
return lastPoint;
}
Statistics chunkStatistics = chunkMetaData.getStatistics();
if (!chunkMetaData.isModified() && endtimeContainedByTimeFilter(chunkStatistics)) {
return constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), dataType);
}
List<IPageReader> pageReaders = FileLoaderUtils.loadPageReaderList(chunkMetaData, timeFilter);
for (int i = pageReaders.size() - 1; i >= 0; i--) {
IPageReader pageReader = pageReaders.get(i);
Statistics pageStatistics = pageReader.getStatistics();
if (!pageReader.isModified() && endtimeContainedByTimeFilter(pageStatistics)) {
lastPoint =
constructLastPair(pageStatistics.getEndTime(), pageStatistics.getLastValue(), dataType);
} else {
BatchData batchData = pageReader.getAllSatisfiedPageData();
lastPoint = batchData.getLastPairBeforeOrEqualTimestamp(queryTime);
}
if (lastPoint.getValue() != null) {
return lastPoint;
}
}
return lastPoint;
}
private boolean shouldUpdate(ChunkMetadata cachedChunk, ChunkMetadata newChunk) {
return (newChunk.getVersion() > cachedChunk.getVersion())
|| (newChunk.getVersion() == cachedChunk.getVersion()
&& newChunk.getOffsetOfChunkHeader() > cachedChunk.getOffsetOfChunkHeader());
}
private PriorityQueue<TsFileResource> sortUnSeqFileResourcesInDecendingOrder(
List<TsFileResource> tsFileResources) {
PriorityQueue<TsFileResource> unseqTsFilesSet =
new PriorityQueue<>(
(o1, o2) -> {
Long maxTimeOfO1 = o1.getEndTime(seriesPath.getDevice());
Long maxTimeOfO2 = o2.getEndTime(seriesPath.getDevice());
return Long.compare(maxTimeOfO2, maxTimeOfO1);
});
unseqTsFilesSet.addAll(tsFileResources);
return unseqTsFilesSet;
}
private PriorityQueue<ChunkMetadata> sortUnseqChunkMetadatasByEndtime() throws IOException {
PriorityQueue<ChunkMetadata> chunkMetadataList =
new PriorityQueue<>(
(o1, o2) -> {
long endTime1 = o1.getEndTime();
long endTime2 = o2.getEndTime();
if (endTime1 < endTime2) {
return 1;
} else if (endTime1 > endTime2) {
return -1;
}
if (o2.getVersion() > o1.getVersion()) {
return 1;
}
return (o2.getVersion() < o1.getVersion()
? -1
: Long.compare(o2.getOffsetOfChunkHeader(), o1.getOffsetOfChunkHeader()));
});
for (TimeseriesMetadata timeseriesMetadata : unseqTimeseriesMetadataList) {
if (timeseriesMetadata != null) {
for (IChunkMetadata chunkMetadata : timeseriesMetadata.loadChunkMetadataList()) {
chunkMetadataList.add((ChunkMetadata) chunkMetadata);
}
}
}
return chunkMetadataList;
}
private boolean endtimeContainedByTimeFilter(Statistics statistics) {
if (timeFilter == null) {
return true;
}
return timeFilter.containStartEndTime(statistics.getEndTime(), statistics.getEndTime());
}
private TimeValuePair constructLastPair(long timestamp, Object value, TSDataType dataType) {
return new TimeValuePair(timestamp, TsPrimitiveType.getByType(dataType, value));
}
}