| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.impl.join; |
| |
| import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; |
| import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; |
| import org.apache.calcite.rel.core.JoinRelType; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.common.types.TypeProtos; |
| import org.apache.drill.common.types.Types; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.exec.exception.OutOfMemoryException; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.ops.FragmentContext; |
| import org.apache.drill.exec.physical.base.LateralContract; |
| import org.apache.drill.exec.physical.base.PhysicalOperator; |
| import org.apache.drill.exec.physical.config.LateralJoinPOP; |
| import org.apache.drill.exec.record.AbstractBinaryRecordBatch; |
| import org.apache.drill.exec.record.BatchSchema; |
| import org.apache.drill.exec.record.JoinBatchMemoryManager; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.RecordBatchSizer; |
| import org.apache.drill.exec.record.SchemaBuilder; |
| import org.apache.drill.exec.record.VectorAccessibleUtilities; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.util.record.RecordBatchStats; |
| import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; |
| import org.apache.drill.exec.vector.IntVector; |
| import org.apache.drill.exec.vector.ValueVector; |
| |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; |
| import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; |
| |
| /** |
| * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST |
| * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract} |
| */ |
| public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract { |
| private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class); |
| |
| // Maximum number records in the outgoing batch |
| private int maxOutputRowCount; |
| |
| // Schema on the left side |
| private BatchSchema leftSchema; |
| |
| // Schema on the right side |
| private BatchSchema rightSchema; |
| |
| // Index in output batch to populate next row |
| private int outputIndex; |
| |
| // Current index of record in left incoming which is being processed |
| private int leftJoinIndex = -1; |
| |
| // Current index of record in right incoming which is being processed |
| private int rightJoinIndex = -1; |
| |
| // flag to keep track if current left batch needs to be processed in future next call |
| private boolean processLeftBatchInFuture; |
| |
| // Keep track if any matching right record was found for current left index record |
| private boolean matchedRecordFound; |
| |
| // Used only for testing |
| private boolean useMemoryManager = true; |
| |
| // Flag to keep track of new left batch so that update on memory manager is called only once per left batch |
| private boolean isNewLeftBatch = false; |
| |
| private final HashSet<String> excludedFieldNames = new HashSet<>(); |
| |
| private final String implicitColumn; |
| |
| private boolean hasRemainderForLeftJoin = false; |
| |
| private ValueVector implicitVector; |
| |
| // Map to cache reference of input and corresponding output vectors for left and right batches |
| private final Map<ValueVector, ValueVector> leftInputOutputVector = new HashMap<>(); |
| |
| private final Map<ValueVector, ValueVector> rightInputOutputVector = new HashMap<>(); |
| |
| /* **************************************************************************************************************** |
| * Public Methods |
| * ****************************************************************************************************************/ |
| public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context, |
| RecordBatch left, RecordBatch right) throws OutOfMemoryException { |
| super(popConfig, context, left, right); |
| Preconditions.checkNotNull(left); |
| Preconditions.checkNotNull(right); |
| final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); |
| RecordBatchStats.printConfiguredBatchSize(getRecordBatchStatsContext(), configOutputBatchSize); |
| implicitColumn = popConfig.getImplicitRIDColumn(); |
| |
| populateExcludedField(popConfig); |
| batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames); |
| |
| // Initially it's set to default value of 64K and later for each new output row it will be set to the computed |
| // row count |
| maxOutputRowCount = batchMemoryManager.getOutputRowCount(); |
| } |
| |
| /** |
| * Handles cases where previous output batch got full after processing all the batches from right side for a left |
| * side batch. But there are still few unprocessed rows in left batch which cannot be ignored because JoinType is |
| * LeftJoin. |
| * @return - true if all the rows in left batch is produced in output container |
| * false if there is still some rows pending in left incoming container |
| */ |
| private boolean handleRemainingLeftRows() { |
| Preconditions.checkState(popConfig.getJoinType() == JoinRelType.LEFT, |
| "Unexpected leftover rows from previous left batch when join type is not left join"); |
| |
| while(leftJoinIndex < left.getRecordCount() && !isOutgoingBatchFull()) { |
| emitLeft(leftJoinIndex, outputIndex, 1); |
| ++outputIndex; |
| ++leftJoinIndex; |
| } |
| |
| // Check if there is still pending left rows |
| return leftJoinIndex >= left.getRecordCount(); |
| } |
| |
| /** |
| * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is |
| * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty |
| * left incoming batch is received then it call's next on right branch to get an incoming and finally produces |
| * output. |
| * @return IterOutcome state of the lateral join batch |
| */ |
| @Override |
| public IterOutcome innerNext() { |
| |
| if (hasRemainderForLeftJoin) { // if set that means there is spill over from previous left batch and no |
| // corresponding right rows and it is left join scenario |
| allocateVectors(); |
| |
| boolean hasMoreRows = !handleRemainingLeftRows(); |
| if (leftUpstream == EMIT || hasMoreRows) { |
| logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " + |
| "consumed now in output batch"); |
| hasRemainderForLeftJoin = hasMoreRows; |
| finalizeOutputContainer(); |
| return (leftUpstream == EMIT) ? EMIT : OK; |
| } else { |
| // release memory for previous left batch |
| leftJoinIndex = -1; |
| VectorAccessibleUtilities.clear(left); |
| } |
| } |
| |
| // We don't do anything special on FIRST state. Process left batch first and then right batch if need be |
| IterOutcome childOutcome = processLeftBatch(); |
| logger.debug("Received left batch with outcome {}", childOutcome); |
| |
| if (processLeftBatchInFuture && hasRemainderForLeftJoin) { |
| finalizeOutputContainer(); |
| hasRemainderForLeftJoin = false; |
| return OK; |
| } |
| |
| // reset this state after calling processLeftBatch above. |
| processLeftBatchInFuture = false; |
| hasRemainderForLeftJoin = false; |
| |
| // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned |
| // from left side is terminal state then just return the IterOutcome and don't call next() on right branch |
| if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) { |
| container.setRecordCount(0); |
| return childOutcome; |
| } |
| |
| // Left side has some records in the batch so let's process right batch |
| childOutcome = processRightBatch(); |
| logger.debug("Received right batch with outcome {}", childOutcome); |
| |
| // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with |
| // OK_NEW_SCHEMA will be handled in subsequent next call |
| if (childOutcome == OK_NEW_SCHEMA) { |
| leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; |
| rightUpstream = OK; |
| return childOutcome; |
| } |
| |
| if (isTerminalOutcome(childOutcome)) { |
| return childOutcome; |
| } |
| |
| // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in |
| // output container based on new left schema and old right schema. If schema change failed then return STOP |
| // downstream |
| if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) { |
| return STOP; |
| } |
| |
| // Setup the references of left, right and outgoing container in generated operator |
| state = BatchState.NOT_FIRST; |
| |
| // Update the memory manager only if its a brand new incoming i.e. leftJoinIndex and rightJoinIndex is 0 |
| // Otherwise there will be a case where while filling last output batch, some records from previous left or |
| // right batch are still left to be sent in output for which we will count this batch twice. The actual checks |
| // are done in updateMemoryManager |
| updateMemoryManager(LEFT_INDEX); |
| |
| // We have to call update on memory manager for empty batches (rightJoinIndex = -1) as well since other wise while |
| // allocating memory for vectors below it can fail. Since in that case colSize will not have any info on right side |
| // vectors and throws NPE. The actual checks are done in updateMemoryManager |
| updateMemoryManager(RIGHT_INDEX); |
| |
| if (outputIndex > 0) { |
| // this means batch is already allocated but because of new incoming the width and output row count might have |
| // changed. So update the maxOutputRowCount with new value |
| if (useMemoryManager) { |
| setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); |
| } |
| } |
| // if output is not allocated then maxRowCount will be set correctly below |
| // allocate space for the outgoing batch |
| allocateVectors(); |
| |
| return produceOutputBatch(); |
| } |
| |
| @Override |
| public void close() { |
| updateBatchMemoryManagerStats(); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, " + |
| "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), |
| batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), |
| batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), |
| batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, " + |
| "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), |
| batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), |
| batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), |
| batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); |
| |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, " + |
| "record count : %d", batchMemoryManager.getNumOutgoingBatches(), |
| batchMemoryManager.getAvgOutputBatchSize(), |
| batchMemoryManager.getAvgOutputRowWidth(), |
| batchMemoryManager.getTotalOutputRecords()); |
| |
| super.close(); |
| } |
| |
| @Override |
| public int getRecordCount() { |
| return container.getRecordCount(); |
| } |
| |
| /** |
| * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral |
| * to process the records at leftJoinIndex. |
| * |
| * @return - RecordBatch received as left side incoming |
| */ |
| @Override |
| public RecordBatch getIncoming() { |
| Preconditions.checkState (left != null, |
| "Retuning null left batch. It's unexpected since right side will only be called iff " + |
| "there is any valid left batch"); |
| return left; |
| } |
| |
| /** |
| * Returns the current row index which the calling operator should process in current incoming left record batch. |
| * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never |
| * call next on right side with empty left batch |
| * |
| * @return - int - index of row to process. |
| */ |
| @Override |
| public int getRecordIndex() { |
| Preconditions.checkState (leftJoinIndex < left.getRecordCount(), |
| "Left join index: %s is out of bounds: %s", leftJoinIndex, left.getRecordCount()); |
| return leftJoinIndex; |
| } |
| |
| /** |
| * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch |
| */ |
| @Override |
| public IterOutcome getLeftOutcome() { |
| return leftUpstream; |
| } |
| |
| /* **************************************************************************************************************** |
| * Protected Methods |
| * ****************************************************************************************************************/ |
| |
| /** |
| * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a |
| * failure outcome then we don't even call next on right branch, since there is no left incoming. |
| * @return true if both the left/right batch was received without failure outcome. |
| * false if either of batch is received with failure outcome. |
| */ |
| @Override |
| protected boolean prefetchFirstBatchFromBothSides() { |
| // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch |
| leftUpstream = next(0, left); |
| |
| boolean validBatch = setBatchState(leftUpstream); |
| |
| if (validBatch) { |
| isNewLeftBatch = true; |
| rightUpstream = next(1, right); |
| validBatch = setBatchState(rightUpstream); |
| } |
| |
| // EMIT outcome is not expected as part of first batch from either side |
| if (leftUpstream == EMIT || rightUpstream == EMIT) { |
| state = BatchState.STOP; |
| throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + |
| "buildSchema phase"); |
| } |
| return validBatch; |
| } |
| |
| /** |
| * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in |
| * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side |
| * which UNNEST should abide by. |
| * |
| * @throws SchemaChangeException if batch schema was changed during execution |
| */ |
| @Override |
| protected void buildSchema() throws SchemaChangeException { |
| // Prefetch a RecordBatch from both left and right branch |
| if (!prefetchFirstBatchFromBothSides()) { |
| return; |
| } |
| Preconditions.checkState(right.getRecordCount() == 0, |
| "Unexpected non-empty first right batch received"); |
| |
| // Setup output container schema based on known left and right schema |
| setupNewSchema(); |
| |
| // Release the vectors received from right side |
| VectorAccessibleUtilities.clear(right); |
| |
| // Set join index as invalid (-1) if the left side is empty, else set it to 0 |
| leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0; |
| rightJoinIndex = -1; |
| |
| // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly |
| // by buildSchema caller and we should treat the batch as received with OK outcome. |
| leftUpstream = OK; |
| rightUpstream = OK; |
| } |
| |
| @Override |
| protected void killIncoming(boolean sendUpstream) { |
| this.left.kill(sendUpstream); |
| // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming |
| // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream |
| // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to |
| // UNNEST. |
| leftUpstream = STOP; |
| this.right.kill(sendUpstream); |
| } |
| |
| /* **************************************************************************************************************** |
| * Private Methods |
| * ****************************************************************************************************************/ |
| |
| private boolean handleSchemaChange() { |
| try { |
| stats.startSetup(); |
| logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema()); |
| setupNewSchema(); |
| return true; |
| } catch (SchemaChangeException ex) { |
| logger.error("Failed to handle schema change hence killing the query"); |
| context.getExecutorState().fail(ex); |
| left.kill(true); // Can have exchange receivers on left so called with true |
| right.kill(false); // Won't have exchange receivers on right side |
| return false; |
| } finally { |
| stats.stopSetup(); |
| } |
| } |
| |
| private boolean isTerminalOutcome(IterOutcome outcome) { |
| return (outcome == STOP || outcome == NONE); |
| } |
| |
| /** |
| * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is |
| * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also |
| * when we populate the outgoing container then this method is called to get next left batch if current one is |
| * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of |
| * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome. |
| * @return IterOutcome after processing current left batch |
| */ |
| private IterOutcome processLeftBatch() { |
| |
| boolean needLeftBatch = leftJoinIndex == -1; |
| |
| // If left batch is empty |
| while (needLeftBatch) { |
| |
| if (!processLeftBatchInFuture) { |
| leftUpstream = next(LEFT_INDEX, left); |
| isNewLeftBatch = true; |
| } |
| |
| final boolean emptyLeftBatch = left.getRecordCount() <=0; |
| logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch); |
| |
| switch (leftUpstream) { |
| case OK_NEW_SCHEMA: |
| // This OK_NEW_SCHEMA is received post build schema phase and from left side |
| if (outputIndex > 0) { // can only reach here from produceOutputBatch |
| // This means there is already some records from previous join inside left batch |
| // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call |
| processLeftBatchInFuture = true; |
| return OK_NEW_SCHEMA; |
| } |
| |
| // If left batch is empty with actual schema change then just rebuild the output container and send empty |
| // batch downstream |
| if (emptyLeftBatch) { |
| if (handleSchemaChange()) { |
| leftJoinIndex = -1; |
| return OK_NEW_SCHEMA; |
| } else { |
| return STOP; |
| } |
| } // else - setup the new schema information after getting it from right side too. |
| case OK: |
| // With OK outcome we will keep calling next until we get a batch with >0 records |
| if (emptyLeftBatch) { |
| leftJoinIndex = -1; |
| continue; |
| } else { |
| leftJoinIndex = 0; |
| } |
| break; |
| case EMIT: |
| // don't call next on right batch |
| if (emptyLeftBatch) { |
| leftJoinIndex = -1; |
| return EMIT; |
| } else { |
| leftJoinIndex = 0; |
| } |
| break; |
| case NONE: |
| case STOP: |
| // Not using =0 since if outgoing container is empty then no point returning anything |
| if (outputIndex > 0) { // can only reach here from produceOutputBatch |
| processLeftBatchInFuture = true; |
| } |
| return leftUpstream; |
| case NOT_YET: |
| try { |
| Thread.sleep(5); |
| } catch (InterruptedException ex) { |
| logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + |
| "received NOT_YET"); |
| } |
| break; |
| } |
| needLeftBatch = leftJoinIndex == -1; |
| } |
| return leftUpstream; |
| } |
| |
| /** |
| * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is |
| * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if |
| * left batch has some records in it. Also when we populate the outgoing container then this method is called to |
| * get next right batch if current one is fully processed. |
| * @return IterOutcome after processing current left batch |
| */ |
| private IterOutcome processRightBatch() { |
| // Check if we still have records left to process in left incoming from new batch or previously half processed |
| // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new |
| // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set |
| // to -1. |
| // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch. |
| // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex |
| // will be a valid index. When all records are consumed it will be set to -1. |
| boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1); |
| while (needNewRightBatch) { |
| rightUpstream = next(RIGHT_INDEX, right); |
| switch (rightUpstream) { |
| case OK_NEW_SCHEMA: |
| |
| // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA, |
| // otherwise it would have been marked for processInFuture and output will be returned. This means for |
| // current non processed left or new left non-empty batch there is unexpected right batch schema change |
| if (outputIndex > 0) { |
| throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " + |
| "current left batch or a new non-empty left batch with no schema change"); |
| } |
| // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a |
| // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through |
| // |
| // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the |
| // new schema downstream with empty batch and later with subsequent next() call the join output will be |
| // produced |
| if (handleSchemaChange()) { |
| container.setRecordCount(0); |
| rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; |
| return OK_NEW_SCHEMA; |
| } else { |
| return STOP; |
| } |
| case OK: |
| case EMIT: |
| // Even if there are no records we should not call next() again because in case of LEFT join empty batch is |
| // of importance too |
| rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1; |
| needNewRightBatch = false; |
| break; |
| case NONE: |
| case STOP: |
| needNewRightBatch = false; |
| break; |
| case NOT_YET: |
| try { |
| Thread.sleep(10); |
| } catch (InterruptedException ex) { |
| logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " + |
| "received NOT_YET"); |
| } |
| break; |
| } |
| } |
| return rightUpstream; |
| } |
| |
| /** |
| * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the |
| * records in the either or both the batches are consumed then it get's next batch from that branch depending upon |
| * if output batch still has some space left. If output batch is full then the output is finalized to be sent |
| * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the |
| * output using that. |
| * |
| * @return - IterOutcome to be send along with output batch to downstream operator |
| */ |
| private IterOutcome produceOutputBatch() { |
| |
| boolean isLeftProcessed = false; |
| |
| // Try to fully pack the outgoing container |
| while (!isOutgoingBatchFull()) { |
| // perform the cross join between records in left and right batch and populate the output container |
| crossJoinAndOutputRecords(); |
| |
| // rightJoinIndex should move by number of records in output batch for current right batch only. For cases when |
| // right batch is fully consumed rightJoinIndex will be equal to record count. For cases when only part of it is |
| // consumed in current output batch rightJoinIndex will point to next row to be consumed |
| final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount(); |
| |
| // Check if above join to produce output resulted in fully consuming right side batch |
| if (isRightProcessed) { |
| // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK |
| VectorAccessibleUtilities.clear(right); |
| rightJoinIndex = -1; |
| } |
| |
| // Check if all rows in right batch is processed and there was a match for last rowId and this is last |
| // right batch for this left batch, then increment the leftJoinIndex. If this is not the last right batch we |
| // cannot increase the leftJoinIndex even though a match is found because next right batch can contain more |
| // records for the same implicit rowId |
| if (isRightProcessed && rightUpstream == EMIT && matchedRecordFound) { |
| ++leftJoinIndex; |
| matchedRecordFound = false; |
| } |
| |
| // left is only declared as processed if this is last right batch for current left batch and we have processed |
| // all the rows in it. |
| isLeftProcessed = (rightUpstream == EMIT) && leftJoinIndex >= left.getRecordCount(); |
| |
| // Even though if left batch is not fully processed but we have received EMIT outcome from right side. |
| // In this case if left batch has some unprocessed rows and it's left join emit left side for these rows. |
| // If it's inner join then just set treat left batch as processed. |
| if (!isLeftProcessed && rightUpstream == EMIT && isRightProcessed) { |
| if (popConfig.getJoinType() == JoinRelType.LEFT) { |
| // If outgoing batch got full that means we still have some leftJoinIndex to output but right side is done |
| // producing the batches. So mark hasRemainderForLeftJoin=true and we will take care of it in future next call. |
| isLeftProcessed = handleRemainingLeftRows(); |
| hasRemainderForLeftJoin = !isLeftProcessed; |
| } else { |
| // not left join hence ignore rows pending in left batch since right side is done producing the output |
| isLeftProcessed = true; |
| } |
| } |
| |
| if (isLeftProcessed) { |
| leftJoinIndex = -1; |
| VectorAccessibleUtilities.clear(left); |
| matchedRecordFound = false; |
| } |
| |
| // Check if output batch still has some space |
| if (!isOutgoingBatchFull()) { |
| // Check if left side still has records or not |
| if (isLeftProcessed) { |
| // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before |
| // getting next batch |
| if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) { |
| break; |
| } else { |
| logger.debug("Output batch still has some space left, getting new batches from left and right. OutIndex: {}", |
| outputIndex); |
| // Get both left batch and the right batch and make sure indexes are properly set |
| leftUpstream = processLeftBatch(); |
| |
| logger.debug("Received left batch with outcome {}", leftUpstream); |
| |
| // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome |
| if (processLeftBatchInFuture) { |
| logger.debug("Received left batch such that we have to return the current outgoing batch and process " + |
| "the new batch in subsequent next call"); |
| // We should return the current output batch with OK outcome and don't reset the leftUpstream |
| finalizeOutputContainer(); |
| return OK; |
| } |
| |
| // If left batch received a terminal outcome then don't call right batch |
| if (isTerminalOutcome(leftUpstream)) { |
| finalizeOutputContainer(); |
| return leftUpstream; |
| } |
| |
| // If we have received the left batch with EMIT outcome and is empty then we should return previous output |
| // batch with EMIT outcome |
| if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) { |
| isLeftProcessed = true; |
| break; |
| } |
| |
| // Update the batch memory manager to use new left incoming batch |
| updateMemoryManager(LEFT_INDEX); |
| } |
| } |
| |
| // If we are here it means one of the below: |
| // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space |
| // left in outgoing batch so let's get next right batch. |
| // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing |
| // batch. Now we have got new left batch with OK outcome. Let's get next right batch |
| // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is |
| // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with |
| // OK_NEW_SCHEMA. |
| rightUpstream = processRightBatch(); |
| |
| logger.debug("Received right batch with outcome {}", rightUpstream); |
| |
| if (rightUpstream == OK_NEW_SCHEMA) { |
| leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream; |
| rightUpstream = OK; |
| finalizeOutputContainer(); |
| return OK_NEW_SCHEMA; |
| } |
| |
| if (isTerminalOutcome(rightUpstream)) { |
| finalizeOutputContainer(); |
| return rightUpstream; |
| } |
| |
| // Update the batch memory manager to use new right incoming batch |
| updateMemoryManager(RIGHT_INDEX); |
| |
| // If previous left batch is fully processed and it didn't produced any output rows and later we got a new |
| // non-empty left batch with OK_NEW_SCHEMA with schema change only on left side vectors, then setup schema |
| // in output container based on new left schema and old right schema. If schema change failed then return STOP |
| // downstream |
| if (leftUpstream == OK_NEW_SCHEMA && outputIndex == 0) { |
| if (!handleSchemaChange()) { |
| return STOP; |
| } |
| // Since schema has change so we have new empty vectors in output container hence allocateMemory for them |
| allocateVectors(); |
| } else { |
| // means we are using already allocated output batch so row count may have changed based on new incoming |
| // batch hence update it |
| if (useMemoryManager) { |
| setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); |
| } |
| } |
| } |
| } // output batch is full to its max capacity |
| |
| finalizeOutputContainer(); |
| |
| // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely |
| // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along |
| // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send |
| // EMIT outcome. |
| if (leftUpstream == EMIT && isLeftProcessed) { |
| logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " + |
| "consumed in output batch"); |
| return EMIT; |
| } |
| |
| if (leftUpstream == OK_NEW_SCHEMA) { |
| // return output batch with OK_NEW_SCHEMA and reset the state to OK |
| logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" + |
| " of batches"); |
| leftUpstream = OK; |
| return OK_NEW_SCHEMA; |
| } |
| return OK; |
| } |
| /** |
| * Finalizes the current output container with the records produced so far before sending it downstream |
| */ |
| private void finalizeOutputContainer() { |
| VectorAccessibleUtilities.setValueCount(container, outputIndex); |
| |
| // Set the record count in the container |
| container.setRecordCount(outputIndex); |
| |
| batchMemoryManager.updateOutgoingStats(outputIndex); |
| |
| RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]", |
| outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation()); |
| |
| // Update the output index for next output batch to zero |
| outputIndex = 0; |
| } |
| |
| /** |
| * Check if the schema changed between provided newSchema and oldSchema. It relies on |
| * {@link BatchSchema#isEquivalent(BatchSchema)}. |
| * @param newSchema - New Schema information |
| * @param oldSchema - - New Schema information to compare with |
| * |
| * @return - true - if newSchema is not same as oldSchema |
| * - false - if newSchema is same as oldSchema |
| */ |
| private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) { |
| return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema); |
| } |
| |
| /** |
| * Validate if the input schema is not null and doesn't contain any Selection Vector. |
| * @param schema - input schema to verify |
| * @return - true: valid input schema |
| * false: invalid input schema |
| */ |
| private boolean verifyInputSchema(BatchSchema schema) { |
| |
| boolean isValid = true; |
| if (schema == null) { |
| logger.error("Null schema found for the incoming batch"); |
| isValid = false; |
| } else { |
| final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode(); |
| if (svMode != BatchSchema.SelectionVectorMode.NONE) { |
| logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}", |
| svMode.toString()); |
| isValid = false; |
| } |
| } |
| return isValid; |
| } |
| |
| private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema, boolean isRightBatch) { |
| if (excludedFieldNames.size() == 0) { |
| return originSchema; |
| } |
| |
| final SchemaBuilder newSchemaBuilder = |
| BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode()); |
| for (MaterializedField field : originSchema) { |
| // Don't ignore implicit column from left side in multilevel case where plan is generated such that lower lateral |
| // is on the right side of upper lateral. |
| if (!excludedFieldNames.contains(field.getName()) || |
| (field.getName().equals(implicitColumn) && !isRightBatch)) { |
| newSchemaBuilder.addField(field); |
| } |
| } |
| return newSchemaBuilder.build(); |
| } |
| |
| /** |
| * Helps to create the outgoing container vectors based on known left and right batch schemas |
| * @throws SchemaChangeException |
| */ |
| private void setupNewSchema() throws SchemaChangeException { |
| |
| logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}", |
| left.getSchema(), right.getSchema()); |
| |
| // Clear up the container |
| container.clear(); |
| leftInputOutputVector.clear(); |
| rightInputOutputVector.clear(); |
| |
| leftSchema = batchSchemaWithNoExcludedCols(left.getSchema(), false); |
| rightSchema = batchSchemaWithNoExcludedCols(right.getSchema(), true); |
| |
| if (!verifyInputSchema(leftSchema)) { |
| throw new SchemaChangeException("Invalid Schema found for left incoming batch"); |
| } |
| |
| if (!verifyInputSchema(rightSchema)) { |
| throw new SchemaChangeException("Invalid Schema found for right incoming batch"); |
| } |
| |
| // Setup LeftSchema in outgoing container and also include implicit column if present in left side for multilevel |
| // case if plan is generated such that lower lateral is right child of upper lateral |
| for (final VectorWrapper<?> vectorWrapper : left) { |
| final MaterializedField leftField = vectorWrapper.getField(); |
| if (excludedFieldNames.contains(leftField.getName()) && !(leftField.getName().equals(implicitColumn))) { |
| continue; |
| } |
| container.addOrGet(leftField); |
| } |
| |
| // Setup RightSchema in the outgoing container |
| for (final VectorWrapper<?> vectorWrapper : right) { |
| MaterializedField rightField = vectorWrapper.getField(); |
| if (excludedFieldNames.contains(rightField.getName())) { |
| if (rightField.getName().equals(implicitColumn)) { |
| implicitVector = vectorWrapper.getValueVector(); |
| } |
| continue; |
| } |
| |
| TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType(); |
| |
| // make right input schema optional if we have LEFT join |
| if (popConfig.getJoinType() == JoinRelType.LEFT && |
| rightFieldType.getMode() == TypeProtos.DataMode.REQUIRED) { |
| final TypeProtos.MajorType outputType = |
| Types.overrideMode(rightField.getType(), TypeProtos.DataMode.OPTIONAL); |
| |
| // Create the right field with optional type. This will also take care of creating |
| // children fields in case of ValueVectors of map type |
| rightField = rightField.withType(outputType); |
| } |
| container.addOrGet(rightField); |
| } |
| |
| Preconditions.checkState(implicitVector != null, |
| "Implicit column vector %s not found in right incoming batch", implicitColumn); |
| |
| // Let's build schema for the container |
| outputIndex = 0; |
| container.setRecordCount(outputIndex); |
| container.buildSchema(BatchSchema.SelectionVectorMode.NONE); |
| |
| // Setup left vectors |
| setupInputOutputVectors(left, 0, leftSchema.getFieldCount(), 0, false); |
| |
| // Setup right vectors |
| setupInputOutputVectors(right, 0, rightSchema.getFieldCount(), |
| leftSchema.getFieldCount(), true); |
| |
| logger.debug("Output Schema created {} based on input left schema {} and right schema {}", container.getSchema(), |
| leftSchema, rightSchema); |
| } |
| |
| /** |
| * Simple method to allocate space for all the vectors in the container. |
| */ |
| private void allocateVectors() { |
| // This check is here and will be true only in case of left join where the pending rows from previous left batch is |
| // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right |
| // batches. |
| if (outputIndex > 0) { |
| logger.trace("Allocation is already done for output container vectors since it already holds some record"); |
| return; |
| } |
| |
| // Set this as max output rows to be filled in output batch since memory for that many rows are allocated |
| if (useMemoryManager) { |
| setMaxOutputRowCount(batchMemoryManager.getOutputRowCount()); |
| } |
| |
| for (VectorWrapper w : container) { |
| RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); |
| colSize.allocateVector(w.getValueVector(), maxOutputRowCount); |
| } |
| |
| logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(), |
| container.getAllocator().getPeakMemoryAllocation()); |
| } |
| |
| private boolean setBatchState(IterOutcome outcome) { |
| switch(outcome) { |
| case STOP: |
| case EMIT: |
| state = BatchState.STOP; |
| return false; |
| case NONE: |
| case NOT_YET: |
| state = BatchState.DONE; |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Creates a map of rowId to number of rows with that rowId in the right incoming batch of Lateral Join. It is |
| * expected from UnnestRecordBatch to add an implicit column of IntVectorType with each output row. All the array |
| * records belonging to same row in left incoming will have same rowId in the Unnest output batch. |
| * @return - map of rowId to rowCount in right batch |
| */ |
| private Map<Integer, Integer> getRowIdToRowCountMapping() { |
| final Map<Integer, Integer> indexToFreq = new HashMap<>(); |
| final IntVector rowIdVector = (IntVector) implicitVector; |
| int prevRowId = rowIdVector.getAccessor().get(rightJoinIndex); |
| int countRows = 1; |
| for (int i=rightJoinIndex + 1; i < right.getRecordCount(); ++i) { |
| int currentRowId = rowIdVector.getAccessor().get(i); |
| if (prevRowId == currentRowId) { |
| ++countRows; |
| } else { |
| indexToFreq.put(prevRowId, countRows); |
| prevRowId = currentRowId; |
| countRows = 1; |
| } |
| } |
| indexToFreq.put(prevRowId, countRows); |
| return indexToFreq; |
| } |
| |
| /** |
| * Main entry point for producing the output records. This method populates the output batch after cross join of |
| * the record in a given left batch at left index and all the corresponding rows in right batches produced by Unnest |
| * for current left batch. For each call to this function number of records copied in output batch is limited to |
| * maximum rows output batch can hold or the number of rows in right incoming batch |
| */ |
| private void crossJoinAndOutputRecords() { |
| final int rightRecordCount = right.getRecordCount(); |
| |
| // If there is no record in right batch just return current index in output batch |
| if (rightRecordCount <= 0) { |
| return; |
| } |
| |
| // Check if right batch is empty since we have to handle left join case |
| Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1"); |
| |
| int currentOutIndex = outputIndex; |
| // Number of rows that can be copied in output batch |
| int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " + |
| "availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex, rightRecordCount, outputIndex, maxAvailableRowSlot); |
| logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this)); |
| } |
| |
| // Assuming that first vector in right batch is for implicitColumn. |
| // get a mapping of number of rows for each rowId present in current right side batch |
| //final Map<Integer, Integer> indexToFreq = getRowIdToRowCountMapping(); |
| final IntVector rowIdVector = (IntVector) implicitVector; |
| final int leftRecordCount = left.getRecordCount(); |
| |
| // we need to have both conditions because in left join case we can exceed the maxAvailableRowSlot before reaching |
| // rightBatch end or vice-versa |
| while(maxAvailableRowSlot > 0 && rightJoinIndex < rightRecordCount) { |
| // Get rowId from current right row |
| int currentRowId = rowIdVector.getAccessor().get(rightJoinIndex); |
| int leftRowId = leftJoinIndex + 1; |
| int numRowsCopied = 0; |
| |
| if (currentRowId > leftRecordCount || leftJoinIndex > leftRecordCount) { |
| // Not using Preconditions.checkState here since along with condition evaluation there will be cost of boxing |
| // the arguments. |
| throw new IllegalStateException(String.format("Either RowId in right batch is greater than total records in " + |
| "left batch or all rows in left batch is processed but there are still rows in right batch. " + |
| "Details[RightRowId: %s, LeftRecordCount: %s, LeftJoinIndex: %s, RightJoinIndex: %s]", |
| currentRowId, leftRecordCount, leftJoinIndex, rightJoinIndex)); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| // Inside the if condition to eliminate parameter boxing cost |
| logger.trace("leftRowId and currentRowId are: {}, {}", leftRowId, currentRowId); |
| } |
| |
| // If leftRowId matches the rowId in right row then emit left and right row. Increment outputIndex, rightJoinIndex |
| // and numRowsCopied. Also set leftMatchFound to true to indicate when to increase leftJoinIndex. |
| if (leftRowId == currentRowId) { |
| // there is a match |
| matchedRecordFound = true; |
| numRowsCopied = 1; |
| //numRowsCopied = Math.min(indexToFreq.get(currentRowId), maxAvailableRowSlot); |
| emitRight(rightJoinIndex, outputIndex, numRowsCopied); |
| emitLeft(leftJoinIndex, outputIndex, numRowsCopied); |
| outputIndex += numRowsCopied; |
| rightJoinIndex += numRowsCopied; |
| } else if (leftRowId < currentRowId) { |
| // If a matching record for leftRowId was found in right batch in previous iteration, increase the leftJoinIndex |
| // and reset the matchedRecordFound flag |
| if (matchedRecordFound) { |
| matchedRecordFound = false; |
| ++leftJoinIndex; |
| continue; |
| } else { // If no matching row was found in right batch then in case of left join produce left row in output |
| // and increase the indexes properly to reflect that |
| if (JoinRelType.LEFT == popConfig.getJoinType()) { |
| numRowsCopied = 1; |
| emitLeft(leftJoinIndex, outputIndex, numRowsCopied); |
| ++outputIndex; |
| } |
| ++leftJoinIndex; |
| } |
| } else { |
| Preconditions.checkState(leftRowId <= currentRowId, "Unexpected case where rowId " + |
| "%s in right batch of lateral is smaller than rowId %s in left batch being processed", |
| currentRowId, leftRowId); |
| } |
| // Update the max available rows slot in output batch |
| maxAvailableRowSlot -= numRowsCopied; |
| } |
| } |
| |
| /** |
| * Get's references of vector's from input and output vector container and create the mapping between them in |
| * respective maps. Example: for right incoming batch the references of input vector to corresponding output |
| * vector will be stored in {@link LateralJoinBatch#rightInputOutputVector}. This is done here such that during |
| * copy we don't have to figure out this mapping everytime for each input and output vector and then do actual copy. |
| * There was overhead seen with functions {@link MaterializedField#getValueClass()} and |
| * {@link RecordBatch#getValueAccessorById(Class, int...)} since it will be called for each row copy. |
| * @param batch - Incoming RecordBatch |
| * @param startVectorIndex - StartIndex of output vector container |
| * @param endVectorIndex - endIndex of output vector container |
| * @param baseVectorIndex - delta to add in startIndex for getting vectors in output container |
| * @param isRightBatch - is batch passed left or right child |
| */ |
| private void setupInputOutputVectors(RecordBatch batch, int startVectorIndex, int endVectorIndex, |
| int baseVectorIndex, boolean isRightBatch) { |
| // Get the vectors using field index rather than Materialized field since input batch field can be different from |
| // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output |
| // container. |
| int inputIndex = 0; |
| final Map<ValueVector, ValueVector> mappingToUse = (isRightBatch) ? rightInputOutputVector : leftInputOutputVector; |
| |
| for (int i = startVectorIndex; i < endVectorIndex; ++i) { |
| // Get output vector |
| final int outputVectorIndex = i + baseVectorIndex; |
| final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass(); |
| final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector(); |
| final String outputFieldName = outputVector.getField().getName(); |
| |
| ValueVector inputVector; |
| Class<?> inputValueClass; |
| String inputFieldName; |
| do { |
| // Get input vector |
| inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass(); |
| inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector(); |
| inputFieldName = inputVector.getField().getName(); |
| |
| // If implicit column is in left batch then preserve it |
| if (inputFieldName.equals(implicitColumn) && !isRightBatch) { |
| ++inputIndex; |
| break; |
| } |
| |
| ++inputIndex; |
| } while (excludedFieldNames.contains(inputFieldName)); |
| |
| Preconditions.checkState(outputFieldName.equals(inputFieldName), |
| "Non-excluded Input and output container fields are not in same order. " + |
| "[Output Schema: %s and Input Schema:%s]", this.getSchema(), batch.getSchema()); |
| |
| mappingToUse.put(inputVector, outputVector); |
| } |
| } |
| |
| /** |
| * Given a vector reference mapping between source and destination vector, copies data from all the source vectors |
| * at fromRowIndex to all the destination vectors in output batch at toRowIndex. |
| * |
| * @param fromRowIndex - row index of all the vectors in batch to copy data from |
| * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to |
| * @param mapping - source record batch holding vectors with data |
| * @param numRowsToCopy - Number of rows to copy into output batch |
| * @param isRightBatch - boolean to indicate if the fromIndex should also be increased or not. Since in case of |
| * copying data from left vector fromIndex is constant whereas in case of copying data from right |
| * vector fromIndex will move along with output index. |
| */ |
| private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, Map<ValueVector, ValueVector> mapping, |
| int numRowsToCopy, boolean isRightBatch) { |
| for (Map.Entry<ValueVector, ValueVector> entry : mapping.entrySet()) { |
| |
| if (logger.isTraceEnabled()) { |
| // Inside the if condition to eliminate parameter boxing cost |
| logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " + |
| "(RowIndex: {}, ColumnName: {}), OutputBatch: (RowIndex: {}, ColumnName: {}) and Other: (TimeEachValue: {})]", |
| fromRowIndex, entry.getKey().getField().getName(), toRowIndex, entry.getValue().getField().getName(), |
| numRowsToCopy); |
| } |
| |
| // Copy data from input vector to output vector for numRowsToCopy times. |
| for (int j = 0; j < numRowsToCopy; ++j) { |
| entry.getValue().copyEntry(toRowIndex + j, entry.getKey(), (isRightBatch) ? fromRowIndex + j : fromRowIndex); |
| } |
| } |
| } |
| |
| /** |
| * Copies data at leftIndex from each of vector's in left incoming batch to outIndex at corresponding vectors in |
| * outgoing record batch |
| * @param leftIndex - index to copy data from left incoming batch vectors |
| * @param outIndex - index to copy data to in outgoing batch vectors |
| * @param numRowsToCopy - number of rows to copy from source vector to destination vectors |
| */ |
| private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) { |
| if (logger.isTraceEnabled()) { |
| // Inside the if condition to eliminate parameter boxing cost |
| logger.trace("Copying the left batch data. Details: [leftIndex: {}, outputIndex: {}, numsCopy: {}]", |
| leftIndex, outIndex, numRowsToCopy); |
| } |
| copyDataToOutputVectors(leftIndex, outIndex, leftInputOutputVector, numRowsToCopy, false); |
| } |
| |
| /** |
| * Copies data at rightIndex from each of vector's in right incoming batch to outIndex at corresponding vectors in |
| * outgoing record batch |
| * @param rightIndex - index to copy data from right incoming batch vectors |
| * @param outIndex - index to copy data to in outgoing batch vectors |
| * @param numRowsToCopy - number of rows to copy from source vector to destination vectors |
| */ |
| private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) { |
| if (logger.isTraceEnabled()) { |
| // Inside the if condition to eliminate parameter boxing cost |
| logger.trace("Copying the right batch data. Details: [rightIndex: {}, outputIndex: {}, numsCopy: {}]", |
| rightIndex, outIndex, numRowsToCopy); |
| } |
| copyDataToOutputVectors(rightIndex, outIndex, rightInputOutputVector, numRowsToCopy, true); |
| } |
| |
| /** |
| * Used only for testing for cases when multiple output batches are produced for same input set |
| * @param outputRowCount - Max rows that output batch can hold |
| */ |
| @VisibleForTesting |
| public void setMaxOutputRowCount(int outputRowCount) { |
| if (isRecordBatchStatsLoggingEnabled()) { |
| RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), |
| "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount); |
| } |
| maxOutputRowCount = outputRowCount; |
| } |
| |
| /** |
| * Used only for testing to disable output batch calculation using memory manager and instead use the static max |
| * value set by {@link LateralJoinBatch#setMaxOutputRowCount(int)} |
| * @param useMemoryManager - false - disable memory manager update to take effect, true enable memory manager update |
| */ |
| @VisibleForTesting |
| public void setUseMemoryManager(boolean useMemoryManager) { |
| this.useMemoryManager = useMemoryManager; |
| } |
| |
| private boolean isOutgoingBatchFull() { |
| return outputIndex >= maxOutputRowCount; |
| } |
| |
| private void updateMemoryManager(int inputIndex) { |
| if (inputIndex == LEFT_INDEX && isNewLeftBatch) { |
| // reset state and continue to update |
| isNewLeftBatch = false; |
| } else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 || rightJoinIndex == -1)) { |
| // continue to update |
| } else { |
| return; |
| } |
| |
| // For cases where all the previous input were consumed and send with previous output batch. But now we are building |
| // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 |
| batchMemoryManager.update(inputIndex, outputIndex); |
| |
| if (isRecordBatchStatsLoggingEnabled()) { |
| RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; |
| RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), |
| getRecordBatchStatsContext()); |
| } |
| } |
| |
| private void populateExcludedField(PhysicalOperator lateralPop) { |
| excludedFieldNames.add(implicitColumn); |
| final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns(); |
| if (excludedCols != null) { |
| for (SchemaPath currentPath : excludedCols) { |
| excludedFieldNames.add(currentPath.rootName()); |
| } |
| } |
| } |
| |
| @Override |
| public void dump() { |
| logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " + |
| "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]", |
| container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex, |
| leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin); |
| } |
| } |