blob: 47410a85ffbe5ccfd27c071e824eb49c4a796c06 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.query;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.BitmapDocIdSetOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
/**
* Operator for selection order-by queries.
* <p>The operator uses a priority queue to sort the rows and return the top rows based on the order-by expressions.
* <p>It is optimized to fetch only the values needed for the ordering purpose and the final result:
* <ul>
* <li>
* When all the output expressions are ordered, the operator fetches all the output expressions and insert them into
* the priority queue because all the values are needed for ordering.
* </li>
* <li>
* Otherwise, the operator fetches only the order-by expressions and the virtual document id column and insert them
* into the priority queue. After getting the top rows, the operator does a second round scan only on the document
* ids for the top rows for the non-order-by output expressions. This optimization can significantly reduce the
* scanning and improve the query performance when most/all of the output expressions are not ordered (e.g. SELECT *
* FROM table ORDER BY col).
* </li>
* </ul>
*/
public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock> {
private static final String EXPLAIN_NAME = "SELECT_ORDERBY";
private final IndexSegment _indexSegment;
private final boolean _nullHandlingEnabled;
// Deduped order-by expressions followed by output expressions from SelectionOperatorUtils.extractExpressions()
private final List<ExpressionContext> _expressions;
private final TransformOperator _transformOperator;
private final List<OrderByExpressionContext> _orderByExpressions;
private final TransformResultMetadata[] _orderByExpressionMetadata;
private final int _numRowsToKeep;
private final PriorityQueue<Object[]> _rows;
private final boolean _allOrderByColsPreSorted;
private int _numDocsScanned = 0;
private long _numEntriesScannedPostFilter = 0;
public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext queryContext,
List<ExpressionContext> expressions, TransformOperator transformOperator, boolean allOrderByColsPreSorted) {
_indexSegment = indexSegment;
_nullHandlingEnabled = queryContext.isNullHandlingEnabled();
_expressions = expressions;
_transformOperator = transformOperator;
_allOrderByColsPreSorted = allOrderByColsPreSorted;
_orderByExpressions = queryContext.getOrderByExpressions();
assert _orderByExpressions != null;
int numOrderByExpressions = _orderByExpressions.size();
_orderByExpressionMetadata = new TransformResultMetadata[numOrderByExpressions];
for (int i = 0; i < numOrderByExpressions; i++) {
ExpressionContext expression = _orderByExpressions.get(i).getExpression();
_orderByExpressionMetadata[i] = _transformOperator.getResultMetadata(expression);
}
_numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
getComparator());
}
@Override
public String toExplainString() {
StringBuilder stringBuilder = new StringBuilder(EXPLAIN_NAME).append("(selectList:");
if (!_expressions.isEmpty()) {
stringBuilder.append(_expressions.get(0));
for (int i = 1; i < _expressions.size(); i++) {
stringBuilder.append(", ").append(_expressions.get(i));
}
}
return stringBuilder.append(')').toString();
}
private Comparator<Object[]> getComparator() {
// Compare all single-value columns
int numOrderByExpressions = _orderByExpressions.size();
List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
for (int i = 0; i < numOrderByExpressions; i++) {
if (_orderByExpressionMetadata[i].isSingleValue()) {
valueIndexList.add(i);
} else {
// MV columns should not be part of the selection order by only list
throw new BadQueryRequestException(
String.format("MV expression: %s should not be included in the ORDER-BY clause",
_orderByExpressions.get(i)));
}
}
int numValuesToCompare = valueIndexList.size();
int[] valueIndices = new int[numValuesToCompare];
DataType[] storedTypes = new DataType[numValuesToCompare];
// Use multiplier -1 or 1 to control ascending/descending order
int[] multipliers = new int[numValuesToCompare];
for (int i = 0; i < numValuesToCompare; i++) {
int valueIndex = valueIndexList.get(i);
valueIndices[i] = valueIndex;
storedTypes[i] = _orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
multipliers[i] = _orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
}
if (_nullHandlingEnabled) {
return (Object[] o1, Object[] o2) -> {
for (int i = 0; i < numValuesToCompare; i++) {
int index = valueIndices[i];
// TODO: Evaluate the performance of casting to Comparable and avoid the switch
Object v1 = o1[index];
Object v2 = o2[index];
if (v1 == null) {
// The default null ordering is: 'NULLS LAST', regardless of the ordering direction.
return v2 == null ? 0 : -multipliers[i];
} else if (v2 == null) {
return multipliers[i];
}
int result;
switch (storedTypes[i]) {
case INT:
result = ((Integer) v1).compareTo((Integer) v2);
break;
case LONG:
result = ((Long) v1).compareTo((Long) v2);
break;
case FLOAT:
result = ((Float) v1).compareTo((Float) v2);
break;
case DOUBLE:
result = ((Double) v1).compareTo((Double) v2);
break;
case BIG_DECIMAL:
result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
break;
case STRING:
result = ((String) v1).compareTo((String) v2);
break;
case BYTES:
result = ((ByteArray) v1).compareTo((ByteArray) v2);
break;
// NOTE: Multi-value columns are not comparable, so we should not reach here
default:
throw new IllegalStateException();
}
if (result != 0) {
return result * multipliers[i];
}
}
return 0;
};
} else {
return (Object[] o1, Object[] o2) -> {
for (int i = 0; i < numValuesToCompare; i++) {
int index = valueIndices[i];
// TODO: Evaluate the performance of casting to Comparable and avoid the switch
Object v1 = o1[index];
Object v2 = o2[index];
int result;
switch (storedTypes[i]) {
case INT:
result = ((Integer) v1).compareTo((Integer) v2);
break;
case LONG:
result = ((Long) v1).compareTo((Long) v2);
break;
case FLOAT:
result = ((Float) v1).compareTo((Float) v2);
break;
case DOUBLE:
result = ((Double) v1).compareTo((Double) v2);
break;
case BIG_DECIMAL:
result = ((BigDecimal) v1).compareTo((BigDecimal) v2);
break;
case STRING:
result = ((String) v1).compareTo((String) v2);
break;
case BYTES:
result = ((ByteArray) v1).compareTo((ByteArray) v2);
break;
// NOTE: Multi-value columns are not comparable, so we should not reach here
default:
throw new IllegalStateException();
}
if (result != 0) {
return result * multipliers[i];
}
}
return 0;
};
}
}
@Override
protected SelectionResultsBlock getNextBlock() {
if (_allOrderByColsPreSorted) {
return computeAllPreSorted();
} else if (_expressions.size() == _orderByExpressions.size()) {
return computeAllOrdered();
} else {
return computePartiallyOrdered();
}
}
private SelectionResultsBlock computeAllPreSorted() {
int numExpressions = _expressions.size();
// Fetch all the expressions and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numExpressions];
int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while (_numDocsScanned < _numRowsToKeep && (transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numExpressions; i++) {
ExpressionContext expression = _expressions.get(i);
blockValSets[i] = transformBlock.getBlockValueSet(expression);
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
if (_nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
for (int i = 0; i < numExpressions; i++) {
nullBitmaps[i] = blockValSets[i].getNullBitmap();
}
for (int rowId = 0; rowId < numDocsFetched && (_numDocsScanned < _numRowsToKeep); rowId++) {
Object[] row = blockValueFetcher.getRow(rowId);
for (int colId = 0; colId < numExpressions; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
row[colId] = null;
}
}
}
}
for (int i = 0; i < numDocsFetched && (_numDocsScanned < _numRowsToKeep); i++) {
SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
_numDocsScanned++;
}
}
_numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;
// Create the data schema
String[] columnNames = new String[numExpressions];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
for (int i = 0; i < numExpressions; i++) {
ExpressionContext expression = _expressions.get(i);
columnNames[i] = expression.toString();
TransformResultMetadata expressionMetadata = _transformOperator.getResultMetadata(expression);
columnDataTypes[i] =
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
return new SelectionResultsBlock(dataSchema, _rows);
}
/**
* Helper method to compute the result when all the output expressions are ordered.
*/
private SelectionResultsBlock computeAllOrdered() {
int numExpressions = _expressions.size();
// Fetch all the expressions and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numExpressions];
int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numExpressions; i++) {
ExpressionContext expression = _expressions.get(i);
blockValSets[i] = transformBlock.getBlockValueSet(expression);
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
if (_nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numExpressions];
for (int i = 0; i < numExpressions; i++) {
nullBitmaps[i] = blockValSets[i].getNullBitmap();
}
for (int rowId = 0; rowId < numDocsFetched; rowId++) {
// Note: Everytime blockValueFetcher.getRow is called, a new row instance is created.
Object[] row = blockValueFetcher.getRow(rowId);
for (int colId = 0; colId < numExpressions; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
row[colId] = null;
}
}
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
} else {
for (int i = 0; i < numDocsFetched; i++) {
SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
}
}
_numDocsScanned += numDocsFetched;
}
_numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;
// Create the data schema
String[] columnNames = new String[numExpressions];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
for (int i = 0; i < numExpressions; i++) {
columnNames[i] = _expressions.get(i).toString();
TransformResultMetadata expressionMetadata = _orderByExpressionMetadata[i];
columnDataTypes[i] =
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
return new SelectionResultsBlock(dataSchema, _rows);
}
/**
* Helper method to compute the result when not all the output expressions are ordered.
*/
private SelectionResultsBlock computePartiallyOrdered() {
int numExpressions = _expressions.size();
int numOrderByExpressions = _orderByExpressions.size();
// Fetch the order-by expressions and docIds and insert them into the priority queue
BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions];
int numColumnsProjected = _transformOperator.getNumColumnsProjected();
TransformBlock transformBlock;
while ((transformBlock = _transformOperator.nextBlock()) != null) {
for (int i = 0; i < numOrderByExpressions; i++) {
ExpressionContext expression = _orderByExpressions.get(i).getExpression();
blockValSets[i] = transformBlock.getBlockValueSet(expression);
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
int[] docIds = transformBlock.getDocIds();
if (_nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[numOrderByExpressions];
for (int i = 0; i < numOrderByExpressions; i++) {
nullBitmaps[i] = blockValSets[i].getNullBitmap();
}
for (int rowId = 0; rowId < numDocsFetched; rowId++) {
Object[] row = new Object[numExpressions];
blockValueFetcher.getRow(rowId, row, 0);
row[numOrderByExpressions] = docIds[rowId];
for (int colId = 0; colId < numOrderByExpressions; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
row[colId] = null;
}
}
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
} else {
for (int i = 0; i < numDocsFetched; i++) {
// NOTE: We pre-allocate the complete row so that we can fill up the non-order-by output expression values
// later
// without creating extra rows or re-constructing the priority queue. We can change the values in-place
// because the comparator only compare the values for the order-by expressions.
Object[] row = new Object[numExpressions];
blockValueFetcher.getRow(i, row, 0);
row[numOrderByExpressions] = docIds[i];
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
}
_numDocsScanned += numDocsFetched;
}
_numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;
// Copy the rows (shallow copy so that any modification will also be reflected to the priority queue) into a list,
// and store the document ids into a bitmap
int numRows = _rows.size();
List<Object[]> rowList = new ArrayList<>(numRows);
RoaringBitmap docIds = new RoaringBitmap();
for (Object[] row : _rows) {
rowList.add(row);
int docId = (int) row[numOrderByExpressions];
docIds.add(docId);
}
// Sort the rows with docIds to match the order of the bitmap (bitmap always returns values in ascending order)
rowList.sort(Comparator.comparingInt(o -> (int) o[numOrderByExpressions]));
// Construct a new TransformOperator to fetch the non-order-by expressions for the top rows
List<ExpressionContext> nonOrderByExpressions = _expressions.subList(numOrderByExpressions, numExpressions);
Set<String> columns = new HashSet<>();
for (ExpressionContext expressionContext : nonOrderByExpressions) {
expressionContext.getColumns(columns);
}
int numColumns = columns.size();
Map<String, DataSource> dataSourceMap = new HashMap<>();
for (String column : columns) {
dataSourceMap.put(column, _indexSegment.getDataSource(column));
}
ProjectionOperator projectionOperator =
new ProjectionOperator(dataSourceMap, new BitmapDocIdSetOperator(docIds, numRows));
TransformOperator transformOperator = new TransformOperator(projectionOperator, nonOrderByExpressions);
// Fill the non-order-by expression values
int numNonOrderByExpressions = nonOrderByExpressions.size();
blockValSets = new BlockValSet[numNonOrderByExpressions];
int rowBaseId = 0;
while ((transformBlock = transformOperator.nextBlock()) != null) {
for (int i = 0; i < numNonOrderByExpressions; i++) {
ExpressionContext expression = nonOrderByExpressions.get(i);
blockValSets[i] = transformBlock.getBlockValueSet(expression);
}
RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
int numDocsFetched = transformBlock.getNumDocs();
for (int i = 0; i < numDocsFetched; i++) {
blockValueFetcher.getRow(i, rowList.get(rowBaseId + i), numOrderByExpressions);
}
_numEntriesScannedPostFilter += (long) numDocsFetched * numColumns;
rowBaseId += numDocsFetched;
}
// Create the data schema
String[] columnNames = new String[numExpressions];
DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
for (int i = 0; i < numExpressions; i++) {
columnNames[i] = _expressions.get(i).toString();
}
for (int i = 0; i < numOrderByExpressions; i++) {
TransformResultMetadata expressionMetadata = _orderByExpressionMetadata[i];
columnDataTypes[i] =
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
}
for (int i = 0; i < numNonOrderByExpressions; i++) {
TransformResultMetadata expressionMetadata = transformOperator.getResultMetadata(nonOrderByExpressions.get(i));
columnDataTypes[numOrderByExpressions + i] =
DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
return new SelectionResultsBlock(dataSchema, _rows);
}
@Override
public List<Operator> getChildOperators() {
return Collections.singletonList(_transformOperator);
}
public IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
public ExecutionStatistics getExecutionStatistics() {
long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, _numEntriesScannedPostFilter,
numTotalDocs);
}
}