blob: 0bf49c7c68e7620c1b6c884304b8b174dc9f0765 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;
import com.google.common.base.Preconditions;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.roaringbitmap.RoaringBitmap;
/**
* Helper class to reduce data tables and set group by results into the BrokerResponseNative
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class GroupByDataTableReducer implements DataTableReducer {
private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value.
private final QueryContext _queryContext;
private final AggregationFunction[] _aggregationFunctions;
private final int _numAggregationFunctions;
private final List<ExpressionContext> _groupByExpressions;
private final int _numGroupByExpressions;
private final int _numColumns;
GroupByDataTableReducer(QueryContext queryContext) {
_queryContext = queryContext;
_aggregationFunctions = queryContext.getAggregationFunctions();
assert _aggregationFunctions != null;
_numAggregationFunctions = _aggregationFunctions.length;
_groupByExpressions = queryContext.getGroupByExpressions();
assert _groupByExpressions != null;
_numGroupByExpressions = _groupByExpressions.size();
_numColumns = _numAggregationFunctions + _numGroupByExpressions;
}
/**
* Reduces and sets group by results into ResultTable.
*/
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponse,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
assert dataSchema != null;
if (dataTableMap.isEmpty()) {
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema));
DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
brokerResponse.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
return;
}
if (!_queryContext.isServerReturnFinalResult()) {
try {
reduceWithIntermediateResult(brokerResponse, dataSchema, dataTableMap.values(), reducerContext, tableName,
brokerMetrics);
} catch (TimeoutException e) {
brokerResponse.getProcessingExceptions()
.add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
}
} else {
// TODO: Support merging results from multiple servers when the data is partitioned on the group-by column
Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponse);
}
if (brokerMetrics != null && brokerResponse.getResultTable() != null) {
brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
brokerResponse.getResultTable().getRows().size());
}
}
/**
* Extract group by order by results and set into {@link ResultTable}
* @param brokerResponseNative broker response
* @param dataSchema data schema
* @param dataTables Collection of data tables
* @param reducerContext DataTableReducer context
* @param rawTableName table name
* @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
private void reduceWithIntermediateResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
throws TimeoutException {
IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
}
int numRecords = indexedTable.size();
Iterator<Record> sortedIterator = indexedTable.iterator();
DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext, prePostAggregationDataSchema);
DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
// Directly return when there is no record returned, or limit is 0
int limit = _queryContext.getLimit();
if (numRecords == 0 || limit == 0) {
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
return;
}
// Calculate rows before post-aggregation
List<Object[]> rows;
ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
FilterContext havingFilter = _queryContext.getHavingFilter();
if (havingFilter != null) {
rows = new ArrayList<>();
HavingFilterHandler havingFilterHandler =
new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
while (rows.size() < limit && sortedIterator.hasNext()) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
for (int i = 0; i < numColumns; i++) {
Object value = row[i];
if (value != null) {
row[i] = columnDataTypes[i].convert(row[i]);
}
}
if (havingFilterHandler.isMatch(row)) {
rows.add(row);
}
}
} else {
int numRows = Math.min(numRecords, limit);
rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
for (int j = 0; j < numColumns; j++) {
Object value = row[j];
if (value != null) {
row[j] = columnDataTypes[j].convert(row[j]);
}
}
rows.add(row);
}
}
// Calculate final result rows after post aggregation
List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
}
/**
* Helper method to extract the final aggregation results for the given row (in-place).
*/
private void extractFinalAggregationResults(Object[] row) {
for (int i = 0; i < _numAggregationFunctions; i++) {
int valueIndex = i + _numGroupByExpressions;
row[valueIndex] = _aggregationFunctions[i].extractFinalResult(row[valueIndex]);
}
}
/**
* Constructs the DataSchema for the rows before the post-aggregation (SQL mode).
*/
private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
String[] columnNames = dataSchema.getColumnNames();
ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns];
System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, _numGroupByExpressions);
for (int i = 0; i < _numAggregationFunctions; i++) {
columnDataTypes[i + _numGroupByExpressions] = _aggregationFunctions[i].getFinalResultColumnType();
}
return new DataSchema(columnNames, columnDataTypes);
}
private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
DataTableReducerContext reducerContext)
throws TimeoutException {
long start = System.currentTimeMillis();
int numDataTables = dataTablesToReduce.size();
// Get the number of threads to use for reducing.
// In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
int limit = _queryContext.getLimit();
// TODO: Make minTrimSize configurable
int trimSize = GroupByUtils.getTableCapacity(limit);
// NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
// TODO: Resolve the HAVING clause within the IndexedTable before returning the result
int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
int trimThreshold = reducerContext.getGroupByTrimThreshold();
IndexedTable indexedTable;
if (numReduceThreadsToUse == 1) {
indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
} else {
if (trimThreshold >= GroupByCombineOperator.MAX_TRIM_THRESHOLD) {
// special case of trim threshold where it is set to max value.
// there won't be any trimming during upsert in this case.
// thus we can avoid the overhead of read-lock and write-lock
// in the upsert method.
indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize);
} else {
indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
}
}
// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
for (int i = 0; i < numReduceThreadsToUse; i++) {
reduceGroups.add(new ArrayList<>());
}
for (int i = 0; i < numDataTables; i++) {
reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
}
Future[] futures = new Future[numReduceThreadsToUse];
CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
for (int i = 0; i < numReduceThreadsToUse; i++) {
List<DataTable> reduceGroup = reduceGroups.get(i);
futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() {
@Override
public void runJob() {
for (DataTable dataTable : reduceGroup) {
// Terminate when thread is interrupted. This is expected when the query already fails in the main thread.
if (Thread.interrupted()) {
return;
}
try {
boolean nullHandlingEnabled = _queryContext.isNullHandlingEnabled();
RoaringBitmap[] nullBitmaps = null;
if (nullHandlingEnabled) {
nullBitmaps = new RoaringBitmap[_numColumns];
for (int i = 0; i < _numColumns; i++) {
nullBitmaps[i] = dataTable.getNullRowIds(i);
}
}
int numRows = dataTable.getNumberOfRows();
for (int rowId = 0; rowId < numRows; rowId++) {
Object[] values = new Object[_numColumns];
for (int colId = 0; colId < _numColumns; colId++) {
switch (storedColumnDataTypes[colId]) {
case INT:
values[colId] = dataTable.getInt(rowId, colId);
break;
case LONG:
values[colId] = dataTable.getLong(rowId, colId);
break;
case FLOAT:
values[colId] = dataTable.getFloat(rowId, colId);
break;
case DOUBLE:
values[colId] = dataTable.getDouble(rowId, colId);
break;
case BIG_DECIMAL:
values[colId] = dataTable.getBigDecimal(rowId, colId);
break;
case STRING:
values[colId] = dataTable.getString(rowId, colId);
break;
case BYTES:
values[colId] = dataTable.getBytes(rowId, colId);
break;
case OBJECT:
// TODO: Move ser/de into AggregationFunction interface
DataTable.CustomObject customObject = dataTable.getCustomObject(rowId, colId);
if (customObject != null) {
values[colId] = ObjectSerDeUtils.deserialize(customObject);
}
break;
// Add other aggregation intermediate result / group-by column type supports here
default:
throw new IllegalStateException();
}
}
if (nullHandlingEnabled) {
for (int colId = 0; colId < _numColumns; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
values[colId] = null;
}
}
}
indexedTable.upsert(new Record(values));
}
} finally {
countDownLatch.countDown();
}
}
}
});
}
try {
long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out in broker reduce phase");
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in broker reduce phase", e);
} finally {
for (Future future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
indexedTable.finish(true);
return indexedTable;
}
/**
* Computes the number of reduce threads to use per query.
* <ul>
* <li> Use single thread if number of data tables to reduce is less than
* {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
* <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li>
* </ul>
*
* @param numDataTables Number of data tables to reduce
* @param maxReduceThreadsPerQuery Max allowed reduce threads per query
* @return Number of reduce threads to use for the query
*/
private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
// Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
return 1;
} else {
return Math.min(numDataTables, maxReduceThreadsPerQuery);
}
}
private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
BrokerResponseNative brokerResponseNative) {
PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, dataSchema);
DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
// Directly return when there is no record returned, or limit is 0
int numRows = dataTable.getNumberOfRows();
int limit = _queryContext.getLimit();
if (numRows == 0 || limit == 0) {
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
return;
}
// Calculate rows before post-aggregation
List<Object[]> rows;
FilterContext havingFilter = _queryContext.getHavingFilter();
if (havingFilter != null) {
rows = new ArrayList<>();
HavingFilterHandler havingFilterHandler =
new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
for (int i = 0; i < numRows; i++) {
Object[] row = getConvertedRowWithFinalResult(dataTable, i);
if (havingFilterHandler.isMatch(row)) {
rows.add(row);
if (rows.size() == limit) {
break;
}
}
}
} else {
numRows = Math.min(numRows, limit);
rows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
rows.add(getConvertedRowWithFinalResult(dataTable, i));
}
}
// Calculate final result rows after post aggregation
List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
}
private List<Object[]> calculateFinalResultRows(PostAggregationHandler postAggregationHandler, List<Object[]> rows) {
List<Object[]> resultRows = new ArrayList<>(rows.size());
ColumnDataType[] resultColumnDataTypes = postAggregationHandler.getResultDataSchema().getColumnDataTypes();
int numResultColumns = resultColumnDataTypes.length;
for (Object[] row : rows) {
Object[] resultRow = postAggregationHandler.getResult(row);
for (int i = 0; i < numResultColumns; i++) {
Object value = resultRow[i];
if (value != null) {
resultRow[i] = resultColumnDataTypes[i].format(value);
}
}
resultRows.add(resultRow);
}
return resultRows;
}
private Object[] getConvertedRowWithFinalResult(DataTable dataTable, int rowId) {
Object[] row = new Object[_numColumns];
ColumnDataType[] columnDataTypes = dataTable.getDataSchema().getColumnDataTypes();
for (int i = 0; i < _numColumns; i++) {
if (i < _numGroupByExpressions) {
row[i] = getConvertedKey(dataTable, columnDataTypes[i], rowId, i);
} else {
row[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataTypes[i], rowId, i);
}
}
return row;
}
private Object getConvertedKey(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
switch (columnDataType) {
case INT:
return dataTable.getInt(rowId, colId);
case LONG:
return dataTable.getLong(rowId, colId);
case FLOAT:
return dataTable.getFloat(rowId, colId);
case DOUBLE:
return dataTable.getDouble(rowId, colId);
case BIG_DECIMAL:
return dataTable.getBigDecimal(rowId, colId);
case BOOLEAN:
return dataTable.getInt(rowId, colId) == 1;
case TIMESTAMP:
return new Timestamp(dataTable.getLong(rowId, colId));
case STRING:
case JSON:
return dataTable.getString(rowId, colId);
case BYTES:
return dataTable.getBytes(rowId, colId).getBytes();
default:
throw new IllegalStateException("Illegal column data type in group key: " + columnDataType);
}
}
}