blob: 2606625d8615012c9672e6469970126252355c0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.executor.groupby.impl;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.groupby.AlignedGroupByExecutor;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
private final AlignedSeriesAggregateReader reader;
private BatchData preCachedData;
// Aggregate result buffer
private final List<List<AggregateResult>> results = new ArrayList<>();
private final TimeRange timeRange;
// used for resetting the batch data to the last index
private int lastReadCurArrayIndex;
private int lastReadCurListIndex;
private final boolean ascending;
private final QueryDataSource queryDataSource;
public LocalAlignedGroupByExecutor(
PartialPath path,
QueryContext context,
Filter timeFilter,
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter, ascending);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
// init AlignedSeriesAggregateReader for aligned series
Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
reader =
new AlignedSeriesAggregateReader(
(AlignedPath) path,
allSensors,
TSDataType.VECTOR,
context,
queryDataSource,
timeFilter,
null,
fileFilter,
ascending);
preCachedData = null;
timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
this.ascending = ascending;
}
@Override
public void addAggregateResult(List<AggregateResult> aggregateResults) {
results.add(aggregateResults);
}
@Override
public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {
// clear result cache
for (List<AggregateResult> resultsOfOneMeasurement : results) {
for (AggregateResult result : resultsOfOneMeasurement) {
result.reset();
}
}
timeRange.set(curStartTime, curEndTime - 1);
if (calcFromCacheData(curStartTime, curEndTime)) {
return results;
}
// read page data firstly
if (readAndCalcFromPage(curStartTime, curEndTime)) {
return results;
}
// read chunk data secondly
if (readAndCalcFromChunk(curStartTime, curEndTime)) {
return results;
}
// read from file
while (reader.hasNextFile()) {
// try to calc from fileMetaData
if (reader.canUseCurrentFileStatistics()) {
Statistics fileTimeStatistics = reader.currentFileTimeStatistics();
if (fileTimeStatistics.getStartTime() >= curEndTime) {
if (ascending) {
return results;
} else {
reader.skipCurrentFile();
continue;
}
}
if (timeRange.contains(
fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
// calc from fileMetaData
while (reader.hasNextSubSeries()) {
Statistics currentFileStatistics = reader.currentFileStatistics();
calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
reader.nextSeries();
}
reader.skipCurrentFile();
continue;
}
}
// read chunk
if (readAndCalcFromChunk(curStartTime, curEndTime)) {
return results;
}
}
return results;
}
private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
throws QueryProcessException {
// statistics may be null for aligned time series
if (statistics == null) {
return;
}
if (statistics.getStartTime() == Long.MAX_VALUE && statistics.getEndTime() == Long.MIN_VALUE) {
return;
}
for (AggregateResult result : aggregateResultList) {
if (result.hasFinalResult()) {
continue;
}
result.updateResultFromStatistics(statistics);
}
}
private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {
while (reader.hasNextChunk()) {
// try to calc from chunkMetaData
if (reader.canUseCurrentChunkStatistics()) {
Statistics chunkTimeStatistics = reader.currentChunkTimeStatistics();
if (chunkTimeStatistics.getStartTime() >= curEndTime) {
if (ascending) {
return true;
} else {
reader.skipCurrentChunk();
continue;
}
}
if (timeRange.contains(
chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
// calc from chunkMetaData
while (reader.hasNextSubSeries()) {
Statistics currentChunkStatistics = reader.currentChunkStatistics();
calcFromStatistics(currentChunkStatistics, results.get(reader.getCurIndex()));
reader.nextSeries();
}
reader.skipCurrentChunk();
continue;
}
}
// read page
if (readAndCalcFromPage(curStartTime, curEndTime)) {
return true;
}
}
return false;
}
private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {
while (reader.hasNextPage()) {
// try to calc from pageHeader
if (reader.canUseCurrentPageStatistics()) {
Statistics pageTimeStatistics = reader.currentPageTimeStatistics();
// current page max than time range
if (pageTimeStatistics.getStartTime() >= curEndTime) {
if (ascending) {
return true;
} else {
reader.skipCurrentPage();
continue;
}
}
if (timeRange.contains(
pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
// calc from pageHeader
while (reader.hasNextSubSeries()) {
int subIndex = reader.getCurIndex();
Statistics currentPageStatistics = reader.currentPageStatistics();
calcFromStatistics(currentPageStatistics, results.get(subIndex));
reader.nextSeries();
}
reader.skipCurrentPage();
if (isEndCalc()) {
return true;
}
continue;
}
}
// calc from page data
BatchData batchData = reader.nextPage();
if (batchData == null || !batchData.hasCurrent()) {
continue;
}
// set initial Index
lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
lastReadCurListIndex = batchData.getReadCurListIndex();
// stop calc and cached current batchData
if (ascending && batchData.currentTime() >= curEndTime) {
preCachedData = batchData;
return true;
}
// calc from batch data
calcFromBatch(batchData, curStartTime, curEndTime);
// judge whether the calculation finished
if (isEndCalc()
|| (batchData.hasCurrent()
&& (ascending
? batchData.currentTime() >= curEndTime
: batchData.currentTime() < curStartTime))) {
return true;
}
}
return false;
}
private boolean calcFromCacheData(long curStartTime, long curEndTime) throws IOException {
calcFromBatch(preCachedData, curStartTime, curEndTime);
// The result is calculated from the cache, judge whether the calculation finished
return ((preCachedData != null
&& (ascending
? preCachedData.getMaxTimestamp() >= curEndTime
: preCachedData.getMinTimestamp() < curStartTime))
|| isEndCalc());
}
private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTime)
throws IOException {
// check if the batchData does not contain points in current interval
if (!satisfied(batchData, curStartTime, curEndTime)) {
return;
}
boolean hasCached = false;
int curReadCurArrayIndex = lastReadCurArrayIndex;
while (reader.hasNextSubSeries()) {
int subIndex = reader.getCurIndex();
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
List<AggregateResult> aggregateResultList = results.get(subIndex);
for (AggregateResult result : aggregateResultList) {
// current agg method has been calculated
if (result.hasFinalResult()) {
continue;
}
// lazy reset batch data for calculation
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
IBatchDataIterator batchDataIterator = batchData.getBatchDataIterator(subIndex);
if (ascending) {
// skip points that cannot be calculated
while (batchDataIterator.hasNext(curStartTime, curEndTime)
&& batchDataIterator.currentTime() < curStartTime) {
batchDataIterator.next();
}
} else {
while (batchDataIterator.hasNext(curStartTime, curEndTime)
&& batchDataIterator.currentTime() >= curEndTime) {
batchDataIterator.next();
}
}
if (batchDataIterator.hasNext(curStartTime, curEndTime)) {
result.updateResultFromPageData(batchDataIterator, curStartTime, curEndTime);
}
curReadCurArrayIndex =
ascending
? Math.max(curReadCurArrayIndex, batchData.getReadCurArrayIndex())
: Math.min(curReadCurArrayIndex, batchData.getReadCurArrayIndex());
}
// can calc for next interval
if (!hasCached && batchData.hasCurrent()) {
preCachedData = batchData;
hasCached = true;
}
reader.nextSeries();
}
// reset the last position to current Index
lastReadCurArrayIndex = curReadCurArrayIndex;
lastReadCurListIndex = batchData.getReadCurListIndex();
batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
}
private boolean isEndCalc() {
for (List<AggregateResult> resultsOfOneMeasurement : results) {
for (AggregateResult result : resultsOfOneMeasurement) {
if (!result.hasFinalResult()) {
return false;
}
}
}
return true;
}
private boolean satisfied(BatchData batchData, long curStartTime, long curEndTime) {
if (batchData == null || !batchData.hasCurrent()) {
return false;
}
if (ascending
&& (batchData.getMaxTimestamp() < curStartTime || batchData.currentTime() >= curEndTime)) {
return false;
}
if (!ascending
&& (batchData.getTimeByIndex(0) >= curEndTime || batchData.currentTime() < curStartTime)) {
preCachedData = batchData;
return false;
}
return true;
}
}