blob: 38adb532c1b5e2459e3a76bc07fa309eb6554db5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.groupby.AlignedGroupByExecutor;
import org.apache.iotdb.db.query.executor.groupby.GroupByExecutor;
import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutor;
import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutorFactory;
import org.apache.iotdb.db.query.executor.groupby.impl.LocalAlignedGroupByExecutor;
import org.apache.iotdb.db.query.executor.groupby.impl.LocalGroupByExecutor;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
private static final Logger logger =
LoggerFactory.getLogger(GroupByWithoutValueFilterDataSet.class);
protected Map<PartialPath, GroupByExecutor> pathExecutors = new HashMap<>();
protected Map<AlignedPath, AlignedGroupByExecutor> alignedPathExecutors = new HashMap<>();
/**
* non-aligned path -> result index for each aggregation
*
* <p>e.g.,
*
* <p>deduplicated paths: s1, s2, s1; deduplicated aggregations: count, count, sum
*
* <p>s1 -> 0, 2; s2 -> 1
*/
protected Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
/**
* aligned path -> result indexes for each aggregation
*
* <p>e.g.,
*
* <p>deduplicated paths: d1.s1, d1.s2, d1.s1; deduplicated aggregations: count, count, sum
*
* <p>d1[s1, s2] -> [[0, 2], [1]]
*/
protected Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap = new HashMap<>();
public GroupByWithoutValueFilterDataSet() {}
/** constructor. */
public GroupByWithoutValueFilterDataSet(QueryContext context, GroupByTimePlan groupByTimePlan) {
super(context, groupByTimePlan);
}
/** init reader and aggregate function. This method should be called once after initializing */
public void initGroupBy(QueryContext context, GroupByTimePlan groupByTimePlan)
throws StorageEngineException, QueryProcessException {
IExpression expression = groupByTimePlan.getExpression();
Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}
if (timeFilter == null) {
throw new QueryProcessException("TimeFilter cannot be null in GroupBy query.");
}
// init resultIndexes, group aligned series
pathToAggrIndexesMap = MetaUtils.groupAggregationsBySeries(paths);
alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
List<PartialPath> groupedPathList =
new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
groupedPathList.addAll(pathToAggrIndexesMap.keySet());
groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
Pair<List<DataRegion>, Map<DataRegion, List<PartialPath>>> lockListAndProcessorToSeriesMapPair =
StorageEngine.getInstance().mergeLock(groupedPathList);
List<DataRegion> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<DataRegion, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
try {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
} catch (Exception e) {
logger.error("Meet error when init QueryDataSource ", e);
throw new QueryProcessException("Meet error when init QueryDataSource.", e);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
// init GroupByExecutor for non-aligned series
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
MeasurementPath path = (MeasurementPath) entry.getKey();
List<Integer> indexes = entry.getValue();
if (!pathExecutors.containsKey(path)) {
pathExecutors.put(
path,
getGroupByExecutor(
path,
groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()),
context,
timeFilter.copy(),
null,
ascending));
}
for (int index : indexes) {
AggregateResult aggrResult =
AggregateResultFactory.getAggrResultByName(
groupByTimePlan.getDeduplicatedAggregations().get(index),
path.getSeriesType(),
ascending);
slidingWindowGroupByExecutors[index] =
SlidingWindowGroupByExecutorFactory.getSlidingWindowGroupByExecutor(
groupByTimePlan.getDeduplicatedAggregations().get(index),
path.getSeriesType(),
ascending);
pathExecutors.get(path).addAggregateResult(aggrResult);
}
}
// init GroupByExecutor for aligned series
for (Map.Entry<AlignedPath, List<List<Integer>>> entry :
alignedPathToAggrIndexesMap.entrySet()) {
AlignedPath path = entry.getKey();
List<List<Integer>> indexesList = entry.getValue();
if (!alignedPathExecutors.containsKey(path)) {
alignedPathExecutors.put(
path, getAlignedGroupByExecutor(path, context, timeFilter.copy(), null, ascending));
}
for (int i = 0; i < path.getMeasurementList().size(); i++) {
List<AggregateResult> aggrResultList = new ArrayList<>();
for (int index : indexesList.get(i)) {
AggregateResult aggrResult =
AggregateResultFactory.getAggrResultByName(
groupByTimePlan.getDeduplicatedAggregations().get(index),
path.getSchemaList().get(i).getType(),
ascending);
slidingWindowGroupByExecutors[index] =
SlidingWindowGroupByExecutorFactory.getSlidingWindowGroupByExecutor(
groupByTimePlan.getDeduplicatedAggregations().get(index),
path.getSchemaList().get(i).getType(),
ascending);
aggrResultList.add(aggrResult);
}
alignedPathExecutors.get(path).addAggregateResult(aggrResultList);
}
}
}
@Override
protected AggregateResult[] getNextAggregateResult() throws IOException {
curAggregateResults = new AggregateResult[paths.size()];
for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
slidingWindowGroupByExecutors) {
slidingWindowGroupByExecutor.setTimeRange(
curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
}
try {
while (!isEndCal()) {
// get pre-aggregate results of non-aligned series
for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
MeasurementPath path = (MeasurementPath) entry.getKey();
List<Integer> indexes = entry.getValue();
GroupByExecutor groupByExecutor = pathExecutors.get(path);
List<AggregateResult> aggregations =
groupByExecutor.calcResult(
curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < aggregations.size(); i++) {
int resultIndex = indexes.get(i);
slidingWindowGroupByExecutors[resultIndex].update(aggregations.get(i).clone());
}
}
// get pre-aggregate results of aligned series
for (Map.Entry<AlignedPath, List<List<Integer>>> entry :
alignedPathToAggrIndexesMap.entrySet()) {
AlignedPath path = entry.getKey();
List<List<Integer>> indexesList = entry.getValue();
AlignedGroupByExecutor groupByExecutor = alignedPathExecutors.get(path);
List<List<AggregateResult>> aggregationsList =
groupByExecutor.calcAlignedResult(
curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < path.getMeasurementList().size(); i++) {
List<AggregateResult> aggregations = aggregationsList.get(i);
List<Integer> indexes = indexesList.get(i);
for (int j = 0; j < aggregations.size(); j++) {
int resultIndex = indexes.get(j);
slidingWindowGroupByExecutors[resultIndex].update(aggregations.get(j).clone());
}
}
}
updatePreAggrInterval();
}
for (int i = 0; i < curAggregateResults.length; i++) {
curAggregateResults[i] = slidingWindowGroupByExecutors[i].getAggregateResult();
}
} catch (QueryProcessException e) {
logger.error("GroupByWithoutValueFilterDataSet execute has error", e);
throw new IOException(e.getMessage(), e);
}
return curAggregateResults;
}
protected GroupByExecutor getGroupByExecutor(
PartialPath path,
Set<String> allSensors,
QueryContext context,
Filter timeFilter,
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
return new LocalGroupByExecutor(path, allSensors, context, timeFilter, fileFilter, ascending);
}
protected AlignedGroupByExecutor getAlignedGroupByExecutor(
PartialPath path,
QueryContext context,
Filter timeFilter,
TsFileFilter fileFilter,
boolean ascending)
throws StorageEngineException, QueryProcessException {
return new LocalAlignedGroupByExecutor(path, context, timeFilter, fileFilter, ascending);
}
}