blob: 6059e6ca7535b3eb02bb191c578b4e6a09c782f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;
/**
* Helper class to reduce and set gap fill results into the BrokerResponseNative
*/
abstract class BaseGapfillProcessor {
protected final QueryContext _queryContext;
protected final int _limitForAggregatedResult;
protected final DateTimeGranularitySpec _gapfillDateTimeGranularity;
protected final DateTimeGranularitySpec _postGapfillDateTimeGranularity;
protected final DateTimeFormatSpec _dateTimeFormatter;
protected final long _startMs;
protected final long _endMs;
protected final long _gapfillTimeBucketSize;
protected final long _postGapfillTimeBucketSize;
protected final int _numOfTimeBuckets;
protected final List<Integer> _groupByKeyIndexes;
protected final Map<Key, Object[]> _previousByGroupKey;
protected long _count = 0;
protected final List<ExpressionContext> _timeSeries;
protected final int _timeBucketColumnIndex;
protected GapfillFilterHandler _postGapfillFilterHandler = null;
protected GapfillFilterHandler _postAggregateHavingFilterHandler = null;
protected final int _aggregationSize;
protected final GapfillUtils.GapfillType _gapfillType;
protected int _limitForGapfilledResult;
protected boolean[] _isGroupBySelections;
protected ExpressionContext _gapFillSelection;
BaseGapfillProcessor(QueryContext queryContext, GapfillUtils.GapfillType gapfillType) {
_queryContext = queryContext;
_limitForAggregatedResult = queryContext.getLimit();
_gapfillType = gapfillType;
if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
|| _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
_limitForGapfilledResult = queryContext.getLimit();
} else {
_limitForGapfilledResult = queryContext.getSubquery().getLimit();
}
_gapFillSelection = GapfillUtils.getGapfillExpressionContext(queryContext, gapfillType);
_timeBucketColumnIndex = GapfillUtils.findTimeBucketColumnIndex(queryContext, gapfillType);
List<ExpressionContext> args = _gapFillSelection.getFunction().getArguments();
_dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
_gapfillDateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral());
if (args.get(5).getLiteral() == null) {
_postGapfillDateTimeGranularity = _gapfillDateTimeGranularity;
} else {
_postGapfillDateTimeGranularity = new DateTimeGranularitySpec(args.get(5).getLiteral());
}
String start = args.get(2).getLiteral();
_startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
String end = args.get(3).getLiteral();
_endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
_gapfillTimeBucketSize = _gapfillDateTimeGranularity.granularityToMillis();
_postGapfillTimeBucketSize = _postGapfillDateTimeGranularity.granularityToMillis();
_numOfTimeBuckets = (int) ((_endMs - _startMs) / _gapfillTimeBucketSize);
_aggregationSize = (int) (_postGapfillTimeBucketSize / _gapfillTimeBucketSize);
_previousByGroupKey = new HashMap<>();
_groupByKeyIndexes = new ArrayList<>();
ExpressionContext timeseriesOn = GapfillUtils.getTimeSeriesOnExpressionContext(_gapFillSelection);
_timeSeries = timeseriesOn.getFunction().getArguments();
}
protected int findGapfillBucketIndex(long time) {
return (int) ((time - _startMs) / _gapfillTimeBucketSize);
}
protected void replaceColumnNameWithAlias(DataSchema dataSchema) {
QueryContext queryContext;
if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
queryContext = _queryContext.getSubquery().getSubquery();
} else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
queryContext = _queryContext;
} else {
queryContext = _queryContext.getSubquery();
}
List<String> aliasList = queryContext.getAliasList();
Map<String, String> columnNameToAliasMap = new HashMap<>();
for (int i = 0; i < aliasList.size(); i++) {
if (aliasList.get(i) != null) {
ExpressionContext selection = queryContext.getSelectExpressions().get(i);
if (GapfillUtils.isGapfill(selection)) {
selection = selection.getFunction().getArguments().get(0);
}
columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
}
}
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
dataSchema.getColumnNames()[i] = columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
}
}
}
/**
* Here are three things that happen
* 1. Sort the result sets from all pinot servers based on timestamp
* 2. Gapfill the data for missing entities per time bucket
* 3. Aggregate the dataset per time bucket.
*/
public void process(BrokerResponseNative brokerResponseNative) {
DataSchema dataSchema = brokerResponseNative.getResultTable().getDataSchema();
DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
if (brokerResponseNative.getResultTable().getRows().isEmpty()) {
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
return;
}
String[] columns = dataSchema.getColumnNames();
Map<String, Integer> indexes = new HashMap<>();
for (int i = 0; i < columns.length; i++) {
indexes.put(columns[i], i);
}
_isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
// The first one argument of timeSeries is time column. The left ones are defining entity.
for (ExpressionContext entityColum : _timeSeries) {
int index = indexes.get(entityColum.getIdentifier());
_isGroupBySelections[index] = true;
}
for (int i = 0; i < _isGroupBySelections.length; i++) {
if (_isGroupBySelections[i]) {
_groupByKeyIndexes.add(i);
}
}
List<Object[]> rows = brokerResponseNative.getResultTable().getRows();
replaceColumnNameWithAlias(dataSchema);
List<Object[]> resultRows = gapFillAndAggregate(rows, dataSchema, resultTableSchema);
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, resultRows));
}
/**
* Constructs the DataSchema for the ResultTable.
*/
protected DataSchema getResultTableDataSchema(DataSchema dataSchema) {
if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
return dataSchema;
}
int numOfColumns = _queryContext.getSelectExpressions().size();
String[] columnNames = new String[numOfColumns];
ColumnDataType[] columnDataTypes = new ColumnDataType[numOfColumns];
for (int i = 0; i < numOfColumns; i++) {
ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(i);
if (GapfillUtils.isGapfill(expressionContext)) {
expressionContext = expressionContext.getFunction().getArguments().get(0);
}
if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
columnNames[i] = expressionContext.getIdentifier();
columnDataTypes[i] = dataSchema.getColumnDataType(_timeBucketColumnIndex);
} else {
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunction aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(functionContext, _queryContext);
columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
columnNames[i] = functionContext.toString();
}
}
return new DataSchema(columnNames, columnDataTypes);
}
protected Key constructGroupKeys(Object[] row) {
Object[] groupKeys = new Object[_groupByKeyIndexes.size()];
for (int i = 0; i < _groupByKeyIndexes.size(); i++) {
groupKeys[i] = row[_groupByKeyIndexes.get(i)];
}
return new Key(groupKeys);
}
protected long truncate(long epoch) {
int sz = _gapfillDateTimeGranularity.getSize();
return epoch / sz * sz;
}
protected List<Object[]> gapFillAndAggregate(
List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
throw new UnsupportedOperationException("Not supported");
}
}