blob: 49cbb46d817fb8625998413f59f51d16a063dab7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.PhysicalOperator;
public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
protected final RecordBatch left;
protected final RecordBatch right;
// state (IterOutcome) of the left input
protected IterOutcome leftUpstream = IterOutcome.NONE;
// state (IterOutcome) of the right input
protected IterOutcome rightUpstream = IterOutcome.NONE;
protected RecordBatchMemoryManager batchMemoryManager;
public final int numInputs = 2;
public static final int LEFT_INDEX = 0;
public static final int RIGHT_INDEX = 1;
public enum Metric implements MetricDef {
LEFT_INPUT_BATCH_COUNT,
LEFT_AVG_INPUT_BATCH_BYTES,
LEFT_AVG_INPUT_ROW_BYTES,
LEFT_INPUT_RECORD_COUNT,
RIGHT_INPUT_BATCH_COUNT,
RIGHT_AVG_INPUT_BATCH_BYTES,
RIGHT_AVG_INPUT_ROW_BYTES,
RIGHT_INPUT_RECORD_COUNT,
OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
@Override
public int metricId() {
return ordinal();
}
}
protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left,
RecordBatch right) throws OutOfMemoryException {
super(popConfig, context, true, context.newOperatorContext(popConfig));
this.left = left;
this.right = right;
}
protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left,
RecordBatch right) throws OutOfMemoryException {
super(popConfig, context, buildSchema);
this.left = left;
this.right = right;
}
protected boolean verifyOutcomeToSetBatchState(IterOutcome leftOutcome, IterOutcome rightOutcome) {
if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
state = BatchState.STOP;
return false;
}
if (checkForEarlyFinish(leftOutcome, rightOutcome)) {
state = BatchState.DONE;
return false;
}
// EMIT outcome is not expected as part of first batch from either side
if (leftOutcome == IterOutcome.EMIT || rightOutcome == IterOutcome.EMIT) {
state = BatchState.STOP;
throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
"buildSchema phase");
}
return true;
}
/**
* Prefetch first batch from both inputs.
* @return true if caller should continue processing
* false if caller should stop and exit from processing.
*/
protected boolean prefetchFirstBatchFromBothSides() {
// Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch
leftUpstream = next(0, left);
rightUpstream = next(1, right);
return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
}
/**
* Checks for the operator specific early terminal condition.
* @return true if the further processing can stop.
* false if the further processing is needed.
*/
protected boolean checkForEarlyFinish(IterOutcome leftOutcome, IterOutcome rightOutcome) {
return (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE);
}
public RecordBatchMemoryManager getBatchMemoryManager() {
return batchMemoryManager;
}
protected void updateBatchMemoryManagerStats() {
stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX));
stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX));
stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX));
stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX));
stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX));
stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX));
stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX));
stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches());
stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize());
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords());
}
}