DRILL-7439: Batch count fixes for six additional operators
Enables vector checks, and fixes batch count and vector issues for:
* StreamingAggBatch
* RuntimeFilterRecordBatch
* FlattenRecordBatch
* MergeJoinBatch
* NestedLoopJoinBatch
* LimitRecordBatch
Also fixes a zero-size batch validity issue for the CSV reader when
all files contain no data.
Includes code cleanup for files touched in this PR.
closes #1893
diff --git a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
index f4fc9ab..446825a 100644
--- a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
+++ b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
@@ -18,11 +18,9 @@
package org.apache.drill.exec.util;
public class AssertionUtil {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class);
-
public static final boolean ASSERT_ENABLED;
- static{
+ static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
@@ -32,6 +30,5 @@
return ASSERT_ENABLED;
}
- private AssertionUtil() {
- }
+ private AssertionUtil() { }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6cec50e..d166353 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.aggregate;
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -31,7 +34,6 @@
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
-
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.VectorSerializer.Writer;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -40,7 +42,6 @@
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.TypeHelper;
-
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -55,40 +56,30 @@
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
-
import org.apache.drill.exec.physical.impl.common.SpilledState;
-import org.apache.drill.exec.record.RecordBatchSizer;
-
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.physical.AggPrelBase;
-
-import org.apache.drill.exec.record.MaterializedField;
-
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.BatchSchema;
-
-import org.apache.drill.exec.record.VectorContainer;
-
-import org.apache.drill.exec.record.TypedFieldId;
-
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
-
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
-
import org.apache.drill.exec.vector.VariableWidthVector;
-
-import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK;
-import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class HashAggTemplate implements HashAggregator {
- protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+ protected static final Logger logger = LoggerFactory.getLogger(HashAggregator.class);
private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
@@ -98,40 +89,40 @@
private static final boolean EXTRA_DEBUG_SPILL = false;
// Fields needed for partitioning (the groups into partitions)
- private int nextPartitionToReturn = 0; // which partition to return the next batch from
+ private int nextPartitionToReturn; // which partition to return the next batch from
// The following members are used for logging, metrics, etc.
- private int rowsInPartition = 0; // counts #rows in each partition
- private int rowsNotSpilled = 0;
- private int rowsSpilled = 0;
- private int rowsSpilledReturned = 0;
- private int rowsReturnedEarly = 0;
+ private int rowsInPartition; // counts #rows in each partition
+ private int rowsNotSpilled;
+ private int rowsSpilled;
+ private int rowsSpilledReturned;
+ private int rowsReturnedEarly;
private AggPrelBase.OperatorPhase phase;
private boolean canSpill = true; // make it false in case can not spill/return-early
private ChainedHashTable baseHashTable;
- private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
- private int earlyPartition = 0; // which partition to return early
- private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
- private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
- private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
- private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
- private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
- private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
- private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
- private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
+ private boolean earlyOutput; // when 1st phase returns a partition due to no memory
+ private int earlyPartition; // which partition to return early
+ private boolean retrySameIndex; // in case put failed during 1st phase - need to output early, then retry
+ private boolean useMemoryPrediction; // whether to use memory prediction to decide when to spill
+ private long estMaxBatchSize; // used for adjusting #partitions and deciding when to spill
+ private long estRowWidth; // the size of the internal "row" (keys + values + extra columns)
+ private long estValuesRowWidth; // the size of the internal values ( values + extra )
+ private long estOutputRowWidth; // the size of the output "row" (no extra columns)
+ private long estValuesBatchSize; // used for "reserving" memory for the Values batch to overcome an OOM
+ private long estOutgoingAllocSize; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
private long minBatchesPerPartition; // for tuning - num partitions and spill decision
- private long plannedBatches = 0; // account for planned, but not yet allocated batches
+ private long plannedBatches; // account for planned, but not yet allocated batches
- private int underlyingIndex = 0;
- private int currentIndex = 0;
+ private int underlyingIndex;
+ private int currentIndex;
private IterOutcome outcome;
- private int numGroupedRecords = 0;
- private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+ private int numGroupedRecords;
+ private int currentBatchRecordCount; // Performance: Avoid repeated calls to getRecordCount()
- private int lastBatchOutputCount = 0;
+ private int lastBatchOutputCount;
private RecordBatch incoming;
private BatchSchema schema;
private HashAggBatch outgoing;
@@ -148,7 +139,7 @@
// For handling spilling
private HashAggUpdater updater;
- private SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>();
+ private final SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>();
private SpillSet spillSet;
SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
private Writer writers[]; // a vector writer for each spilled partition
@@ -157,17 +148,17 @@
private int originalPartition = -1; // the partition a secondary reads from
private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
- private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
+ private int numGroupByOutFields; // Note: this should be <= number of group-by fields
private TypedFieldId[] groupByOutFieldIds;
private MaterializedField[] materializedValueFields;
- private boolean allFlushed = false;
- private boolean buildComplete = false;
- private boolean handlingSpills = false; // True once starting to process spill files
- private boolean handleEmit = false; // true after receiving an EMIT, till finish handling it
+ private boolean allFlushed;
+ private boolean buildComplete;
+ private boolean handlingSpills; // True once starting to process spill files
+ private boolean handleEmit; // true after receiving an EMIT, till finish handling it
- private OperatorStats stats = null;
- private HashTableStats htStats = new HashTableStats();
+ private OperatorStats stats;
+ private final HashTableStats htStats = new HashTableStats();
public enum Metric implements MetricDef {
@@ -198,9 +189,9 @@
}
public class BatchHolder {
- private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
+ private final VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
private int maxOccupiedIdx = -1;
- private int targetBatchRowCount = 0;
+ private int targetBatchRowCount;
public int getTargetBatchRowCount() {
return targetBatchRowCount;
@@ -1009,11 +1000,7 @@
this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords);
// set the value count for outgoing batch value vectors
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(numOutputRecords);
- }
-
- outContainer.setRecordCount(numOutputRecords);
+ outContainer.setValueCount(numOutputRecords);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(numOutputRecords, outContainer, false);
try {
writers[part].write(batch, null);
@@ -1067,10 +1054,8 @@
if ( handleEmit && ( batchHolders == null || batchHolders[0].size() == 0 ) ) {
lastBatchOutputCount = 0; // empty
allocateOutgoing(0);
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(0);
- }
- outgoing.getContainer().setRecordCount(0);
+ outgoing.getContainer().setValueCount(0);
+
// When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
this.outcome = IterOutcome.EMIT;
handleEmit = false; // finish handling EMIT
@@ -1184,9 +1169,7 @@
this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, numPendingOutput);
// set the value count for outgoing batch value vectors
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(numOutputRecords);
- }
+ outgoing.getContainer().setValueCount(numOutputRecords);
outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 471904f..7d02c99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -17,11 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.aggregate;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
import java.io.IOException;
import java.util.List;
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -63,24 +67,22 @@
import org.apache.drill.exec.vector.UntypedNullHolder;
import org.apache.drill.exec.vector.UntypedNullVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(StreamingAggBatch.class);
protected StreamingAggregator aggregator;
protected final RecordBatch incoming;
private List<BaseWriter.ComplexWriter> complexWriters;
- //
+
// Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving
// lateral and unnest. In case(a), the aggregator proceeds normally until it sees a group change or a NONE. If a
// group has changed, the aggregated data is sent downstream and the aggregation continues with the next group. If
@@ -100,33 +102,37 @@
// Schema change within a Data Set is not supported.
//
// We will define some states for internal management
- //
- private boolean done = false; // END of all data
+
+ private boolean done; // END of all data
private boolean first = true; // Beginning of new data set. True during the build schema phase. False once the first
// call to inner next is made.
- private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
- // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
- // we send out an empty batch with EMIT.
+ private boolean sendEmit; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
+ // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
+ // we send out an empty batch with EMIT.
private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next
private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA
private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
- private int recordCount = 0; // number of records output in the current data set
+ private int recordCount; // number of records output in the current data set
private BatchSchema incomingSchema;
/*
- * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause we need to perform special handling when
- * the incoming batch is empty. In the case of the empty input into the streaming aggregate we need
- * to return a single batch with one row. For count we need to return 0 and for all other aggregate
- * functions like sum, avg etc we need to return an explicit row with NULL. Since we correctly allocate the type of
- * the outgoing vectors (required for count and nullable for other aggregate functions) all we really need to do
- * is simply set the record count to be 1 in such cases. For nullable vectors we don't need to do anything because
- * if we don't set anything the output will be NULL, however for required vectors we explicitly zero out the vector
- * since we don't zero it out while allocating it.
+ * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause
+ * we need to perform special handling when the incoming batch is empty. In
+ * the case of the empty input into the streaming aggregate we need to return
+ * a single batch with one row. For count we need to return 0 and for all
+ * other aggregate functions like sum, avg etc we need to return an explicit
+ * row with NULL. Since we correctly allocate the type of the outgoing vectors
+ * (required for count and nullable for other aggregate functions) all we
+ * really need to do is simply set the record count to be 1 in such cases. For
+ * nullable vectors we don't need to do anything because if we don't set
+ * anything the output will be NULL, however for required vectors we
+ * explicitly zero out the vector since we don't zero it out while allocating
+ * it.
*
* We maintain some state to remember that we have done such special handling.
*/
- private boolean specialBatchSent = false;
+ private boolean specialBatchSent;
private static final int SPECIAL_BATCH_COUNT = 1;
// TODO: Needs to adapt to batch sizing rather than hardcoded constant value
@@ -156,7 +162,7 @@
@Override
public VectorContainer getOutgoingContainer() {
- return this.container;
+ return container;
}
@Override
@@ -177,17 +183,18 @@
break;
}
- this.incomingSchema = incoming.getSchema();
+ incomingSchema = incoming.getSchema();
if (!createAggregator()) {
state = BatchState.DONE;
}
- for (final VectorWrapper<?> w : container) {
+ for (VectorWrapper<?> w : container) {
w.getValueVector().allocateNew();
}
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
+ container.setRecordCount(0);
}
@Override
@@ -207,6 +214,7 @@
firstBatchForDataSet = true;
firstBatchForSchema = false;
recordCount = 0;
+ container.setEmpty();
specialBatchSent = false;
return EMIT;
}
@@ -379,7 +387,7 @@
private void allocateComplexWriters() {
// Allocate the complex writers before processing the incoming batch
if (complexWriters != null) {
- for (final BaseWriter.ComplexWriter writer : complexWriters) {
+ for (BaseWriter.ComplexWriter writer : complexWriters) {
writer.allocate();
}
}
@@ -393,8 +401,8 @@
*/
private void constructSpecialBatch() {
int exprIndex = 0;
- for (final VectorWrapper<?> vw: container) {
- final ValueVector vv = vw.getValueVector();
+ for (VectorWrapper<?> vw: container) {
+ ValueVector vv = vw.getValueVector();
AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT);
vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT);
if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) {
@@ -442,7 +450,7 @@
}
}
- public void addComplexWriter(final BaseWriter.ComplexWriter writer) {
+ public void addComplexWriter(BaseWriter.ComplexWriter writer) {
complexWriters.add(writer);
}
@@ -460,21 +468,21 @@
ErrorCollector collector = new ErrorCollectorImpl();
for (int i = 0; i < keyExprs.length; i++) {
- final NamedExpression ne = popConfig.getKeys().get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
+ NamedExpression ne = popConfig.getKeys().get(i);
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
if (expr == null) {
continue;
}
keyExprs[i] = expr;
- final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
+ MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
expr.getMajorType());
- final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
keyOutputIds[i] = container.add(vector);
}
for (int i = 0; i < valueExprs.length; i++) {
- final NamedExpression ne = popConfig.getExprs().get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+ NamedExpression ne = popConfig.getExprs().get(i);
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
if (expr instanceof IfExpression) {
throw UserException.unsupportedError(new UnsupportedOperationException("Union type not supported in aggregate functions")).build(logger);
}
@@ -498,7 +506,7 @@
container.add(new UntypedNullVector(field, container.getAllocator()));
valueExprs[i] = expr;
} else {
- final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
+ MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
TypedFieldId id = container.add(vector);
@@ -532,40 +540,41 @@
protected void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(IS_SAME_I1);
- for (final LogicalExpression expr : keyExprs) {
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(IS_SAME_I1);
- final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(IS_SAME_I2);
- final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- final LogicalExpression fh =
+ LogicalExpression fh =
FunctionGenerationHelper
.getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
- final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.getEvalBlock()._return(JExpr.TRUE);
}
- private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); // the internal batch changes each time so we need to redo setup.
+ // the internal batch changes each time so we need to redo setup.
+ private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null);
private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSamePrev", null, null);
private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
protected void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(ISA_B1);
- for (final LogicalExpression expr : keyExprs) {
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(ISA_B1);
- final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(ISA_B2);
- final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- final LogicalExpression fh =
+ LogicalExpression fh =
FunctionGenerationHelper
.getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
- final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.getEvalBlock()._return(JExpr.TRUE);
@@ -577,7 +586,7 @@
protected void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) {
cg.setMappingSet(EVAL);
- for (final LogicalExpression ex : valueExprs) {
+ for (LogicalExpression ex : valueExprs) {
cg.addExpr(ex);
}
}
@@ -601,11 +610,13 @@
cg.setMappingSet(RECORD_KEYS_PREV);
for (int i = 0; i < keyExprs.length; i++) {
- // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this.
+ // IMPORTANT: there is an implicit assertion here that the TypedFieldIds
+ // for the previous batch and the current batch are the same. This is
+ // possible because InternalBatch guarantees this.
logger.debug("Writing out expr {}", keyExprs[i]);
cg.rotateBlock();
cg.setMappingSet(RECORD_KEYS_PREV);
- final HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE);
+ HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(RECORD_KEYS_PREV_OUT);
cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), ClassGenerator.BlkCreateMode.FALSE);
}
@@ -655,11 +666,6 @@
}
@Override
- public void close() {
- super.close();
- }
-
- @Override
protected void killIncoming(boolean sendUpstream) {
incoming.kill(sendUpstream);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index cc89f23..173ed6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -17,6 +17,11 @@
*/
package org.apache.drill.exec.physical.impl.aggregate;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -25,32 +30,29 @@
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class StreamingAggTemplate implements StreamingAggregator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
+ private static final Logger logger = LoggerFactory.getLogger(StreamingAggregator.class);
private static final boolean EXTRA_DEBUG = false;
private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
// lastOutcome is set ONLY if the lastOutcome was NONE or STOP
- private IterOutcome lastOutcome = null;
+ private IterOutcome lastOutcome;
// First batch after build schema phase
private boolean first = true;
- private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA.
+ private boolean firstBatchForSchema; // true if the current batch came in with an OK_NEW_SCHEMA.
private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
- private boolean newSchema = false;
+ private boolean newSchema;
// End of all data
- private boolean done = false;
+ private boolean done;
// index in the incoming (sv4/sv2/vector)
- private int underlyingIndex = 0;
+ private int underlyingIndex;
// The indexes below refer to the actual record indexes in input batch
// (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
private int previousIndex = -1; // the last index that has been processed. Initialized to -1 every time a new
@@ -59,19 +61,18 @@
/**
* Number of records added to the current aggregation group.
*/
- private long addedRecordCount = 0;
+ private long addedRecordCount;
// There are two outcomes from the aggregator. One is the aggregator's outcome defined in
// StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next
private IterOutcome outcome;
// Number of aggregation groups added into the output batch
- private int outputCount = 0;
+ private int outputCount;
private RecordBatch incoming;
// the Streaming Agg Batch that this aggregator belongs to
private StreamingAggBatch outgoing;
private OperatorContext context;
-
@Override
public void setup(OperatorContext context, RecordBatch incoming,
StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException {
@@ -166,7 +167,6 @@
}
}
-
if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}
@@ -350,7 +350,6 @@
first = false;
}
}
-
}
@Override
@@ -432,11 +431,9 @@
} else {
outcomeToReturn = OK;
}
- this.outcome = outcomeToReturn;
+ outcome = outcomeToReturn;
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(outputCount);
- }
+ outgoing.getContainer().setValueCount(outputCount);
return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME;
}
@@ -455,11 +452,9 @@
} else {
outcomeToReturn = EMIT;
}
- this.outcome = outcomeToReturn;
+ outcome = outcomeToReturn;
- for (VectorWrapper<?> v : outgoing) {
- v.getValueVector().getMutator().setValueCount(outputCount);
- }
+ outgoing.getContainer().setValueCount(outputCount);
return AggOutcome.RETURN_AND_RESET;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index be3c51e..0257abc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,12 +17,20 @@
*/
package org.apache.drill.exec.physical.impl.common;
-import com.carrotsearch.hppc.IntArrayList;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -47,16 +55,12 @@
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE;
+import com.carrotsearch.hppc.IntArrayList;
/**
* <h2>Overview</h2>
@@ -72,14 +76,14 @@
* </p>
*/
public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class);
+ static final Logger logger = LoggerFactory.getLogger(HashPartition.class);
public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
private int partitionNum = -1; // the current number of this partition, as used by the operator
private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
- private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+ private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
public static final MajorType HVtype = MajorType.newBuilder()
.setMinorType(MinorType.INT /* dataType */ )
@@ -93,7 +97,7 @@
// While build data is incoming - temporarily keep the list of in-memory
// incoming batches, per each partition (these may be spilled at some point)
- private List<VectorContainer> tmpBatchesList;
+ private final List<VectorContainer> tmpBatchesList;
// A batch and HV vector to hold incoming rows - per each partition
private VectorContainer currentBatch; // The current (newest) batch
private IntVector currHVVector; // The HV vectors for the currentBatches
@@ -112,21 +116,21 @@
private int partitionBatchesCount; // count number of batches spilled
private String spillFile;
- private BufferAllocator allocator;
+ private final BufferAllocator allocator;
private int recordsPerBatch;
- private SpillSet spillSet;
+ private final SpillSet spillSet;
private boolean isSpilled; // is this partition spilled ?
private boolean processingOuter; // is (inner done spilling and) now the outer is processed?
private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory
- private RecordBatch buildBatch;
- private RecordBatch probeBatch;
- private int cycleNum;
- private int numPartitions;
- private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
+ private final RecordBatch buildBatch;
+ private final RecordBatch probeBatch;
+ private final int cycleNum;
+ private final int numPartitions;
+ private final List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
private long partitionInMemorySize;
private long numInMemoryRecords;
- private boolean updatedRecordsPerBatch = false;
- private boolean semiJoin;
+ private boolean updatedRecordsPerBatch;
+ private final boolean semiJoin;
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
@@ -188,7 +192,7 @@
try {
while (vci.hasNext()) {
- VectorWrapper vw = vci.next();
+ VectorWrapper<?> vw = vci.next();
// If processing a spilled container, skip the last column (HV)
if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
ValueVector vv = vw.getValueVector();
@@ -254,9 +258,11 @@
public void completeAnOuterBatch(boolean toInitialize) {
completeABatch(toInitialize, true);
}
+
public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
completeABatch(toInitialize, needsSpill);
}
+
/**
* A current batch is full (or no more rows incoming) - complete processing this batch
* I.e., add it to its partition's tmp list, if needed - spill that list, and if needed -
@@ -349,15 +355,13 @@
numInMemoryRecords = 0L;
inMemoryBatchStats.clear();
- while ( tmpBatchesList.size() > 0 ) {
+ while (tmpBatchesList.size() > 0) {
VectorContainer vc = tmpBatchesList.remove(0);
int numRecords = vc.getRecordCount();
// set the value count for outgoing batch value vectors
- for (VectorWrapper<?> v : vc) {
- v.getValueVector().getMutator().setValueCount(numRecords);
- }
+ vc.setValueCount(numRecords);
WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
try {
@@ -381,6 +385,7 @@
public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
return hashTable.probeForKey(recordsProcessed, hashCode);
}
+
public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
/* The current probe record has a key that matches. Get the index
* of the first row in the build side that matches the current key
@@ -393,16 +398,20 @@
boolean matchExists = hjHelper.setRecordMatched(compositeIndex);
return Pair.of(compositeIndex, matchExists);
}
+
public int getNextIndex(int compositeIndex) {
// in case of inner rows with duplicate keys, get the next one
return hjHelper.getNextIndex(compositeIndex);
}
+
public boolean setRecordMatched(int compositeIndex) {
return hjHelper.setRecordMatched(compositeIndex);
}
+
public IntArrayList getNextUnmatchedIndex() {
return hjHelper.getNextUnmatchedIndex();
}
+
//
// =====================================================================================
//
@@ -410,9 +419,11 @@
public int getBuildHashCode(int ind) throws SchemaChangeException {
return hashTable.getBuildHashCode(ind);
}
+
public int getProbeHashCode(int ind) throws SchemaChangeException {
return hashTable.getProbeHashCode(ind);
}
+
public ArrayList<VectorContainer> getContainers() {
return containers;
}
@@ -457,6 +468,7 @@
public int getPartitionBatchesCount() {
return partitionBatchesCount;
}
+
public int getPartitionNum() {
return partitionNum;
}
@@ -468,6 +480,7 @@
closeWriterInternal(false);
processingOuter = true; // After the spill file was closed
}
+
/**
* If exists - close the writer for this partition
*
@@ -601,4 +614,4 @@
return String.format("[hashTable = %s]",
hashTable == null ? "None": hashTable.makeDebugString());
}
-} // class HashPartition
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index ac6718c..28de51f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -17,6 +17,13 @@
*/
package org.apache.drill.exec.physical.impl.filter;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
@@ -39,41 +46,39 @@
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates
- * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered
- * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect
- * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op.
+ * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch
+ * participates in the HashJoinBatch and can be applied by a RuntimeFilter, it
+ * will generate a filtered SV2, otherwise will generate a same
+ * recordCount-originalRecordCount SV2 which will not affect the Query's
+ * performance ,but just do a memory transfer by the later RemovingRecordBatch
+ * op.
*/
public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeFilterPOP> {
- private SelectionVector2 sv2;
+ private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+ private SelectionVector2 sv2;
private ValueVectorHashHelper.Hash64 hash64;
- private Map<String, Integer> field2id = new HashMap<>();
+ private final Map<String, Integer> field2id = new HashMap<>();
private List<String> toFilterFields;
private List<BloomFilter> bloomFilters;
private RuntimeFilterWritable current;
private int originalRecordCount;
- private long filteredRows = 0l;
- private long appliedTimes = 0l;
- private int batchTimes = 0;
- private boolean waited = false;
- private boolean enableRFWaiting;
- private long maxWaitingTime;
- private long rfIdentifier;
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
+ private long filteredRows;
+ private long appliedTimes;
+ private int batchTimes;
+ private boolean waited;
+ private final boolean enableRFWaiting;
+ private final long maxWaitingTime;
+ private final long rfIdentifier;
public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
- enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val;
- maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val;
+ enableRFWaiting = context.getOptions().getBoolean(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY);
+ maxWaitingTime = context.getOptions().getLong(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY);
this.rfIdentifier = pop.getIdentifier();
}
@@ -107,6 +112,7 @@
throw new UnsupportedOperationException(e);
}
container.transferIn(incoming.getContainer());
+ container.setRecordCount(originalRecordCount);
updateStats();
return getFinalOutcome(false);
}
@@ -258,7 +264,6 @@
}
}
-
appliedTimes++;
sv2.setRecordCount(svIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 7877c6b..cee7625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -43,16 +43,16 @@
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.FlattenPOP;
-import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.ValueVector;
@@ -60,20 +60,23 @@
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.IntHashSet;
import com.sun.codemodel.JExpr;
-// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly
-// as they come in
+// TODO - handle the case where a user tries to flatten a scalar, should
+// just act as a project all of the columns exactly as they come in
+
public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(FlattenRecordBatch.class);
private Flattener flattener;
private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters;
- private boolean hasRemainder = false;
- private int remainderIndex = 0;
+ private boolean hasRemainder;
+ private int remainderIndex;
private int recordCount;
private final FlattenMemoryManager flattenMemoryManager;
@@ -81,7 +84,7 @@
@Override
public int getBufferSizeFor(int recordCount) {
int bufferSize = 0;
- for(final ValueVector vv : allocationVectors) {
+ for (ValueVector vv : allocationVectors) {
bufferSize += vv.getBufferSizeFor(recordCount);
}
return bufferSize;
@@ -98,7 +101,7 @@
outputNames.clear();
}
- // note: don't clear the internal maps since they have cumulative data..
+ // note: don't clear the internal maps since they have cumulative data.
}
}
@@ -129,28 +132,28 @@
// Get sizing information for the batch.
setRecordBatchSizer(new RecordBatchSizer(incoming));
- final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
- final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+ TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+ MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
// Get column size of flatten column.
RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName());
// Average rowWidth of flatten column
- final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
+ int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry();
// Average rowWidth excluding the flatten column.
- final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
+ int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn;
// Average rowWidth of single element in the flatten list.
// subtract the offset vector size from column data size.
- final int avgRowWidthSingleFlattenEntry =
+ int avgRowWidthSingleFlattenEntry =
RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()),
columnSize.getElementCount());
// Average rowWidth of outgoing batch.
- final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
+ int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
- final int outputBatchSize = getOutputBatchSize();
+ int outputBatchSize = getOutputBatchSize();
// Number of rows in outgoing batch
setOutputRowCount(outputBatchSize, avgOutgoingRowWidth);
@@ -182,14 +185,12 @@
return recordCount;
}
-
@Override
protected void killIncoming(boolean sendUpstream) {
super.killIncoming(sendUpstream);
hasRemainder = false;
}
-
@Override
public IterOutcome innerNext() {
if (hasRemainder) {
@@ -202,14 +203,14 @@
@Override
public VectorContainer getOutgoingContainer() {
- return this.container;
+ return container;
}
private void setFlattenVector() {
- final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
- final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
- final RepeatedValueVector vector;
- final ValueVector inVV = incoming.getValueAccessorById(
+ TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+ MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+ RepeatedValueVector vector;
+ ValueVector inVV = incoming.getValueAccessorById(
field.getValueClass(), typedFieldId.getFieldIds()).getValueVector();
if (! (inVV instanceof RepeatedValueVector)) {
@@ -232,10 +233,7 @@
int incomingRecordCount = incoming.getRecordCount();
- if (!doAlloc(flattenMemoryManager.getOutputRowCount())) {
- outOfMemory = true;
- return IterOutcome.OUT_OF_MEMORY;
- }
+ doAlloc(flattenMemoryManager.getOutputRowCount());
// we call this in setupSchema, but we also need to call it here so we have a reference to the appropriate vector
// inside of the the flattener for the current batch
@@ -244,18 +242,13 @@
int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
int outputRecords = childCount == 0 ? 0: flattener.flattenRecords(incomingRecordCount, 0, monitor);
// TODO - change this to be based on the repeated vector length
+ setValueCount(outputRecords);
if (outputRecords < childCount) {
- setValueCount(outputRecords);
hasRemainder = true;
remainderIndex = outputRecords;
- this.recordCount = remainderIndex;
} else {
- setValueCount(outputRecords);
flattener.resetGroupIndex();
- for(VectorWrapper<?> v: incoming) {
- v.clear();
- }
- this.recordCount = outputRecords;
+ VectorAccessibleUtilities.clear(incoming.getContainer());
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
@@ -276,25 +269,18 @@
// remainingRecordCount can be much higher than number of rows we will have in outgoing batch.
// Do memory allocation only for number of rows we are going to have in the batch.
- if (!doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount()))) {
- outOfMemory = true;
- return;
- }
+ doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount()));
int projRecords = flattener.flattenRecords(remainingRecordCount, 0, monitor);
if (projRecords < remainingRecordCount) {
setValueCount(projRecords);
- this.recordCount = projRecords;
remainderIndex += projRecords;
} else {
setValueCount(remainingRecordCount);
hasRemainder = false;
remainderIndex = 0;
- for (VectorWrapper<?> v : incoming) {
- v.clear();
- }
+ VectorAccessibleUtilities.clear(incoming.getContainer());
flattener.resetGroupIndex();
- this.recordCount = remainingRecordCount;
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
@@ -303,45 +289,38 @@
}
flattenMemoryManager.updateOutgoingStats(projRecords);
-
}
public void addComplexWriter(ComplexWriter writer) {
complexWriters.add(writer);
}
- private boolean doAlloc(int recordCount) {
-
- for (ValueVector v : this.allocationVectors) {
+ private void doAlloc(int recordCount) {
+ for (ValueVector v : allocationVectors) {
// This will iteratively allocate memory for nested columns underneath.
RecordBatchSizer.ColumnSize colSize = flattenMemoryManager.getColumnSize(v.getField().getName());
colSize.allocateVector(v, recordCount);
}
- //Allocate vv for complexWriters.
- if (complexWriters == null) {
- return true;
+ // Allocate vv for complexWriters.
+ if (complexWriters != null) {
+ for (ComplexWriter writer : complexWriters) {
+ writer.allocate();
+ }
}
-
- for (ComplexWriter writer : complexWriters) {
- writer.allocate();
- }
-
- return true;
}
private void setValueCount(int count) {
- for (ValueVector v : allocationVectors) {
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(count);
+ recordCount = count;
+ if (count == 0) {
+ container.setEmpty();
+ } else {
+ container.setValueCount(count);
}
-
- if (complexWriters == null) {
- return;
- }
-
- for (ComplexWriter writer : complexWriters) {
- writer.setValueCount(count);
+ if (complexWriters != null) {
+ for (ComplexWriter writer : complexWriters) {
+ writer.setValueCount(count);
+ }
}
}
@@ -350,33 +329,40 @@
}
/**
- * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for
- * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to
- * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the
- * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just
- * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten
- * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or
- * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened
- * value coming out of the repeated field.)
+ * The data layout is the same for the actual data within a repeated field, as
+ * it is in a scalar vector for the same sql type. For example, a repeated int
+ * vector has a vector of offsets into a regular int vector to represent the
+ * lists. As the data layout for the actual values in the same in the repeated
+ * vector as in the scalar vector of the same type, we can avoid making
+ * individual copies for the column being flattened, and just use vector
+ * copies between the inner vector of the repeated field to the resulting
+ * scalar vector from the flatten operation. This is completed after we
+ * determine how many records will fit (as we will hit either a batch end, or
+ * the end of one of the other vectors while we are copying the data of the
+ * other vectors alongside each new flattened value coming out of the repeated
+ * field.)
*/
private TransferPair getFlattenFieldTransferPair(FieldReference reference) {
- final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
- final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
- final ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();
+ TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
+ Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
+ ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();
TransferPair tp = null;
if (flattenField instanceof AbstractRepeatedMapVector) {
- tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap(reference.getAsNamePart().getName(), oContext.getAllocator());
- } else if ( !(flattenField instanceof RepeatedValueVector) ) {
+ tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap(
+ reference.getAsNamePart().getName(), oContext.getAllocator());
+ } else if (!(flattenField instanceof RepeatedValueVector)) {
if(incoming.getRecordCount() != 0) {
- throw UserException.unsupportedError().message("Flatten does not support inputs of non-list values.").build(logger);
+ throw UserException.unsupportedError().message(
+ "Flatten does not support inputs of non-list values.").build(logger);
}
logger.error("Cannot cast {} to RepeatedValueVector", flattenField);
//when incoming recordCount is 0, don't throw exception since the type being seen here is not solid
- final ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null);
- tp = RepeatedValueVector.class.cast(vv).getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
+ ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null);
+ tp = RepeatedValueVector.class.cast(vv).getTransferPair(
+ reference.getAsNamePart().getName(), oContext.getAllocator());
} else {
- final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
+ ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
// vvIn may be null because of fast schema return for repeated list vectors
if (vvIn != null) {
tp = vvIn.getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator());
@@ -387,28 +373,32 @@
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- this.allocationVectors = new ArrayList<>();
+ allocationVectors = new ArrayList<>();
container.clear();
- final List<NamedExpression> exprs = getExpressionList();
- final ErrorCollector collector = new ErrorCollectorImpl();
- final List<TransferPair> transfers = new ArrayList<>();
+ List<NamedExpression> exprs = getExpressionList();
+ ErrorCollector collector = new ErrorCollectorImpl();
+ List<TransferPair> transfers = new ArrayList<>();
- final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions());
+ ClassGenerator<Flattener> cg = CodeGenerator.getRoot(
+ Flattener.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- final IntHashSet transferFieldIds = new IntHashSet();
+ IntHashSet transferFieldIds = new IntHashSet();
- final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
- final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
- final FieldReference fieldReference = flattenExpr.getRef();
- final TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
+ NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(),
+ new FieldReference(popConfig.getColumn()));
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(
+ flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+ FieldReference fieldReference = flattenExpr.getRef();
+ TransferPair transferPair = getFlattenFieldTransferPair(fieldReference);
if (transferPair != null) {
- final ValueVector flattenVector = transferPair.getTo();
+ ValueVector flattenVector = transferPair.getTo();
// checks that list has only default ValueVector and replaces resulting ValueVector to INT typed ValueVector
if (exprs.size() == 0 && flattenVector.getField().getType().equals(Types.LATE_BIND_TYPE)) {
- final MaterializedField outputField = MaterializedField.create(fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
- final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ MaterializedField outputField = MaterializedField.create(
+ fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT);
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
container.add(vector);
} else {
@@ -435,9 +425,11 @@
}
}
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(),
+ incoming, collector, context.getFunctionRegistry(), true);
if (collector.hasErrors()) {
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ throw new SchemaChangeException(String.format(
+ "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
if (expr instanceof DrillFuncHolderExpr &&
((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) {
@@ -452,10 +444,11 @@
cg.addExpr(expr);
} else {
// need to do evaluation.
- final MaterializedField outputField;
+ MaterializedField outputField;
if (expr instanceof ValueVectorReadExpression) {
- final TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
- final ValueVector incomingVector = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId();
+ ValueVector incomingVector = incoming.getValueAccessorById(
+ id.getIntermediateClass(), id.getFieldIds()).getValueVector();
// outputField is taken from the incoming schema to avoid the loss of nested fields
// when the first batch will be empty.
if (incomingVector != null) {
@@ -466,7 +459,7 @@
} else {
outputField = MaterializedField.create(outputName, expr.getMajorType());
}
- final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocationVectors.add(vector);
TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
@@ -482,7 +475,7 @@
container.buildSchema(SelectionVectorMode.NONE);
try {
- this.flattener = context.getImplementationClass(cg.getCodeGenerator());
+ flattener = context.getImplementationClass(cg.getCodeGenerator());
flattener.setup(context, incoming, this, transfers);
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 7c59b4d..ef477b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -17,8 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.sun.codemodel.JClass;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
@@ -74,7 +76,7 @@
*/
public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(MergeJoinBatch.class);
private final MappingSet setupMapping =
new MappingSet("null", "null",
@@ -177,6 +179,7 @@
}
allocateBatch(true);
+ container.setEmpty();
}
@Override
@@ -269,11 +272,7 @@
}
private void setRecordCountInContainer() {
- for (VectorWrapper vw : container) {
- Preconditions.checkArgument(!vw.isHyper());
- vw.getValueVector().getMutator().setValueCount(getRecordCount());
- }
-
+ container.setValueCount(getRecordCount());
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
batchMemoryManager.updateOutgoingStats(getRecordCount());
}
@@ -487,7 +486,7 @@
// Allocate memory for the vectors.
// This will iteratively allocate memory for all nested columns underneath.
int outputRowCount = batchMemoryManager.getOutputRowCount();
- for (VectorWrapper w : container) {
+ for (VectorWrapper<?> w : container) {
RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), outputRowCount);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 7513ebd..6b7edd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -23,6 +23,8 @@
import java.util.Map;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -66,23 +68,23 @@
* RecordBatch implementation for the nested loop join operator
*/
public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(NestedLoopJoinBatch.class);
// Input indexes to correctly update the stats
protected static final int LEFT_INPUT = 0;
protected static final int RIGHT_INPUT = 1;
// Schema on the left side
- private BatchSchema leftSchema = null;
+ private BatchSchema leftSchema;
// Schema on the right side
- private BatchSchema rightSchema = null;
+ private BatchSchema rightSchema;
// Runtime generated class implementing the NestedLoopJoin interface
- private NestedLoopJoin nljWorker = null;
+ private NestedLoopJoin nljWorker;
// Number of output records in the current outgoing batch
- private int outputRecords = 0;
+ private int outputRecords;
// We accumulate all the batches on the right side in a hyper container.
private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer();
@@ -198,12 +200,7 @@
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
// Set the record count
- for (final VectorWrapper<?> vw : container) {
- vw.getValueVector().getMutator().setValueCount(outputRecords);
- }
-
- // Set the record count in the container
- container.setRecordCount(outputRecords);
+ container.setValueCount(outputRecords);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
@@ -413,6 +410,7 @@
batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setEmpty();
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException(e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index bb49187..e9d7dd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -32,12 +32,14 @@
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
@@ -122,7 +124,7 @@
container.clear();
transfers.clear();
- for(final VectorWrapper<?> v : incoming) {
+ for (final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair(
container.addOrGet(v.getField(), callBack));
transfers.add(pair);
@@ -130,7 +132,7 @@
final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
- switch(svMode) {
+ switch (svMode) {
case NONE:
break;
case TWO_BYTE:
@@ -174,10 +176,11 @@
final int inputRecordCount = incoming.getRecordCount();
if (inputRecordCount == 0) {
setOutgoingRecordCount(0);
+ container.setEmpty();
return getFinalOutcome(false);
}
- for(final TransferPair tp : transfers) {
+ for (final TransferPair tp : transfers) {
tp.transfer();
}
// Check if current input record count is less than start offset. If yes then adjust the start offset since we
@@ -185,6 +188,7 @@
if (inputRecordCount <= recordStartOffset) {
recordStartOffset -= inputRecordCount;
setOutgoingRecordCount(0);
+ container.setEmpty();
} else {
// Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
// batch to output record batch and later an SV2Remover copies the needed records.
@@ -194,7 +198,9 @@
// clear memory for incoming sv (if any)
if (incomingSv != null) {
- outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount());
+ int incomingCount = incomingSv.getBatchActualRecordCount();
+ outgoingSv.setBatchActualRecordCount(incomingCount);
+ container.setRecordCount(incomingCount);
incomingSv.clear();
}
@@ -218,7 +224,7 @@
}
int svIndex = 0;
- for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
+ for (int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
if (incomingSv != null) {
outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
} else {
@@ -226,12 +232,17 @@
}
}
outgoingSv.setRecordCount(svIndex);
+ outgoingSv.setBatchActualRecordCount(inputRecordCount);
+ // Actual number of values in the container; not the number in
+ // the SV.
+ container.setRecordCount(inputRecordCount);
// Update the start offset
recordStartOffset = 0;
}
private void setOutgoingRecordCount(int outputCount) {
outgoingSv.setRecordCount(outputCount);
+ outgoingSv.setBatchActualRecordCount(outputCount);
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
index 639a25a..07f5069 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java
@@ -32,6 +32,8 @@
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Modular implementation of the standard Drill record batch iterator
@@ -51,7 +53,7 @@
*/
public class OperatorRecordBatch implements CloseableRecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(OperatorRecordBatch.class);
private final OperatorDriver driver;
private final BatchAccessor batchAccessor;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index 496f326..866b988 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -21,6 +21,8 @@
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.accessor.InvalidConversionError;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Manages a row batch reader through its lifecycle. Created when the reader
@@ -128,7 +130,7 @@
*/
class ReaderState {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderState.class);
+ static final Logger logger = LoggerFactory.getLogger(ReaderState.class);
private enum State {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 701c82b..53664af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -25,6 +25,8 @@
import org.apache.drill.exec.physical.impl.protocol.OperatorExec;
import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of the revised scan operator that uses a mutator aware of
@@ -155,7 +157,7 @@
CLOSED
}
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanOperatorExec.class);
+ static final Logger logger = LoggerFactory.getLogger(ScanOperatorExec.class);
private final ScanOperatorEvents factory;
private final boolean allowEmptyResult;
@@ -295,6 +297,13 @@
if (allowEmptyResult &&
containerAccessor.batchCount() == 0 &&
containerAccessor.schemaVersion() > 0) {
+
+ // We've exhausted all readers, none had data, but at least
+ // one had a schema. Any zero-sized batch produced by a reader
+ // was cleared when closing the reader. Recreate a valid empty
+ // batch here to return downstream.
+
+ containerAccessor.getOutgoingContainer().setEmpty();
state = State.EMPTY;
} else {
state = State.END;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 712d21c..095372c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -23,6 +23,8 @@
import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Basic scan framework for a "managed" reader which uses the scan schema
@@ -112,7 +114,7 @@
public class ManagedScanFramework implements ScanOperatorEvents {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ManagedScanFramework.class);
+ static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class);
/**
* Creates a batch reader on demand. Unlike earlier versions of Drill,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
index 5fa2187..8628873 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -202,10 +202,10 @@
public AbstractMapVector buildMap() {
if (parentColumn.sourceIndex() != -1) {
- final ResolvedTuple parentTuple = parentColumn.parent();
+ ResolvedTuple parentTuple = parentColumn.parent();
inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex());
}
- final MaterializedField colSchema = parentColumn.schema();
+ MaterializedField colSchema = parentColumn.schema();
outputMap = createMap(inputMap,
MaterializedField.create(
colSchema.getName(), colSchema.getType()),
@@ -275,8 +275,8 @@
// Create a new map array, reusing the offset vector from
// the original input map.
- final RepeatedMapVector source = (RepeatedMapVector) inputMap;
- final UInt4Vector offsets = source.getOffsetVector();
+ RepeatedMapVector source = (RepeatedMapVector) inputMap;
+ UInt4Vector offsets = source.getOffsetVector();
valueCount = offsets.getAccessor().getValueCount();
return new RepeatedMapVector(schema,
offsets, null);
@@ -342,7 +342,7 @@
nullBuilder.build(vectorCache);
}
if (children != null) {
- for (final ResolvedTuple child : children) {
+ for (ResolvedTuple child : children) {
child.buildNulls(vectorCache.childCache(child.name()));
}
}
@@ -353,7 +353,7 @@
nullBuilder.load(rowCount);
}
if (children != null) {
- for (final ResolvedTuple child : children) {
+ for (ResolvedTuple child : children) {
child.loadNulls(innerCardinality(rowCount));
}
}
@@ -402,7 +402,7 @@
if (children == null) {
return;
}
- for (final ResolvedTuple child : children) {
+ for (ResolvedTuple child : children) {
child.setRowCount(rowCount);
}
}
@@ -426,7 +426,7 @@
nullBuilder.close();
}
if (children != null) {
- for (final ResolvedTuple child : children) {
+ for (ResolvedTuple child : children) {
child.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 695e26a..f7e63e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -23,7 +23,13 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
+import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
+import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
+import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
+import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
+import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
@@ -44,6 +50,8 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VarDecimalVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.ZeroVector;
import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.complex.DictVector;
@@ -184,14 +192,20 @@
*/
private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
- rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
- rules.put(ScanBatch.class, CheckMode.VECTORS);
+ rules.put(OperatorRecordBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(ScanBatch.class, CheckMode.ZERO_SIZE);
rules.put(ProjectRecordBatch.class, CheckMode.ZERO_SIZE);
rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
rules.put(HashAggBatch.class, CheckMode.VECTORS);
rules.put(RemovingRecordBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(StreamingAggBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(RuntimeFilterRecordBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(FlattenRecordBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(MergeJoinBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(NestedLoopJoinBatch.class, CheckMode.ZERO_SIZE);
+ rules.put(LimitRecordBatch.class, CheckMode.ZERO_SIZE);
return rules;
}
@@ -349,6 +363,8 @@
validateDictVector(name, (DictVector) vector, topLevel);
} else if (vector instanceof UnionVector) {
validateUnionVector(name, (UnionVector) vector);
+ } else if (vector instanceof VarDecimalVector) {
+ validateVarDecimalVector(name, (VarDecimalVector) vector, topLevel);
} else {
logger.debug("Don't know how to validate vector: {} of class {}",
name, vector.getClass().getSimpleName());
@@ -369,20 +385,22 @@
}
private void validateVarCharVector(String name, VarCharVector vector, boolean topLevel) {
- int valueCount = vector.getAccessor().getValueCount();
-
- // Disabled because a large number of operators
- // set up offset vectors incorrectly.
- if (valueCount == 0 && (!topLevel || !checkZeroSize)) {
- return;
- }
-
int dataLength = vector.getBuffer().writerIndex();
- validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
- valueCount, dataLength, topLevel);
+ validateVarWidthVector(name, vector, dataLength, topLevel);
}
private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) {
+ int dataLength = vector.getBuffer().writerIndex();
+ validateVarWidthVector(name, vector, dataLength, topLevel);
+ }
+
+ private void validateVarDecimalVector(String name, VarDecimalVector vector,
+ boolean topLevel) {
+ int dataLength = vector.getBuffer().writerIndex();
+ validateVarWidthVector(name, vector, dataLength, topLevel);
+ }
+
+ private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength, boolean topLevel) {
int valueCount = vector.getAccessor().getValueCount();
// Disabled because a large number of operators
@@ -391,8 +409,7 @@
return;
}
- int dataLength = vector.getBuffer().writerIndex();
- validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false,
+ validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
valueCount, dataLength, topLevel);
}
@@ -401,7 +418,7 @@
int dataLength = dataVector.getAccessor().getValueCount();
int valueCount = vector.getAccessor().getValueCount();
int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
- true, valueCount, dataLength, topLevel);
+ valueCount, dataLength, topLevel);
if (dataLength != itemCount) {
error(name, vector, String.format(
@@ -419,7 +436,7 @@
int valueCount = vector.getAccessor().getValueCount();
int maxBitCount = valueCount * 8;
int elementCount = validateOffsetVector(name + "-offsets",
- vector.getOffsetVector(), true, valueCount, maxBitCount, topLevel);
+ vector.getOffsetVector(), valueCount, maxBitCount, topLevel);
BitVector dataVector = vector.getDataVector();
if (dataVector.getAccessor().getValueCount() != elementCount) {
error(name, vector, String.format(
@@ -452,7 +469,7 @@
RepeatedMapVector vector, boolean topLevel) {
int valueCount = vector.getAccessor().getValueCount();
int elementCount = validateOffsetVector(name + "-offsets",
- vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+ vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
for (ValueVector child: vector) {
validateVector(elementCount, child, false);
}
@@ -461,7 +478,7 @@
private void validateDictVector(String name, DictVector vector, boolean topLevel) {
int valueCount = vector.getAccessor().getValueCount();
int elementCount = validateOffsetVector(name + "-offsets",
- vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+ vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
validateVector(elementCount, vector.getKeys(), false);
validateVector(elementCount, vector.getValues(), false);
}
@@ -470,7 +487,7 @@
RepeatedListVector vector, boolean topLevel) {
int valueCount = vector.getAccessor().getValueCount();
int elementCount = validateOffsetVector(name + "-offsets",
- vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel);
+ vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
validateVector(elementCount, vector.getDataVector(), false);
}
@@ -500,16 +517,17 @@
}
private int validateOffsetVector(String name, UInt4Vector offsetVector,
- boolean repeated, int valueCount, int maxOffset, boolean topLevel) {
+ int valueCount, int maxOffset, boolean topLevel) {
UInt4Vector.Accessor accessor = offsetVector.getAccessor();
int offsetCount = accessor.getValueCount();
// TODO: Disabled because a large number of operators
- // set up offset vectors incorrectly.
-// if (!repeated && offsetCount == 0) {
-// System.out.println(String.format(
-// "Offset vector for %s: [0] has length 0, expected 1+",
-// name));
-// return false;
+ // set up offset vectors incorrectly. Either that, or semantics
+ // are ill-defined: some vectors assume an offset vector length
+ // of 0 if the "outer" value count is zero (which, while fiddly, is
+ // a workable definition.)
+// if (checkZeroSize && topLevel && offsetCount == 0) {
+// error(name, offsetVector,
+// "Offset vector has length 0, expected 1+");
// }
if (valueCount == 0 && offsetCount > 1 || valueCount > 0 && offsetCount != valueCount + 1) {
error(name, offsetVector, String.format(
@@ -520,7 +538,8 @@
return 0;
}
- // First value must be zero in current version.
+ // First value must be zero. (This is why offset vectors have one more
+ // value than the number of rows.)
int prevOffset;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 4d70065..07f919d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -30,6 +30,8 @@
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of the result set loader. Caches vectors
@@ -163,7 +165,7 @@
CLOSED
}
- protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class);
+ protected static final Logger logger = LoggerFactory.getLogger(ResultSetLoaderImpl.class);
/**
* Options provided to this loader.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
index 057c0f2..de8a77e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.nested;
-import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.BaseTestQuery;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -26,7 +26,6 @@
@Test
public void test() throws Exception {
-// test("select r.r_name, t1.f from cp.`tpch/region.parquet` r join (select flatten(x) as f from (select convert_from('[0, 1]', 'json') as x from cp.`tpch/region.parquet`)) t1 on t1.f = r.r_regionkey");
test("SELECT r.r_name, \n" +
" t1.f \n" +
"FROM cp.`tpch/region.parquet` r \n" +
@@ -111,5 +110,4 @@
"(select first_name from cp.`employee.json` where first_name='Sheri')",
"Flatten does not support inputs of non-list values");
}
-
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 07b34c0..7610e71 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -28,22 +28,21 @@
import java.nio.file.Paths;
import java.util.List;
-import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.OperatorTest;
-import org.apache.drill.test.TestBuilder;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.exec.fn.interp.TestConstantFolding;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.SubDirTestWatcher;
+import org.apache.drill.test.TestBuilder;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
@Category(OperatorTest.class)
public class TestFlatten extends BaseTestQuery {
@@ -88,7 +87,6 @@
.setRecord(jsonRecords)
.createFiles(1, numCopies, "json");
- @SuppressWarnings("unchecked")
List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
mapOf("uid", 1l,
"lst_lst_0", listOf(1l, 2l, 3l, 4l, 5l),
@@ -123,7 +121,6 @@
@Test
public void testFlattenReferenceImpl() throws Exception {
- @SuppressWarnings("unchecked")
List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
mapOf("a",1,
"b",2,
@@ -133,7 +130,6 @@
listOf(1000,999)
)));
List<JsonStringHashMap<String, Object>> result = flatten(flatten(flatten(data, "list_col"), "nested_list_col"), "nested_list_col");
- @SuppressWarnings("unchecked")
List<JsonStringHashMap<String, Object>> expectedResult = Lists.newArrayList(
mapOf("nested_list_col", 100, "list_col", 10,"a", 1, "b",2),
mapOf("nested_list_col", 99, "list_col", 10,"a", 1, "b",2),
@@ -195,7 +191,6 @@
.setRecord(jsonRecord)
.createFiles(1, numRecords, "json");
- @SuppressWarnings("unchecked")
List<JsonStringHashMap<String,Object>> data = Lists.newArrayList(
mapOf("int_list", inputList)
);
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 6d408b8..0278749 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -102,7 +102,7 @@
public FieldReader getReader() { return reader; }
@Override
- public int getBufferSizeFor(final int valueCount) {
+ public int getBufferSizeFor(int valueCount) {
if (valueCount == 0) {
return 0;
}
@@ -121,8 +121,8 @@
public Mutator getMutator() { return mutator; }
@Override
- public void setInitialCapacity(final int valueCount) {
- final long size = (long) valueCount * VALUE_WIDTH;
+ public void setInitialCapacity(int valueCount) {
+ long size = (long) valueCount * VALUE_WIDTH;
// TODO: Replace this with MAX_BUFFER_SIZE once all
// code is aware of the maximum vector size.
if (size > MAX_ALLOCATION_SIZE) {
@@ -170,7 +170,7 @@
* if it can't allocate the new buffer
*/
@Override
- public void allocateNew(final int valueCount) {
+ public void allocateNew(int valueCount) {
allocateBytes(valueCount * VALUE_WIDTH);
}
@@ -182,14 +182,14 @@
super.reset();
}
- private void allocateBytes(final long size) {
+ private void allocateBytes(long size) {
// TODO: Replace this with MAX_BUFFER_SIZE once all
// code is aware of the maximum vector size.
if (size > MAX_ALLOCATION_SIZE) {
throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
}
- final int curSize = (int)size;
+ int curSize = (int)size;
clear();
data = allocator.buffer(curSize);
data.readerIndex(0);
@@ -214,7 +214,7 @@
? 256
: allocationSizeInBytes * 2L;
- final int currentCapacity = data.capacity();
+ int currentCapacity = data.capacity();
// Some operations, such as Value Vector#exchange, can be change DrillBuf data field without corresponding allocation size changes.
// Check that the size of the allocation is sufficient to copy the old buffer.
while (newAllocationSize < currentCapacity) {
@@ -238,7 +238,7 @@
if (newAllocationSize == 0) {
throw new IllegalStateException("Attempt to reAlloc a zero-sized vector");
}
- final DrillBuf newBuf = allocator.buffer(newAllocationSize);
+ DrillBuf newBuf = allocator.buffer(newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
newBuf.writerIndex(data.writerIndex());
data.release(1);
@@ -259,9 +259,9 @@
public void load(SerializedField metadata, DrillBuf buffer) {
Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()),
"The field %s doesn't match the provided metadata %s.", this.field, metadata);
- final int actualLength = metadata.getBufferLength();
- final int valueCount = metadata.getValueCount();
- final int expectedLength = valueCount * VALUE_WIDTH;
+ int actualLength = metadata.getBufferLength();
+ int valueCount = metadata.getValueCount();
+ int expectedLength = valueCount * VALUE_WIDTH;
assert actualLength == expectedLength : String.format("Expected to load %d bytes but actually loaded %d bytes", expectedLength, actualLength);
clear();
@@ -296,8 +296,8 @@
}
public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
- final int startPoint = startIndex * VALUE_WIDTH;
- final int sliceLength = length * VALUE_WIDTH;
+ int startPoint = startIndex * VALUE_WIDTH;
+ int sliceLength = length * VALUE_WIDTH;
target.clear();
target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
target.data.writerIndex(sliceLength);
@@ -402,14 +402,14 @@
<#if (minor.class == "Interval")>
public void get(int index, ${minor.class}Holder holder) {
- final int offsetIndex = index * VALUE_WIDTH;
+ int offsetIndex = index * VALUE_WIDTH;
holder.months = data.getInt(offsetIndex);
holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
}
public void get(int index, Nullable${minor.class}Holder holder) {
- final int offsetIndex = index * VALUE_WIDTH;
+ int offsetIndex = index * VALUE_WIDTH;
holder.isSet = 1;
holder.months = data.getInt(offsetIndex);
holder.days = data.getInt(offsetIndex + ${minor.daysOffset});
@@ -418,30 +418,30 @@
@Override
public ${friendlyType} getObject(int index) {
- final int offsetIndex = index * VALUE_WIDTH;
- final int months = data.getInt(offsetIndex);
- final int days = data.getInt(offsetIndex + ${minor.daysOffset});
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ int offsetIndex = index * VALUE_WIDTH;
+ int months = data.getInt(offsetIndex);
+ int days = data.getInt(offsetIndex + ${minor.daysOffset});
+ int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.fromInterval(months, days, millis);
}
public StringBuilder getAsStringBuilder(int index) {
- final int offsetIndex = index * VALUE_WIDTH;
- final int months = data.getInt(offsetIndex);
- final int days = data.getInt(offsetIndex + ${minor.daysOffset});
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ int offsetIndex = index * VALUE_WIDTH;
+ int months = data.getInt(offsetIndex);
+ int days = data.getInt(offsetIndex + ${minor.daysOffset});
+ int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.intervalStringBuilder(months, days, millis);
}
<#elseif (minor.class == "IntervalDay")>
public void get(int index, ${minor.class}Holder holder) {
- final int offsetIndex = index * VALUE_WIDTH;
+ int offsetIndex = index * VALUE_WIDTH;
holder.days = data.getInt(offsetIndex);
holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
}
public void get(int index, Nullable${minor.class}Holder holder) {
- final int offsetIndex = index * VALUE_WIDTH;
+ int offsetIndex = index * VALUE_WIDTH;
holder.isSet = 1;
holder.days = data.getInt(offsetIndex);
holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset});
@@ -449,16 +449,16 @@
@Override
public ${friendlyType} getObject(int index) {
- final int offsetIndex = index * VALUE_WIDTH;
- final int days = data.getInt(offsetIndex);
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ int offsetIndex = index * VALUE_WIDTH;
+ int days = data.getInt(offsetIndex);
+ int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.fromIntervalDay(days, millis);
}
public StringBuilder getAsStringBuilder(int index) {
- final int offsetIndex = index * VALUE_WIDTH;
- final int days = data.getInt(offsetIndex);
- final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
+ int offsetIndex = index * VALUE_WIDTH;
+ int days = data.getInt(offsetIndex);
+ int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset});
return DateUtilities.intervalDayStringBuilder(days, millis);
}
<#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
@@ -551,7 +551,7 @@
@Override
public ${friendlyType} getObject(int index) {
- final BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
+ BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value());
return new BigDecimal(value, getField().getScale());
}
<#else>
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 9f0b1d5..1ca1f5e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -215,8 +215,8 @@
// This is performance critical code; every operation counts.
// Please be thoughtful when changing the code.
- final int valueIndex = prepareFill();
- final int fillCount = valueIndex - lastWriteIndex - 1;
+ int valueIndex = prepareFill();
+ int fillCount = valueIndex - lastWriteIndex - 1;
if (fillCount > 0) {
fillEmpties(fillCount);
}
@@ -228,7 +228,7 @@
}
public final int prepareFill() {
- final int valueIndex = vectorIndex.vectorIndex();
+ int valueIndex = vectorIndex.vectorIndex();
if (valueIndex + 1 < capacity) {
return valueIndex;
}
@@ -240,32 +240,32 @@
}
@Override
- protected final void fillEmpties(final int fillCount) {
+ protected final void fillEmpties(int fillCount) {
for (int i = 0; i < fillCount; i++) {
fillOffset(nextOffset);
}
}
@Override
- public final void setNextOffset(final int newOffset) {
- final int writeIndex = prepareWrite();
+ public final void setNextOffset(int newOffset) {
+ int writeIndex = prepareWrite();
drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
nextOffset = newOffset;
}
- public final void reviseOffset(final int newOffset) {
- final int writeIndex = vectorIndex.vectorIndex() + 1;
+ public final void reviseOffset(int newOffset) {
+ int writeIndex = vectorIndex.vectorIndex() + 1;
drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
nextOffset = newOffset;
}
- public final void fillOffset(final int newOffset) {
+ public final void fillOffset(int newOffset) {
drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, newOffset);
nextOffset = newOffset;
}
@Override
- public final void setValue(final Object value) {
+ public final void setValue(Object value) {
throw new InvalidConversionError(
"setValue() not supported for the offset vector writer: " + value);
}
@@ -298,19 +298,22 @@
@Override
public void postRollover() {
- final int newNext = nextOffset - rowStartOffset;
+ int newNext = nextOffset - rowStartOffset;
super.postRollover();
nextOffset = newNext;
}
@Override
public void setValueCount(int valueCount) {
- mandatoryResize(valueCount);
- // Value count is in row positions.
+ // Value count is in row positions, not index
+ // positions. (There are one more index positions
+ // than row positions.)
+ int offsetCount = valueCount + 1;
+ mandatoryResize(offsetCount);
fillEmpties(valueCount - lastWriteIndex - 1);
- vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH);
+ vector().getBuffer().writerIndex(offsetCount * VALUE_WIDTH);
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index f19709d..046641b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.vector.complex;
-import io.netty.buffer.DrillBuf;
-
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
@@ -27,8 +25,8 @@
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
import org.apache.drill.exec.expr.BasicTypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
@@ -38,10 +36,11 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VectorDescriptor;
import org.apache.drill.exec.vector.ZeroVector;
-
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.ObjectArrays;
+import io.netty.buffer.DrillBuf;
+
public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector {
public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
@@ -349,6 +348,12 @@
@Override
public void setValueCount(int valueCount) {
// TODO: populate offset end points
+ // Convention seems to be that if the "outer" value count
+ // is zero, then the offset vector can also be zero-length:
+ // the normal initial zero position is omitted. While this
+ // saves a bit of memory, it greatly complicates code that
+ // works with vectors because of the special case for zero-length
+ // vectors.
offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
vector.getMutator().setValueCount(childValueCount);