blob: 5c845573ad62cd101c4902b788b9148feb5b635c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.common;
import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.IntArrayList;
/**
* <h2>Overview</h2>
* <p>
* Created to represent an active partition for the Hash-Join operator
* (active means: currently receiving data, or its data is being probed; as opposed to fully
* spilled partitions).
* After all the build/inner data is read for this partition - if all its data is in memory, then
* a hash table and a helper are created, and later this data would be probed.
* If all this partition's build/inner data was spilled, then it begins to work as an outer
* partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
* currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
* </p>
*/
public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
static final Logger logger = LoggerFactory.getLogger(HashPartition.class);
public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
private int partitionNum = -1; // the current number of this partition, as used by the operator
private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
public static final MajorType HVtype = MajorType.newBuilder()
.setMinorType(MinorType.INT /* dataType */)
.setMode(DataMode.REQUIRED /* mode */)
.build();
// The vector containers storing all the inner rows
// * Records are retrieved from these containers when there is a matching record
// * on the probe side
private ArrayList<VectorContainer> containers;
// While build data is incoming - temporarily keep the list of in-memory
// incoming batches, per each partition (these may be spilled at some point)
private final List<VectorContainer> tmpBatchesList;
// A batch and HV vector to hold incoming rows - per each partition
private VectorContainer currentBatch; // The current (newest) batch
private IntVector currHVVector; // The HV vectors for the currentBatches
/* Helper class
* Maintains linked list of build side records with the same key
* Keeps information about which build records have a corresponding
* matching key in the probe side (for outer, right joins)
*/
private HashJoinHelper hjHelper;
// Underlying hashtable used by the hash join
private HashTable hashTable;
private VectorSerializer.Writer writer; // a vector writer for each spilled partition
private int partitionBatchesCount; // count number of batches spilled
private String spillFile;
private final BufferAllocator allocator;
private int recordsPerBatch;
private final SpillSet spillSet;
private boolean isSpilled; // is this partition spilled ?
private boolean processingOuter; // is (inner done spilling and) now the outer is processed?
private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory
private final RecordBatch buildBatch;
private final RecordBatch probeBatch;
private final int cycleNum;
private final int numPartitions;
private final List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
private long partitionInMemorySize;
private long numInMemoryRecords;
private boolean updatedRecordsPerBatch;
private final boolean semiJoin;
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
this.allocator = allocator;
this.buildBatch = buildBatch;
this.probeBatch = probeBatch;
this.recordsPerBatch = recordsPerBatch;
this.spillSet = spillSet;
this.partitionNum = partNum;
this.cycleNum = cycleNum;
this.numPartitions = numPartitions;
this.semiJoin = semiJoin;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
.build(logger);
} catch (IOException e) {
throw UserException.resourceError(e)
.message("IO Error while creating a hash table.")
.build(logger);
} catch (SchemaChangeException sce) {
throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
}
this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
if (numPartitions > 1) {
allocateNewCurrentBatchAndHV();
}
}
/**
* Configure a different temporary batch size when spilling probe batches.
* @param newRecordsPerBatch The new temporary batch size to use.
*/
public void updateProbeRecordsPerBatch(int newRecordsPerBatch) {
Preconditions.checkArgument(newRecordsPerBatch > 0);
Preconditions.checkState(!updatedRecordsPerBatch); // Only allow updating once
Preconditions.checkState(processingOuter); // We can only update the records per batch when probing.
recordsPerBatch = newRecordsPerBatch;
}
/**
* Allocate a new vector container for either right or left record batch
* Add an additional special vector for the hash values
* Note: this call may OOM !!
* @param rb - either the right or the left record batch
* @return the new vector container
*/
private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
VectorContainer newVC = new VectorContainer();
VectorContainer fromVC = rb.getContainer();
Iterator<VectorWrapper<?>> vci = fromVC.iterator();
boolean success = false;
try {
while (vci.hasNext()) {
VectorWrapper<?> vw = vci.next();
// If processing a spilled container, skip the last column (HV)
if (cycleNum > 0 && ! vci.hasNext()) { break; }
ValueVector vv = vw.getValueVector();
ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator);
newVC.add(newVV); // add first to allow dealloc in case of an OOM
if (newVV instanceof FixedWidthVector) {
((FixedWidthVector) newVV).allocateNew(recordsPerBatch);
} else if (newVV instanceof VariableWidthVector) {
((VariableWidthVector) newVV).allocateNew(maxColumnWidth * recordsPerBatch, recordsPerBatch);
} else if (newVV instanceof ObjectVector) {
((ObjectVector) newVV).allocateNew(recordsPerBatch);
} else {
newVV.allocateNew();
}
}
newVC.setRecordCount(0);
success = true;
} finally {
if (!success) {
newVC.clear(); // in case of an OOM
}
}
return newVC;
}
/**
* Allocate a new current Vector Container and current HV vector
*/
public void allocateNewCurrentBatchAndHV() {
if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole in memory
currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
currHVVector.allocateNew(recordsPerBatch);
}
/**
* Spills if needed
*/
public void appendInnerRow(VectorContainer buildContainer, int ind,
int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
int pos = currentBatch.appendRow(buildContainer, ind);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
if (pos == recordsPerBatch) {
boolean needsSpill = isSpilled || calc.shouldSpill();
completeAnInnerBatch(true, needsSpill);
}
}
/**
* Outer always spills when batch is full
*/
public void appendOuterRow(int hashCode, int recordsProcessed) {
int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
if (pos == recordsPerBatch) {
completeAnOuterBatch(true);
}
}
public void completeAnOuterBatch(boolean toInitialize) {
completeABatch(toInitialize, true);
}
public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
completeABatch(toInitialize, needsSpill);
}
/**
* A current batch is full (or no more rows incoming) - complete processing this batch
* I.e., add it to its partition's tmp list, if needed - spill that list, and if needed -
* (that is, more rows are coming) - initialize with a new current batch for that partition
* */
private void completeABatch(boolean toInitialize, boolean needsSpill) {
if (currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
currentBatch.add(currHVVector);
currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
tmpBatchesList.add(currentBatch);
partitionBatchesCount++;
long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
partitionInMemorySize += batchSize;
numInMemoryRecords += currentBatch.getRecordCount();
} else {
freeCurrentBatchAndHVVector();
}
if (needsSpill) { // spill this batch/partition and free its memory
spillThisPartition();
}
if (toInitialize) { // allocate a new batch and HV vector
allocateNewCurrentBatchAndHV();
} else {
currentBatch = null;
currHVVector = null;
}
}
/**
* Append the incoming batch (actually only the vectors of that batch) into the tmp list
*/
public void appendBatch(VectorAccessible batch) {
assert numPartitions == 1;
int recordCount = batch.getRecordCount();
currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
currHVVector.allocateNew(recordCount /* recordsPerBatch */);
try {
// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < recordCount; ind++) {
int hashCode = getBuildHashCode(ind);
currHVVector.getMutator().set(ind, hashCode); // store the hash value in the new HV column
}
} catch(SchemaChangeException sce) {}
VectorContainer container = new VectorContainer();
List<ValueVector> vectors = Lists.newArrayList();
for (VectorWrapper<?> v : batch) {
TransferPair tp = v.getValueVector().getTransferPair(allocator);
tp.transfer();
vectors.add(tp.getTo());
}
container.addCollection(vectors);
container.add(currHVVector); // the HV vector is added as an extra "column"
container.setRecordCount(recordCount);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
tmpBatchesList.add(container);
partitionBatchesCount++;
currHVVector = null;
numInMemoryRecords += recordCount;
}
public void spillThisPartition() {
if (tmpBatchesList.size() == 0) { return; } // in case empty - nothing to spill
logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
// If this is the first spill for this partition, create an output stream
if (writer == null) {
final String side = processingOuter ? "outer" : "inner";
final String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
spillFile = spillSet.getNextSpillFile(suffix);
try {
writer = spillSet.writer(spillFile);
} catch (IOException ioe) {
throw UserException.resourceError(ioe)
.message("Hash Join failed to open spill file: " + spillFile)
.build(logger);
}
isSpilled = true;
}
partitionInMemorySize = 0L;
numInMemoryRecords = 0L;
inMemoryBatchStats.clear();
while (tmpBatchesList.size() > 0) {
VectorContainer vc = tmpBatchesList.remove(0);
int numRecords = vc.getRecordCount();
// set the value count for outgoing batch value vectors
vc.setValueCount(numRecords);
WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
try {
writer.write(wBatch, null);
} catch (IOException ioe) {
throw UserException.dataWriteError(ioe)
.message("Hash Join failed to write to output file: " + spillFile)
.build(logger);
} finally {
wBatch.clear();
}
vc.zeroVectors();
logger.trace("HASH JOIN: Took {} us to spill {} records", writer.time(TimeUnit.MICROSECONDS), numRecords);
}
}
//
// ===== Methods to probe the hash table and to get indices out of the helper =======
//
public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
return hashTable.probeForKey(recordsProcessed, hashCode);
}
public int getRecordNumForKey(int currentIndex) {
return hashTable.getRecordNumForKey(currentIndex);
}
public void setRecordNumForKey(int currentIndex, int num) {
hashTable.setRecordNumForKey(currentIndex, num);
}
public void decreaseRecordNumForKey(int currentIndex) {
hashTable.decreaseRecordNumForKey(currentIndex);
}
public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
/* The current probe record has a key that matches. Get the index
* of the first row in the build side that matches the current key
*/
int compositeIndex = hjHelper.getStartIndex(probeIndex);
/* Record in the build side at currentCompositeIdx has a matching record in the probe
* side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
* join we keep track of which records we need to project at the end
*/
boolean matchExists = hjHelper.setRecordMatched(compositeIndex);
return Pair.of(compositeIndex, matchExists);
}
public int getNextIndex(int compositeIndex) {
// in case of inner rows with duplicate keys, get the next one
return hjHelper.getNextIndex(compositeIndex);
}
public boolean setRecordMatched(int compositeIndex) {
return hjHelper.setRecordMatched(compositeIndex);
}
public IntArrayList getNextUnmatchedIndex() {
return hjHelper.getNextUnmatchedIndex();
}
//
// =====================================================================================
//
public int getBuildHashCode(int ind) throws SchemaChangeException {
return hashTable.getBuildHashCode(ind);
}
public int getProbeHashCode(int ind) throws SchemaChangeException {
return hashTable.getProbeHashCode(ind);
}
public ArrayList<VectorContainer> getContainers() {
return containers;
}
public void updateBatches() throws SchemaChangeException {
hashTable.updateBatches();
}
public Pair<VectorContainer, Integer> nextBatch() {
return hashTable.nextBatch();
}
@Override
public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches() {
return inMemoryBatchStats;
}
@Override
public int getNumInMemoryBatches() {
return inMemoryBatchStats.size();
}
@Override
public boolean isSpilled() {
return isSpilled;
}
@Override
public long getNumInMemoryRecords() {
return numInMemoryRecords;
}
@Override
public long getInMemorySize() {
return partitionInMemorySize;
}
public String getSpillFile() {
return spillFile;
}
public int getPartitionBatchesCount() {
return partitionBatchesCount;
}
public int getPartitionNum() {
return partitionNum;
}
/**
* Close the writer without deleting the spill file
*/
public void closeWriter() { // no deletion !!
closeWriterInternal(false);
processingOuter = true; // After the spill file was closed
}
/**
* If exists - close the writer for this partition
*
* @param doDeleteFile Also delete the associated file
*/
private void closeWriterInternal(boolean doDeleteFile) {
try {
if (writer != null) {
spillSet.close(writer);
}
if (doDeleteFile && spillFile != null) {
spillSet.delete(spillFile);
}
} catch (IOException ioe) {
throw UserException.resourceError(ioe)
.message("IO Error while closing %s spill file %s",
doDeleteFile ? "and deleting" : "",
spillFile)
.build(logger);
}
spillFile = null;
writer = null;
partitionBatchesCount = 0;
}
/**
* Creates the hash table and join helper for this partition.
* This method should only be called after all the build side records
* have been consumed.
*/
public void buildContainersHashTableAndHelper() throws SchemaChangeException {
if (isSpilled) { return; } // no building for spilled partitions
containers = new ArrayList<>();
hashTable.updateInitialCapacity((int) getNumInMemoryRecords());
for (int curr = 0; curr < partitionBatchesCount; curr++) {
VectorContainer nextBatch = tmpBatchesList.get(curr);
final int currentRecordCount = nextBatch.getRecordCount();
// For every incoming build batch, we create a matching helper batch
if (! semiJoin) { hjHelper.addNewBatch(currentRecordCount); }
// Holder contains the global index where the key is hashed into using the hash table
final IndexPointer htIndex = new IndexPointer();
assert nextBatch != null;
assert probeBatch != null;
hashTable.updateIncoming(nextBatch, probeBatch);
IntVector HV_vector = (IntVector) nextBatch.getLast();
for (int recInd = 0; recInd < currentRecordCount; recInd++) {
int hashCode = HV_vector.getAccessor().get(recInd);
try {
hashTable.put(recInd, htIndex, hashCode, BATCH_SIZE);
} catch (RetryAfterSpillException RE) {
throw new OutOfMemoryException("HT put");
} // Hash Join does not retry
/* Use the global index returned by the hash table, to store
* the current record index and batch index. This will be used
* later when we probe and find a match.
*/
if (! semiJoin) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
}
containers.add(nextBatch);
}
outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
}
public void getStats(HashTableStats newStats) {
hashTable.getStats(newStats);
}
/**
* Frees memory allocated to the {@link HashTable} and {@link HashJoinHelper}.
*/
private void clearHashTableAndHelper() {
if (hashTable != null) {
hashTable.clear();
hashTable = null;
}
if (hjHelper != null) {
hjHelper.clear();
hjHelper = null;
}
}
private void freeCurrentBatchAndHVVector() {
if (currentBatch != null) {
currentBatch.clear();
currentBatch = null;
}
if (currHVVector != null) {
currHVVector.clear();
currHVVector = null;
}
}
/**
* Free all in-memory allocated structures.
* @param deleteFile - whether to delete the spill file or not
*/
public void cleanup(boolean deleteFile) {
freeCurrentBatchAndHVVector();
if (containers != null && !containers.isEmpty()) {
for (VectorContainer vc : containers) {
vc.clear();
}
}
while (tmpBatchesList.size() > 0) {
VectorContainer vc = tmpBatchesList.remove(0);
vc.clear();
}
closeWriterInternal(deleteFile);
clearHashTableAndHelper();
if (containers != null) {
containers.clear();
}
}
public void close() {
cleanup(true);
}
/**
* Creates a debugging string containing information about memory usage.
* @return A debugging string.
*/
public String makeDebugString() {
return String.format("[hashTable = %s]",
hashTable == null ? "None": hashTable.makeDebugString());
}
}