blob: 23b5daa6c0164a382e44d4096d2fb53ece6747af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import com.google.common.collect.Lists;
/**
* Represents the set of in-memory batches accumulated by
* the external sort.
*/
public class BufferedBatches {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferedBatches.class);
/**
* Incoming batches buffered in memory prior to spilling
* or an in-memory merge.
*/
private final LinkedList<InputBatch> bufferedBatches = Lists.newLinkedList();
private final SorterWrapper sorterWrapper;
private BatchSchema schema;
private final OperatorContext context;
public BufferedBatches(OperatorContext opContext) {
context = opContext;
sorterWrapper = new SorterWrapper(opContext);
}
public void setSchema(BatchSchema schema) {
this.schema = schema;
// New schema: must generate a new sorter and copier.
sorterWrapper.close();
// Coerce all existing batches to the new schema.
for (BatchGroup b : bufferedBatches) {
b.setSchema(schema);
}
}
public int size() { return bufferedBatches.size(); }
public void add(VectorAccessible incoming, long batchSize) {
// Convert the incoming batch to the agreed-upon schema.
// No converted batch means we got an empty input batch.
// Converting the batch transfers memory ownership to our
// allocator. This gives a round-about way to learn the batch
// size: check the before and after memory levels, then use
// the difference as the batch size, in bytes.
VectorContainer convertedBatch = convertBatch(incoming);
if (convertedBatch == null) {
return;
}
SelectionVector2 sv2;
try {
sv2 = makeSelectionVector(incoming);
} catch (Exception e) {
convertedBatch.clear();
throw e;
}
// Sort the incoming batch using either the original selection vector,
// or a new one created here.
sorterWrapper.sortBatch(convertedBatch, sv2);
bufferBatch(convertedBatch, sv2, batchSize);
}
/**
* Convert an incoming batch into the agree-upon format.
* @param incoming
*
* @return the converted batch, or null if the incoming batch is empty
*/
private VectorContainer convertBatch(VectorAccessible incoming) {
// Must accept the batch even if no records. Then clear
// the vectors to release memory since we won't do any
// further processing with the empty batch.
VectorContainer convertedBatch = SchemaUtil.coerceContainer(incoming, schema, context.getAllocator());
if (incoming.getRecordCount() == 0) {
for (VectorWrapper<?> w : convertedBatch) {
w.clear();
}
SelectionVector2 sv2 = incoming.getSelectionVector2();
if (sv2 != null) {
sv2.clear();
}
return null;
}
return convertedBatch;
}
private SelectionVector2 makeSelectionVector(VectorAccessible incoming) {
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
return incoming.getSelectionVector2().clone();
} else {
return newSV2(incoming);
}
}
/**
* Allocate and initialize the selection vector used as the sort index.
* Assumes that memory is available for the vector since memory management
* ensured space is available.
*
* @return a new, populated selection vector 2
*/
private SelectionVector2 newSV2(VectorAccessible incoming) {
SelectionVector2 sv2 = new SelectionVector2(context.getAllocator());
if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
throw UserException.resourceError(new OutOfMemoryException("Unable to allocate sv2 buffer"))
.build(logger);
}
for (int i = 0; i < incoming.getRecordCount(); i++) {
sv2.setIndex(i, (char) i);
}
sv2.setRecordCount(incoming.getRecordCount());
return sv2;
}
private void bufferBatch(VectorContainer convertedBatch, SelectionVector2 sv2, long netSize) {
BufferAllocator allocator = context.getAllocator();
RecordBatchData rbd = new RecordBatchData(convertedBatch, allocator);
try {
rbd.setSv2(sv2);
bufferedBatches.add(new InputBatch(rbd.getContainer(), rbd.getSv2(), allocator, netSize));
} catch (Throwable t) {
rbd.clear();
throw t;
}
}
public List<BatchGroup> prepareSpill(long targetSpillSize) {
// Determine the number of batches to spill to create a spill file
// of the desired size. The actual file size might be a bit larger
// or smaller than the target, which is expected.
int spillCount = 0;
long spillSize = 0;
for (InputBatch batch : bufferedBatches) {
long batchSize = batch.getDataSize();
spillSize += batchSize;
spillCount++;
if (spillSize + batchSize / 2 > targetSpillSize) {
break; }
}
// Must always spill at least 2, even if this creates an over-size
// spill file. But, if this is a final consolidation, we may have only
// a single batch.
spillCount = Math.max(spillCount, 2);
spillCount = Math.min(spillCount, bufferedBatches.size());
return SpilledRuns.prepareSpillBatches(bufferedBatches, spillCount);
}
public List<InputBatch> removeAll() {
List<InputBatch> batches = new ArrayList<>( );
batches.addAll(bufferedBatches);
bufferedBatches.clear();
return batches;
}
public void close() {
// Use the spilled runs version. In-memory batches won't throw
// an error, but the API is generic.
RuntimeException ex = null;
try {
BatchGroup.closeAll(bufferedBatches);
bufferedBatches.clear();
} catch (RuntimeException e) {
ex = e;
}
try {
sorterWrapper.close();
} catch (RuntimeException e) {
ex = (ex == null) ? e : ex;
}
if (ex != null) {
throw ex;
}
}
}