| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.query.executor; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.commons.path.AlignedPath; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.StorageEngine; |
| import org.apache.iotdb.db.engine.querycontext.QueryDataSource; |
| import org.apache.iotdb.db.engine.storagegroup.DataRegion; |
| import org.apache.iotdb.db.exception.StorageEngineException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.utils.MetaUtils; |
| import org.apache.iotdb.db.qp.physical.crud.AggregationPlan; |
| import org.apache.iotdb.db.qp.physical.crud.QueryPlan; |
| import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; |
| import org.apache.iotdb.db.query.aggregation.AggregateResult; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.query.control.QueryResourceManager; |
| import org.apache.iotdb.db.query.dataset.SingleDataSet; |
| import org.apache.iotdb.db.query.factory.AggregateResultFactory; |
| import org.apache.iotdb.db.query.filter.TsFileFilter; |
| import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader; |
| import org.apache.iotdb.db.query.reader.series.IAggregateReader; |
| import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; |
| import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; |
| import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; |
| import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; |
| import org.apache.iotdb.db.utils.QueryUtils; |
| import org.apache.iotdb.db.utils.ValueIterator; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; |
| import org.apache.iotdb.tsfile.read.common.RowRecord; |
| import org.apache.iotdb.tsfile.read.expression.IExpression; |
| import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; |
| import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import static org.apache.iotdb.tsfile.read.query.executor.ExecutorWithTimeGenerator.markFilterdPaths; |
| |
| @SuppressWarnings("java:S1135") // ignore todos |
| public class AggregationExecutor { |
| |
| private static final Logger logger = LoggerFactory.getLogger(AggregationExecutor.class); |
| |
| private List<PartialPath> selectedSeries; |
| protected List<TSDataType> dataTypes; |
| protected List<String> aggregations; |
| protected IExpression expression; |
| protected boolean ascending; |
| protected QueryContext context; |
| protected AggregateResult[] aggregateResultList; |
| |
| /** aggregation batch calculation size. */ |
| private int aggregateFetchSize; |
| |
| protected AggregationExecutor(QueryContext context, AggregationPlan aggregationPlan) { |
| this.selectedSeries = new ArrayList<>(); |
| aggregationPlan |
| .getDeduplicatedPaths() |
| .forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath())); |
| this.dataTypes = aggregationPlan.getDeduplicatedDataTypes(); |
| this.aggregations = aggregationPlan.getDeduplicatedAggregations(); |
| this.expression = aggregationPlan.getExpression(); |
| this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize(); |
| this.ascending = aggregationPlan.isAscending(); |
| this.context = context; |
| this.aggregateResultList = new AggregateResult[selectedSeries.size()]; |
| } |
| |
| /** execute aggregate function with only time filter or no filter. */ |
| public QueryDataSet executeWithoutValueFilter(AggregationPlan aggregationPlan) |
| throws StorageEngineException, IOException, QueryProcessException { |
| |
| Filter timeFilter = null; |
| if (expression != null) { |
| timeFilter = ((GlobalTimeExpression) expression).getFilter(); |
| } |
| |
| // TODO use multi-thread |
| Map<PartialPath, List<Integer>> pathToAggrIndexesMap = |
| MetaUtils.groupAggregationsBySeries(selectedSeries); |
| // Attention: this method will REMOVE aligned path from pathToAggrIndexesMap |
| Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap = |
| MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap); |
| |
| List<PartialPath> groupedPathList = |
| new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size()); |
| groupedPathList.addAll(pathToAggrIndexesMap.keySet()); |
| groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet()); |
| |
| // TODO-Cluster: group the paths by storage group to reduce communications |
| Pair<List<DataRegion>, Map<DataRegion, List<PartialPath>>> lockListAndProcessorToSeriesMapPair = |
| StorageEngine.getInstance().mergeLock(groupedPathList); |
| List<DataRegion> lockList = lockListAndProcessorToSeriesMapPair.left; |
| Map<DataRegion, List<PartialPath>> processorToSeriesMap = |
| lockListAndProcessorToSeriesMapPair.right; |
| |
| try { |
| // init QueryDataSource Cache |
| QueryResourceManager.getInstance() |
| .initQueryDataSourceCache(processorToSeriesMap, context, timeFilter); |
| } catch (Exception e) { |
| logger.error("Meet error when init QueryDataSource ", e); |
| throw new QueryProcessException("Meet error when init QueryDataSource.", e); |
| } finally { |
| StorageEngine.getInstance().mergeUnLock(lockList); |
| } |
| |
| for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { |
| PartialPath seriesPath = entry.getKey(); |
| aggregateOneSeries( |
| seriesPath, |
| entry.getValue(), |
| aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()), |
| timeFilter); |
| } |
| for (Map.Entry<AlignedPath, List<List<Integer>>> entry : |
| alignedPathToAggrIndexesMap.entrySet()) { |
| AlignedPath alignedPath = entry.getKey(); |
| aggregateOneAlignedSeries( |
| alignedPath, |
| entry.getValue(), |
| aggregationPlan.getAllMeasurementsInDevice(alignedPath.getDevice()), |
| timeFilter); |
| } |
| |
| return constructDataSet(Arrays.asList(aggregateResultList), aggregationPlan); |
| } |
| |
| /** |
| * get aggregation result for one series |
| * |
| * @param timeFilter time filter |
| */ |
| protected void aggregateOneSeries( |
| PartialPath seriesPath, |
| List<Integer> indexes, |
| Set<String> allMeasurementsInDevice, |
| Filter timeFilter) |
| throws IOException, QueryProcessException, StorageEngineException { |
| List<AggregateResult> ascAggregateResultList = new ArrayList<>(); |
| List<AggregateResult> descAggregateResultList = new ArrayList<>(); |
| boolean[] isAsc = new boolean[aggregateResultList.length]; |
| |
| TSDataType tsDataType = dataTypes.get(indexes.get(0)); |
| for (int i : indexes) { |
| // construct AggregateResult |
| AggregateResult aggregateResult = |
| AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType); |
| if (aggregateResult.isAscending()) { |
| ascAggregateResultList.add(aggregateResult); |
| isAsc[i] = true; |
| } else { |
| descAggregateResultList.add(aggregateResult); |
| } |
| } |
| aggregateOneSeries( |
| seriesPath, |
| allMeasurementsInDevice, |
| context, |
| timeFilter, |
| tsDataType, |
| ascAggregateResultList, |
| descAggregateResultList, |
| null, |
| ascending); |
| |
| int ascIndex = 0; |
| int descIndex = 0; |
| for (int i : indexes) { |
| aggregateResultList[i] = |
| isAsc[i] |
| ? ascAggregateResultList.get(ascIndex++) |
| : descAggregateResultList.get(descIndex++); |
| } |
| } |
| |
| protected void aggregateOneAlignedSeries( |
| AlignedPath alignedPath, |
| List<List<Integer>> subIndexes, |
| Set<String> allMeasurementsInDevice, |
| Filter timeFilter) |
| throws IOException, QueryProcessException, StorageEngineException { |
| List<List<AggregateResult>> ascAggregateResultList = new ArrayList<>(); |
| List<List<AggregateResult>> descAggregateResultList = new ArrayList<>(); |
| boolean[] isAsc = new boolean[aggregateResultList.length]; |
| |
| for (List<Integer> subIndex : subIndexes) { |
| TSDataType tsDataType = dataTypes.get(subIndex.get(0)); |
| List<AggregateResult> subAscResultList = new ArrayList<>(); |
| List<AggregateResult> subDescResultList = new ArrayList<>(); |
| for (int i : subIndex) { |
| // construct AggregateResult |
| AggregateResult aggregateResult = |
| AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType); |
| if (aggregateResult.isAscending()) { |
| subAscResultList.add(aggregateResult); |
| isAsc[i] = true; |
| } else { |
| subDescResultList.add(aggregateResult); |
| } |
| } |
| ascAggregateResultList.add(subAscResultList); |
| descAggregateResultList.add(subDescResultList); |
| } |
| |
| aggregateOneAlignedSeries( |
| alignedPath, |
| allMeasurementsInDevice, |
| context, |
| timeFilter, |
| TSDataType.VECTOR, |
| ascAggregateResultList, |
| descAggregateResultList, |
| null, |
| ascending); |
| |
| for (int i = 0; i < subIndexes.size(); i++) { |
| List<Integer> subIndex = subIndexes.get(i); |
| List<AggregateResult> subAscResultList = ascAggregateResultList.get(i); |
| List<AggregateResult> subDescResultList = descAggregateResultList.get(i); |
| int ascIndex = 0; |
| int descIndex = 0; |
| for (int index : subIndex) { |
| aggregateResultList[index] = |
| isAsc[index] ? subAscResultList.get(ascIndex++) : subDescResultList.get(descIndex++); |
| } |
| } |
| } |
| |
| @SuppressWarnings("squid:S107") |
| public static void aggregateOneSeries( |
| PartialPath seriesPath, |
| Set<String> measurements, |
| QueryContext context, |
| Filter timeFilter, |
| TSDataType tsDataType, |
| List<AggregateResult> ascAggregateResultList, |
| List<AggregateResult> descAggregateResultList, |
| TsFileFilter fileFilter, |
| boolean ascending) |
| throws StorageEngineException, IOException, QueryProcessException { |
| |
| // construct series reader without value filter |
| QueryDataSource queryDataSource = |
| QueryResourceManager.getInstance() |
| .getQueryDataSource(seriesPath, context, timeFilter, ascending); |
| if (fileFilter != null) { |
| QueryUtils.filterQueryDataSource(queryDataSource, fileFilter); |
| } |
| // update filter by TTL |
| timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); |
| |
| if (ascAggregateResultList != null && !ascAggregateResultList.isEmpty()) { |
| QueryUtils.fillOrderIndexes(queryDataSource, seriesPath.getDevice(), true); |
| IAggregateReader seriesReader = |
| new SeriesAggregateReader( |
| seriesPath, |
| measurements, |
| tsDataType, |
| context, |
| queryDataSource, |
| timeFilter, |
| null, |
| null, |
| true); |
| aggregateFromReader(seriesReader, ascAggregateResultList); |
| } |
| if (descAggregateResultList != null && !descAggregateResultList.isEmpty()) { |
| QueryUtils.fillOrderIndexes(queryDataSource, seriesPath.getDevice(), false); |
| IAggregateReader seriesReader = |
| new SeriesAggregateReader( |
| seriesPath, |
| measurements, |
| tsDataType, |
| context, |
| queryDataSource, |
| timeFilter, |
| null, |
| null, |
| false); |
| aggregateFromReader(seriesReader, descAggregateResultList); |
| } |
| } |
| |
| public static void aggregateOneAlignedSeries( |
| AlignedPath alignedPath, |
| Set<String> measurements, |
| QueryContext context, |
| Filter timeFilter, |
| TSDataType tsDataType, |
| List<List<AggregateResult>> ascAggregateResultList, |
| List<List<AggregateResult>> descAggregateResultList, |
| TsFileFilter fileFilter, |
| boolean ascending) |
| throws StorageEngineException, IOException, QueryProcessException { |
| |
| // construct series reader without value filter |
| QueryDataSource queryDataSource = |
| QueryResourceManager.getInstance() |
| .getQueryDataSource(alignedPath, context, timeFilter, ascending); |
| if (fileFilter != null) { |
| QueryUtils.filterQueryDataSource(queryDataSource, fileFilter); |
| } |
| // update filter by TTL |
| timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); |
| |
| if (!isAggregateResultEmpty(ascAggregateResultList)) { |
| QueryUtils.fillOrderIndexes(queryDataSource, alignedPath.getDevice(), true); |
| AlignedSeriesAggregateReader seriesReader = |
| new AlignedSeriesAggregateReader( |
| alignedPath, |
| measurements, |
| tsDataType, |
| context, |
| queryDataSource, |
| timeFilter, |
| null, |
| null, |
| true); |
| aggregateFromAlignedReader(seriesReader, ascAggregateResultList); |
| } |
| if (!isAggregateResultEmpty(descAggregateResultList)) { |
| QueryUtils.fillOrderIndexes(queryDataSource, alignedPath.getDevice(), false); |
| AlignedSeriesAggregateReader seriesReader = |
| new AlignedSeriesAggregateReader( |
| alignedPath, |
| measurements, |
| tsDataType, |
| context, |
| queryDataSource, |
| timeFilter, |
| null, |
| null, |
| false); |
| aggregateFromAlignedReader(seriesReader, descAggregateResultList); |
| } |
| } |
| |
| private static boolean isAggregateResultEmpty(List<List<AggregateResult>> resultList) { |
| for (List<AggregateResult> result : resultList) { |
| if (!result.isEmpty()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private static void aggregateFromReader( |
| IAggregateReader seriesReader, List<AggregateResult> aggregateResultList) |
| throws QueryProcessException, IOException { |
| int remainingToCalculate = aggregateResultList.size(); |
| boolean[] isCalculatedArray = new boolean[aggregateResultList.size()]; |
| |
| while (seriesReader.hasNextFile()) { |
| // cal by file statistics |
| if (seriesReader.canUseCurrentFileStatistics()) { |
| Statistics fileStatistics = seriesReader.currentFileStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList, isCalculatedArray, remainingToCalculate, fileStatistics); |
| if (remainingToCalculate == 0) { |
| return; |
| } |
| seriesReader.skipCurrentFile(); |
| continue; |
| } |
| |
| while (seriesReader.hasNextChunk()) { |
| // cal by chunk statistics |
| if (seriesReader.canUseCurrentChunkStatistics()) { |
| Statistics chunkStatistics = seriesReader.currentChunkStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList, isCalculatedArray, remainingToCalculate, chunkStatistics); |
| if (remainingToCalculate == 0) { |
| return; |
| } |
| seriesReader.skipCurrentChunk(); |
| continue; |
| } |
| |
| remainingToCalculate = |
| aggregatePages( |
| seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate); |
| if (remainingToCalculate == 0) { |
| return; |
| } |
| } |
| } |
| } |
| |
| private static void aggregateFromAlignedReader( |
| AlignedSeriesAggregateReader seriesReader, List<List<AggregateResult>> aggregateResultList) |
| throws QueryProcessException, IOException { |
| int remainingToCalculate = 0; |
| List<boolean[]> isCalculatedArray = new ArrayList<>(); |
| for (List<AggregateResult> subAggregateResults : aggregateResultList) { |
| remainingToCalculate += subAggregateResults.size(); |
| boolean[] subCalculatedArray = new boolean[subAggregateResults.size()]; |
| isCalculatedArray.add(subCalculatedArray); |
| } |
| |
| while (seriesReader.hasNextFile()) { |
| // cal by file statistics |
| if (seriesReader.canUseCurrentFileStatistics()) { |
| while (seriesReader.hasNextSubSeries()) { |
| Statistics fileStatistics = seriesReader.currentFileStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList.get(seriesReader.getCurIndex()), |
| isCalculatedArray.get(seriesReader.getCurIndex()), |
| remainingToCalculate, |
| fileStatistics); |
| if (remainingToCalculate == 0) { |
| seriesReader.resetIndex(); |
| return; |
| } |
| seriesReader.nextSeries(); |
| } |
| seriesReader.skipCurrentFile(); |
| continue; |
| } |
| |
| while (seriesReader.hasNextChunk()) { |
| // cal by chunk statistics |
| if (seriesReader.canUseCurrentChunkStatistics()) { |
| while (seriesReader.hasNextSubSeries()) { |
| Statistics chunkStatistics = seriesReader.currentChunkStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList.get(seriesReader.getCurIndex()), |
| isCalculatedArray.get(seriesReader.getCurIndex()), |
| remainingToCalculate, |
| chunkStatistics); |
| if (remainingToCalculate == 0) { |
| seriesReader.resetIndex(); |
| return; |
| } |
| seriesReader.nextSeries(); |
| } |
| seriesReader.skipCurrentChunk(); |
| continue; |
| } |
| |
| remainingToCalculate = |
| aggregateAlignedPages( |
| seriesReader, aggregateResultList, isCalculatedArray, remainingToCalculate); |
| if (remainingToCalculate == 0) { |
| return; |
| } |
| } |
| } |
| } |
| |
| /** Aggregate each result in the list with the statistics */ |
| private static int aggregateStatistics( |
| List<AggregateResult> aggregateResultList, |
| boolean[] isCalculatedArray, |
| int remainingToCalculate, |
| Statistics statistics) |
| throws QueryProcessException { |
| // some aligned paths' statistics may be null |
| if (statistics == null) { |
| return remainingToCalculate; |
| } |
| int newRemainingToCalculate = remainingToCalculate; |
| for (int i = 0; i < aggregateResultList.size(); i++) { |
| if (!isCalculatedArray[i]) { |
| AggregateResult aggregateResult = aggregateResultList.get(i); |
| aggregateResult.updateResultFromStatistics(statistics); |
| if (aggregateResult.hasFinalResult()) { |
| isCalculatedArray[i] = true; |
| newRemainingToCalculate--; |
| if (newRemainingToCalculate == 0) { |
| return newRemainingToCalculate; |
| } |
| } |
| } |
| } |
| return newRemainingToCalculate; |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private static int aggregatePages( |
| IAggregateReader seriesReader, |
| List<AggregateResult> aggregateResultList, |
| boolean[] isCalculatedArray, |
| int remainingToCalculate) |
| throws IOException, QueryProcessException { |
| while (seriesReader.hasNextPage()) { |
| // cal by page statistics |
| if (seriesReader.canUseCurrentPageStatistics()) { |
| Statistics pageStatistic = seriesReader.currentPageStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList, isCalculatedArray, remainingToCalculate, pageStatistic); |
| if (remainingToCalculate == 0) { |
| return 0; |
| } |
| seriesReader.skipCurrentPage(); |
| continue; |
| } |
| IBatchDataIterator batchDataIterator = seriesReader.nextPage().getBatchDataIterator(); |
| remainingToCalculate = |
| aggregateBatchData( |
| aggregateResultList, isCalculatedArray, remainingToCalculate, batchDataIterator); |
| } |
| return remainingToCalculate; |
| } |
| |
| private static int aggregateAlignedPages( |
| AlignedSeriesAggregateReader seriesReader, |
| List<List<AggregateResult>> aggregateResultList, |
| List<boolean[]> isCalculatedArray, |
| int remainingToCalculate) |
| throws IOException, QueryProcessException { |
| while (seriesReader.hasNextPage()) { |
| // cal by page statistics |
| if (seriesReader.canUseCurrentPageStatistics()) { |
| while (seriesReader.hasNextSubSeries()) { |
| Statistics pageStatistic = seriesReader.currentPageStatistics(); |
| remainingToCalculate = |
| aggregateStatistics( |
| aggregateResultList.get(seriesReader.getCurIndex()), |
| isCalculatedArray.get(seriesReader.getCurIndex()), |
| remainingToCalculate, |
| pageStatistic); |
| if (remainingToCalculate == 0) { |
| seriesReader.resetIndex(); |
| return 0; |
| } |
| seriesReader.nextSeries(); |
| } |
| seriesReader.skipCurrentPage(); |
| continue; |
| } |
| |
| BatchData nextOverlappedPageData = seriesReader.nextPage(); |
| while (seriesReader.hasNextSubSeries()) { |
| int subIndex = seriesReader.getCurIndex(); |
| IBatchDataIterator batchIterator = nextOverlappedPageData.getBatchDataIterator(subIndex); |
| remainingToCalculate = |
| aggregateBatchData( |
| aggregateResultList.get(subIndex), |
| isCalculatedArray.get(subIndex), |
| remainingToCalculate, |
| batchIterator); |
| if (remainingToCalculate == 0) { |
| seriesReader.resetIndex(); |
| return 0; |
| } |
| seriesReader.nextSeries(); |
| } |
| } |
| return remainingToCalculate; |
| } |
| |
| private static int aggregateBatchData( |
| List<AggregateResult> aggregateResultList, |
| boolean[] isCalculatedArray, |
| int remainingToCalculate, |
| IBatchDataIterator batchIterator) |
| throws QueryProcessException, IOException { |
| int newRemainingToCalculate = remainingToCalculate; |
| for (int i = 0; i < aggregateResultList.size(); i++) { |
| if (!isCalculatedArray[i]) { |
| AggregateResult aggregateResult = aggregateResultList.get(i); |
| aggregateResult.updateResultFromPageData(batchIterator); |
| batchIterator.reset(); |
| if (aggregateResult.hasFinalResult()) { |
| isCalculatedArray[i] = true; |
| remainingToCalculate--; |
| if (remainingToCalculate == 0) { |
| return newRemainingToCalculate; |
| } |
| } |
| } |
| } |
| return newRemainingToCalculate; |
| } |
| |
| /** execute aggregate function with value filter. */ |
| public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan) |
| throws StorageEngineException, IOException, QueryProcessException { |
| optimizeLastElementFunc(queryPlan); |
| |
| TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan); |
| |
| Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>(); |
| |
| // group by path name |
| Map<PartialPath, List<Integer>> pathToAggrIndexesMap = |
| MetaUtils.groupAggregationsBySeries(selectedSeries); |
| Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap = |
| MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap); |
| |
| List<PartialPath> groupedPathList = |
| new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size()); |
| groupedPathList.addAll(pathToAggrIndexesMap.keySet()); |
| groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet()); |
| |
| Pair<List<DataRegion>, Map<DataRegion, List<PartialPath>>> lockListAndProcessorToSeriesMapPair = |
| StorageEngine.getInstance().mergeLock(groupedPathList); |
| List<DataRegion> lockList = lockListAndProcessorToSeriesMapPair.left; |
| Map<DataRegion, List<PartialPath>> processorToSeriesMap = |
| lockListAndProcessorToSeriesMapPair.right; |
| |
| try { |
| // init QueryDataSource Cache |
| QueryResourceManager.getInstance() |
| .initQueryDataSourceCache( |
| processorToSeriesMap, context, timestampGenerator.getTimeFilter()); |
| } catch (Exception e) { |
| logger.error("Meet error when init QueryDataSource ", e); |
| throw new QueryProcessException("Meet error when init QueryDataSource.", e); |
| } finally { |
| StorageEngine.getInstance().mergeUnLock(lockList); |
| } |
| |
| for (PartialPath path : pathToAggrIndexesMap.keySet()) { |
| IReaderByTimestamp seriesReaderByTimestamp = |
| getReaderByTime(path, queryPlan, path.getSeriesType(), context); |
| readerToAggrIndexesMap.put( |
| seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); |
| } |
| // assign null to be friendly for GC |
| pathToAggrIndexesMap = null; |
| for (AlignedPath vectorPath : alignedPathToAggrIndexesMap.keySet()) { |
| IReaderByTimestamp seriesReaderByTimestamp = |
| getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context); |
| readerToAggrIndexesMap.put( |
| seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(vectorPath)); |
| } |
| // assign null to be friendly for GC |
| alignedPathToAggrIndexesMap = null; |
| |
| for (int i = 0; i < selectedSeries.size(); i++) { |
| aggregateResultList[i] = |
| AggregateResultFactory.getAggrResultByName( |
| aggregations.get(i), dataTypes.get(i), ascending); |
| } |
| aggregateWithValueFilter(timestampGenerator, readerToAggrIndexesMap); |
| return constructDataSet(Arrays.asList(aggregateResultList), queryPlan); |
| } |
| |
| private void optimizeLastElementFunc(QueryPlan queryPlan) { |
| int index = 0; |
| for (; index < aggregations.size(); index++) { |
| String aggregationFunc = aggregations.get(index); |
| if (!aggregationFunc.equals(IoTDBConstant.MAX_TIME) |
| && !aggregationFunc.equals(IoTDBConstant.LAST_VALUE)) { |
| break; |
| } |
| } |
| if (index >= aggregations.size()) { |
| queryPlan.setAscending(false); |
| this.ascending = false; |
| } |
| } |
| |
| protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan) |
| throws StorageEngineException { |
| return new ServerTimeGenerator(context, queryPlan); |
| } |
| |
| protected IReaderByTimestamp getReaderByTime( |
| PartialPath path, RawDataQueryPlan queryPlan, TSDataType dataType, QueryContext context) |
| throws StorageEngineException, QueryProcessException { |
| return new SeriesReaderByTimestamp( |
| path, |
| queryPlan.getAllMeasurementsInDevice(path.getDevice()), |
| dataType, |
| context, |
| QueryResourceManager.getInstance().getQueryDataSource(path, context, null, ascending), |
| null, |
| ascending); |
| } |
| |
| /** calculate aggregation result with value filter. */ |
| private void aggregateWithValueFilter( |
| TimeGenerator timestampGenerator, |
| Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap) |
| throws IOException { |
| List<Boolean> cached = |
| markFilterdPaths( |
| expression, new ArrayList<>(selectedSeries), timestampGenerator.hasOrNode()); |
| |
| while (timestampGenerator.hasNext()) { |
| |
| // generate timestamps for aggregate |
| long[] timeArray = new long[aggregateFetchSize]; |
| int timeArrayLength = 0; |
| for (int cnt = 0; cnt < aggregateFetchSize; cnt++) { |
| if (!timestampGenerator.hasNext()) { |
| break; |
| } |
| timeArray[timeArrayLength++] = timestampGenerator.next(); |
| } |
| |
| // cal part of aggregate result |
| for (Entry<IReaderByTimestamp, List<List<Integer>>> entry : |
| readerToAggrIndexesMap.entrySet()) { |
| // use cache data as much as possible |
| boolean[] cachedOrNot = new boolean[entry.getValue().size()]; |
| for (int i = 0; i < entry.getValue().size(); i++) { |
| List<Integer> subIndexes = entry.getValue().get(i); |
| int pathId = subIndexes.get(0); |
| // if cached in timeGenerator |
| if (cached.get(pathId)) { |
| Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId)); |
| ValueIterator valueIterator = QueryUtils.generateValueIterator(values); |
| if (valueIterator != null) { |
| for (Integer index : subIndexes) { |
| aggregateResultList[index].updateResultUsingValues( |
| timeArray, timeArrayLength, valueIterator); |
| valueIterator.reset(); |
| } |
| cachedOrNot[i] = true; |
| } |
| } |
| } |
| |
| if (hasRemaining(cachedOrNot)) { |
| // TODO: if we only need to get firstValue/minTime that's not need to traverse all values, |
| // it's enough to get the exact number of values for these specific aggregate func |
| Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength); |
| ValueIterator valueIterator = QueryUtils.generateValueIterator(values); |
| if (valueIterator != null) { |
| for (int i = 0; i < entry.getValue().size(); i++) { |
| if (!cachedOrNot[i]) { |
| valueIterator.setSubMeasurementIndex(i); |
| for (Integer index : entry.getValue().get(i)) { |
| aggregateResultList[index].updateResultUsingValues( |
| timeArray, timeArrayLength, valueIterator); |
| valueIterator.reset(); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** Return whether there is result that has not been cached */ |
| private boolean hasRemaining(boolean[] cachedOrNot) { |
| for (int i = 0; i < cachedOrNot.length; i++) { |
| if (!cachedOrNot[i]) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * using aggregate result data list construct QueryDataSet. |
| * |
| * @param aggregateResultList aggregate result list |
| */ |
| private QueryDataSet constructDataSet( |
| List<AggregateResult> aggregateResultList, AggregationPlan plan) { |
| SingleDataSet dataSet; |
| RowRecord record = new RowRecord(0); |
| |
| if (plan.isGroupByLevel()) { |
| Map<String, AggregateResult> groupPathsResultMap = |
| plan.groupAggResultByLevel(aggregateResultList); |
| |
| List<PartialPath> paths = new ArrayList<>(); |
| List<TSDataType> dataTypes = new ArrayList<>(); |
| for (AggregateResult resultData : groupPathsResultMap.values()) { |
| dataTypes.add(resultData.getResultDataType()); |
| record.addField(resultData.getResult(), resultData.getResultDataType()); |
| } |
| dataSet = new SingleDataSet(paths, dataTypes); |
| } else { |
| for (AggregateResult resultData : aggregateResultList) { |
| TSDataType dataType = resultData.getResultDataType(); |
| record.addField(resultData.getResult(), dataType); |
| } |
| dataSet = new SingleDataSet(selectedSeries, dataTypes); |
| } |
| dataSet.setRecord(record); |
| |
| return dataSet; |
| } |
| } |