blob: 16ca57d91ff5ce96b1bc34e28abf08ad4336f5ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.selection;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.roaringbitmap.RoaringBitmap;
/**
* The <code>SelectionOperatorService</code> class provides the services for selection queries with
* <code>ORDER BY</code>.
* <p>Expected behavior:
* <ul>
* <li>
* Return selection results with the same order of columns as user passed in.
* <ul>
* <li>Eg. SELECT colB, colA, colC FROM table -> [valB, valA, valC]</li>
* </ul>
* </li>
* <li>
* For 'SELECT *', return columns with alphabetically order.
* <ul>
* <li>Eg. SELECT * FROM table -> [valA, valB, valC]</li>
* </ul>
* </li>
* <li>
* Order by does not change the order of columns in selection results.
* <ul>
* <li>Eg. SELECT colB, colA, colC FROM table ORDER BY calC -> [valB, valA, valC]</li>
* </ul>
* </li>
* </ul>
*/
@SuppressWarnings("rawtypes")
public class SelectionOperatorService {
private final QueryContext _queryContext;
private final List<String> _selectionColumns;
private final DataSchema _dataSchema;
private final int _offset;
private final int _numRowsToKeep;
private final PriorityQueue<Object[]> _rows;
/**
* Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Inter segment)
*
* @param queryContext Selection order-by query
* @param dataSchema data schema.
*/
public SelectionOperatorService(QueryContext queryContext, DataSchema dataSchema) {
_queryContext = queryContext;
_selectionColumns = SelectionOperatorUtils.getSelectionColumns(queryContext, dataSchema);
_dataSchema = dataSchema;
// Select rows from offset to offset + limit.
_offset = queryContext.getOffset();
_numRowsToKeep = _offset + queryContext.getLimit();
assert queryContext.getOrderByExpressions() != null;
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema,
_queryContext.isNullHandlingEnabled()));
}
/**
* Get the selection results.
*
* @return selection results.
*/
public PriorityQueue<Object[]> getRows() {
return _rows;
}
/**
* Reduces a collection of {@link DataTable}s to selection rows for selection queries with <code>ORDER BY</code>.
* (Broker side)
*/
public void reduceWithOrdering(Collection<DataTable> dataTables, boolean nullHandlingEnabled) {
for (DataTable dataTable : dataTables) {
int numRows = dataTable.getNumberOfRows();
if (nullHandlingEnabled) {
RoaringBitmap[] nullBitmaps = new RoaringBitmap[dataTable.getDataSchema().size()];
for (int colId = 0; colId < nullBitmaps.length; colId++) {
nullBitmaps[colId] = dataTable.getNullRowIds(colId);
}
for (int rowId = 0; rowId < numRows; rowId++) {
Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
for (int colId = 0; colId < nullBitmaps.length; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
row[colId] = null;
}
}
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
} else {
for (int rowId = 0; rowId < numRows; rowId++) {
Object[] row = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
}
}
}
}
/**
* Render the selection rows to a {@link ResultTable} object for selection queries with <code>ORDER BY</code>.
* (Broker side)
* <p>{@link ResultTable} object will be used to build the broker response.
* <p>Should be called after method "reduceWithOrdering()".
*/
public ResultTable renderResultTableWithOrdering() {
int[] columnIndices = SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
int numColumns = columnIndices.length;
// Construct the result data schema
String[] columnNames = _dataSchema.getColumnNames();
ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
String[] resultColumnNames = new String[numColumns];
ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < numColumns; i++) {
int columnIndex = columnIndices[i];
resultColumnNames[i] = columnNames[columnIndex];
resultColumnDataTypes[i] = columnDataTypes[columnIndex];
}
DataSchema resultDataSchema = new DataSchema(resultColumnNames, resultColumnDataTypes);
// Extract the result rows
LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
while (_rows.size() > _offset) {
Object[] row = _rows.poll();
assert row != null;
Object[] extractedRow = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
Object value = row[columnIndices[i]];
if (value != null) {
extractedRow[i] = resultColumnDataTypes[i].convertAndFormat(value);
}
}
rowsInSelectionResults.addFirst(extractedRow);
}
return new ResultTable(resultDataSchema, rowsInSelectionResults);
}
}