blob: e094d568360cb4cd2dbf0d789c85874c59d72fa4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.drill.exec.physical.impl.TopN;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.expr.fn.FunctionLookupContext;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.svremover.Copier;
import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.HyperVectorWrapper;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
* Operator Batch which implements the TopN functionality. It is more efficient
* than (sort + limit) since unlike sort it doesn't have to store all the input
* data to sort it first and then apply limit on the sorted data. Instead
* internally it maintains a priority queue backed by a heap with the size being
* same as limit value.
public class TopNBatch extends AbstractRecordBatch<TopN> {
private static final Logger logger = LoggerFactory.getLogger(TopNBatch.class);
private final MappingSet mainMapping = createMainMappingSet();
private final MappingSet leftMapping = createLeftMappingSet();
private final MappingSet rightMapping = createRightMappingSet();
private final int batchPurgeThreshold;
private final boolean codegenDump;
private final RecordBatch incoming;
private BatchSchema schema;
private boolean schemaChanged;
private PriorityQueue priorityQueue;
private final TopN config;
private SelectionVector4 sv4;
private long countSincePurge;
private int batchCount;
private Copier copier;
private boolean first = true;
private int recordCount;
private IterOutcome lastKnownOutcome = OK;
private boolean firstBatchForSchema = true;
private boolean hasOutputRecords;
public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
this.config = popConfig;
DrillConfig drillConfig = context.getConfig();
batchPurgeThreshold = drillConfig.getInt(ExecConstants.BATCH_PURGE_THRESHOLD);
codegenDump = drillConfig.getBoolean(CodeCompiler.ENABLE_SAVE_CODE_FOR_DEBUG_TOPN);
public int getRecordCount() {
return recordCount;
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
public SelectionVector4 getSelectionVector4() {
return sv4;
public void close() {
public void buildSchema() {
IterOutcome outcome = next(incoming);
switch (outcome) {
case OK:
for (VectorWrapper<?> w : incoming) {
ValueVector v = container.addOrGet(w.getField());
if (v instanceof AbstractContainerVector) {
case NONE:
state = BatchState.DONE;
case EMIT:
throw new IllegalStateException("Unexpected EMIT outcome received in buildSchema phase");
throw new IllegalStateException("Unexpected outcome received in buildSchema phase");
public IterOutcome innerNext() {
recordCount = 0;
if (state == BatchState.DONE) {
return NONE;
// Check if anything is remaining from previous record boundary
if (hasOutputRecords) {
return handleRemainingOutput();
// Reset the TopN state for next iteration
boolean incomingHasSv2 = false;
switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE: {
case TWO_BYTE: {
incomingHasSv2 = true;
case FOUR_BYTE: {
throw UserException.internalError(null)
.message("TopN doesn't support incoming with SV4 mode")
throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
outer: while (true) {
Stopwatch watch = Stopwatch.createStarted();
if (first) {
lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
// Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
sv4 = new SelectionVector4(context.getAllocator(), 0);
first = false;
} else {
lastKnownOutcome = next(incoming);
if (lastKnownOutcome == OK && schema == null) {
lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
switch (lastKnownOutcome) {
case NONE:
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
// schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
// is enabled it will be handled.
firstBatchForSchema = true;
if (!incoming.getSchema().equals(schema)) {
if (schema != null) {
if (!unionTypeEnabled) {
throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
"schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
} else {
schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
schemaChanged = true;
} else {
schema = incoming.getSchema();
// fall through.
case OK:
case EMIT:
if (incoming.getRecordCount() == 0) {
for (VectorWrapper<?> w : incoming) {
// Release memory for incoming SV2 vector
if (incomingHasSv2) {
countSincePurge += incoming.getRecordCount();
RecordBatchData batch;
if (schemaChanged) {
batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
} else {
batch = new RecordBatchData(incoming, oContext.getAllocator());
boolean success = false;
try {
if (priorityQueue == null) {
priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
} else if (!priorityQueue.isInitialized()) {
// means priority queue is cleaned up after producing output for first record boundary. We should
// initialize it for next record boundary
priorityQueue.init(config.getLimit(), oContext.getAllocator(),
schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
// Based on static threshold of number of batches, perform purge operation to release the memory for
// RecordBatches which are of no use or doesn't fall under TopN category
if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
countSincePurge = 0;
batchCount = 0;
success = true;
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
} finally {
if (!success) {
throw new UnsupportedOperationException();
// If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
// with records and EMIT outcome in above case statements
if (lastKnownOutcome == EMIT) {
// PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
// call returned NONE or EMIT.
// PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
// builder may be null at this point if the first incoming batch is empty
return handleEmptyBatches(lastKnownOutcome);
prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
// With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
// here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
// lastKnownOutcome.
return getFinalOutcome();
* When PriorityQueue is built up then it stores the list of limit number of
* record indexes (in heapSv4) which falls under TopN category. But it also
* stores all the incoming RecordBatches with all records inside a
* HyperContainer (hyperBatch). When a certain threshold of batches are
* reached then this method is called which copies the limit number of records
* whose indexes are stored in heapSv4 out of HyperBatch to a new
* VectorContainer and releases all other records and their batches. Later
* this new VectorContainer is stored inside the HyperBatch and it's
* corresponding indexes are stored in the heapSv4 vector. This is done to
* avoid holding up lot's of Record Batches which can create OutOfMemory
* condition.
private void purge() {
Stopwatch watch = Stopwatch.createStarted();
VectorContainer c = priorityQueue.getHyperBatch();
// Simple VectorConatiner which stores limit number of records only. The records whose indexes are stored inside
// selectionVector4 below are only copied from Hyper container to this simple container.
VectorContainer newContainer = new VectorContainer(oContext);
// SV4 storing the limit number of indexes
SelectionVector4 selectionVector4 = priorityQueue.getSv4();
SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
if (copier == null) {
copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
} else {
for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
copier.setup(batch, newContainer);
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try {
// Purge all the existing batches to a new batch which only holds the selected records
copyToPurge(newContainer, builder);
// New VectorContainer that contains only limit number of records and is later passed to resetQueue to create a
// HyperContainer backing the priority queue out of it
VectorContainer newQueue = new VectorContainer();;
try {
priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
} finally {
logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) {
return createNewPriorityQueue(
mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, unionTypeEnabled,
codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode(), context);
public static MappingSet createMainMappingSet() {
return new MappingSet((String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
public static MappingSet createLeftMappingSet() {
return new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
public static MappingSet createRightMappingSet() {
return new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
public static PriorityQueue createNewPriorityQueue(
MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump,
int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context) {
OptionSet optionSet = context.getOptions();
FunctionLookupContext functionLookupContext = context.getFunctionRegistry();
CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
// Uncomment out this line to debug the generated code.
// cg.saveCodeForDebugging(true);
ClassGenerator<PriorityQueue> g = cg.getRoot();
for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, functionLookupContext, unionTypeEnabled);
HoldingContainer left = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
HoldingContainer right = g.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, functionLookupContext);
HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
if (od.getDirection() == Direction.ASCENDING) {
} else {
PriorityQueue q = context.getImplementationClass(cg);
try {
q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
} catch (SchemaChangeException e) {
throw TopNBatch.schemaChangeException(e, "Top N", logger);
return q;
* Handle schema changes during execution.
* 1. Purge existing batches
* 2. Promote newly created container for new schema.
* 3. Recreate priority queue and reset with coerced container.
public void purgeAndResetPriorityQueue() {
final Stopwatch watch = Stopwatch.createStarted();
final VectorContainer c = priorityQueue.getHyperBatch();
final VectorContainer newContainer = new VectorContainer(oContext);
final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try {
// Purge all the existing batches to a new batch which only holds the selected records
copyToPurge(newContainer, builder);
final VectorContainer oldSchemaContainer = new VectorContainer(oContext);;
final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
priorityQueue = createNewPriorityQueue(newSchemaContainer, config.getLimit());
try {
priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
} finally {
logger.debug("Took {} us to purge and recreate queue for new schema", watch.elapsed(TimeUnit.MICROSECONDS));
public WritableBatch getWritableBatch() {
throw new UnsupportedOperationException("A sort batch is not writable.");
protected void cancelIncoming() {
* Resets TopNBatch state to process next incoming batches independent of
* already seen incoming batches.
private void resetTopNState() {
lastKnownOutcome = OK;
countSincePurge = 0;
batchCount = 0;
hasOutputRecords = false;
* Cleanup resources held by TopN Batch such as sv4, priority queue and outgoing container
private void releaseResource() {
if (sv4 != null) {
if (priorityQueue != null) {
* Returns the final IterOutcome which TopN should return for this next call. Return OK_NEW_SCHEMA with first output
* batch after a new schema is seen. This is indicated by firstBatchSchema flag. It is also true for very first
* output batch after buildSchema()phase too since in buildSchema() a dummy schema was returned downstream without
* correct SelectionVectorMode.
* In other cases when there is no schema change then either OK or EMIT is returned with output batches depending upon
* if EMIT is seen or not. In cases when EMIT is not seen then OK is always returned with an output batch. When all
* the data is returned then NONE is sent in the end.
* @return - IterOutcome - outcome to send downstream
private IterOutcome getFinalOutcome() {
IterOutcome outcomeToReturn;
if (firstBatchForSchema) {
outcomeToReturn = OK_NEW_SCHEMA;
firstBatchForSchema = false;
} else if (recordCount == 0) {
// get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
} else if (lastKnownOutcome == EMIT) {
// in case of EMIT check if this output batch returns all the data or not. If yes then return EMIT along with this
// output batch else return OK. Remaining data will be sent downstream in subsequent next() call.
final boolean hasMoreRecords = sv4.hasNext();
outcomeToReturn = (hasMoreRecords) ? OK : EMIT;
hasOutputRecords = hasMoreRecords;
} else {
outcomeToReturn = OK;
return outcomeToReturn;
* Copies all the selected records into the new container to purge all the incoming batches into a single batch.
* @param newContainer - New container holding the ValueVectors with selected records
* @param batchBuilder - Builder to build hyper vectors batches
* @throws SchemaChangeException
private void copyToPurge(VectorContainer newContainer, SortRecordBatchBuilder batchBuilder) {
final VectorContainer c = priorityQueue.getHyperBatch();
final SelectionVector4 queueSv4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
do {
// count is the limit number of records required by TopN batch
final int count = queueSv4.getCount();
// Transfers count number of records from hyperBatch to simple container
final int copiedRecords = copier.copyRecords(0, count);
assert copiedRecords == count;
// Store all the batches containing limit number of records
} while (;
// Release the memory stored for the priority queue heap to store indexes
// Release the memory from HyperBatch container
* Prepares an output container with batches from Priority Queue for each record boundary. In case when this is the
* first batch for the known schema (indicated by true value of firstBatchForSchema) the output container is cleared
* and recreated with new HyperVectorWrapper objects and ValueVectors from PriorityQueue. In cases when the schema
* has not changed then it prepares the container keeping the VectorWrapper and SV4 references as is since that is
* what is needed by downstream operator.
private void prepareOutputContainer(VectorContainer dataContainer, SelectionVector4 dataSv4) {
hasOutputRecords = true;
// Check if this is the first output batch for the new known schema. If yes then prepare the output container
// with the proper vectors, otherwise re-use the previous vectors.
if (firstBatchForSchema) {
for (VectorWrapper<?> w : dataContainer) {
sv4 = dataSv4;
} else {
// Schema didn't changed so we should keep the reference of HyperVectorWrapper in outgoing container intact and
// populate the HyperVectorWrapper with new list of vectors. Here the assumption is order of ValueVectors is same
// across multiple record boundary unless a new schema is observed
int index = 0;
for (VectorWrapper<?> w : dataContainer) {
HyperVectorWrapper<?> wrapper = (HyperVectorWrapper<?>) container.getValueVector(index++);
// Since the reference of SV4 is held by downstream operator and there is no schema change, so just copy the
// underlying buffer from priority queue sv4.
recordCount = sv4.getCount();
* Method handles returning correct outcome and setting recordCount for output container when next() is called
* multiple time for single record boundary. It handles cases when some output was already returned at current record
* boundary but Either there is more left to return OR proper outcome with empty batch is left to return.
* Example: For first EMIT record boundary if all the records were returned in previous call with OK_NEW_SCHEMA
* outcome, then this method will handle returning empty output batch with EMIT outcome in subsequent next() call.
* @return - Outcome to return downstream
private IterOutcome handleRemainingOutput() {
// if priority queue is not null that means the incoming batches were non-empty. And if there are more records
// to send downstream for this record boundary
if (priorityQueue != null && {
recordCount = sv4.getCount();
} else { // This means that either:
// 1) Priority Queue was not null and all records have been sent downstream for this record boundary
// 2) Or Priority Queue is null, since all the incoming batches were empty for current record boundary (or EMIT
// outcome). In the previous call we must have returned OK_NEW_SCHEMA along with SV4 container, so it will
// return EMIT outcome now
recordCount = 0;
return getFinalOutcome();
* Method to handle preparing output container and returning proper outcome to downstream when either NONE or only
* empty batches have been seen but with EMIT outcome. In either of the case PriorityQueue is not created yet since no
* actual records have been received so far.
* @param incomingOutcome - outcome received from upstream. Either NONE or EMIT
* @return - outcome to return downstream. NONE when incomingOutcome is NONE. OK_NEW_SCHEMA/EMIT when incomingOutcome
* is EMIT and is first/non-first empty input batch respectively.
private IterOutcome handleEmptyBatches(IterOutcome incomingOutcome) {
IterOutcome outcomeToReturn = incomingOutcome;
// In case of NONE it will change state to DONE and return NONE whereas in case of
// EMIT it has to still continue working for future records.
if (incomingOutcome == NONE) { // this means we saw NONE
state = BatchState.DONE;
recordCount = 0;
} else if (incomingOutcome == EMIT) {
// since priority queue is null that means it has not seen any batch with data
assert (countSincePurge == 0 && batchCount == 0);
final VectorContainer hyperContainer = new ExpandableHyperContainer(incoming.getContainer());
prepareOutputContainer(hyperContainer, sv4);
// update the outcome to return
outcomeToReturn = getFinalOutcome();
return outcomeToReturn;
public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
private final SelectionVector4 sv4;
public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
super(container, context);
this.sv4 = sv4;
public int getRecordCount() {
if (sv4 != null) {
return sv4.getCount();
} else {
return super.getRecordCount();
public SelectionVector4 getSelectionVector4() {
return sv4;
public void dump() {
logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " +
"batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount);