blob: 3907175b6b6dfead4f0525093c39fc9a568eb8df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
import org.apache.pinot.core.query.aggregation.function.CountAggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GapfillUtils;
/**
* Helper class to reduce and set gap fill results into the BrokerResponseNative
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class GapfillProcessor extends BaseGapfillProcessor {
private final Set<Key> _groupByKeys;
private final Map<String, ExpressionContext> _fillExpressions;
private int[] _sourceColumnIndexForResultSchema = null;
GapfillProcessor(QueryContext queryContext, GapfillUtils.GapfillType gapfillType) {
super(queryContext, gapfillType);
_fillExpressions = GapfillUtils.getFillExpressions(_gapFillSelection);
_groupByKeys = new HashSet<>();
}
/**
* Here are three things that happen
* 1. Sort the result sets from all pinot servers based on timestamp
* 2. Gapfill the data for missing entities per time bucket
* 3. Aggregate the dataset per time bucket.
*/
public void process(BrokerResponseNative brokerResponseNative) {
DataSchema dataSchema = brokerResponseNative.getResultTable().getDataSchema();
DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
if (brokerResponseNative.getResultTable().getRows().isEmpty()) {
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
return;
}
String[] columns = dataSchema.getColumnNames();
Map<String, Integer> indexes = new HashMap<>();
for (int i = 0; i < columns.length; i++) {
indexes.put(columns[i], i);
}
_isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
// The first one argument of timeSeries is time column. The left ones are defining entity.
for (ExpressionContext entityColum : _timeSeries) {
int index = indexes.get(entityColum.getIdentifier());
_isGroupBySelections[index] = true;
}
for (int i = 0; i < _isGroupBySelections.length; i++) {
if (_isGroupBySelections[i]) {
_groupByKeyIndexes.add(i);
}
}
List<Object[]>[] timeBucketedRawRows = putRawRowsIntoTimeBucket(brokerResponseNative.getResultTable().getRows());
replaceColumnNameWithAlias(dataSchema);
if (_queryContext.getAggregationFunctions() == null) {
Map<String, Integer> sourceColumnsIndexes = new HashMap<>();
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
sourceColumnsIndexes.put(dataSchema.getColumnName(i), i);
}
_sourceColumnIndexForResultSchema = new int[resultTableSchema.getColumnNames().length];
for (int i = 0; i < _sourceColumnIndexForResultSchema.length; i++) {
_sourceColumnIndexForResultSchema[i] = sourceColumnsIndexes.get(resultTableSchema.getColumnName(i));
}
}
List<Object[]> resultRows = gapFillAndAggregate(timeBucketedRawRows, resultTableSchema, dataSchema);
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, resultRows));
}
private List<Object[]> gapFillAndAggregate(List<Object[]>[] timeBucketedRawRows,
DataSchema dataSchemaForAggregatedResult, DataSchema dataSchema) {
List<Object[]> result = new ArrayList<>();
GapfillFilterHandler postGapfillFilterHandler = null;
if (_queryContext.getSubquery() != null && _queryContext.getFilter() != null) {
postGapfillFilterHandler = new GapfillFilterHandler(_queryContext.getFilter(), dataSchema);
}
GapfillFilterHandler postAggregateHavingFilterHandler = null;
if (_queryContext.getHavingFilter() != null) {
postAggregateHavingFilterHandler =
new GapfillFilterHandler(_queryContext.getHavingFilter(), dataSchemaForAggregatedResult);
}
long start = _startMs;
ColumnDataType[] resultColumnDataTypes = dataSchema.getColumnDataTypes();
List<Object[]> bucketedResult = new ArrayList<>();
for (long time = _startMs; time < _endMs; time += _gapfillTimeBucketSize) {
int index = findGapfillBucketIndex(time);
gapfill(time, bucketedResult, timeBucketedRawRows[index], dataSchema, postGapfillFilterHandler);
if (_queryContext.getAggregationFunctions() == null) {
for (Object[] row : bucketedResult) {
Object[] resultRow = new Object[_sourceColumnIndexForResultSchema.length];
for (int i = 0; i < _sourceColumnIndexForResultSchema.length; i++) {
resultRow[i] = row[_sourceColumnIndexForResultSchema[i]];
}
result.add(resultRow);
}
bucketedResult.clear();
} else if (index % _aggregationSize == _aggregationSize - 1) {
if (bucketedResult.size() > 0) {
Object timeCol;
if (resultColumnDataTypes[_timeBucketColumnIndex] == ColumnDataType.LONG) {
timeCol = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(start));
} else {
timeCol = _dateTimeFormatter.fromMillisToFormat(start);
}
List<Object[]> aggregatedRows = aggregateGapfilledData(timeCol, bucketedResult, dataSchema);
for (Object[] aggregatedRow : aggregatedRows) {
if (postAggregateHavingFilterHandler == null || postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
result.add(aggregatedRow);
}
if (result.size() >= _limitForAggregatedResult) {
return result;
}
}
bucketedResult.clear();
}
start = time + _gapfillTimeBucketSize;
}
}
return result;
}
private void gapfill(long bucketTime, List<Object[]> bucketedResult, List<Object[]> rawRowsForBucket,
DataSchema dataSchema, GapfillFilterHandler postGapfillFilterHandler) {
ColumnDataType[] resultColumnDataTypes = dataSchema.getColumnDataTypes();
int numResultColumns = resultColumnDataTypes.length;
Set<Key> keys = new HashSet<>(_groupByKeys);
if (rawRowsForBucket != null) {
for (Object[] resultRow : rawRowsForBucket) {
for (int i = 0; i < resultColumnDataTypes.length; i++) {
resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
}
long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[0]));
if (timeCol > bucketTime) {
break;
}
if (timeCol == bucketTime) {
if (postGapfillFilterHandler == null || postGapfillFilterHandler.isMatch(resultRow)) {
if (bucketedResult.size() >= _limitForGapfilledResult) {
_limitForGapfilledResult = 0;
break;
} else {
bucketedResult.add(resultRow);
}
}
Key key = constructGroupKeys(resultRow);
keys.remove(key);
_previousByGroupKey.put(key, resultRow);
}
}
}
for (Key key : keys) {
Object[] gapfillRow = new Object[numResultColumns];
int keyIndex = 0;
if (resultColumnDataTypes[_timeBucketColumnIndex] == ColumnDataType.LONG) {
gapfillRow[0] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(bucketTime));
} else {
gapfillRow[0] = _dateTimeFormatter.fromMillisToFormat(bucketTime);
}
for (int i = 1; i < _isGroupBySelections.length; i++) {
if (_isGroupBySelections[i]) {
gapfillRow[i] = key.getValues()[keyIndex++];
} else {
gapfillRow[i] = getFillValue(i, dataSchema.getColumnName(i), key, resultColumnDataTypes[i]);
}
}
if (postGapfillFilterHandler == null || postGapfillFilterHandler.isMatch(gapfillRow)) {
if (bucketedResult.size() >= _limitForGapfilledResult) {
break;
} else {
bucketedResult.add(gapfillRow);
}
}
}
if (_limitForGapfilledResult > _groupByKeys.size()) {
_limitForGapfilledResult -= _groupByKeys.size();
} else {
_limitForGapfilledResult = 0;
}
}
private List<Object[]> aggregateGapfilledData(Object timeCol, List<Object[]> bucketedRows, DataSchema dataSchema) {
List<ExpressionContext> groupbyExpressions = _queryContext.getGroupByExpressions();
Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy Clause.");
Map<String, Integer> indexes = new HashMap<>();
for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
indexes.put(dataSchema.getColumnName(i), i);
}
for (Object[] bucketedRow : bucketedRows) {
bucketedRow[_timeBucketColumnIndex] = timeCol;
}
Map<List<Object>, Integer> groupKeyIndexes = new HashMap<>();
int[] groupKeyArray = new int[bucketedRows.size()];
List<Object[]> aggregatedResult = new ArrayList<>();
for (int i = 0; i < bucketedRows.size(); i++) {
Object[] bucketedRow = bucketedRows.get(i);
List<Object> groupKey = new ArrayList<>(groupbyExpressions.size());
for (ExpressionContext groupbyExpression : groupbyExpressions) {
int columnIndex = indexes.get(groupbyExpression.toString());
groupKey.add(bucketedRow[columnIndex]);
}
if (groupKeyIndexes.containsKey(groupKey)) {
groupKeyArray[i] = groupKeyIndexes.get(groupKey);
} else {
// create the new groupBy Result row and fill the group by key
groupKeyArray[i] = groupKeyIndexes.size();
groupKeyIndexes.put(groupKey, groupKeyIndexes.size());
Object[] row = new Object[_queryContext.getSelectExpressions().size()];
for (int j = 0; j < _queryContext.getSelectExpressions().size(); j++) {
ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(j);
if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
row[j] = bucketedRow[indexes.get(expressionContext.toString())];
}
}
aggregatedResult.add(row);
}
}
Map<ExpressionContext, BlockValSet> blockValSetMap = new HashMap<>();
for (int i = 1; i < dataSchema.getColumnNames().length; i++) {
blockValSetMap.put(ExpressionContext.forIdentifier(dataSchema.getColumnName(i)),
new RowBasedBlockValSet(dataSchema.getColumnDataType(i), bucketedRows, i));
}
for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(i);
if (expressionContext.getType() == ExpressionContext.Type.FUNCTION) {
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunction aggregationFunction =
AggregationFunctionFactory.getAggregationFunction(functionContext, _queryContext);
GroupByResultHolder groupByResultHolder =
aggregationFunction.createGroupByResultHolder(groupKeyIndexes.size(), groupKeyIndexes.size());
if (aggregationFunction instanceof CountAggregationFunction) {
aggregationFunction.aggregateGroupBySV(bucketedRows.size(), groupKeyArray, groupByResultHolder,
new HashMap<ExpressionContext, BlockValSet>());
} else {
aggregationFunction
.aggregateGroupBySV(bucketedRows.size(), groupKeyArray, groupByResultHolder, blockValSetMap);
}
for (int j = 0; j < groupKeyIndexes.size(); j++) {
Object[] row = aggregatedResult.get(j);
row[i] = aggregationFunction.extractGroupByResult(groupByResultHolder, j);
row[i] = aggregationFunction.extractFinalResult(row[i]);
}
}
}
return aggregatedResult;
}
private Object getFillValue(int columnIndex, String columnName, Object key, ColumnDataType dataType) {
ExpressionContext expressionContext = _fillExpressions.get(columnName);
if (expressionContext != null && expressionContext.getFunction() != null && GapfillUtils
.isFill(expressionContext)) {
List<ExpressionContext> args = expressionContext.getFunction().getArguments();
if (args.get(1).getLiteral() == null) {
throw new UnsupportedOperationException("Wrong Sql.");
}
GapfillUtils.FillType fillType = GapfillUtils.FillType.valueOf(args.get(1).getLiteral());
if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) {
// TODO: may fill the default value from sql in the future.
return GapfillUtils.getDefaultValue(dataType);
} else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) {
Object[] row = _previousByGroupKey.get(key);
if (row != null) {
return row[columnIndex];
} else {
return GapfillUtils.getDefaultValue(dataType);
}
} else {
throw new UnsupportedOperationException("unsupported fill type.");
}
} else {
return GapfillUtils.getDefaultValue(dataType);
}
}
/**
* Merge all result tables from different pinot servers and sort the rows based on timebucket.
*/
private List<Object[]>[] putRawRowsIntoTimeBucket(List<Object[]> rows) {
List<Object[]>[] bucketedItems = new List[_numOfTimeBuckets];
for (Object[] row : rows) {
long timeBucket = _dateTimeFormatter.fromFormatToMillis(String.valueOf(row[_timeBucketColumnIndex]));
int index = findGapfillBucketIndex(timeBucket);
if (index >= _numOfTimeBuckets) {
// the data will not be used for gapfill, skip it
continue;
}
Key key = constructGroupKeys(row);
_groupByKeys.add(key);
if (index < 0) {
// the data can potentially be used for previous value
_previousByGroupKey.compute(key, (k, previousRow) -> {
if (previousRow == null) {
return row;
} else {
long previousTimeBucket =
_dateTimeFormatter.fromFormatToMillis(String.valueOf(previousRow[_timeBucketColumnIndex]));
if (timeBucket > previousTimeBucket) {
return row;
} else {
return previousRow;
}
}
});
} else {
if (bucketedItems[index] == null) {
bucketedItems[index] = new ArrayList<>();
}
bucketedItems[index].add(row);
}
}
return bucketedItems;
}
}