blob: e0a560563ffa511346af68c29c90b84980dd8d1f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GapfillUtils;
/**
* Helper class to reduce and set gap fill results into the BrokerResponseNative
*
* {@link SumAvgGapfillProcessor} is only applying the sum and/or avg aggregation function on the gapfilled result for
* each time bucket.
*
* {@link SumAvgGapfillProcessor} is different from {@link GapfillProcessor} that {@link GapfillProcessor} will create
* the gapfilled entries, but {@link SumAvgGapfillProcessor} does not generate the gapfilled entries. It just updated
* the aggregated sum and/or avg counters as necessary.
*/
class SumAvgGapfillProcessor extends BaseGapfillProcessor {
private final double [] _sumes;
private final int [] _columnTypes;
private final int [] _sumArgIndexes;
private final static int COLUMN_TYPE_SUM = 1;
private final static int COLUMN_TYPE_AVG = 2;
protected Map<Integer, Integer> _filteredMap;
protected final Map<Key, Integer> _groupByKeys;
SumAvgGapfillProcessor(QueryContext queryContext, GapfillUtils.GapfillType gapfillType) {
super(queryContext, gapfillType);
_groupByKeys = new HashMap<>();
_columnTypes = new int[_queryContext.getSelectExpressions().size()];
_sumArgIndexes = new int[_columnTypes.length];
_sumes = new double[_columnTypes.length];
}
protected void initializeAggregationValues(List<Object[]> rows, DataSchema dataSchema) {
for (int i = 0; i < _columnTypes.length; i++) {
ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(i);
if (expressionContext.getType() == ExpressionContext.Type.FUNCTION) {
if (expressionContext.getFunction().getFunctionName().equalsIgnoreCase("sum")) {
_columnTypes[i] = COLUMN_TYPE_SUM;
} else {
_columnTypes[i] = COLUMN_TYPE_AVG;
}
ExpressionContext arg = expressionContext.getFunction().getArguments().get(0);
for (int j = 0; j < dataSchema.getColumnNames().length; j++) {
if (arg.getLiteral().equalsIgnoreCase(dataSchema.getColumnName(j))) {
_sumArgIndexes[i] = j;
break;
}
}
}
}
for (Map.Entry<Key, Integer> entry : _groupByKeys.entrySet()) {
if (_previousByGroupKey.containsKey(entry.getKey())) {
if (_postGapfillFilterHandler == null
|| _postGapfillFilterHandler.isMatch(_previousByGroupKey.get(entry.getKey()))) {
_filteredMap.put(entry.getValue(), entry.getValue());
for (int i = 0; i < _columnTypes.length; i++) {
if (_columnTypes[i] != 0) {
_sumes[i] += ((Number) rows.get(entry.getValue())[_sumArgIndexes[i]]).doubleValue();
}
}
_count++;
}
}
}
}
@Override
protected List<Object[]> gapFillAndAggregate(
List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
int [] timeBucketedRawRows = new int[_numOfTimeBuckets + 1];
int timeBucketedRawRowsIndex = 0;
for (int i = 0; i < rows.size(); i++) {
Object[] row = rows.get(i);
long time = _dateTimeFormatter.fromFormatToMillis(String.valueOf(row[_timeBucketColumnIndex]));
int index = findGapfillBucketIndex(time);
if (index >= _numOfTimeBuckets) {
timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
break;
}
Key key = constructGroupKeys(row);
_groupByKeys.putIfAbsent(key, _groupByKeys.size());
if (index < 0) {
// the data can potentially be used for previous value
_previousByGroupKey.compute(key, (k, previousRow) -> {
if (previousRow == null) {
return row;
} else {
if ((Long) row[_timeBucketColumnIndex] > (Long) previousRow[_timeBucketColumnIndex]) {
return row;
} else {
return previousRow;
}
}
});
} else if (index >= timeBucketedRawRowsIndex) {
while (index >= timeBucketedRawRowsIndex) {
timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
}
}
}
while (timeBucketedRawRowsIndex < _numOfTimeBuckets + 1) {
timeBucketedRawRows[timeBucketedRawRowsIndex++] = rows.size();
}
_filteredMap = new HashMap<>();
if (_queryContext.getSubquery() != null && _queryContext.getFilter() != null) {
_postGapfillFilterHandler = new GapfillFilterHandler(_queryContext.getFilter(), dataSchema);
}
if (_queryContext.getHavingFilter() != null) {
_postAggregateHavingFilterHandler =
new GapfillFilterHandler(_queryContext.getHavingFilter(), resultTableSchema);
}
initializeAggregationValues(rows, dataSchema);
List<Object[]> result = new ArrayList<>();
double [] aggregatedSum = new double[_columnTypes.length];
long aggregatedCount = 0;
for (long time = _startMs; time < _endMs; time += _gapfillTimeBucketSize) {
int timeBucketIndex = findGapfillBucketIndex(time);
for (int i = timeBucketedRawRows[timeBucketIndex]; i < timeBucketedRawRows[timeBucketIndex + 1]; i++) {
Object[] resultRow = rows.get(i);
Key key = constructGroupKeys(resultRow);
int groupKeyIndex = _groupByKeys.get(key);
if ((_filteredMap.containsKey(groupKeyIndex))) {
for (int j = 0; j < _columnTypes.length; j++) {
if (_columnTypes[j] == 0) {
continue;
}
_sumes[j] -= ((Number) (rows.get(_filteredMap.get(groupKeyIndex))[_sumArgIndexes[j]])).doubleValue();
}
_filteredMap.remove(groupKeyIndex);
_count--;
}
if (_postGapfillFilterHandler == null || _postGapfillFilterHandler.isMatch(resultRow)) {
_count++;
for (int j = 0; j < _columnTypes.length; j++) {
if (_columnTypes[j] == 0) {
continue;
}
_sumes[j] += ((Number) (resultRow[_sumArgIndexes[j]])).doubleValue();
}
_filteredMap.put(groupKeyIndex, i);
}
}
if (_count > 0) {
aggregatedCount += _count;
for (int i = 0; i < _columnTypes.length; i++) {
if (_columnTypes[i] != 0) {
aggregatedSum[i] += _sumes[i];
}
}
}
if ((timeBucketIndex + 1) % _aggregationSize == 0 && aggregatedCount > 0) {
Object[] aggregatedRow = new Object[_queryContext.getSelectExpressions().size()];
for (int i = 0; i < _columnTypes.length; i++) {
if (_columnTypes[i] == 0) {
if (dataSchema.getColumnDataType(_timeBucketColumnIndex) == DataSchema.ColumnDataType.LONG) {
aggregatedRow[i] = time - (_aggregationSize - 1) * _gapfillTimeBucketSize;
} else {
aggregatedRow[i] = _dateTimeFormatter.fromMillisToFormat(
time - (_aggregationSize - 1) * _gapfillTimeBucketSize);
}
} else if (_columnTypes[i] == COLUMN_TYPE_SUM) {
aggregatedRow[i] = aggregatedSum[i];
} else { //COLUMN_TYPE_AVG
aggregatedRow[i] = aggregatedSum[i] / aggregatedCount;
}
}
aggregatedSum = new double[_columnTypes.length];
aggregatedCount = 0;
if (_postAggregateHavingFilterHandler == null || _postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
result.add(aggregatedRow);
}
if (result.size() >= _limitForAggregatedResult) {
return result;
}
}
}
return result;
}
}