blob: 18082efccca3a7d1292f03a4ce29bda19fc9877b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator;
import org.apache.iotdb.commons.udf.builtin.BuiltinAggregationFunction;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TimeRangeIteratorFactory;
import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow;
import org.apache.iotdb.db.queryengine.execution.operator.window.TimeWindow;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.statistics.StatisticsManager;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.BooleanColumn;
import org.apache.tsfile.read.common.block.column.DoubleColumn;
import org.apache.tsfile.read.common.block.column.FloatColumn;
import org.apache.tsfile.read.common.block.column.IntColumn;
import org.apache.tsfile.read.common.block.column.LongColumn;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
public class AggregationUtil {
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
private static final int INVALID_END_TIME = -1;
private static final String PARTIAL_SUFFIX = "_partial";
private AggregationUtil() {
// Forbidding instantiation
}
/**
* If groupByTimeParameter is null, which means it's an aggregation query without down sampling.
* Aggregation query has only one time window and the result set of it does not contain a
* timestamp, so it doesn't matter what the time range returns.
*/
public static ITimeRangeIterator initTimeRangeIterator(
GroupByTimeParameter groupByTimeParameter,
boolean ascending,
boolean outputPartialTimeWindow) {
if (groupByTimeParameter == null) {
return new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE);
} else {
return TimeRangeIteratorFactory.getTimeRangeIterator(
groupByTimeParameter.getStartTime(),
groupByTimeParameter.getEndTime(),
groupByTimeParameter.getInterval(),
groupByTimeParameter.getSlidingStep(),
ascending,
groupByTimeParameter.isLeftCRightO(),
outputPartialTimeWindow);
}
}
/**
* Calculate aggregation value on the time range from the tsBlock containing raw data.
*
* @return left - whether the aggregation calculation of the current time range has done; right -
* remaining tsBlock
*/
public static Pair<Boolean, TsBlock> calculateAggregationFromRawData(
TsBlock inputTsBlock,
List<Aggregator> aggregators,
TimeRange curTimeRange,
boolean ascending) {
if (inputTsBlock == null || inputTsBlock.isEmpty()) {
return new Pair<>(false, inputTsBlock);
}
// check if the tsBlock does not contain points in current interval
if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
// skip points that cannot be calculated
if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
|| (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) {
inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending);
}
inputTsBlock = process(inputTsBlock, curTimeRange, aggregators);
}
// judge whether the calculation finished
boolean isTsBlockOutOfBound =
inputTsBlock != null
&& (ascending
? inputTsBlock.getEndTime() > curTimeRange.getMax()
: inputTsBlock.getEndTime() < curTimeRange.getMin());
return new Pair<>(
isAllAggregatorsHasFinalResult(aggregators) || isTsBlockOutOfBound, inputTsBlock);
}
private static TsBlock process(
TsBlock inputTsBlock, TimeRange curTimeRange, List<Aggregator> aggregators) {
// Get the row which need to be processed by aggregator
IWindow curWindow = new TimeWindow(curTimeRange);
TimeColumn timeColumn = inputTsBlock.getTimeColumn();
int lastIndexToProcess = 0;
for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
if (!curWindow.satisfy(timeColumn, i)) {
break;
}
lastIndexToProcess = i;
}
TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
for (Aggregator aggregator : aggregators) {
// current agg method has been calculated
if (aggregator.hasFinalResult()) {
continue;
}
aggregator.processTsBlock(inputRegion, null);
}
int lastReadRowIndex = lastIndexToProcess + 1;
if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
return null;
} else {
return inputTsBlock.subTsBlock(lastReadRowIndex);
}
}
/** Append a row of aggregation results to the result tsBlock. */
public static void appendAggregationResult(
TsBlockBuilder tsBlockBuilder,
List<? extends Aggregator> aggregators,
long outputTime,
long endTime) {
TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
// Use start time of current time range as time column
timeColumnBuilder.writeLong(outputTime);
ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
int columnIndex = 0;
if (endTime != INVALID_END_TIME) {
columnBuilders[columnIndex].writeLong(endTime);
columnIndex++;
}
for (Aggregator aggregator : aggregators) {
ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
columnBuilder[0] = columnBuilders[columnIndex++];
if (columnBuilder.length > 1) {
columnBuilder[1] = columnBuilders[columnIndex++];
}
aggregator.outputResult(columnBuilder);
}
tsBlockBuilder.declarePosition();
}
public static void appendAggregationResult(
TsBlockBuilder tsBlockBuilder, List<? extends Aggregator> aggregators, long outputTime) {
appendAggregationResult(tsBlockBuilder, aggregators, outputTime, INVALID_END_TIME);
}
/** return whether the tsBlock contains the data of the current time window. */
public static boolean satisfiedTimeRange(
TsBlock tsBlock, TimeRange curTimeRange, boolean ascending) {
if (tsBlock == null || tsBlock.isEmpty()) {
return false;
}
return ascending
? (tsBlock.getEndTime() >= curTimeRange.getMin()
&& tsBlock.getStartTime() <= curTimeRange.getMax())
: (tsBlock.getEndTime() <= curTimeRange.getMax()
&& tsBlock.getStartTime() >= curTimeRange.getMin());
}
public static boolean isAllAggregatorsHasFinalResult(List<Aggregator> aggregators) {
for (Aggregator aggregator : aggregators) {
if (!aggregator.hasFinalResult()) {
return false;
}
}
return true;
}
public static long calculateMaxAggregationResultSize(
List<? extends AggregationDescriptor> aggregationDescriptors,
ITimeRangeIterator timeRangeIterator,
TypeProvider typeProvider) {
long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
for (AggregationDescriptor descriptor : aggregationDescriptors) {
List<TSDataType> outPutDataTypes =
descriptor.getOutputColumnNames().stream()
.map(typeProvider::getType)
.collect(Collectors.toList());
for (TSDataType tsDataType : outPutDataTypes) {
timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType);
}
}
return Math.min(
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
Math.min(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
timeRangeIterator.getTotalIntervalNum())
* timeValueColumnsSizePerLine);
}
public static long calculateMaxAggregationResultSizeForLastQuery(List<Aggregator> aggregators) {
long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION;
List<TSDataType> outPutDataTypes =
aggregators.stream()
.flatMap(aggregator -> Arrays.stream(aggregator.getOutputType()))
.collect(Collectors.toList());
for (TSDataType tsDataType : outPutDataTypes) {
timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType);
}
return timeValueColumnsSizePerLine;
}
public static long getOutputColumnSizePerLine(TSDataType tsDataType) {
switch (tsDataType) {
case INT32:
return IntColumn.SIZE_IN_BYTES_PER_POSITION;
case INT64:
return LongColumn.SIZE_IN_BYTES_PER_POSITION;
case FLOAT:
return FloatColumn.SIZE_IN_BYTES_PER_POSITION;
case DOUBLE:
return DoubleColumn.SIZE_IN_BYTES_PER_POSITION;
case BOOLEAN:
return BooleanColumn.SIZE_IN_BYTES_PER_POSITION;
case TEXT:
return StatisticsManager.getInstance().getMaxBinarySizeInBytes();
default:
throw new UnsupportedOperationException("Unknown data type " + tsDataType);
}
}
public static String addPartialSuffix(String aggregationName) {
return aggregationName + PARTIAL_SUFFIX;
}
public static boolean isBuiltinAggregationName(String functionName) {
return BuiltinAggregationFunction.getNativeFunctionNames().contains(functionName);
}
}