blob: c8977910a902c6b7f776a337eefa69bf50acba28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.utils.AggregateUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths;
@SuppressWarnings("java:S1135") // ignore todos
public class AggregationExecutor {
private static final Logger logger = LoggerFactory.getLogger(AggregationExecutor.class);
private List<PartialPath> selectedSeries;
protected List<TSDataType> dataTypes;
protected List<String> aggregations;
protected IExpression expression;
protected boolean ascending;
/** aggregation batch calculation size. */
private int aggregateFetchSize;
protected AggregationExecutor(AggregationPlan aggregationPlan) {
this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
this.aggregations = aggregationPlan.getDeduplicatedAggregations();
this.expression = aggregationPlan.getExpression();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
this.ascending = aggregationPlan.isAscending();
}
/**
* execute aggregate function with only time filter or no filter.
*
* @param context query context
*/
public QueryDataSet executeWithoutValueFilter(
QueryContext context, AggregationPlan aggregationPlan)
throws StorageEngineException, IOException, QueryProcessException {
Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
// TODO use multi-thread
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
// TODO-Cluster: group the paths by storage group to reduce communications
Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
lockListAndProcessorToSeriesMapPair =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
try {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
} catch (Exception e) {
logger.error("Meet error when init QueryDataSource ", e);
throw new QueryProcessException("Meet error when init QueryDataSource.", e);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
aggregateOneSeries(
entry,
aggregateResultList,
aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
timeFilter,
context);
}
return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan);
}
/**
* get aggregation result for one series
*
* @param pathToAggrIndexes entry of path to aggregation indexes map
* @param timeFilter time filter
* @param context query context
*/
protected void aggregateOneSeries(
Map.Entry<PartialPath, List<Integer>> pathToAggrIndexes,
AggregateResult[] aggregateResultList,
Set<String> measurements,
Filter timeFilter,
QueryContext context)
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> ascAggregateResultList = new ArrayList<>();
List<AggregateResult> descAggregateResultList = new ArrayList<>();
boolean[] isAsc = new boolean[aggregateResultList.length];
PartialPath seriesPath = pathToAggrIndexes.getKey();
TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
for (int i : pathToAggrIndexes.getValue()) {
// construct AggregateResult
AggregateResult aggregateResult =
AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType);
if (aggregateResult.isAscending()) {
ascAggregateResultList.add(aggregateResult);
isAsc[i] = true;
} else {
descAggregateResultList.add(aggregateResult);
}
}
aggregateOneSeries(
seriesPath,
measurements,
context,
timeFilter,
tsDataType,
ascAggregateResultList,
descAggregateResultList,
null);
int ascIndex = 0;
int descIndex = 0;
for (int i : pathToAggrIndexes.getValue()) {
aggregateResultList[i] =
isAsc[i]
? ascAggregateResultList.get(ascIndex++)
: descAggregateResultList.get(descIndex++);
}
}
@SuppressWarnings("squid:S107")
public static void aggregateOneSeries(
PartialPath seriesPath,
Set<String> measurements,
QueryContext context,
Filter timeFilter,
TSDataType tsDataType,
List<AggregateResult> ascAggregateResultList,
List<AggregateResult> descAggregateResultList,
TsFileFilter fileFilter)
throws StorageEngineException, IOException, QueryProcessException {
// construct series reader without value filter
QueryDataSource queryDataSource =
QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
if (ascAggregateResultList != null && !ascAggregateResultList.isEmpty()) {
QueryUtils.fillOrderIndexes(queryDataSource, seriesPath.getDevice(), true);
IAggregateReader seriesReader =
new SeriesAggregateReader(
seriesPath,
measurements,
tsDataType,
context,
queryDataSource,
timeFilter,
null,
null,
true);
aggregateFromReader(seriesReader, ascAggregateResultList);
}
if (descAggregateResultList != null && !descAggregateResultList.isEmpty()) {
QueryUtils.fillOrderIndexes(queryDataSource, seriesPath.getDevice(), false);
IAggregateReader seriesReader =
new SeriesAggregateReader(
seriesPath,
measurements,
tsDataType,
context,
queryDataSource,
timeFilter,
null,
null,
false);
aggregateFromReader(seriesReader, descAggregateResultList);
}
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private static void aggregateFromReader(
IAggregateReader seriesReader, List<AggregateResult> aggregateResultList)
throws QueryProcessException, IOException {
int remainingToCalculate = aggregateResultList.size();
boolean[] isCalculatedArray = new boolean[aggregateResultList.size()];
while (seriesReader.hasNextFile()) {
// cal by file statistics
if (seriesReader.canUseCurrentFileStatistics()) {
Statistics fileStatistics = seriesReader.currentFileStatistics();
remainingToCalculate =
aggregateStatistics(
aggregateResultList, isCalculatedArray, remainingToCalculate, fileStatistics);
if (remainingToCalculate == 0) {
return;
}
seriesReader.skipCurrentFile();
continue;
}
while (seriesReader.hasNextChunk()) {
// cal by chunk statistics
if (seriesReader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = seriesReader.currentChunkStatistics();
remainingToCalculate =
aggregateStatistics(
aggregateResultList, isCalculatedArray, remainingToCalculate, chunkStatistics);
if (remainingToCalculate == 0) {
return;
}
seriesReader.skipCurrentChunk();
continue;
}
remainingToCalculate =
aggregatePages(
seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate);
if (remainingToCalculate == 0) {
return;
}
}
}
}
/** Aggregate each result in the list with the statistics */
private static int aggregateStatistics(
List<AggregateResult> aggregateResultList,
boolean[] isCalculatedArray,
int remainingToCalculate,
Statistics statistics)
throws QueryProcessException {
int newRemainingToCalculate = remainingToCalculate;
for (int i = 0; i < aggregateResultList.size(); i++) {
if (!isCalculatedArray[i]) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromStatistics(statistics);
if (aggregateResult.hasFinalResult()) {
isCalculatedArray[i] = true;
newRemainingToCalculate--;
if (newRemainingToCalculate == 0) {
return newRemainingToCalculate;
}
}
}
}
return newRemainingToCalculate;
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private static int aggregatePages(
IAggregateReader seriesReader,
List<AggregateResult> aggregateResultList,
boolean[] isCalculatedArray,
int remainingToCalculate)
throws IOException, QueryProcessException {
while (seriesReader.hasNextPage()) {
// cal by page statistics
if (seriesReader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = seriesReader.currentPageStatistics();
remainingToCalculate =
aggregateStatistics(
aggregateResultList, isCalculatedArray, remainingToCalculate, pageStatistic);
if (remainingToCalculate == 0) {
return 0;
}
seriesReader.skipCurrentPage();
continue;
}
BatchData nextOverlappedPageData = seriesReader.nextPage();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (!isCalculatedArray[i]) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromPageData(nextOverlappedPageData);
nextOverlappedPageData.resetBatchData();
if (aggregateResult.hasFinalResult()) {
isCalculatedArray[i] = true;
remainingToCalculate--;
if (remainingToCalculate == 0) {
return 0;
}
}
}
}
}
return remainingToCalculate;
}
/**
* execute aggregate function with value filter.
*
* @param context query context.
*/
public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
optimizeLastElementFunc(queryPlan);
TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan);
// group by path name
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
groupAggregationsBySeries(selectedSeries);
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>();
Pair<List<StorageGroupProcessor>, Map<StorageGroupProcessor, List<PartialPath>>>
lockListAndProcessorToSeriesMapPair =
StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
List<StorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<StorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
try {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(
processorToSeriesMap, context, timestampGenerator.getTimeFilter());
} catch (Exception e) {
logger.error("Meet error when init QueryDataSource ", e);
throw new QueryProcessException("Meet error when init QueryDataSource.", e);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath path = selectedSeries.get(i);
List<Integer> indexes = pathToAggrIndexesMap.remove(path);
if (indexes != null) {
IReaderByTimestamp seriesReaderByTimestamp =
getReaderByTime(path, queryPlan, dataTypes.get(i), context);
readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes);
}
}
List<AggregateResult> aggregateResults = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
AggregateResult result =
AggregateResultFactory.getAggrResultByName(
aggregations.get(i), dataTypes.get(i), ascending);
aggregateResults.add(result);
}
aggregateWithValueFilter(aggregateResults, timestampGenerator, readerToAggrIndexesMap);
return constructDataSet(aggregateResults, queryPlan);
}
private void optimizeLastElementFunc(QueryPlan queryPlan) {
int index = 0;
for (; index < aggregations.size(); index++) {
String aggregationFunc = aggregations.get(index);
if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME)
&& !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) {
break;
}
}
if (index >= aggregations.size()) {
queryPlan.setAscending(false);
this.ascending = false;
}
}
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
throws StorageEngineException {
return new ServerTimeGenerator(context, queryPlan);
}
protected IReaderByTimestamp getReaderByTime(
PartialPath path, RawDataQueryPlan queryPlan, TSDataType dataType, QueryContext context)
throws StorageEngineException, QueryProcessException {
return new SeriesReaderByTimestamp(
path,
queryPlan.getAllMeasurementsInDevice(path.getDevice()),
dataType,
context,
QueryResourceManager.getInstance().getQueryDataSource(path, context, null),
null,
ascending);
}
/** calculate aggregation result with value filter. */
private void aggregateWithValueFilter(
List<AggregateResult> aggregateResults,
TimeGenerator timestampGenerator,
Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap)
throws IOException {
List<Boolean> cached =
markFilterdPaths(
expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode());
while (timestampGenerator.hasNext()) {
// generate timestamps for aggregate
long[] timeArray = new long[aggregateFetchSize];
int timeArrayLength = 0;
for (int cnt = 0; cnt < aggregateFetchSize; cnt++) {
if (!timestampGenerator.hasNext()) {
break;
}
timeArray[timeArrayLength++] = timestampGenerator.next();
}
// cal part of aggregate result
for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) {
int pathId = entry.getValue().get(0);
// cache in timeGenerator
if (cached.get(pathId)) {
Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId));
for (Integer i : entry.getValue()) {
aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
}
} else {
if (entry.getValue().size() == 1) {
aggregateResults
.get(entry.getValue().get(0))
.updateResultUsingTimestamps(timeArray, timeArrayLength, entry.getKey());
} else {
Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength);
if (values != null) {
for (Integer i : entry.getValue()) {
aggregateResults.get(i).updateResultUsingValues(timeArray, timeArrayLength, values);
}
}
}
}
}
}
}
/**
* using aggregate result data list construct QueryDataSet.
*
* @param aggregateResultList aggregate result list
*/
private QueryDataSet constructDataSet(
List<AggregateResult> aggregateResultList, AggregationPlan plan)
throws QueryProcessException {
RowRecord record = new RowRecord(0);
for (AggregateResult resultData : aggregateResultList) {
TSDataType dataType = resultData.getResultDataType();
record.addField(resultData.getResult(), dataType);
}
SingleDataSet dataSet;
if (plan.isGroupByLevel()) {
Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
List<AggregateResult> mergedAggResults =
AggregateUtils.mergeRecordByPath(plan, aggregateResultList, finalPaths);
List<PartialPath> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (int i = 0; i < mergedAggResults.size(); i++) {
dataTypes.add(mergedAggResults.get(i).getResultDataType());
}
RowRecord curRecord = new RowRecord(0);
for (AggregateResult resultData : mergedAggResults) {
TSDataType dataType = resultData.getResultDataType();
curRecord.addField(resultData.getResult(), dataType);
}
dataSet = new SingleDataSet(paths, dataTypes);
dataSet.setRecord(curRecord);
} else {
dataSet = new SingleDataSet(selectedSeries, dataTypes);
dataSet.setRecord(record);
}
return dataSet;
}
/**
* Merge same series and convert to series map. For example: Given: paths: s1, s2, s3, s1 and
* aggregations: count, sum, count, sum. Then: pathToAggrIndexesMap: s1 -> 0, 3; s2 -> 1; s3 -> 2
*
* @param selectedSeries selected series
* @return path to aggregation indexes map
*/
private Map<PartialPath, List<Integer>> groupAggregationsBySeries(
List<PartialPath> selectedSeries) {
Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
for (int i = 0; i < selectedSeries.size(); i++) {
PartialPath series = selectedSeries.get(i);
pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
}
return pathToAggrIndexesMap;
}
}