| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.join; |
| |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.calcite.rel.core.JoinRelType; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang3.mutable.MutableBoolean; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.drill.common.exceptions.UserException; |
| import org.apache.drill.common.expression.ErrorCollector; |
| import org.apache.drill.common.expression.ErrorCollectorImpl; |
| import org.apache.drill.common.expression.ExpressionPosition; |
| import org.apache.drill.common.expression.FieldReference; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.common.expression.PathSegment; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.common.logical.data.JoinCondition; |
| import org.apache.drill.common.logical.data.NamedExpression; |
| import org.apache.drill.common.types.TypeProtos; |
| import org.apache.drill.common.types.TypeProtos.DataMode; |
| import org.apache.drill.common.types.TypeProtos.MajorType; |
| import org.apache.drill.common.types.Types; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.expr.ExpressionTreeMaterializer; |
| import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper; |
| import org.apache.drill.exec.memory.BaseAllocator; |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.ops.ExecutorFragmentContext; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.ops.MetricDef; |
| import org.apache.drill.exec.physical.base.AbstractBase; |
| import org.apache.drill.exec.physical.config.HashJoinPOP; |
| import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch; |
| import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata; |
| import org.apache.drill.exec.physical.impl.common.ChainedHashTable; |
| import org.apache.drill.exec.physical.impl.common.Comparator; |
| import org.apache.drill.exec.physical.impl.common.HashPartition; |
| import org.apache.drill.exec.physical.impl.common.HashTable; |
| import org.apache.drill.exec.physical.impl.common.HashTableConfig; |
| import org.apache.drill.exec.physical.impl.common.HashTableStats; |
| import org.apache.drill.exec.physical.impl.common.SpilledState; |
| import org.apache.drill.exec.physical.impl.spill.SpillSet; |
| import org.apache.drill.exec.planner.common.JoinControl; |
| import org.apache.drill.exec.record.AbstractBinaryRecordBatch; |
| import org.apache.drill.exec.record.BatchSchema; |
| import org.apache.drill.exec.record.JoinBatchMemoryManager; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.TypedFieldId; |
| import org.apache.drill.exec.record.VectorAccessibleUtilities; |
| import org.apache.drill.exec.record.VectorContainer; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.util.record.RecordBatchStats; |
| import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; |
| import org.apache.drill.exec.vector.IntVector; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.exec.vector.complex.AbstractContainerVector; |
| import org.apache.drill.exec.work.filter.BloomFilter; |
| import org.apache.drill.exec.work.filter.BloomFilterDef; |
| import org.apache.drill.exec.work.filter.RuntimeFilterDef; |
| import org.apache.drill.exec.work.filter.RuntimeFilterReporter; |
| import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Lists; |
| import org.apache.drill.shaded.guava.com.google.common.collect.Sets; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class implements the runtime execution for the Hash-Join operator |
| * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins |
| * <p> |
| * This implementation splits the incoming Build side rows into multiple |
| * Partitions, thus allowing spilling of some of these partitions to disk if |
| * memory gets tight. Each partition is implemented as a {@link HashPartition}. |
| * After the build phase is over, in the most general case, some of the |
| * partitions were spilled, and the others are in memory. Each of the partitions |
| * in memory would get a {@link HashTable} built. |
| * <p> |
| * Next the Probe side is read, and each row is key matched with a Build |
| * partition. If that partition is in memory, then the key is used to probe and |
| * perform the join, and the results are added to the outgoing batch. But if |
| * that build side partition was spilled, then the matching Probe size partition |
| * is spilled as well. |
| * <p> |
| * After all the Probe side was processed, we are left with pairs of spilled |
| * partitions. Then each pair is processed individually (that Build partition |
| * should be smaller than the original, hence likely fit whole into memory to |
| * allow probing; if not -- see below). |
| * <p> |
| * Processing of each spilled pair is EXACTLY like processing the original |
| * Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls |
| * itself recursively !!). Thus the spilled build partition is read and divided |
| * into new partitions, which in turn may spill again (and again...). The code |
| * tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or |
| * greater) is a waste, indicating that the number of partitions chosen was too |
| * small. |
| */ |
| |
| public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin { |
| private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class); |
| |
| /** |
| * The maximum number of records within each internal batch. |
| */ |
| private final int RECORDS_PER_BATCH; // internal batches |
| |
| // Join type, INNER, LEFT, RIGHT or OUTER |
| private final JoinRelType joinType; |
| private final boolean semiJoin; |
| private final boolean joinIsLeftOrFull; |
| private final boolean joinIsRightOrFull; |
| private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755) |
| |
| // Join conditions |
| private final List<JoinCondition> conditions; |
| |
| private RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL; |
| |
| // Runtime generated class implementing HashJoinProbe interface |
| private HashJoinProbe hashJoinProbe = null; |
| |
| private final List<NamedExpression> rightExpr; |
| |
| /** |
| * Names of the join columns. This names are used in order to help estimate |
| * the size of the {@link HashTable}s. |
| */ |
| private final Set<String> buildJoinColumns; |
| |
| // Fields used for partitioning |
| /** |
| * The number of {@link HashPartition}s. This is configured via a system |
| * option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}. |
| */ |
| private int numPartitions = 1; // must be 2 to the power of bitsInMask |
| |
| /** |
| * The master class used to generate {@link HashTable}s. |
| */ |
| private ChainedHashTable baseHashTable; |
| private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false); |
| private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false); |
| private boolean canSpill = true; |
| private boolean wasKilled; // a kill was received, may need to clean spilled partns |
| |
| /** |
| * This array holds the currently active {@link HashPartition}s. |
| */ |
| HashPartition partitions[]; |
| |
| // Number of records in the output container |
| private int outputRecords; |
| |
| // Schema of the build side |
| private BatchSchema buildSchema; |
| // Schema of the probe side |
| private BatchSchema probeSchema; |
| |
| // Whether this HashJoin is used for a row-key based join |
| private final boolean isRowKeyJoin; |
| |
| private final JoinControl joinControl; |
| |
| // An iterator over the build side hash table (only applicable for row-key joins) |
| private boolean buildComplete; |
| |
| // indicates if we have previously returned an output batch |
| private boolean firstOutputBatch = true; |
| |
| private int rightHVColPosition; |
| private final BufferAllocator allocator; |
| // Local fields for left/right incoming - may be replaced when reading from spilled |
| private RecordBatch buildBatch; |
| private RecordBatch probeBatch; |
| |
| /** |
| * Flag indicating whether or not the first data holding build batch needs to be fetched. |
| */ |
| private final MutableBoolean prefetchedBuild = new MutableBoolean(false); |
| /** |
| * Flag indicating whether or not the first data holding probe batch needs to be fetched. |
| */ |
| private final MutableBoolean prefetchedProbe = new MutableBoolean(false); |
| |
| // For handling spilling |
| private final SpillSet spillSet; |
| HashJoinPOP popConfig; |
| |
| private final int originalPartition = -1; // the partition a secondary reads from |
| IntVector read_right_HV_vector; // HV vector that was read from the spilled batch |
| private final int maxBatchesInMemory; |
| private final List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters |
| private boolean enableRuntimeFilter; |
| private RuntimeFilterReporter runtimeFilterReporter; |
| private ValueVectorHashHelper.Hash64 hash64; |
| private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>(); |
| private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>(); |
| private final List<BloomFilter> bloomFilters = new ArrayList<>(); |
| private boolean bloomFiltersGenerated; |
| |
| /** |
| * This holds information about the spilled partitions for the build and probe side. |
| */ |
| public static class HashJoinSpilledPartition extends AbstractSpilledPartitionMetadata { |
| private final int innerSpilledBatches; |
| private final String innerSpillFile; |
| private int outerSpilledBatches; |
| private String outerSpillFile; |
| private boolean updatedOuter; |
| |
| public HashJoinSpilledPartition(int cycle, |
| int originPartition, |
| int prevOriginPartition, |
| int innerSpilledBatches, |
| String innerSpillFile) { |
| super(cycle, originPartition, prevOriginPartition); |
| |
| this.innerSpilledBatches = innerSpilledBatches; |
| this.innerSpillFile = innerSpillFile; |
| } |
| |
| public int getInnerSpilledBatches() { |
| return innerSpilledBatches; |
| } |
| |
| public String getInnerSpillFile() { |
| return innerSpillFile; |
| } |
| |
| public int getOuterSpilledBatches() { |
| Preconditions.checkState(updatedOuter); |
| return outerSpilledBatches; |
| } |
| |
| public String getOuterSpillFile() { |
| Preconditions.checkState(updatedOuter); |
| return outerSpillFile; |
| } |
| |
| public void updateOuter(int outerSpilledBatches, String outerSpillFile) { |
| Preconditions.checkState(!updatedOuter); |
| updatedOuter = true; |
| |
| this.outerSpilledBatches = outerSpilledBatches; |
| this.outerSpillFile = outerSpillFile; |
| } |
| |
| @Override |
| public String makeDebugString() { |
| return String.format("Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).", |
| this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle(), outerSpilledBatches, innerSpilledBatches); |
| } |
| } |
| |
| public class HashJoinUpdater implements SpilledState.Updater { |
| @Override |
| public void cleanup() { |
| HashJoinBatch.this.cleanup(); |
| } |
| |
| @Override |
| public String getFailureMessage() { |
| return "Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)."; |
| } |
| |
| @Override |
| public long getMemLimit() { |
| return HashJoinBatch.this.allocator.getLimit(); |
| } |
| |
| @Override |
| public boolean hasPartitionLimit() { |
| return true; |
| } |
| } |
| |
| /** |
| * Queue of spilled partitions to process. |
| */ |
| private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>(); |
| private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater(); |
| private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition |
| |
| public enum Metric implements MetricDef { |
| NUM_BUCKETS, |
| NUM_ENTRIES, |
| NUM_RESIZING, |
| RESIZING_TIME_MS, |
| NUM_PARTITIONS, |
| SPILLED_PARTITIONS, // number of original partitions spilled to disk |
| SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, |
| // then later re-read. So, disk I/O is twice this amount. |
| SPILL_CYCLE, // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY |
| LEFT_INPUT_BATCH_COUNT, |
| LEFT_AVG_INPUT_BATCH_BYTES, |
| LEFT_AVG_INPUT_ROW_BYTES, |
| LEFT_INPUT_RECORD_COUNT, |
| RIGHT_INPUT_BATCH_COUNT, |
| RIGHT_AVG_INPUT_BATCH_BYTES, |
| RIGHT_AVG_INPUT_ROW_BYTES, |
| RIGHT_INPUT_RECORD_COUNT, |
| OUTPUT_BATCH_COUNT, |
| AVG_OUTPUT_BATCH_BYTES, |
| AVG_OUTPUT_ROW_BYTES, |
| OUTPUT_RECORD_COUNT; |
| |
| // duplicate for hash ag |
| |
| @Override |
| public int metricId() { return ordinal(); } |
| } |
| |
| @Override |
| public int getRecordCount() { |
| return outputRecords; |
| } |
| |
| @Override |
| protected void buildSchema() { |
| // We must first get the schemas from upstream operators before we can build |
| // our schema. |
| boolean validSchema = prefetchFirstBatchFromBothSides(); |
| |
| if (validSchema) { |
| // We are able to construct a valid schema from the upstream data. |
| // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA |
| state = BatchState.BUILD_SCHEMA; |
| |
| if (leftUpstream == OK_NEW_SCHEMA) { |
| probeSchema = left.getSchema(); |
| } |
| |
| if (rightUpstream == OK_NEW_SCHEMA) { |
| buildSchema = right.getSchema(); |
| // position of the new "column" for keeping the hash values |
| // (after the real columns) |
| rightHVColPosition = right.getContainer().getNumberOfColumns(); |
| // In special cases, when the probe side is empty, and |
| // inner/left join - no need for Hash Table |
| skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull; |
| // We only need the hash tables if we have data on the build side. |
| setupHashTable(); |
| } |
| |
| hashJoinProbe = setupHashJoinProbe(); |
| } |
| |
| // If we have a valid schema, this will build a valid container. |
| // If we were unable to obtain a valid schema, |
| // we still need to build a dummy schema. This code handles both cases for us. |
| setupOutputContainerSchema(); |
| container.buildSchema(BatchSchema.SelectionVectorMode.NONE); |
| container.setEmpty(); |
| } |
| |
| /** |
| * Prefetches the first build side data holding batch. |
| */ |
| private void prefetchFirstBuildBatch() { |
| rightUpstream = prefetchFirstBatch(rightUpstream, |
| prefetchedBuild, |
| buildSideIsEmpty, |
| RIGHT_INDEX, |
| buildBatch, |
| () -> { |
| batchMemoryManager.update(RIGHT_INDEX, 0, true); |
| RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, |
| batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), |
| getRecordBatchStatsContext()); |
| }); |
| } |
| |
| /** |
| * Prefetches the first build side data holding batch. |
| */ |
| private void prefetchFirstProbeBatch() { |
| leftUpstream = prefetchFirstBatch(leftUpstream, |
| prefetchedProbe, |
| probeSideIsEmpty, |
| LEFT_INDEX, |
| probeBatch, |
| () -> { |
| batchMemoryManager.update(LEFT_INDEX, 0); |
| RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, |
| batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), |
| getRecordBatchStatsContext()); |
| }); |
| } |
| |
| /** |
| * Used to fetch the first data holding batch from either the build or probe side. |
| * @param outcome The current upstream outcome for either the build or probe side. |
| * @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side. |
| * @param isEmpty A flag indicating if the probe or build side is empty. |
| * @param index The upstream index of the probe or build batch. |
| * @param batch The probe or build batch itself. |
| * @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch. |
| * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. |
| */ |
| private IterOutcome prefetchFirstBatch(IterOutcome outcome, |
| MutableBoolean prefetched, |
| MutableBoolean isEmpty, |
| int index, |
| RecordBatch batch, |
| Runnable memoryManagerUpdate) { |
| if (prefetched.booleanValue()) { |
| // We have already prefetch the first data holding batch |
| return outcome; |
| } |
| |
| // If we didn't retrieve our first data holding batch, we need to do it now. |
| prefetched.setValue(true); |
| |
| if (outcome != IterOutcome.NONE) { |
| // We can only get data if there is data available |
| outcome = sniffNonEmptyBatch(outcome, index, batch); |
| } |
| |
| isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data. |
| |
| if (outcome == IterOutcome.STOP) { |
| // We reached a termination state |
| state = BatchState.STOP; |
| } else { |
| // Got our first batch(es) |
| if (spilledState.isFirstCycle()) { |
| // Only collect stats for the first cycle |
| memoryManagerUpdate.run(); |
| } |
| state = BatchState.FIRST; |
| } |
| |
| return outcome; |
| } |
| |
| /** |
| * Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method |
| * fetches the first non-empty batch from the probe or build side. |
| * @param curr The current outcome. |
| * @param inputIndex Index specifying whether to work with the prorbe or build input. |
| * @param recordBatch The probe or build record batch. |
| * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch. |
| */ |
| private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) { |
| while (true) { |
| if (recordBatch.getRecordCount() != 0) { |
| return curr; |
| } |
| |
| curr = next(inputIndex, recordBatch); |
| |
| switch (curr) { |
| case OK: |
| // We got a data batch |
| break; |
| case NOT_YET: |
| // We need to try again |
| break; |
| case EMIT: |
| throw new UnsupportedOperationException("We do not support " + EMIT); |
| default: |
| // Other cases are termination conditions |
| return curr; |
| } |
| } |
| } |
| |
| /** |
| * Determines the memory calculator to use. If maxNumBatches is configured simple batch counting is used to spill. Otherwise |
| * memory calculations are used to determine when to spill. |
| * @return The memory calculator to use. |
| */ |
| public HashJoinMemoryCalculator getCalculatorImpl() { |
| if (maxBatchesInMemory == 0) { |
| double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY); |
| double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY); |
| double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY); |
| String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY); |
| |
| return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin); |
| } else { |
| return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory); |
| } |
| } |
| |
| @Override |
| public IterOutcome innerNext() { |
| if (wasKilled) { |
| // We have received a kill signal. We need to stop processing. |
| this.cleanup(); |
| super.close(); |
| return IterOutcome.NONE; |
| } |
| |
| prefetchFirstBuildBatch(); |
| |
| if (rightUpstream.isError()) { |
| // A termination condition was reached while prefetching the first build side data holding batch. |
| // We need to terminate. |
| return rightUpstream; |
| } |
| |
| try { |
| /* If we are here for the first time, execute the build phase of the |
| * hash join and setup the run time generated class for the probe side |
| */ |
| if (state == BatchState.FIRST) { |
| // Build the hash table, using the build side record batches. |
| IterOutcome buildExecuteTermination = executeBuildPhase(); |
| |
| if (buildExecuteTermination != null) { |
| // A termination condition was reached while executing the build phase. |
| // We need to terminate. |
| return buildExecuteTermination; |
| } |
| |
| buildComplete = true; |
| |
| if (isRowKeyJoin) { |
| // discard the first left batch which was fetched by buildSchema, and get the new |
| // one based on rowkey join |
| leftUpstream = next(left); |
| |
| if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { |
| state = BatchState.STOP; |
| return leftUpstream; |
| } |
| } |
| |
| // Update the hash table related stats for the operator |
| updateStats(); |
| } |
| |
| // Try to probe and project, or recursively handle a spilled partition |
| if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows |
| joinIsLeftOrFull) { // or if this is a left/full outer join |
| |
| prefetchFirstProbeBatch(); |
| |
| if (leftUpstream.isError() || |
| ( leftUpstream == NONE && ! joinIsRightOrFull )) { |
| // A termination condition was reached while prefetching the first probe side data holding batch. |
| // We need to terminate. |
| return leftUpstream; |
| } |
| |
| if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) { |
| // Only allocate outgoing vectors and execute probing logic if there is data |
| |
| if (state == BatchState.FIRST) { |
| // Initialize various settings for the probe side |
| hashJoinProbe.setupHashJoinProbe(probeBatch, |
| this, |
| joinType, |
| semiJoin, |
| leftUpstream, |
| partitions, |
| spilledState.getCycle(), |
| container, |
| spilledInners, |
| buildSideIsEmpty.booleanValue(), |
| numPartitions, |
| rightHVColPosition); |
| } |
| |
| // Allocate the memory for the vectors in the output container |
| batchMemoryManager.allocateVectors(container); |
| |
| hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); |
| |
| outputRecords = hashJoinProbe.probeAndProject(); |
| |
| container.setValueCount(outputRecords); |
| |
| batchMemoryManager.updateOutgoingStats(outputRecords); |
| RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); |
| |
| /* We are here because of one the following |
| * 1. Completed processing of all the records and we are done |
| * 2. We've filled up the outgoing batch to the maximum and we need to return upstream |
| * Either case build the output container's schema and return |
| */ |
| if (outputRecords > 0 || state == BatchState.FIRST) { |
| state = BatchState.NOT_FIRST; |
| |
| return IterOutcome.OK; |
| } |
| } |
| |
| // Free all partitions' in-memory data structures |
| // (In case need to start processing spilled partitions) |
| for (HashPartition partn : partitions) { |
| partn.cleanup(false); // clean, but do not delete the spill files !! |
| } |
| |
| // |
| // (recursively) Handle the spilled partitions, if any |
| // |
| if (!buildSideIsEmpty.booleanValue()) { |
| while (!spilledState.isEmpty()) { // "while" is only used for skipping; see "continue" below |
| |
| // Get the next (previously) spilled partition to handle as incoming |
| HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition(); |
| |
| // If the outer is empty (and it's not a right/full join) - try the next spilled partition |
| if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) { |
| continue; |
| } |
| |
| // Create a BUILD-side "incoming" out of the inner spill file of that partition |
| buildBatch = new SpilledRecordBatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet); |
| // The above ctor call also got the first batch; need to update the outcome |
| rightUpstream = ((SpilledRecordBatch) buildBatch).getInitialOutcome(); |
| |
| if (currSp.outerSpilledBatches > 0) { |
| // Create a PROBE-side "incoming" out of the outer spill file of that partition |
| probeBatch = new SpilledRecordBatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet); |
| // The above ctor call also got the first batch; need to update the outcome |
| leftUpstream = ((SpilledRecordBatch) probeBatch).getInitialOutcome(); |
| } else { |
| probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming() |
| leftUpstream = IterOutcome.NONE; |
| hashJoinProbe.changeToFinalProbeState(); |
| } |
| |
| spilledState.updateCycle(stats, currSp, spilledStateUpdater); |
| state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this |
| |
| prefetchedBuild.setValue(false); |
| prefetchedProbe.setValue(false); |
| |
| return innerNext(); // start processing the next spilled partition "recursively" |
| } |
| } |
| |
| } else { |
| // Our build side is empty, we won't have any matches, clear the probe side |
| killAndDrainLeftUpstream(); |
| } |
| |
| // No more output records, clean up and return |
| state = BatchState.DONE; |
| |
| cleanup(); |
| |
| return IterOutcome.NONE; |
| } catch (SchemaChangeException e) { |
| throw UserException.schemaChangeError(e).build(logger); |
| } |
| } |
| |
| /** |
| * In case an upstream data is no longer needed, send a kill and flush any remaining batch |
| * |
| * @param batch probe or build batch |
| * @param upstream which upstream |
| * @param isLeft is it the left or right |
| */ |
| private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) { |
| batch.kill(true); |
| while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) { |
| VectorAccessibleUtilities.clear(batch); |
| upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch); |
| } |
| } |
| private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); } |
| private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); } |
| |
| private void setupHashTable() { |
| List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size()); |
| conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); |
| |
| if ( skipHashTableBuild ) { return; } |
| |
| // Setup the hash table configuration object |
| List<NamedExpression> leftExpr = new ArrayList<>(conditions.size()); |
| |
| // Create named expressions from the conditions |
| for (int i = 0; i < conditions.size(); i++) { |
| leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i))); |
| } |
| |
| // Set the left named expression to be null if the probe batch is empty. |
| if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) { |
| leftExpr = null; |
| } else { |
| if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) { |
| throw UserException.internalError(null) |
| .message("Hash join does not support probe batch with selection vectors.") |
| .addContext("Probe batch has selection mode", |
| (probeBatch.getSchema().getSelectionVectorMode()).toString()) |
| .build(logger); |
| } |
| } |
| |
| HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), |
| true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt()); |
| |
| // Create the chained hash table |
| baseHashTable = |
| new ChainedHashTable(htConfig, context, allocator, buildBatch, probeBatch, null); |
| if (enableRuntimeFilter) { |
| setupHash64(htConfig); |
| } |
| } |
| |
| private void setupHash64(HashTableConfig htConfig) { |
| LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()]; |
| ErrorCollector collector = new ErrorCollectorImpl(); |
| int i = 0; |
| for (NamedExpression ne : htConfig.getKeyExprsBuild()) { |
| LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry()); |
| collector.reportErrors(logger); |
| if (expr == null) { |
| continue; |
| } |
| keyExprsBuild[i] = expr; |
| i++; |
| } |
| i = 0; |
| boolean missingField = false; |
| TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length]; |
| for (NamedExpression ne : htConfig.getKeyExprsBuild()) { |
| SchemaPath schemaPath = (SchemaPath) ne.getExpr(); |
| TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); |
| if (typedFieldId == null) { |
| missingField = true; |
| break; |
| } |
| buildSideTypeFieldIds[i] = typedFieldId; |
| i++; |
| } |
| if (missingField) { |
| logger.info("As some build side key fields not found, runtime filter was disabled"); |
| enableRuntimeFilter = false; |
| return; |
| } |
| RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); |
| List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); |
| for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { |
| String buildField = bloomFilterDef.getBuildField(); |
| SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN); |
| TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); |
| if (typedFieldId == null) { |
| missingField = true; |
| break; |
| } |
| int fieldId = typedFieldId.getFieldIds()[0]; |
| bloomFilterDef2buildId.put(bloomFilterDef, fieldId); |
| } |
| if (missingField) { |
| logger.info("As some build side join key fields not found, runtime filter was disabled"); |
| enableRuntimeFilter = false; |
| return; |
| } |
| ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context); |
| try { |
| hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds); |
| } catch (Exception e) { |
| throw UserException.internalError(e) |
| .message("Failed to construct a field's hash64 dynamic codes") |
| .build(logger); |
| } |
| } |
| |
| /** |
| * Call only after num partitions is known |
| */ |
| private void delayedSetup() { |
| // |
| // Find out the estimated max batch size, etc |
| // and compute the max numPartitions possible |
| // See partitionNumTuning() |
| // |
| |
| spilledState.initialize(numPartitions); |
| // Create array for the partitions |
| partitions = new HashPartition[numPartitions]; |
| } |
| |
| /** |
| * Initialize fields (that may be reused when reading spilled partitions) |
| */ |
| private void initializeBuild() { |
| baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files |
| // Recreate the partitions every time build is initialized |
| for (int part = 0; part < numPartitions; part++ ) { |
| partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin, |
| RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions); |
| } |
| |
| spilledInners = new HashJoinSpilledPartition[numPartitions]; |
| |
| } |
| |
| /** |
| * Note: |
| * This method can not be called again as part of recursive call of executeBuildPhase() to handle spilled build partitions. |
| */ |
| private void initializeRuntimeFilter() { |
| if (!enableRuntimeFilter || bloomFiltersGenerated) { |
| return; |
| } |
| runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); |
| RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); |
| //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the |
| //RuntimeFilterRouter's judgement will have the RuntimeFilterDef. |
| if (runtimeFilterDef != null) { |
| List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); |
| for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { |
| int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef); |
| int numBytes = bloomFilterDef.getNumBytes(); |
| String probeField = bloomFilterDef.getProbeField(); |
| probeFields.add(probeField); |
| BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator()); |
| bloomFilters.add(bloomFilter); |
| bloomFilter2buildId.put(bloomFilter, buildFieldId); |
| } |
| } |
| bloomFiltersGenerated = true; |
| } |
| |
| /** |
| * Tunes the number of partitions used by {@link HashJoinBatch}. If it is not possible to spill it gives up and reverts |
| * to unbounded in memory operation. |
| * @param maxBatchSize |
| * @param buildCalc |
| * @return |
| */ |
| private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning( |
| int maxBatchSize, |
| HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) { |
| // Get auto tuning result |
| numPartitions = buildCalc.getNumPartitions(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(buildCalc.makeDebugString()); |
| } |
| |
| if (buildCalc.getMaxReservedMemory() > allocator.getLimit()) { |
| // We don't have enough memory to do any spilling. Give up and do no spilling and have no limits |
| |
| // TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented. |
| // We don't have enough memory to do partitioning, we have to do everything in memory |
| String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " + |
| "Forcing legacy behavoir of using unbounded memory in order to prevent regressions.", |
| numPartitions, |
| FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()), |
| FileUtils.byteCountToDisplaySize(allocator.getLimit())); |
| logger.warn(message); |
| |
| // create a Noop memory calculator |
| HashJoinMemoryCalculator calc = getCalculatorImpl(); |
| calc.initialize(false); |
| buildCalc = calc.next(); |
| |
| buildCalc.initialize(true, |
| true, // TODO Fix after growing hash values bug fixed |
| buildBatch, |
| probeBatch, |
| buildJoinColumns, |
| leftUpstream == IterOutcome.NONE, // probeEmpty |
| allocator.getLimit(), |
| numPartitions, |
| RECORDS_PER_BATCH, |
| RECORDS_PER_BATCH, |
| maxBatchSize, |
| maxBatchSize, |
| batchMemoryManager.getOutputBatchSize(), |
| HashTable.DEFAULT_LOAD_FACTOR); |
| |
| disableSpilling(null); |
| } |
| |
| return buildCalc; |
| } |
| |
| /** |
| * Disable spilling - use only a single partition and set the memory limit to the max ( 10GB ) |
| * @param reason If not null - log this as warning, else check fallback setting to either warn or fail. |
| */ |
| private void disableSpilling(String reason) { |
| // Fail, or just issue a warning if a reason was given, or a fallback option is enabled |
| if ( reason == null ) { |
| boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val; |
| if (fallbackEnabled) { |
| logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" + |
| " to use unbounded memory"); |
| } else { |
| throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " + |
| "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " + |
| "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger); |
| } |
| } else { |
| logger.warn(reason); |
| } |
| |
| numPartitions = 1; // We are only using one partition |
| canSpill = false; // We cannot spill |
| allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory |
| } |
| |
| /** |
| * Execute the BUILD phase; first read incoming and split rows into partitions; |
| * may decide to spill some of the partitions |
| * |
| * @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null. |
| * @throws SchemaChangeException |
| */ |
| public IterOutcome executeBuildPhase() throws SchemaChangeException { |
| if (buildSideIsEmpty.booleanValue()) { |
| // empty right |
| return null; |
| } |
| |
| if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream |
| killAndDrainRightUpstream(); |
| return null; |
| } |
| |
| HashJoinMemoryCalculator.BuildSidePartitioning buildCalc; |
| |
| { |
| // Initializing build calculator |
| // Limit scope of these variables to this block |
| int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH; |
| boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue(); |
| HashJoinMemoryCalculator calc = getCalculatorImpl(); |
| |
| calc.initialize(doMemoryCalculation); |
| buildCalc = calc.next(); |
| |
| buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed |
| buildBatch, |
| probeBatch, |
| buildJoinColumns, |
| probeSideIsEmpty.booleanValue(), |
| allocator.getLimit(), |
| numPartitions, |
| RECORDS_PER_BATCH, |
| RECORDS_PER_BATCH, |
| maxBatchSize, |
| maxBatchSize, |
| batchMemoryManager.getOutputBatchSize(), |
| HashTable.DEFAULT_LOAD_FACTOR); |
| |
| if (spilledState.isFirstCycle() && doMemoryCalculation) { |
| // Do auto tuning |
| buildCalc = partitionNumTuning(maxBatchSize, buildCalc); |
| } |
| } |
| |
| if (spilledState.isFirstCycle()) { |
| // Do initial setup only on the first cycle |
| delayedSetup(); |
| } |
| |
| initializeBuild(); |
| |
| initializeRuntimeFilter(); |
| |
| // Make the calculator aware of our partitions |
| HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions); |
| buildCalc.setPartitionStatSet(partitionStatSet); |
| |
| boolean moreData = true; |
| while (moreData) { |
| switch (rightUpstream) { |
| case NONE: |
| case NOT_YET: |
| case STOP: |
| moreData = false; |
| continue; |
| |
| case OK_NEW_SCHEMA: |
| if (!buildSchema.equals(buildBatch.getSchema())) { |
| throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema()); |
| } |
| for (HashPartition partn : partitions) { partn.updateBatches(); } |
| // Fall through |
| case OK: |
| batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true); |
| int currentRecordCount = buildBatch.getRecordCount(); |
| //create runtime filter |
| if (spilledState.isFirstCycle() && enableRuntimeFilter) { |
| //create runtime filter and send out async |
| for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) { |
| int fieldId = bloomFilter2buildId.get(bloomFilter); |
| for (int ind = 0; ind < currentRecordCount; ind++) { |
| long hashCode = hash64.hash64Code(ind, 0, fieldId); |
| bloomFilter.insert(hashCode); |
| } |
| } |
| } |
| // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy) |
| if ( numPartitions == 1 ) { |
| partitions[0].appendBatch(buildBatch); |
| break; |
| } |
| |
| if (!spilledState.isFirstCycle()) { |
| read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); |
| } |
| |
| // For every record in the build batch, hash the key columns and keep the result |
| for (int ind = 0; ind < currentRecordCount; ind++) { |
| int hashCode = spilledState.isFirstCycle() ? partitions[0].getBuildHashCode(ind) |
| : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column |
| int currPart = hashCode & spilledState.getPartitionMask(); |
| hashCode >>>= spilledState.getBitsInMask(); |
| // semi-join skips join-key-duplicate rows |
| if ( semiJoin ) { |
| |
| } |
| // Append the new inner row to the appropriate partition; spill (that partition) if needed |
| partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); |
| } |
| |
| if ( read_right_HV_vector != null ) { |
| read_right_HV_vector.clear(); |
| read_right_HV_vector = null; |
| } |
| break; |
| default: |
| throw new IllegalStateException(rightUpstream.name()); |
| } |
| // Get the next incoming record batch |
| rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch); |
| } |
| |
| if (spilledState.isFirstCycle() && enableRuntimeFilter) { |
| if (bloomFilter2buildId.size() > 0) { |
| int hashJoinOpId = this.popConfig.getOperatorId(); |
| runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId); |
| } |
| } |
| |
| // Move the remaining current batches into their temp lists, or spill |
| // them if the partition is spilled. Add the spilled partitions into |
| // the spilled partitions list |
| if ( numPartitions > 1 ) { // a single partition needs no completion |
| for (HashPartition partn : partitions) { |
| partn.completeAnInnerBatch(false, partn.isSpilled()); |
| } |
| } |
| |
| prefetchFirstProbeBatch(); |
| |
| if (leftUpstream.isError()) { |
| // A termination condition was reached while prefetching the first build side data holding batch. |
| // We need to terminate. |
| return leftUpstream; |
| } |
| |
| HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next(); |
| postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty |
| |
| // Traverse all the in-memory partitions' incoming batches, and build their hash tables |
| |
| for (int index = 0; index < partitions.length; index++) { |
| HashPartition partn = partitions[index]; |
| |
| if (partn.isSpilled()) { |
| // Don't build hash tables for spilled partitions |
| continue; |
| } |
| |
| try { |
| if (postBuildCalc.shouldSpill()) { |
| // Spill this partition if we need to make room |
| partn.spillThisPartition(); |
| } else { |
| // Only build hash tables for partitions that are not spilled |
| partn.buildContainersHashTableAndHelper(); |
| } |
| } catch (OutOfMemoryException e) { |
| String message = "Failed building hash table on partition " + index + ":\n" |
| + makeDebugString() + "\n" |
| + postBuildCalc.makeDebugString(); |
| // Include debug info |
| throw new OutOfMemoryException(message, e); |
| } |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(postBuildCalc.makeDebugString()); |
| } |
| |
| for (HashPartition partn : partitions) { |
| if ( partn.isSpilled() ) { |
| HashJoinSpilledPartition sp = new HashJoinSpilledPartition(spilledState.getCycle(), |
| partn.getPartitionNum(), |
| originalPartition, |
| partn.getPartitionBatchesCount(), |
| partn.getSpillFile()); |
| |
| spilledState.addPartition(sp); |
| spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later |
| partn.closeWriter(); |
| |
| partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch()); |
| } |
| } |
| |
| return null; |
| } |
| |
| private void setupOutputContainerSchema() { |
| |
| if (buildSchema != null && ! semiJoin ) { |
| for (MaterializedField field : buildSchema) { |
| MajorType inputType = field.getType(); |
| MajorType outputType; |
| // If left or full outer join, then the output type must be nullable. However, map types are |
| // not nullable so we must exclude them from the check below (see DRILL-2197). |
| if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED |
| && inputType.getMinorType() != TypeProtos.MinorType.MAP) { |
| outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); |
| } else { |
| outputType = inputType; |
| } |
| |
| // make sure to project field with children for children to show up in the schema |
| MaterializedField projected = field.withType(outputType); |
| // Add the vector to our output container |
| container.addOrGet(projected); |
| } |
| } |
| |
| if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows) |
| for (VectorWrapper<?> vv : probeBatch) { |
| MajorType inputType = vv.getField().getType(); |
| MajorType outputType; |
| |
| // If right or full outer join then the output type should be optional. However, map types are |
| // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197). |
| if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED |
| && inputType.getMinorType() != TypeProtos.MinorType.MAP) { |
| outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); |
| } else { |
| outputType = inputType; |
| } |
| |
| ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType)); |
| if (v instanceof AbstractContainerVector) { |
| vv.getValueVector().makeTransferPair(v); |
| v.clear(); |
| } |
| } |
| } |
| |
| } |
| |
| // (After the inner side was read whole) - Has that inner partition spilled |
| public boolean isSpilledInner(int part) { |
| if ( spilledInners == null ) { return false; } // empty inner |
| return spilledInners[part] != null; |
| } |
| |
| /** |
| * The constructor |
| * |
| * @param popConfig |
| * @param context |
| * @param left -- probe/outer side incoming input |
| * @param right -- build/iner side incoming input |
| * @throws OutOfMemoryException |
| */ |
| public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, |
| RecordBatch left, /*Probe side record batch*/ |
| RecordBatch right /*Build side record batch*/ |
| ) throws OutOfMemoryException { |
| super(popConfig, context, true, left, right); |
| this.buildBatch = right; |
| this.probeBatch = left; |
| joinType = popConfig.getJoinType(); |
| semiJoin = popConfig.isSemiJoin(); |
| joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL; |
| joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL; |
| conditions = popConfig.getConditions(); |
| this.popConfig = popConfig; |
| this.isRowKeyJoin = popConfig.isRowKeyJoin(); |
| this.joinControl = new JoinControl(popConfig.getJoinControl()); |
| |
| rightExpr = new ArrayList<>(conditions.size()); |
| buildJoinColumns = Sets.newHashSet(); |
| List<SchemaPath> rightConditionPaths = new ArrayList<>(); |
| for (int i = 0; i < conditions.size(); i++) { |
| SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight(); |
| rightConditionPaths.add(rightPath); |
| } |
| |
| for (int i = 0; i < conditions.size(); i++) { |
| SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight(); |
| PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment(); |
| buildJoinColumns.add(nameSegment.getPath()); |
| String refName = "build_side_" + i; |
| rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName))); |
| } |
| |
| this.allocator = oContext.getAllocator(); |
| |
| numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); |
| if ( numPartitions == 1 ) { // |
| disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1"); |
| } |
| |
| numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 |
| |
| long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR); |
| |
| if (memLimit != 0) { |
| allocator.setLimit(memLimit); |
| } |
| |
| RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR); |
| maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR); |
| |
| logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit())); |
| spillSet = new SpillSet(context, popConfig); |
| |
| // Create empty partitions (in the ctor - covers the case where right side is empty) |
| partitions = new HashPartition[0]; |
| |
| // get the output batch size from config. |
| int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); |
| double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR); |
| int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor))); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d", |
| configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); |
| |
| batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>()); |
| |
| |
| RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(), |
| configuredBatchSize); |
| |
| enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null; |
| } |
| |
| /** |
| * This method is called when {@link HashJoinBatch} closes. It cleans up left over spilled files that are in the spill queue, and closes the |
| * spillSet. |
| */ |
| private void cleanup() { |
| if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean |
| if ( spillSet.getWriteBytes() > 0 ) { |
| stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled |
| (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); |
| } |
| // clean (and deallocate) each partition, and delete its spill file |
| for (HashPartition partn : partitions) { |
| partn.close(); |
| } |
| |
| // delete any spill file left in unread spilled partitions |
| while (!spilledState.isEmpty()) { |
| HashJoinSpilledPartition sp = spilledState.getNextSpilledPartition(); |
| try { |
| spillSet.delete(sp.innerSpillFile); |
| } catch(IOException e) { |
| logger.warn("Cleanup: Failed to delete spill file {}",sp.innerSpillFile); |
| } |
| try { // outer file is added later; may be null if cleaning prematurely |
| if ( sp.outerSpillFile != null ) { spillSet.delete(sp.outerSpillFile); } |
| } catch(IOException e) { |
| logger.warn("Cleanup: Failed to delete spill file {}",sp.outerSpillFile); |
| } |
| } |
| // Delete the currently handled (if any) spilled files |
| spillSet.close(); // delete the spill directory(ies) |
| } |
| |
| /** |
| * This creates a string that summarizes the memory usage of the operator. |
| * @return A memory dump string. |
| */ |
| public String makeDebugString() { |
| StringBuilder sb = new StringBuilder(); |
| |
| for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) { |
| String partitionPrefix = "Partition " + partitionIndex + ": "; |
| HashPartition hashPartition = partitions[partitionIndex]; |
| sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n"); |
| } |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Updates the {@link HashTable} and spilling stats after the original build side is processed. |
| * |
| * Note: this does not update all the stats. The cycleNum is updated dynamically in {@link #innerNext()} and the total bytes |
| * written is updated at close time in {@link #cleanup()}. |
| */ |
| private void updateStats() { |
| if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty |
| if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files |
| |
| HashTableStats htStats = new HashTableStats(); |
| long numSpilled = 0; |
| HashTableStats newStats = new HashTableStats(); |
| // sum the stats from all the partitions |
| for ( HashPartition partn : partitions ) { |
| if ( partn.isSpilled() ) { numSpilled++; } |
| partn.getStats(newStats); |
| htStats.addStats(newStats); |
| } |
| |
| stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); |
| stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); |
| stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); |
| stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime); |
| stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions); |
| stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill |
| stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); |
| } |
| |
| /** |
| * Get the hash table iterator that is created for the build side of the hash join if |
| * this hash join was instantiated as a row-key join. |
| * @return hash table iterator or null if this hash join was not a row-key join or if it |
| * was a row-key join but the build has not yet completed. |
| */ |
| @Override |
| public Pair<ValueVector, Integer> nextRowKeyBatch() { |
| if (buildComplete) { |
| // partition 0 because Row Key Join has only a single partition - no spilling |
| Pair<VectorContainer, Integer> pp = partitions[0].nextBatch(); |
| if (pp != null) { |
| VectorWrapper<?> vw = Iterables.get(pp.getLeft(), 0); |
| ValueVector vv = vw.getValueVector(); |
| return Pair.of(vv, pp.getRight()); |
| } |
| } else if(partitions == null && firstOutputBatch) { //if there is data coming to right(build) side in build Schema stage, use it. |
| firstOutputBatch = false; |
| if ( right.getRecordCount() > 0 ) { |
| VectorWrapper<?> vw = Iterables.get(right, 0); |
| ValueVector vv = vw.getValueVector(); |
| return Pair.of(vv, right.getRecordCount()-1); |
| } |
| } |
| return null; |
| } |
| |
| @Override // implement RowKeyJoin interface |
| public boolean hasRowKeyBatch() { |
| return buildComplete; |
| } |
| |
| @Override // implement RowKeyJoin interface |
| public BatchState getBatchState() { |
| return state; |
| } |
| |
| @Override // implement RowKeyJoin interface |
| public void setBatchState(BatchState newState) { |
| state = newState; |
| } |
| |
| @Override |
| public void killIncoming(boolean sendUpstream) { |
| wasKilled = true; |
| probeBatch.kill(sendUpstream); |
| buildBatch.kill(sendUpstream); |
| } |
| |
| public void updateMetrics() { |
| stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX)); |
| |
| stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); |
| stats.setLongStat(HashJoinBatch.Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX)); |
| |
| stats.setLongStat(HashJoinBatch.Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches()); |
| stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize()); |
| stats.setLongStat(HashJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth()); |
| stats.setLongStat(HashJoinBatch.Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords()); |
| } |
| |
| @Override |
| public void setRowKeyJoinState(RowKeyJoin.RowKeyJoinState newState) { |
| this.rkJoinState = newState; |
| } |
| |
| @Override |
| public RowKeyJoin.RowKeyJoinState getRowKeyJoinState() { |
| return rkJoinState; |
| } |
| |
| @Override |
| public void close() { |
| if (!spilledState.isFirstCycle()) { // spilling happened |
| // In case closing due to cancellation, BaseRootExec.close() does not close the open |
| // SpilledRecordBatch "scanners" as it only knows about the original left/right ops. |
| killIncoming(false); |
| } |
| |
| updateMetrics(); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", |
| batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), |
| batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", |
| batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), |
| batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", |
| batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), |
| batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); |
| |
| cleanup(); |
| super.close(); |
| } |
| |
| public HashJoinProbe setupHashJoinProbe() { |
| // No real code generation !! |
| return new HashJoinProbeTemplate(); |
| } |
| |
| @Override |
| public void dump() { |
| logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," + |
| " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream, |
| joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema); |
| } |
| } |