blob: c32cdbfa28af829750cf174395f602e0ddc23531 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.orderedpartitioner;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.cache.CachedVectorContainer;
import org.apache.drill.exec.cache.Counter;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
import org.apache.drill.exec.cache.DistributedCache.SerializationMode;
import org.apache.drill.exec.cache.DistributedMap;
import org.apache.drill.exec.cache.DistributedMultiMap;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.impl.sort.SortBatch;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.sort.Sorter;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
/**
* Generates an ordered partition, rather than a random hash partition. This
* could be used to do a total order sort, for example. This operator reads in a
* few incoming record batches, samples these batches, and stores them in the
* distributed cache. The samples from all the parallel-running fragments are
* merged, and a partition-table is built and stored in the distributed cache
* for use by all fragments. A new column is added to the outgoing batch, whose
* value is determined by where each record falls in the partition table. This
* column is used by PartitionSenderRootExec to determine which bucket to assign
* each record to.
*/
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig
.newBuilder(CachedVectorContainer.class)
.name("SINGLE-" + CachedVectorContainer.class.getSimpleName())
.mode(SerializationMode.DRILL_SERIALIZIABLE)
.build();
public static final CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = CacheConfig
.newBuilder(CachedVectorContainer.class)
.name("MULTI-" + CachedVectorContainer.class.getSimpleName())
.mode(SerializationMode.DRILL_SERIALIZIABLE)
.build();
private final MappingSet mainMapping = new MappingSet( (String) null, null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final MappingSet partitionMapping = new MappingSet("partitionIndex",
null, "partitionVectors", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final int recordsToSample; // How many records must be received before analyzing
private final int samplingFactor; // Will collect samplingFactor * number of partitions to send to distributed cache
private final float completionFactor; // What fraction of fragments must be completed before attempting to build
// partition table
protected final RecordBatch incoming;
private boolean first = true;
private OrderedPartitionProjector projector;
private final VectorContainer partitionVectors = new VectorContainer();
private final int partitions;
private Queue<VectorContainer> batchQueue;
private int recordsSampled;
private final int sendingMajorFragmentWidth;
private boolean startedUnsampledBatches;
private boolean upstreamNone;
private int recordCount;
private final IntVector partitionKeyVector;
private final DistributedMap<String, CachedVectorContainer> tableMap;
private final Counter minorFragmentSampleCount;
private final DistributedMultiMap<String, CachedVectorContainer> mmap;
private final String mapKey;
private List<VectorContainer> sampledIncomingBatches;
public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context);
this.incoming = incoming;
this.partitions = pop.getDestinations().size();
this.sendingMajorFragmentWidth = pop.getSendingWidth();
this.recordsToSample = pop.getRecordsToSample();
this.samplingFactor = pop.getSamplingFactor();
this.completionFactor = pop.getCompletionFactor();
DistributedCache cache = null;
// Clearly, this code is not used!
// cache can only be null here!
this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG);
this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG);
Preconditions.checkNotNull(tableMap);
this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
this.minorFragmentSampleCount = cache.getCounter(mapKey);
SchemaPath outputPath = popConfig.getRef();
MaterializedField outputField = MaterializedField.create(outputPath.getAsNamePart().getName(), Types.required(TypeProtos.MinorType.INT));
this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
}
@Override
public void close() {
super.close();
partitionVectors.clear();
partitionKeyVector.clear();
}
private boolean saveSamples() {
recordsSampled = 0;
IterOutcome upstream;
// Start collecting batches until recordsToSample records have been collected
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
WritableBatch batch = null;
CachedVectorContainer sampleToSave = null;
VectorContainer containerToCache = new VectorContainer();
try {
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
outer: while (recordsSampled < recordsToSample) {
upstream = next(incoming);
switch (upstream) {
case NONE:
case NOT_YET:
case STOP:
upstreamNone = true;
break outer;
default:
// fall through
}
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
if (upstream == IterOutcome.NONE) {
break;
}
}
VectorContainer sortedSamples = new VectorContainer();
builder.build(sortedSamples);
// Sort the records according the orderings given in the configuration
Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
SelectionVector4 sv4 = builder.getSv4();
try {
sorter.setup(context, sv4, sortedSamples);
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
}
sorter.sort(sv4, sortedSamples);
// Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
// Uses the
// the expressions from the Orderings to populate each column. There is one column for each Ordering in
// popConfig.orderings.
List<ValueVector> localAllocationVectors = Lists.newArrayList();
SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
int allocationSize = 50;
while (true) {
for (ValueVector vv : localAllocationVectors) {
AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
}
if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
break;
} else {
containerToCache.zeroVectors();
allocationSize *= 2;
}
}
containerToCache.setValueCount(copier.getOutputRecords());
// Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
// into a serializable wrapper object, and then add to distributed map
batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
mmap.put(mapKey, sampleToSave);
this.sampledIncomingBatches = builder.getHeldRecordBatches();
} finally {
builder.clear();
builder.close();
if (batch != null) {
batch.clear();
}
containerToCache.clear();
if (sampleToSave != null) {
sampleToSave.clear();
}
}
return true;
}
/**
* Wait until the at least the given timeout is expired or interrupted and the
* fragment status is not runnable.
*
* @param timeout
* Timeout in milliseconds.
* @return True if the given timeout is expired. False when interrupted and
* the fragment status is not runnable.
*/
private void waitUntilTimeOut(final long timeout) {
while(true) {
try {
Thread.sleep(timeout);
} catch (final InterruptedException e) {
throw new QueryCancelledException();
}
}
}
/**
* Called when the first batch comes in. Incoming batches are collected until
* a threshold is met. At that point, the records in the batches are sorted
* and sampled, and the sampled records are stored in the distributed cache.
* Once a sufficient fraction of the fragments have shared their samples, each
* fragment grabs all the samples, sorts all the records, builds a partition
* table, and attempts to push the partition table to the distributed cache.
* Whichever table gets pushed first becomes the table used by all fragments
* for partitioning.
*
* @return True is successful. False if failed.
*/
private boolean getPartitionVectors() {
if (!saveSamples()) {
return false;
}
CachedVectorContainer finalTable = null;
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
long fragmentsBeforeProceed =
(long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
String finalTableKey = mapKey + "final";
if (val == fragmentsBeforeProceed) { // we crossed the barrier, build table and get data.
buildTable();
finalTable = tableMap.get(finalTableKey);
} else {
// Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed
// TODO: this should be polling.
if (val < fragmentsBeforeProceed) {
waitUntilTimeOut(10);
}
for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey);
if (finalTable != null) {
break;
}
waitUntilTimeOut(10);
}
if (finalTable == null) {
buildTable();
}
finalTable = tableMap.get(finalTableKey);
}
Preconditions.checkState(finalTable != null);
// Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in
// the rest of this operator
for (VectorWrapper<?> w : finalTable.get()) {
partitionVectors.add(w.getValueVector());
}
return true;
}
private void buildTable() {
// Get all samples from distributed map
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator());
VectorContainer allSamplesContainer = new VectorContainer();
VectorContainer candidatePartitionTable = new VectorContainer();
CachedVectorContainer wrap = null;
try {
for (CachedVectorContainer w : mmap.get(mapKey)) {
containerBuilder.add(w.get());
}
containerBuilder.build(allSamplesContainer);
List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
for (Ordering od : popConfig.getOrderings()) {
SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
// sort the data incoming samples.
SelectionVector4 newSv4 = containerBuilder.getSv4();
Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
try {
sorter.setup(context, newSv4, allSamplesContainer);
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
}
sorter.sort(newSv4, allSamplesContainer);
// Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
// Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
SampleCopier copier = null;
List<ValueVector> localAllocationVectors = Lists.newArrayList();
copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
int allocationSize = 50;
while (true) {
for (ValueVector vv : localAllocationVectors) {
AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
}
int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
candidatePartitionTable.setValueCount(copier.getOutputRecords());
break;
} else {
candidatePartitionTable.zeroVectors();
allocationSize *= 2;
}
}
candidatePartitionTable.setRecordCount(copier.getOutputRecords());
WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
wrap = new CachedVectorContainer(batch, context.getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
} finally {
candidatePartitionTable.clear();
allSamplesContainer.clear();
containerBuilder.clear();
containerBuilder.close();
if (wrap != null) {
wrap.clear();
}
}
}
/**
* Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
* outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
* Ordering determines the value of each column. These records will later be sorted based on the values in each
* column, in the same order as the orderings.
*
* @param sv4
* @param incoming
* @param outgoing
* @param orderings
* @return
* @throws SchemaChangeException
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
List<Ordering> orderings, List<ValueVector> localAllocationVectors) {
ErrorCollector collector = new ErrorCollectorImpl();
ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
// Note: disabled for now. This may require some debugging:
// no tests are available for this operator.
// cg.getCodeGenerator().plainOldJavaCapable(true);
// Uncomment out this line to debug the generated code.
// cg.getCodeGenerator().saveCodeForDebugging(true);
int i = 0;
for (Ordering od : orderings) {
LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
.clearMode().setMode(TypeProtos.DataMode.REQUIRED);
TypeProtos.MajorType newType = builder.build();
MaterializedField outputField = MaterializedField.create("f" + i++, newType);
collector.reportErrors(logger);
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
localAllocationVectors.add(vector);
TypedFieldId fid = outgoing.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.rotateBlock();
cg.getEvalBlock()._return(JExpr.TRUE);
outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE);
try {
SampleCopier sampleCopier = context.getImplementationClass(cg);
sampleCopier.setupCopier(context, sv4, incoming, outgoing);
return sampleCopier;
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
}
}
@Override
protected void killIncoming(boolean sendUpstream) {
incoming.kill(sendUpstream);
}
@Override
public IterOutcome innerNext() {
recordCount = 0;
container.zeroVectors();
// if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
// done
if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) {
return IterOutcome.NONE;
}
// if there are batches on the queue, process them first, rather than calling incoming.next()
if (batchQueue != null && batchQueue.size() > 0) {
VectorContainer vc = batchQueue.poll();
recordCount = vc.getRecordCount();
// Must set up a new schema each time, because ValueVectors are not reused between containers in queue
setupNewSchema(vc);
doWork(vc);
vc.zeroVectors();
return IterOutcome.OK_NEW_SCHEMA;
}
// Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are
// more incoming
IterOutcome upstream = next(incoming);
if (this.first && upstream == IterOutcome.OK) {
throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
}
// If this is the first iteration, we need to generate the partition vectors before we can proceed
if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
getPartitionVectors();
batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches);
first = false;
// Now that we have the partition vectors, we immediately process the first batch on the queue
VectorContainer vc = batchQueue.poll();
setupNewSchema(vc);
doWork(vc);
vc.zeroVectors();
recordCount = vc.getRecordCount();
return IterOutcome.OK_NEW_SCHEMA;
}
// if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the
// first one
// we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
if (!this.startedUnsampledBatches) {
this.startedUnsampledBatches = true;
if (upstream == IterOutcome.OK) {
upstream = IterOutcome.OK_NEW_SCHEMA;
}
}
switch (upstream) {
case NONE:
case NOT_YET:
case STOP:
close();
recordCount = 0;
return upstream;
case OK_NEW_SCHEMA:
setupNewSchema(incoming);
// fall through.
case OK:
doWork(incoming);
recordCount = incoming.getRecordCount();
return upstream; // change if upstream changed, otherwise normal.
default:
throw new UnsupportedOperationException();
}
}
@Override
public int getRecordCount() {
return recordCount;
}
protected void doWork(VectorAccessible batch) {
int recordCount = batch.getRecordCount();
AllocationHelper.allocate(partitionKeyVector, recordCount, 50);
projector.projectRecords(recordCount, 0);
container.setValueCount(recordCount);
}
/**
* Sets up projection that will transfer all of the columns in batch, and also
* populate the partition column based on which partition a record falls into
* in the partition table
*
* @param batch
* @throws SchemaChangeException
*/
protected void setupNewSchema(VectorAccessible batch) {
container.clear();
ErrorCollector collector = new ErrorCollectorImpl();
List<TransferPair> transfers = Lists.newArrayList();
ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(
OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getOptions());
// Note: disabled for now. This may require some debugging:
// no tests are available for this operator.
// cg.getCodeGenerator().plainOldJavaCapable(true);
// Uncomment out this line to debug the generated code.
// cg.getCodeGenerator().saveCodeForDebugging(true);
for (VectorWrapper<?> vw : batch) {
TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator());
transfers.add(tp);
container.add(tp.getTo());
}
cg.setMappingSet(mainMapping);
int count = 0;
for (Ordering od : popConfig.getOrderings()) {
LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
collector.reportErrors(logger);
cg.setMappingSet(incomingMapping);
ClassGenerator.HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(partitionMapping);
TypedFieldId fieldId = new TypedFieldId.Builder().finalType(expr.getMajorType())
.addId(count++)
.build();
ClassGenerator.HoldingContainer right = cg.addExpr(
new ValueVectorReadExpression(fieldId), ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(mainMapping);
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh =
FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right,
context.getFunctionRegistry());
ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
} else {
jc._then()._return(out.getValue().minus());
}
}
cg.getEvalBlock()._return(JExpr.lit(0));
container.add(this.partitionKeyVector);
container.buildSchema(batch.getSchema().getSelectionVectorMode());
projector = context.getImplementationClass(cg);
try {
projector.setup(context, batch, this, transfers, partitionVectors, partitions, popConfig.getRef());
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
}
}
@Override
public void dump() {
logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " +
"recordsSampled={}, recordCount={}]",
container, popConfig, partitionVectors, partitions, recordsSampled, recordCount);
}
}