blob: daf23523da93553c6958a3c9ef056f9ff9b8c9d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
/*
* Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
* class contains the main nested loop join logic.
*/
public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Current left input batch being processed
private RecordBatch left;
// Record count of the left batch currently being processed
private int leftRecordCount;
// List of record counts per batch in the hyper container
private List<Integer> rightCounts;
// Output batch
private NestedLoopJoinBatch outgoing;
// Iteration status tracker
private final IterationStatusTracker tracker = new IterationStatusTracker();
private int targetOutputRecords;
/**
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
*
* @param context Fragment context
* @param left Current left input batch being processed
* @param rightContainer Hyper container
* @param rightCounts Counts for each right container
* @param outgoing Output batch
*/
@Override
public void setupNestedLoopJoin(FragmentContext context,
RecordBatch left,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing) {
this.left = left;
this.leftRecordCount = left.getRecordCount();
this.rightCounts = rightCounts;
this.outgoing = outgoing;
doSetup(context, rightContainer, left, outgoing);
}
@Override
public void setTargetOutputCount(int targetOutputRecords) {
this.targetOutputRecords = targetOutputRecords;
}
/**
* Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
* controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
*
* @param joinType join type (INNER ot LEFT)
* @return the number of records produced in the output batch
*/
@Override
public int outputRecords(JoinRelType joinType) {
int outputIndex = 0;
while (leftRecordCount != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);
if (outputIndex >= targetOutputRecords) {
break;
}
// reset state and get next left batch
resetAndGetNextLeft(outputIndex);
}
return outputIndex;
}
/**
* This method is the core of the nested loop join.For each left batch record looks for matching record
* from the list of right batches. Match is checked by calling {@link #doEval(int, int, int)} method.
* If matching record is found both left and right records are written into output batch,
* otherwise if join type is LEFT, than only left record is written, right batch record values will be null.
*
* @param joinType join type (INNER or LEFT)
* @param outputIndex index to start emitting records at
* @return final outputIndex after producing records in the output batch
*/
private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
// copy index and match counters as local variables to speed up processing
int nextRightBatchToProcess = tracker.getNextRightBatchToProcess();
int nextRightRecordToProcess = tracker.getNextRightRecordToProcess();
int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess();
boolean rightRecordMatched = tracker.isRightRecordMatched();
outer:
// for every record in the left batch
for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) {
// for every batch on the right
for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) {
int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
// Since right container is a hyper container, in doEval generated code it expects the
// batch index in the 2 MSBytes of the index variable. See DRILL-6128 for details
final int currentRightBatchIndex = nextRightBatchToProcess << 16;
// for every record in right batch
for (; nextRightRecordToProcess < rightRecordCount; nextRightRecordToProcess++) {
if (doEval(nextLeftRecordToProcess, currentRightBatchIndex, nextRightRecordToProcess)) {
// project records from the left and right batches
emitLeft(nextLeftRecordToProcess, outputIndex);
emitRight(nextRightBatchToProcess, nextRightRecordToProcess, outputIndex);
outputIndex++;
rightRecordMatched = true;
if (outputIndex >= targetOutputRecords) {
nextRightRecordToProcess++;
// no more space left in the batch, stop processing
break outer;
}
}
}
nextRightRecordToProcess = 0;
}
nextRightBatchToProcess = 0;
if (joinType == JoinRelType.LEFT && !rightRecordMatched) {
// project records from the left side only, records from right will be null
emitLeft(nextLeftRecordToProcess, outputIndex);
outputIndex++;
if (outputIndex >= targetOutputRecords) {
nextLeftRecordToProcess++;
// no more space left in the batch, stop processing
break;
}
} else {
// reset match indicator if matching record was found
rightRecordMatched = false;
}
}
// update iteration status tracker with actual index and match counters
tracker.update(nextRightBatchToProcess, nextRightRecordToProcess, nextLeftRecordToProcess, rightRecordMatched);
return outputIndex;
}
/**
* Utility method to clear the memory in the left input batch once we have completed processing it.
* Resets some internal state which indicates the next records to process in the left and right batches,
* also fetches the next left input batch.
*/
private void resetAndGetNextLeft(int outputIndex) {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
tracker.reset();
RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
switch (leftOutcome) {
case OK_NEW_SCHEMA:
throw new DrillRuntimeException("Nested loop join does not handle schema change. Schema change" +
" found on the left side of NLJ.");
case NONE:
case NOT_YET:
leftRecordCount = 0;
break;
case OK:
outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());
leftRecordCount = left.getRecordCount();
break;
default:
}
}
@Override
public abstract void doSetup(@Named("context") FragmentContext context,
@Named("rightContainer") VectorContainer rightContainer,
@Named("leftBatch") RecordBatch leftBatch,
@Named("outgoing") RecordBatch outgoing);
@Override
public abstract void emitRight(@Named("batchIndex") int batchIndex,
@Named("recordIndexWithinBatch") int recordIndexWithinBatch,
@Named("outIndex") int outIndex);
@Override
public abstract void emitLeft(@Named("leftIndex") int leftIndex,
@Named("outIndex") int outIndex);
protected abstract boolean doEval(@Named("leftIndex") int leftIndex,
@Named("rightBatchIndex") int batchIndex,
@Named("rightRecordIndexWithinBatch") int recordIndexWithinBatch);
/**
* Helper class to track position of left and record batches during iteration
* and match status of record from the right batch.
*/
private static class IterationStatusTracker {
// Next right batch to process
private int nextRightBatchToProcess;
// Next record in the current right batch to process
private int nextRightRecordToProcess;
// Next record in the left batch to process
private int nextLeftRecordToProcess;
// Flag to indicate if record from the left found matching record from the right, applicable during left join
private boolean rightRecordMatched;
int getNextRightBatchToProcess() {
return nextRightBatchToProcess;
}
boolean isRightRecordMatched() {
return rightRecordMatched;
}
int getNextLeftRecordToProcess() {
return nextLeftRecordToProcess;
}
int getNextRightRecordToProcess() {
return nextRightRecordToProcess;
}
void update(int nextRightBatchToProcess,
int nextRightRecordToProcess,
int nextLeftRecordToProcess,
boolean rightRecordMatchFound) {
this.nextRightBatchToProcess = nextRightBatchToProcess;
this.nextRightRecordToProcess = nextRightRecordToProcess;
this.nextLeftRecordToProcess = nextLeftRecordToProcess;
this.rightRecordMatched = rightRecordMatchFound;
}
void reset() {
nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0;
rightRecordMatched = false;
}
}
}