blob: 238fb557b161c09c1b7bbc4b748f22f7f4155565 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.roaringbitmap.RoaringBitmap;
/**
* Helper class to reduce and set Aggregation results into the BrokerResponseNative
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class AggregationDataTableReducer implements DataTableReducer {
private final QueryContext _queryContext;
private final AggregationFunction[] _aggregationFunctions;
AggregationDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
_aggregationFunctions = queryContext.getAggregationFunctions();
}
/**
* Reduces data tables and sets aggregations results into ResultTable.
*/
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
assert dataSchema != null;
if (dataTableMap.isEmpty()) {
DataSchema resultTableSchema =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema()).getResultDataSchema();
brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
return;
}
if (!_queryContext.isServerReturnFinalResult()) {
reduceWithIntermediateResult(dataSchema, dataTableMap.values(), brokerResponseNative);
} else {
Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponseNative);
}
}
private void reduceWithIntermediateResult(DataSchema dataSchema, Collection<DataTable> dataTables,
BrokerResponseNative brokerResponseNative) {
int numAggregationFunctions = _aggregationFunctions.length;
Object[] intermediateResults = new Object[numAggregationFunctions];
for (DataTable dataTable : dataTables) {
for (int i = 0; i < numAggregationFunctions; i++) {
Object intermediateResultToMerge;
ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
if (nullBitmap != null && nullBitmap.contains(0)) {
intermediateResultToMerge = null;
} else {
intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
}
} else {
intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
}
Object mergedIntermediateResult = intermediateResults[i];
if (mergedIntermediateResult == null) {
intermediateResults[i] = intermediateResultToMerge;
} else {
intermediateResults[i] = _aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge);
}
}
}
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
AggregationFunction aggregationFunction = _aggregationFunctions[i];
Comparable result = aggregationFunction.extractFinalResult(intermediateResults[i]);
finalResults[i] = result == null ? null : aggregationFunction.getFinalResultColumnType().convert(result);
}
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
}
private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
BrokerResponseNative brokerResponseNative) {
int numAggregationFunctions = _aggregationFunctions.length;
Object[] finalResults = new Object[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
if (nullBitmap != null && nullBitmap.contains(0)) {
finalResults[i] = null;
} else {
finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
}
} else {
finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
}
}
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
}
/**
* Sets aggregation results into ResultsTable
*/
private ResultTable reduceToResultTable(Object[] finalResults) {
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema());
DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
Object[] row = postAggregationHandler.getResult(finalResults);
ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
for (int i = 0; i < numColumns; i++) {
row[i] = columnDataTypes[i].format(row[i]);
}
return new ResultTable(dataSchema, Collections.singletonList(row));
}
/**
* Constructs the DataSchema for the rows before the post-aggregation (SQL mode).
*/
private DataSchema getPrePostAggregationDataSchema() {
int numAggregationFunctions = _aggregationFunctions.length;
String[] columnNames = new String[numAggregationFunctions];
ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
AggregationFunction aggregationFunction = _aggregationFunctions[i];
columnNames[i] = aggregationFunction.getResultColumnName();
columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
}
return new DataSchema(columnNames, columnDataTypes);
}
}