blob: dfdc11f648417aca237d6d18b1cd412d702cad35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorInitializer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
/**
* Manages a {@link PriorityQueueCopier} instance produced from code generation.
* Provides a wrapper around a copier "session" to simplify reading batches
* from the copier.
*/
public class PriorityQueueCopierWrapper extends BaseSortWrapper {
private static final Logger logger = LoggerFactory.getLogger(PriorityQueueCopierWrapper.class);
private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
/**
* A single PriorityQueueCopier instance is used for 2 purposes:
* 1. Merge sorted batches before spilling
* 2. Merge sorted batches when all incoming data fits in memory
*/
private PriorityQueueCopier copier;
public PriorityQueueCopierWrapper(OperatorContext opContext) {
super(opContext);
}
public PriorityQueueCopier getCopier(VectorAccessible batch) {
if (copier == null) {
copier = newCopier(batch);
}
return copier;
}
private PriorityQueueCopier newCopier(VectorAccessible batch) {
// Generate the copier code and obtain the resulting class
CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFragmentContext().getOptions());
ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
cg.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
// cg.saveCodeForDebugging(true);
generateComparisons(g, batch, logger);
g.setMappingSet(COPIER_MAPPING_SET);
CopyUtil.generateCopies(g, batch, true);
g.setMappingSet(MAIN_MAPPING);
return getInstance(cg, logger);
}
/**
* Start a merge operation using the specified vector container. Used for
* the final merge operation.
*
* @param schema
* @param batchGroupList
* @param outputContainer
* @param targetRecordCount
* @param allocHelper
* @return
*/
public BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList,
VectorContainer outputContainer, int targetRecordCount, VectorInitializer allocHelper) {
return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount, allocHelper);
}
/**
* Prepare a copier which will write a collection of vectors to disk. The copier
* uses generated code to do the actual writes. If the copier has not yet been
* created, generate code and create it. If it has been created, close it and
* prepare it for a new collection of batches.
*
* @param batch the (hyper) batch of vectors to be copied
* @param batchGroupList same batches as above, but represented as a list
* of individual batches
* @param outputContainer the container into which to copy the batches
*/
@SuppressWarnings("unchecked")
private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) {
copier = getCopier(batch);
// Initialize the value vectors for the output container
for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
outputContainer.add(v);
}
try {
copier.setup(context.getAllocator(), batch, (List<BatchGroup>) batchGroupList, outputContainer);
} catch (SchemaChangeException e) {
throw UserException.unsupportedError(e)
.message("Unexpected schema change - likely code error.")
.build(logger);
}
logger.debug("Copier setup complete");
}
public BufferAllocator getAllocator() { return context.getAllocator(); }
public void close() {
if (copier == null) {
return; }
try {
copier.close();
copier = null;
} catch (IOException e) {
throw UserException.dataWriteError(e)
.message("Failure while flushing spilled data")
.build(logger);
}
}
/**
* We've gathered a set of batches, each of which has been sorted. The batches
* may have passed through a filter and thus may have "holes" where rows have
* been filtered out. We will spill records in blocks of targetRecordCount.
* To prepare, copy that many records into an outputContainer as a set of
* contiguous values in new vectors. The result is a single batch with
* vectors that combine a collection of input batches up to the
* given threshold.
* <p>
* Input. Here the top line is a selection vector of indexes.
* The second line is a set of batch groups (separated by underscores)
* with letters indicating individual records:<pre>
* [3 7 4 8 0 6 1] [5 3 6 8 2 0]
* [eh_ad_ibf] [r_qm_kn_p]</pre>
* <p>
* Output, assuming blocks of 5 records. The brackets represent
* batches, the line represents the set of batches copied to the
* spill file.<pre>
* [abcde] [fhikm] [npqr]</pre>
* <p>
* The copying operation does a merge as well: copying
* values from the sources in ordered fashion. Consider a different example,
* we want to merge two input batches to produce a single output batch:
* <pre>
* Input: [aceg] [bdfh]
* Output: [abcdefgh]</pre>
* <p>
* In the above, the input consists of two sorted batches. (In reality,
* the input batches have an associated selection vector, but that is omitted
* here and just the sorted values shown.) The output is a single batch
* with the merged records (indicated by letters) from the two input batches.
* <p>
* Here we bind the copier to the batchGroupList of sorted, buffered batches
* to be merged. We bind the copier output to outputContainer: the copier will write its
* merged "batches" of records to that container.
* <p>
* Calls to the {@link #next()} method sequentially return merged batches
* of the desired row count.
*/
public static class BatchMerger implements SortResults, AutoCloseable {
private final PriorityQueueCopierWrapper holder;
private final VectorContainer hyperBatch;
private final VectorContainer outputContainer;
private final VectorInitializer allocHelper;
private final int targetRecordCount;
private int batchCount;
private long estBatchSize;
/**
* Creates a merger with an temporary output container.
*
* @param holder the copier that does the work
* @param schema schema for the input and output batches
* @param batchGroupList the input batches
* @param targetRecordCount number of records for each output batch
*/
private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
int targetRecordCount, VectorInitializer allocHelper) {
this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount, allocHelper);
}
/**
* Creates a merger with the specified output container
*
* @param holder the copier that does the work
* @param schema schema for the input and output batches
* @param batchGroupList the input batches
* @param outputContainer merges output batch into the given output container
* @param targetRecordCount number of records for each output batch
* @param allocHelper
*/
private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
VectorContainer outputContainer, int targetRecordCount, VectorInitializer allocHelper) {
this.holder = holder;
this.allocHelper = allocHelper;
hyperBatch = constructHyperBatch(schema, batchGroupList);
this.targetRecordCount = targetRecordCount;
this.outputContainer = outputContainer;
holder.createCopier(hyperBatch, batchGroupList, outputContainer);
}
/**
* Read the next merged batch. The batch holds the specified row count, but
* may be less if this is the last batch.
*
* @return the number of rows in the batch, or 0 if no more batches
* are available
*/
@Override
public boolean next() {
long start = holder.getAllocator().getAllocatedMemory();
// Allocate an outgoing container the "dumb" way (based on static sizes)
// for testing, or the "smart" way (based on actual observed data sizes)
// for production code.
if (allocHelper == null) {
VectorAccessibleUtilities.allocateVectors(outputContainer, targetRecordCount);
} else {
allocHelper.allocateBatch(outputContainer, targetRecordCount);
}
logger.trace("Initial output batch allocation: {} bytes, {} records",
holder.getAllocator().getAllocatedMemory() - start,
targetRecordCount);
Stopwatch w = Stopwatch.createStarted();
int count = holder.copier.next(targetRecordCount);
if (count > 0) {
long t = w.elapsed(TimeUnit.MICROSECONDS);
batchCount++;
long size = holder.getAllocator().getAllocatedMemory() - start;
logger.trace("Took {} us to merge {} records, consuming {} bytes of memory",
t, count, size);
estBatchSize = Math.max(estBatchSize, size);
} else {
logger.trace("copier returned 0 records");
}
// Initialize output container metadata.
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
outputContainer.setRecordCount(count);
return count > 0;
}
/**
* Construct a vector container that holds a list of batches, each represented as an
* array of vectors. The entire collection of vectors has a common schema.
* <p>
* To build the collection, we go through the current schema (which has been
* devised to be common for all batches.) For each field in the schema, we create
* an array of vectors. To create the elements, we iterate over all the incoming
* batches and search for the vector that matches the current column.
* <p>
* Finally, we build a new schema for the combined container. That new schema must,
* because of the way the container was created, match the current schema.
*
* @param schema schema for the hyper batch
* @param batchGroupList list of batches to combine
* @return a container where each column is represented as an array of vectors
* (hence the "hyper" in the method name)
*/
private VectorContainer constructHyperBatch(BatchSchema schema, List<? extends BatchGroup> batchGroupList) {
VectorContainer cont = new VectorContainer();
for (MaterializedField field : schema) {
ValueVector[] vectors = new ValueVector[batchGroupList.size()];
int i = 0;
for (BatchGroup group : batchGroupList) {
vectors[i++] = group.getValueAccessorById(
field.getValueClass(),
group.getValueVectorId(SchemaPath.getSimplePath(field.getName())).getFieldIds())
.getValueVector();
}
cont.add(vectors);
}
cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
return cont;
}
@Override
public void close() {
hyperBatch.clear();
holder.close();
}
@Override
public int getRecordCount() { return outputContainer.getRecordCount(); }
@Override
public int getBatchCount() { return batchCount; }
/**
* Gets the estimated batch size, in bytes. Use for estimating the memory
* needed to process the batches that this operator created.
* @return the size of the largest batch created by this operation,
* in bytes
*/
public long getEstBatchSize() { return estBatchSize; }
@Override
public SelectionVector4 getSv4() { return null; }
@Override
public void updateOutputContainer(VectorContainer container, SelectionVector4 sv4,
RecordBatch.IterOutcome outcome, BatchSchema schema) {
if (outcome == EMIT) {
throw new UnsupportedOperationException("It looks like Sort is hitting memory pressure and forced to spill " +
"for cases with EMIT outcome. This Sort is most likely used within the subquery between Lateral and Unnest " +
"in which case spilling is unexpected.");
}
VectorContainer dataContainer = getContainer();
// First output batch of current schema, populate container with ValueVectors
if (container.getNumberOfColumns() == 0) {
for (VectorWrapper<?> vw : dataContainer) {
container.add(vw.getValueVector());
}
// In future when we want to support spilling with EMIT outcome then we have to create SV4 container all the
// time. But that will have effect of copying data again by SelectionVectorRemover from SV4 to SV_None. Other
// than that we have to send OK_NEW_SCHEMA each time. There can be other operators like StreamAgg in downstream
// as well, so we cannot have special handling in SVRemover for EMIT phase.
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
} else { // preserve ValueVectors references for subsequent output batches
container.transferIn(dataContainer);
}
// Set the record count on output container
container.setRecordCount(getRecordCount());
}
@Override
public SelectionVector2 getSv2() { return null; }
@Override
public VectorContainer getContainer() { return outputContainer; }
}
}