blob: 5edea2c19c1668134420ccd32fd19e2398476f9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.common.JoinControl;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
* <p>
* This implementation splits the incoming Build side rows into multiple
* Partitions, thus allowing spilling of some of these partitions to disk if
* memory gets tight. Each partition is implemented as a {@link HashPartition}.
* After the build phase is over, in the most general case, some of the
* partitions were spilled, and the others are in memory. Each of the partitions
* in memory would get a {@link HashTable} built.
* <p>
* Next the Probe side is read, and each row is key matched with a Build
* partition. If that partition is in memory, then the key is used to probe and
* perform the join, and the results are added to the outgoing batch. But if
* that build side partition was spilled, then the matching Probe size partition
* is spilled as well.
* <p>
* After all the Probe side was processed, we are left with pairs of spilled
* partitions. Then each pair is processed individually (that Build partition
* should be smaller than the original, hence likely fit whole into memory to
* allow probing; if not -- see below).
* <p>
* Processing of each spilled pair is EXACTLY like processing the original
* Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls
* itself recursively !!). Thus the spilled build partition is read and divided
* into new partitions, which in turn may spill again (and again...). The code
* tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or
* greater) is a waste, indicating that the number of partitions chosen was too
* small.
*/
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
/**
* The maximum number of records within each internal batch.
*/
private final int RECORDS_PER_BATCH; // internal batches
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
private final boolean semiJoin;
private final boolean joinIsLeftOrFull;
private final boolean joinIsRightOrFull;
private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
// Join conditions
private final List<JoinCondition> conditions;
private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
// Runtime generated class implementing HashJoinProbe interface
private HashJoinProbe hashJoinProbe = null;
private final List<NamedExpression> rightExpr;
/**
* Names of the join columns. This names are used in order to help estimate
* the size of the {@link HashTable}s.
*/
private final Set<String> buildJoinColumns;
// Fields used for partitioning
/**
* The number of {@link HashPartition}s. This is configured via a system
* option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
private int numPartitions = 1; // must be 2 to the power of bitsInMask
/**
* The master class used to generate {@link HashTable}s.
*/
private ChainedHashTable baseHashTable;
private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
private boolean canSpill = true;
private boolean wasKilled; // a kill was received, may need to clean spilled partns
/**
* This array holds the currently active {@link HashPartition}s.
*/
HashPartition partitions[];
// Number of records in the output container
private int outputRecords;
// Schema of the build side
private BatchSchema buildSchema;
// Schema of the probe side
private BatchSchema probeSchema;
// Whether this HashJoin is used for a row-key based join
private final boolean isRowKeyJoin;
private final JoinControl joinControl;
// An iterator over the build side hash table (only applicable for row-key joins)
private boolean buildComplete;
// indicates if we have previously returned an output batch
private boolean firstOutputBatch = true;
private int rightHVColPosition;
private final BufferAllocator allocator;
// Local fields for left/right incoming - may be replaced when reading from spilled
private RecordBatch buildBatch;
private RecordBatch probeBatch;
/**
* Flag indicating whether or not the first data holding build batch needs to be fetched.
*/
private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
/**
* Flag indicating whether or not the first data holding probe batch needs to be fetched.
*/
private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
// For handling spilling
private final SpillSet spillSet;
HashJoinPOP popConfig;
private final int originalPartition = -1; // the partition a secondary reads from
IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
private final int maxBatchesInMemory;
private final List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
private boolean enableRuntimeFilter;
private RuntimeFilterReporter runtimeFilterReporter;
private ValueVectorHashHelper.Hash64 hash64;
private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
private final List<BloomFilter> bloomFilters = new ArrayList<>();
private boolean bloomFiltersGenerated;
/**
* This holds information about the spilled partitions for the build and probe side.
*/
public static class HashJoinSpilledPartition extends AbstractSpilledPartitionMetadata {
private final int innerSpilledBatches;
private final String innerSpillFile;
private int outerSpilledBatches;
private String outerSpillFile;
private boolean updatedOuter;
public HashJoinSpilledPartition(int cycle,
int originPartition,
int prevOriginPartition,
int innerSpilledBatches,
String innerSpillFile) {
super(cycle, originPartition, prevOriginPartition);
this.innerSpilledBatches = innerSpilledBatches;
this.innerSpillFile = innerSpillFile;
}
public int getInnerSpilledBatches() {
return innerSpilledBatches;
}
public String getInnerSpillFile() {
return innerSpillFile;
}
public int getOuterSpilledBatches() {
Preconditions.checkState(updatedOuter);
return outerSpilledBatches;
}
public String getOuterSpillFile() {
Preconditions.checkState(updatedOuter);
return outerSpillFile;
}
public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
Preconditions.checkState(!updatedOuter);
updatedOuter = true;
this.outerSpilledBatches = outerSpilledBatches;
this.outerSpillFile = outerSpillFile;
}
@Override
public String makeDebugString() {
return String.format("Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).",
this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle(), outerSpilledBatches, innerSpilledBatches);
}
}
public class HashJoinUpdater implements SpilledState.Updater {
@Override
public void cleanup() {
HashJoinBatch.this.cleanup();
}
@Override
public String getFailureMessage() {
return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates).";
}
@Override
public long getMemLimit() {
return HashJoinBatch.this.allocator.getLimit();
}
@Override
public boolean hasPartitionLimit() {
return true;
}
}
/**
* Queue of spilled partitions to process.
*/
private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
public enum Metric implements MetricDef {
NUM_BUCKETS,
NUM_ENTRIES,
NUM_RESIZING,
RESIZING_TIME_MS,
NUM_PARTITIONS,
SPILLED_PARTITIONS, // number of original partitions spilled to disk
SPILL_MB, // Number of MB of data spilled to disk. This amount is first written,
// then later re-read. So, disk I/O is twice this amount.
SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
LEFT_INPUT_BATCH_COUNT,
LEFT_AVG_INPUT_BATCH_BYTES,
LEFT_AVG_INPUT_ROW_BYTES,
LEFT_INPUT_RECORD_COUNT,
RIGHT_INPUT_BATCH_COUNT,
RIGHT_AVG_INPUT_BATCH_BYTES,
RIGHT_AVG_INPUT_ROW_BYTES,
RIGHT_INPUT_RECORD_COUNT,
OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
// duplicate for hash ag
@Override
public int metricId() { return ordinal(); }
}
@Override
public int getRecordCount() {
return outputRecords;
}
@Override
protected void buildSchema() {
// We must first get the schemas from upstream operators before we can build
// our schema.
boolean validSchema = prefetchFirstBatchFromBothSides();
if (validSchema) {
// We are able to construct a valid schema from the upstream data.
// Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
state = BatchState.BUILD_SCHEMA;
if (leftUpstream == OK_NEW_SCHEMA) {
probeSchema = left.getSchema();
}
if (rightUpstream == OK_NEW_SCHEMA) {
buildSchema = right.getSchema();
// position of the new "column" for keeping the hash values
// (after the real columns)
rightHVColPosition = right.getContainer().getNumberOfColumns();
// In special cases, when the probe side is empty, and
// inner/left join - no need for Hash Table
skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
// We only need the hash tables if we have data on the build side.
setupHashTable();
}
hashJoinProbe = setupHashJoinProbe();
}
// If we have a valid schema, this will build a valid container.
// If we were unable to obtain a valid schema,
// we still need to build a dummy schema. This code handles both cases for us.
setupOutputContainerSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
container.setEmpty();
}
/**
* Prefetches the first build side data holding batch.
*/
private void prefetchFirstBuildBatch() {
rightUpstream = prefetchFirstBatch(rightUpstream,
prefetchedBuild,
buildSideIsEmpty,
RIGHT_INDEX,
buildBatch,
() -> {
batchMemoryManager.update(RIGHT_INDEX, 0, true);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
getRecordBatchStatsContext());
});
}
/**
* Prefetches the first build side data holding batch.
*/
private void prefetchFirstProbeBatch() {
leftUpstream = prefetchFirstBatch(leftUpstream,
prefetchedProbe,
probeSideIsEmpty,
LEFT_INDEX,
probeBatch,
() -> {
batchMemoryManager.update(LEFT_INDEX, 0);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
getRecordBatchStatsContext());
});
}
/**
* Used to fetch the first data holding batch from either the build or probe side.
* @param outcome The current upstream outcome for either the build or probe side.
* @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side.
* @param isEmpty A flag indicating if the probe or build side is empty.
* @param index The upstream index of the probe or build batch.
* @param batch The probe or build batch itself.
* @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch.
* @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
*/
private IterOutcome prefetchFirstBatch(IterOutcome outcome,
MutableBoolean prefetched,
MutableBoolean isEmpty,
int index,
RecordBatch batch,
Runnable memoryManagerUpdate) {
if (prefetched.booleanValue()) {
// We have already prefetch the first data holding batch
return outcome;
}
// If we didn't retrieve our first data holding batch, we need to do it now.
prefetched.setValue(true);
if (outcome != IterOutcome.NONE) {
// We can only get data if there is data available
outcome = sniffNonEmptyBatch(outcome, index, batch);
}
isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
if (outcome == IterOutcome.STOP) {
// We reached a termination state
state = BatchState.STOP;
} else {
// Got our first batch(es)
if (spilledState.isFirstCycle()) {
// Only collect stats for the first cycle
memoryManagerUpdate.run();
}
state = BatchState.FIRST;
}
return outcome;
}
/**
* Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method
* fetches the first non-empty batch from the probe or build side.
* @param curr The current outcome.
* @param inputIndex Index specifying whether to work with the prorbe or build input.
* @param recordBatch The probe or build record batch.
* @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
*/
private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
while (true) {
if (recordBatch.getRecordCount() != 0) {
return curr;
}
curr = next(inputIndex, recordBatch);
switch (curr) {
case OK:
// We got a data batch
break;
case NOT_YET:
// We need to try again
break;
case EMIT:
throw new UnsupportedOperationException("We do not support " + EMIT);
default:
// Other cases are termination conditions
return curr;
}
}
}
/**
* Determines the memory calculator to use. If maxNumBatches is configured simple batch counting is used to spill. Otherwise
* memory calculations are used to determine when to spill.
* @return The memory calculator to use.
*/
public HashJoinMemoryCalculator getCalculatorImpl() {
if (maxBatchesInMemory == 0) {
double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
} else {
return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
}
}
@Override
public IterOutcome innerNext() {
if (wasKilled) {
// We have received a kill signal. We need to stop processing.
this.cleanup();
super.close();
return IterOutcome.NONE;
}
prefetchFirstBuildBatch();
if (rightUpstream.isError()) {
// A termination condition was reached while prefetching the first build side data holding batch.
// We need to terminate.
return rightUpstream;
}
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
*/
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
IterOutcome buildExecuteTermination = executeBuildPhase();
if (buildExecuteTermination != null) {
// A termination condition was reached while executing the build phase.
// We need to terminate.
return buildExecuteTermination;
}
buildComplete = true;
if (isRowKeyJoin) {
// discard the first left batch which was fetched by buildSchema, and get the new
// one based on rowkey join
leftUpstream = next(left);
if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
state = BatchState.STOP;
return leftUpstream;
}
}
// Update the hash table related stats for the operator
updateStats();
}
// Try to probe and project, or recursively handle a spilled partition
if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
joinIsLeftOrFull) { // or if this is a left/full outer join
prefetchFirstProbeBatch();
if (leftUpstream.isError() ||
( leftUpstream == NONE && ! joinIsRightOrFull )) {
// A termination condition was reached while prefetching the first probe side data holding batch.
// We need to terminate.
return leftUpstream;
}
if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) {
// Only allocate outgoing vectors and execute probing logic if there is data
if (state == BatchState.FIRST) {
// Initialize various settings for the probe side
hashJoinProbe.setupHashJoinProbe(probeBatch,
this,
joinType,
semiJoin,
leftUpstream,
partitions,
spilledState.getCycle(),
container,
spilledInners,
buildSideIsEmpty.booleanValue(),
numPartitions,
rightHVColPosition);
}
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
outputRecords = hashJoinProbe.probeAndProject();
container.setValueCount(outputRecords);
batchMemoryManager.updateOutgoingStats(outputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
/* We are here because of one the following
* 1. Completed processing of all the records and we are done
* 2. We've filled up the outgoing batch to the maximum and we need to return upstream
* Either case build the output container's schema and return
*/
if (outputRecords > 0 || state == BatchState.FIRST) {
state = BatchState.NOT_FIRST;
return IterOutcome.OK;
}
}
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
for (HashPartition partn : partitions) {
partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
if (!buildSideIsEmpty.booleanValue()) {
while (!spilledState.isEmpty()) { // "while" is only used for skipping; see "continue" below
// Get the next (previously) spilled partition to handle as incoming
HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
// If the outer is empty (and it's not a right/full join) - try the next spilled partition
if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
continue;
}
// Create a BUILD-side "incoming" out of the inner spill file of that partition
buildBatch = new SpilledRecordBatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
rightUpstream = ((SpilledRecordBatch) buildBatch).getInitialOutcome();
if (currSp.outerSpilledBatches > 0) {
// Create a PROBE-side "incoming" out of the outer spill file of that partition
probeBatch = new SpilledRecordBatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
leftUpstream = ((SpilledRecordBatch) probeBatch).getInitialOutcome();
} else {
probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
leftUpstream = IterOutcome.NONE;
hashJoinProbe.changeToFinalProbeState();
}
spilledState.updateCycle(stats, currSp, spilledStateUpdater);
state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
prefetchedBuild.setValue(false);
prefetchedProbe.setValue(false);
return innerNext(); // start processing the next spilled partition "recursively"
}
}
} else {
// Our build side is empty, we won't have any matches, clear the probe side
killAndDrainLeftUpstream();
}
// No more output records, clean up and return
state = BatchState.DONE;
cleanup();
return IterOutcome.NONE;
} catch (SchemaChangeException e) {
throw UserException.schemaChangeError(e).build(logger);
}
}
/**
* In case an upstream data is no longer needed, send a kill and flush any remaining batch
*
* @param batch probe or build batch
* @param upstream which upstream
* @param isLeft is it the left or right
*/
private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
batch.kill(true);
while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
VectorAccessibleUtilities.clear(batch);
upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
}
}
private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
private void setupHashTable() {
List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
if ( skipHashTableBuild ) { return; }
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
// Create named expressions from the conditions
for (int i = 0; i < conditions.size(); i++) {
leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));
}
// Set the left named expression to be null if the probe batch is empty.
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
leftExpr = null;
} else {
if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
throw UserException.internalError(null)
.message("Hash join does not support probe batch with selection vectors.")
.addContext("Probe batch has selection mode",
(probeBatch.getSchema().getSelectionVectorMode()).toString())
.build(logger);
}
}
HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
// Create the chained hash table
baseHashTable =
new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null);
if (enableRuntimeFilter) {
setupHash64(htConfig);
}
}
private void setupHash64(HashTableConfig htConfig) {
LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()];
ErrorCollector collector = new ErrorCollectorImpl();
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
collector.reportErrors(logger);
if (expr == null) {
continue;
}
keyExprsBuild[i] = expr;
i++;
}
i = 0;
boolean missingField = false;
TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length];
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
SchemaPath schemaPath = (SchemaPath) ne.getExpr();
TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
if (typedFieldId == null) {
missingField = true;
break;
}
buildSideTypeFieldIds[i] = typedFieldId;
i++;
}
if (missingField) {
logger.info("As some build side key fields not found, runtime filter was disabled");
enableRuntimeFilter = false;
return;
}
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
String buildField = bloomFilterDef.getBuildField();
SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
if (typedFieldId == null) {
missingField = true;
break;
}
int fieldId = typedFieldId.getFieldIds()[0];
bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
}
if (missingField) {
logger.info("As some build side join key fields not found, runtime filter was disabled");
enableRuntimeFilter = false;
return;
}
ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
try {
hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
} catch (Exception e) {
throw UserException.internalError(e)
.message("Failed to construct a field's hash64 dynamic codes")
.build(logger);
}
}
/**
* Call only after num partitions is known
*/
private void delayedSetup() {
//
// Find out the estimated max batch size, etc
// and compute the max numPartitions possible
// See partitionNumTuning()
//
spilledState.initialize(numPartitions);
// Create array for the partitions
partitions = new HashPartition[numPartitions];
}
/**
* Initialize fields (that may be reused when reading spilled partitions)
*/
private void initializeBuild() {
baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin,
RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
}
spilledInners = new HashJoinSpilledPartition[numPartitions];
}
/**
* Note:
* This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions.
*/
private void initializeRuntimeFilter() {
if (!enableRuntimeFilter || bloomFiltersGenerated) {
return;
}
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
//RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
//RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
if (runtimeFilterDef != null) {
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
int numBytes = bloomFilterDef.getNumBytes();
String probeField = bloomFilterDef.getProbeField();
probeFields.add(probeField);
BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
bloomFilters.add(bloomFilter);
bloomFilter2buildId.put(bloomFilter, buildFieldId);
}
}
bloomFiltersGenerated = true;
}
/**
* Tunes the number of partitions used by {@link HashJoinBatch}. If it is not possible to spill it gives up and reverts
* to unbounded in memory operation.
* @param maxBatchSize
* @param buildCalc
* @return
*/
private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(
int maxBatchSize,
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
// Get auto tuning result
numPartitions = buildCalc.getNumPartitions();
if (logger.isDebugEnabled()) {
logger.debug(buildCalc.makeDebugString());
}
if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) {
// We don't have enough memory to do any spilling. Give up and do no spilling and have no limits
// TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
// We don't have enough memory to do partitioning, we have to do everything in memory
String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
"Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
numPartitions,
FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
FileUtils.byteCountToDisplaySize(allocator.getLimit()));
logger.warn(message);
// create a Noop memory calculator
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(false);
buildCalc = calc.next();
buildCalc.initialize(true,
true, // TODO Fix after growing hash values bug fixed
buildBatch,
probeBatch,
buildJoinColumns,
leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
disableSpilling(null);
}
return buildCalc;
}
/**
* Disable spilling - use only a single partition and set the memory limit to the max ( 10GB )
* @param reason If not null - log this as warning, else check fallback setting to either warn or fail.
*/
private void disableSpilling(String reason) {
// Fail, or just issue a warning if a reason was given, or a fallback option is enabled
if ( reason == null ) {
boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
if (fallbackEnabled) {
logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
" to use unbounded memory");
} else {
throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " +
"HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " +
"session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
}
} else {
logger.warn(reason);
}
numPartitions = 1; // We are only using one partition
canSpill = false; // We cannot spill
allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
}
/**
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
*
* @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null.
* @throws SchemaChangeException
*/
public IterOutcome executeBuildPhase() throws SchemaChangeException {
if (buildSideIsEmpty.booleanValue()) {
// empty right
return null;
}
if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
killAndDrainRightUpstream();
return null;
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
{
// Initializing build calculator
// Limit scope of these variables to this block
int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
buildBatch,
probeBatch,
buildJoinColumns,
probeSideIsEmpty.booleanValue(),
allocator.getLimit(),
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
if (spilledState.isFirstCycle() && doMemoryCalculation) {
// Do auto tuning
buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
}
}
if (spilledState.isFirstCycle()) {
// Do initial setup only on the first cycle
delayedSetup();
}
initializeBuild();
initializeRuntimeFilter();
// Make the calculator aware of our partitions
HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
buildCalc.setPartitionStatSet(partitionStatSet);
boolean moreData = true;
while (moreData) {
switch (rightUpstream) {
case NONE:
case NOT_YET:
case STOP:
moreData = false;
continue;
case OK_NEW_SCHEMA:
if (!buildSchema.equals(buildBatch.getSchema())) {
throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema());
}
for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
case OK:
batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
int currentRecordCount = buildBatch.getRecordCount();
//create runtime filter
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
//create runtime filter and send out async
for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
int fieldId = bloomFilter2buildId.get(bloomFilter);
for (int ind = 0; ind < currentRecordCount; ind++) {
long hashCode = hash64.hash64Code(ind, 0, fieldId);
bloomFilter.insert(hashCode);
}
}
}
// Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy)
if ( numPartitions == 1 ) {
partitions[0].appendBatch(buildBatch);
break;
}
if (!spilledState.isFirstCycle()) {
read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast();
}
// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = spilledState.isFirstCycle() ? partitions[0].getBuildHashCode(ind)
: read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
// semi-join skips join-key-duplicate rows
if ( semiJoin ) {
}
// Append the new inner row to the appropriate partition; spill (that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc);
}
if ( read_right_HV_vector != null ) {
read_right_HV_vector.clear();
read_right_HV_vector = null;
}
break;
default:
throw new IllegalStateException(rightUpstream.name());
}
// Get the next incoming record batch
rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
}
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
if (bloomFilter2buildId.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
}
}
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
if ( numPartitions > 1 ) { // a single partition needs no completion
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled());
}
}
prefetchFirstProbeBatch();
if (leftUpstream.isError()) {
// A termination condition was reached while prefetching the first build side data holding batch.
// We need to terminate.
return leftUpstream;
}
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
for (int index = 0; index < partitions.length; index++) {
HashPartition partn = partitions[index];
if (partn.isSpilled()) {
// Don't build hash tables for spilled partitions
continue;
}
try {
if (postBuildCalc.shouldSpill()) {
// Spill this partition if we need to make room
partn.spillThisPartition();
} else {
// Only build hash tables for partitions that are not spilled
partn.buildContainersHashTableAndHelper();
}
} catch (OutOfMemoryException e) {
String message = "Failed building hash table on partition " + index + ":\n"
+ makeDebugString() + "\n"
+ postBuildCalc.makeDebugString();
// Include debug info
throw new OutOfMemoryException(message, e);
}
}
if (logger.isDebugEnabled()) {
logger.debug(postBuildCalc.makeDebugString());
}
for (HashPartition partn : partitions) {
if ( partn.isSpilled() ) {
HashJoinSpilledPartition sp = new HashJoinSpilledPartition(spilledState.getCycle(),
partn.getPartitionNum(),
originalPartition,
partn.getPartitionBatchesCount(),
partn.getSpillFile());
spilledState.addPartition(sp);
spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
partn.closeWriter();
partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch());
}
}
return null;
}
private void setupOutputContainerSchema() {
if (buildSchema != null && ! semiJoin ) {
for (MaterializedField field : buildSchema) {
MajorType inputType = field.getType();
MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2197).
if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
outputType = inputType;
}
// make sure to project field with children for children to show up in the schema
MaterializedField projected = field.withType(outputType);
// Add the vector to our output container
container.addOrGet(projected);
}
}
if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
for (VectorWrapper<?> vv : probeBatch) {
MajorType inputType = vv.getField().getType();
MajorType outputType;
// If right or full outer join then the output type should be optional. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
outputType = inputType;
}
ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
if (v instanceof AbstractContainerVector) {
vv.getValueVector().makeTransferPair(v);
v.clear();
}
}
}
}
// (After the inner side was read whole) - Has that inner partition spilled
public boolean isSpilledInner(int part) {
if ( spilledInners == null ) { return false; } // empty inner
return spilledInners[part] != null;
}
/**
* The constructor
*
* @param popConfig
* @param context
* @param left -- probe/outer side incoming input
* @param right -- build/iner side incoming input
* @throws OutOfMemoryException
*/
public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
RecordBatch left, /*Probe side record batch*/
RecordBatch right /*Build side record batch*/
) throws OutOfMemoryException {
super(popConfig, context, true, left, right);
this.buildBatch = right;
this.probeBatch = left;
joinType = popConfig.getJoinType();
semiJoin = popConfig.isSemiJoin();
joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL;
joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
conditions = popConfig.getConditions();
this.popConfig = popConfig;
this.isRowKeyJoin = popConfig.isRowKeyJoin();
this.joinControl = new JoinControl(popConfig.getJoinControl());
rightExpr = new ArrayList<>(conditions.size());
buildJoinColumns = Sets.newHashSet();
List<SchemaPath> rightConditionPaths = new ArrayList<>();
for (int i = 0; i < conditions.size(); i++) {
SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
rightConditionPaths.add(rightPath);
}
for (int i = 0; i < conditions.size(); i++) {
SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
buildJoinColumns.add(nameSegment.getPath());
String refName = "build_side_" + i;
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
}
this.allocator = oContext.getAllocator();
numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
}
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
if (memLimit != 0) {
allocator.setLimit(memLimit);
}
RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit()));
spillSet = new SpillSet(context, popConfig);
// Create empty partitions (in the ctor - covers the case where right side is empty)
partitions = new HashPartition[0];
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(),
configuredBatchSize);
enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
}
/**
* This method is called when {@link HashJoinBatch} closes. It cleans up left over spilled files that are in the spill queue, and closes the
* spillSet.
*/
private void cleanup() {
if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean
if ( spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
// clean (and deallocate) each partition, and delete its spill file
for (HashPartition partn : partitions) {
partn.close();
}
// delete any spill file left in unread spilled partitions
while (!spilledState.isEmpty()) {
HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition();
try {
spillSet.delete(sp.innerSpillFile);
} catch(IOException e) {
logger.warn("Cleanup: Failed to delete spill file {}",sp.innerSpillFile);
}
try { // outer file is added later; may be null if cleaning prematurely
if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); }
} catch(IOException e) {
logger.warn("Cleanup: Failed to delete spill file {}",sp.outerSpillFile);
}
}
// Delete the currently handled (if any) spilled files
spillSet.close(); // delete the spill directory(ies)
}
/**
* This creates a string that summarizes the memory usage of the operator.
* @return A memory dump string.
*/
public String makeDebugString() {
StringBuilder sb = new StringBuilder();
for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
String partitionPrefix = "Partition " + partitionIndex + ": ";
HashPartition hashPartition = partitions[partitionIndex];
sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
}
return sb.toString();
}
/**
* Updates the {@link HashTable} and spilling stats after the original build side is processed.
*
* Note: this does not update all the stats. The cycleNum is updated dynamically in {@link #innerNext()} and the total bytes
* written is updated at close time in {@link #cleanup()}.
*/
private void updateStats() {
if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files
HashTableStats htStats = new HashTableStats();
long numSpilled = 0;
HashTableStats newStats = new HashTableStats();
// sum the stats from all the partitions
for ( HashPartition partn : partitions ) {
if ( partn.isSpilled() ) { numSpilled++; }
partn.getStats(newStats);
htStats.addStats(newStats);
}
stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
/**
* Get the hash table iterator that is created for the build side of the hash join if
* this hash join was instantiated as a row-key join.
* @return hash table iterator or null if this hash join was not a row-key join or if it
* was a row-key join but the build has not yet completed.
*/
@Override
public Pair<ValueVector, Integer> nextRowKeyBatch() {
if (buildComplete) {
// partition 0 because Row Key Join has only a single partition - no spilling
Pair<VectorContainer, Integer> pp = partitions[0].nextBatch();
if (pp != null) {
VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0);
ValueVector vv = vw.getValueVector();
return Pair.of(vv, pp.getRight());
}
} else if(partitions == null && firstOutputBatch) { //if there is data coming to right(build) side in build Schema stage, use it.
firstOutputBatch = false;
if ( right.getRecordCount() > 0 ) {
VectorWrapper<?> vw = Iterables.get(right, 0);
ValueVector vv = vw.getValueVector();
return Pair.of(vv, right.getRecordCount()-1);
}
}
return null;
}
@Override // implement RowKeyJoin interface
public boolean hasRowKeyBatch() {
return buildComplete;
}
@Override // implement RowKeyJoin interface
public BatchState getBatchState() {
return state;
}
@Override // implement RowKeyJoin interface
public void setBatchState(BatchState newState) {
state = newState;
}
@Override
public void killIncoming(boolean sendUpstream) {
wasKilled = true;
probeBatch.kill(sendUpstream);
buildBatch.kill(sendUpstream);
}
public void updateMetrics() {
stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches());
stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize());
stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords());
}
@Override
public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) {
this.rkJoinState = newState;
}
@Override
public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() {
return rkJoinState;
}
@Override
public void close() {
if (!spilledState.isFirstCycle()) { // spilling happened
// In case closing due to cancellation, BaseRootExec.close() does not close the open
// SpilledRecordBatch "scanners" as it only knows about the original left/right ops.
killIncoming(false);
}
updateMetrics();
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
cleanup();
super.close();
}
public HashJoinProbe setupHashJoinProbe() {
// No real code generation !!
return new HashJoinProbeTemplate();
}
@Override
public void dump() {
logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," +
" rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream,
joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema);
}
}