blob: a84eedf23128a04eda8392627b3184c5e8a6c87a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.utils.MetaUtils;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutor;
import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutorFactory;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.ValueIterator;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class GroupByWithValueFilterDataSet extends GroupByTimeEngineDataSet {
private static final Logger logger = LoggerFactory.getLogger(GroupByWithValueFilterDataSet.class);
private Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap;
protected GroupByTimePlan groupByTimePlan;
private TimeGenerator timestampGenerator;
/** cached timestamp for next group by partition. */
private LinkedList<Long> cachedTimestamps = new LinkedList<>();
/** group by batch calculation size. */
protected int timeStampFetchSize;
private long lastTimestamp;
// aggregate result for current pre-aggregate window
private AggregateResult[] preAggregateResults;
protected GroupByWithValueFilterDataSet() {}
/** constructor. */
public GroupByWithValueFilterDataSet(QueryContext context, GroupByTimePlan groupByTimePlan) {
super(context, groupByTimePlan);
this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
@TestOnly
public GroupByWithValueFilterDataSet(long queryId, GroupByTimePlan groupByTimePlan) {
super(new QueryContext(queryId), groupByTimePlan);
this.readerToAggrIndexesMap = new HashMap<>();
this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}
/** init reader and aggregate function. This method should be called once after initializing */
public void initGroupBy(QueryContext context, GroupByTimePlan groupByTimePlan)
throws StorageEngineException, QueryProcessException {
this.timestampGenerator = getTimeGenerator(context, groupByTimePlan);
this.readerToAggrIndexesMap = new HashMap<>();
this.groupByTimePlan = groupByTimePlan;
Filter timeFilter =
FilterFactory.and(
TimeFilter.gtEq(groupByTimePlan.getStartTime()),
TimeFilter.lt(groupByTimePlan.getEndTime()));
List<PartialPath> selectedSeries = new ArrayList<>();
groupByTimePlan
.getDeduplicatedPaths()
.forEach(k -> selectedSeries.add(((MeasurementPath) k).transformToExactPath()));
Map<PartialPath, List<Integer>> pathToAggrIndexesMap =
MetaUtils.groupAggregationsBySeries(selectedSeries);
Map<AlignedPath, List<List<Integer>>> alignedPathToAggrIndexesMap =
MetaUtils.groupAlignedSeriesWithAggregations(pathToAggrIndexesMap);
List<PartialPath> groupedPathList =
new ArrayList<>(pathToAggrIndexesMap.size() + alignedPathToAggrIndexesMap.size());
groupedPathList.addAll(pathToAggrIndexesMap.keySet());
groupedPathList.addAll(alignedPathToAggrIndexesMap.keySet());
Pair<List<DataRegion>, Map<DataRegion, List<PartialPath>>> lockListAndProcessorToSeriesMapPair =
StorageEngine.getInstance().mergeLock(groupedPathList);
List<DataRegion> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<DataRegion, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
try {
// init QueryDataSource Cache
QueryResourceManager.getInstance()
.initQueryDataSourceCache(processorToSeriesMap, context, timeFilter);
} catch (Exception e) {
logger.error("Meet error when init QueryDataSource ", e);
throw new QueryProcessException("Meet error when init QueryDataSource.", e);
} finally {
StorageEngine.getInstance().mergeUnLock(lockList);
}
// init non-aligned series reader
for (PartialPath path : pathToAggrIndexesMap.keySet()) {
IReaderByTimestamp seriesReaderByTimestamp = getReaderByTime(path, groupByTimePlan, context);
readerToAggrIndexesMap.put(
seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path)));
}
// assign null to be friendly for GC
pathToAggrIndexesMap = null;
// init aligned series reader
for (PartialPath alignedPath : alignedPathToAggrIndexesMap.keySet()) {
IReaderByTimestamp seriesReaderByTimestamp =
getReaderByTime(alignedPath, groupByTimePlan, context);
readerToAggrIndexesMap.put(
seriesReaderByTimestamp, alignedPathToAggrIndexesMap.get(alignedPath));
}
// assign null to be friendly for GC
alignedPathToAggrIndexesMap = null;
preAggregateResults = new AggregateResult[paths.size()];
for (int i = 0; i < paths.size(); i++) {
preAggregateResults[i] =
AggregateResultFactory.getAggrResultByName(
groupByTimePlan.getDeduplicatedAggregations().get(i),
groupByTimePlan.getDeduplicatedDataTypes().get(i),
ascending);
slidingWindowGroupByExecutors[i] =
SlidingWindowGroupByExecutorFactory.getSlidingWindowGroupByExecutor(
groupByTimePlan.getDeduplicatedAggregations().get(i),
groupByTimePlan.getDeduplicatedDataTypes().get(i),
ascending);
}
}
protected TimeGenerator getTimeGenerator(QueryContext context, RawDataQueryPlan queryPlan)
throws StorageEngineException {
return new ServerTimeGenerator(context, queryPlan);
}
protected IReaderByTimestamp getReaderByTime(
PartialPath path, RawDataQueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException {
return new SeriesReaderByTimestamp(
path,
queryPlan.getAllMeasurementsInDevice(path.getDevice()),
path.getSeriesType(),
context,
QueryResourceManager.getInstance()
.getQueryDataSource(path, context, null, queryPlan.isAscending()),
null,
ascending);
}
@Override
protected AggregateResult[] getNextAggregateResult() throws IOException {
curAggregateResults = new AggregateResult[paths.size()];
for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
slidingWindowGroupByExecutors) {
slidingWindowGroupByExecutor.setTimeRange(
curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
}
while (!isEndCal()) {
AggregateResult[] aggregations =
calcResult(curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
for (int i = 0; i < aggregations.length; i++) {
slidingWindowGroupByExecutors[i].update(aggregations[i].clone());
}
updatePreAggrInterval();
}
for (int i = 0; i < curAggregateResults.length; i++) {
curAggregateResults[i] = slidingWindowGroupByExecutors[i].getAggregateResult().clone();
}
return curAggregateResults;
}
public AggregateResult[] calcResult(long curStartTime, long curEndTime) throws IOException {
// clear result cache
for (AggregateResult result : preAggregateResults) {
result.reset();
}
long[] timestampArray = new long[timeStampFetchSize];
int timeArrayLength = 0;
if (!cachedTimestamps.isEmpty()) {
long timestamp = cachedTimestamps.remove();
if (timestamp < curEndTime) {
if (!groupByTimePlan.isAscending() && timestamp < curStartTime) {
cachedTimestamps.addFirst(timestamp);
return preAggregateResults;
}
if (timestamp >= curStartTime) {
timestampArray[timeArrayLength++] = timestamp;
}
} else {
cachedTimestamps.addFirst(timestamp);
return preAggregateResults;
}
}
while (!cachedTimestamps.isEmpty() || timestampGenerator.hasNext()) {
// construct timestamp array
timeArrayLength =
constructTimeArrayForOneCal(timestampArray, timeArrayLength, curStartTime, curEndTime);
// cal result using timestamp array
calcUsingTimestampArray(timestampArray, timeArrayLength);
timeArrayLength = 0;
// judge if it's end
if ((groupByTimePlan.isAscending() && lastTimestamp >= curEndTime)
|| (!groupByTimePlan.isAscending() && lastTimestamp < curStartTime)) {
break;
}
}
if (timeArrayLength > 0) {
// cal result using timestamp array
calcUsingTimestampArray(timestampArray, timeArrayLength);
}
return preAggregateResults;
}
private void calcUsingTimestampArray(long[] timestampArray, int timeArrayLength)
throws IOException {
for (Entry<IReaderByTimestamp, List<List<Integer>>> entry : readerToAggrIndexesMap.entrySet()) {
IReaderByTimestamp reader = entry.getKey();
List<List<Integer>> subIndexes = entry.getValue();
int subSensorSize = subIndexes.size();
Object[] values = reader.getValuesInTimestamps(timestampArray, timeArrayLength);
ValueIterator valueIterator = QueryUtils.generateValueIterator(values);
if (valueIterator != null) {
for (int curIndex = 0; curIndex < subSensorSize; curIndex++) {
valueIterator.setSubMeasurementIndex(curIndex);
for (Integer index : subIndexes.get(curIndex)) {
preAggregateResults[index].updateResultUsingValues(
timestampArray, timeArrayLength, valueIterator);
valueIterator.reset();
}
}
}
}
}
/**
* construct an array of timestamps for one batch of a group by partition calculating.
*
* @param timestampArray timestamp array
* @param timeArrayLength the current size of timestamp array
* @return time array size
*/
@SuppressWarnings("squid:S3776")
private int constructTimeArrayForOneCal(
long[] timestampArray, int timeArrayLength, long curStartTime, long curEndTime)
throws IOException {
for (int cnt = 1;
cnt < timeStampFetchSize - 1
&& (!cachedTimestamps.isEmpty() || timestampGenerator.hasNext());
cnt++) {
if (!cachedTimestamps.isEmpty()) {
lastTimestamp = cachedTimestamps.remove();
} else {
lastTimestamp = timestampGenerator.next();
}
if (groupByTimePlan.isAscending() && lastTimestamp < curEndTime) {
timestampArray[timeArrayLength++] = lastTimestamp;
} else if (!groupByTimePlan.isAscending() && lastTimestamp >= curStartTime) {
timestampArray[timeArrayLength++] = lastTimestamp;
} else {
// may lastTimestamp get from cache
if (!cachedTimestamps.isEmpty() && lastTimestamp <= cachedTimestamps.peek()) {
cachedTimestamps.addFirst(lastTimestamp);
} else {
cachedTimestamps.add(lastTimestamp);
}
break;
}
}
return timeArrayLength;
}
}