DRILL-7456: Batch count fixes for 12 operators
Enables batch validation for 12 additional operators:
* MergingRecordBatch
* OrderedPartitionRecordBatch
* RangePartitionRecordBatch
* TraceRecordBatch
* UnionAllRecordBatch
* UnorderedReceiverBatch
* UnpivotMapsRecordBatch
* WindowFrameRecordBatch
* TopNBatch
* HashJoinBatch
* ExternalSortBatch
* WriterRecordBatch
Fixes issues found with those checks so that this set of
operators passes all checks.
Includes code cleanup in many files touched during this
work.
closes #1906
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 79b40ee..3e658cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -58,6 +58,8 @@
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.buffer.DrillBuf;
@@ -65,7 +67,7 @@
* Record batch used for a particular scan. Operators against one or more
*/
public class ScanBatch implements CloseableRecordBatch {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(ScanBatch.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class);
/** Main collection of fields' value vectors. */
@@ -79,7 +81,7 @@
private BatchSchema schema;
private final Mutator mutator;
private boolean done;
- private Iterator<Map<String, String>> implicitColumns;
+ private final Iterator<Map<String, String>> implicitColumns;
private Map<String, String> implicitValues;
private final BufferAllocator allocator;
private final List<Map<String, String>> implicitColumnList;
@@ -179,18 +181,19 @@
}
/**
- * This method is to perform scan specific actions when the scan needs to clean/reset readers and return NONE status
+ * This method is to perform scan specific actions when the scan needs to
+ * clean/reset readers and return NONE status
+ *
* @return NONE
*/
private IterOutcome cleanAndReturnNone() {
if (isRepeatableScan) {
readers = readerList.iterator();
- return IterOutcome.NONE;
} else {
releaseAssets(); // All data has been read. Release resource.
done = true;
- return IterOutcome.NONE;
}
+ return IterOutcome.NONE;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 0257abc..52fe0a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -86,8 +86,8 @@
private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
public static final MajorType HVtype = MajorType.newBuilder()
- .setMinorType(MinorType.INT /* dataType */ )
- .setMode(DataMode.REQUIRED /* mode */ )
+ .setMinorType(MinorType.INT /* dataType */)
+ .setMode(DataMode.REQUIRED /* mode */)
.build();
// The vector containers storing all the inner rows
@@ -160,7 +160,7 @@
}
this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
- if ( numPartitions > 1 ) {
+ if (numPartitions > 1) {
allocateNewCurrentBatchAndHV();
}
}
@@ -194,7 +194,7 @@
while (vci.hasNext()) {
VectorWrapper<?> vw = vci.next();
// If processing a spilled container, skip the last column (HV)
- if ( cycleNum > 0 && ! vci.hasNext() ) { break; }
+ if (cycleNum > 0 && ! vci.hasNext()) { break; }
ValueVector vv = vw.getValueVector();
ValueVector newVV = TypeHelper.getNewVector(vv.getField(), allocator);
newVC.add(newVV); // add first to allow dealloc in case of an OOM
@@ -213,7 +213,7 @@
newVC.setRecordCount(0);
success = true;
} finally {
- if ( !success ) {
+ if (!success) {
newVC.clear(); // in case of an OOM
}
}
@@ -233,11 +233,12 @@
/**
* Spills if needed
*/
- public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
+ public void appendInnerRow(VectorContainer buildContainer, int ind,
+ int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
- int pos = currentBatch.appendRow(buildContainer,ind);
+ int pos = currentBatch.appendRow(buildContainer, ind);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
- if ( pos == recordsPerBatch ) {
+ if (pos == recordsPerBatch) {
boolean needsSpill = isSpilled || calc.shouldSpill();
completeAnInnerBatch(true, needsSpill);
}
@@ -245,12 +246,11 @@
/**
* Outer always spills when batch is full
- *
*/
public void appendOuterRow(int hashCode, int recordsProcessed) {
int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
- if ( pos == recordsPerBatch ) {
+ if (pos == recordsPerBatch) {
completeAnOuterBatch(true);
}
}
@@ -269,7 +269,7 @@
* (that is, more rows are coming) - initialize with a new current batch for that partition
* */
private void completeABatch(boolean toInitialize, boolean needsSpill) {
- if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
+ if (currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
currentBatch.add(currHVVector);
currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
tmpBatchesList.add(currentBatch);
@@ -283,10 +283,10 @@
} else {
freeCurrentBatchAndHVVector();
}
- if ( needsSpill ) { // spill this batch/partition and free its memory
+ if (needsSpill) { // spill this batch/partition and free its memory
spillThisPartition();
}
- if ( toInitialize ) { // allocate a new batch and HV vector
+ if (toInitialize) { // allocate a new batch and HV vector
allocateNewCurrentBatchAndHV();
} else {
currentBatch = null;
@@ -331,11 +331,11 @@
}
public void spillThisPartition() {
- if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
+ if (tmpBatchesList.size() == 0) { return; } // in case empty - nothing to spill
logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
// If this is the first spill for this partition, create an output stream
- if ( writer == null ) {
+ if (writer == null) {
final String side = processingOuter ? "outer" : "inner";
final String suffix = cycleNum > 0 ? side + "_" + Integer.toString(cycleNum) : side;
spillFile = spillSet.getNextSpillFile(suffix);
@@ -488,10 +488,10 @@
*/
private void closeWriterInternal(boolean doDeleteFile) {
try {
- if ( writer != null ) {
+ if (writer != null) {
spillSet.close(writer);
}
- if ( doDeleteFile && spillFile != null ) {
+ if (doDeleteFile && spillFile != null) {
spillSet.delete(spillFile);
}
} catch (IOException ioe) {
@@ -512,7 +512,7 @@
* have been consumed.
*/
public void buildContainersHashTableAndHelper() throws SchemaChangeException {
- if ( isSpilled ) { return; } // no building for spilled partitions
+ if (isSpilled) { return; } // no building for spilled partitions
containers = new ArrayList<>();
hashTable.updateInitialCapacity((int) getNumInMemoryRecords());
for (int curr = 0; curr < partitionBatchesCount; curr++) {
@@ -520,7 +520,7 @@
final int currentRecordCount = nextBatch.getRecordCount();
// For every incoming build batch, we create a matching helper batch
- if ( ! semiJoin ) { hjHelper.addNewBatch(currentRecordCount); }
+ if (! semiJoin) { hjHelper.addNewBatch(currentRecordCount); }
// Holder contains the global index where the key is hashed into using the hash table
final IndexPointer htIndex = new IndexPointer();
@@ -528,7 +528,7 @@
assert nextBatch != null;
assert probeBatch != null;
- hashTable.updateIncoming(nextBatch, probeBatch );
+ hashTable.updateIncoming(nextBatch, probeBatch);
IntVector HV_vector = (IntVector) nextBatch.getLast();
@@ -543,7 +543,7 @@
* the current record index and batch index. This will be used
* later when we probe and find a match.
*/
- if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
+ if (! semiJoin) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
}
containers.add(nextBatch);
@@ -570,11 +570,11 @@
}
private void freeCurrentBatchAndHVVector() {
- if ( currentBatch != null ) {
+ if (currentBatch != null) {
currentBatch.clear();
currentBatch = null;
}
- if ( currHVVector != null ) {
+ if (currHVVector != null) {
currHVVector.clear();
currHVVector = null;
}
@@ -591,13 +591,13 @@
vc.clear();
}
}
- while ( tmpBatchesList.size() > 0 ) {
+ while (tmpBatchesList.size() > 0) {
VectorContainer vc = tmpBatchesList.remove(0);
vc.clear();
}
closeWriterInternal(deleteFile);
clearHashTableAndHelper();
- if ( containers != null ) {
+ if (containers != null) {
containers.clear();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dc063f9..eab38ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -25,18 +29,14 @@
import java.util.Map;
import java.util.Set;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
@@ -51,7 +51,6 @@
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
import org.apache.drill.exec.memory.BaseAllocator;
@@ -65,10 +64,10 @@
import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.Comparator;
+import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.common.JoinControl;
@@ -85,49 +84,61 @@
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This class implements the runtime execution for the Hash-Join operator
- * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
- *
- * This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of
- * some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}.
- * After the build phase is over, in the most general case, some of the partitions were spilled, and the others
- * are in memory. Each of the partitions in memory would get a {@link HashTable} built.
- * Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in
- * memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch.
- * But if that build side partition was spilled, then the matching Probe size partition is spilled as well.
- * After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is
- * processed individually (that Build partition should be smaller than the original, hence likely fit whole into
- * memory to allow probing; if not -- see below).
- * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- * the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
- * read and divided into new partitions, which in turn may spill again (and again...).
- * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
- * indicating that the number of partitions chosen was too small.
+ * This class implements the runtime execution for the Hash-Join operator
+ * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
+ * <p>
+ * This implementation splits the incoming Build side rows into multiple
+ * Partitions, thus allowing spilling of some of these partitions to disk if
+ * memory gets tight. Each partition is implemented as a {@link HashPartition}.
+ * After the build phase is over, in the most general case, some of the
+ * partitions were spilled, and the others are in memory. Each of the partitions
+ * in memory would get a {@link HashTable} built.
+ * <p>
+ * Next the Probe side is read, and each row is key matched with a Build
+ * partition. If that partition is in memory, then the key is used to probe and
+ * perform the join, and the results are added to the outgoing batch. But if
+ * that build side partition was spilled, then the matching Probe size partition
+ * is spilled as well.
+ * <p>
+ * After all the Probe side was processed, we are left with pairs of spilled
+ * partitions. Then each pair is processed individually (that Build partition
+ * should be smaller than the original, hence likely fit whole into memory to
+ * allow probing; if not -- see below).
+ * <p>
+ * Processing of each spilled pair is EXACTLY like processing the original
+ * Build/Probe incomings. (As a fact, the {@link #innerNext()} method calls
+ * itself recursively !!). Thus the spilled build partition is read and divided
+ * into new partitions, which in turn may spill again (and again...). The code
+ * tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or
+ * greater) is a waste, indicating that the number of partitions chosen was too
+ * small.
*/
+
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implements RowKeyJoin {
- protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(HashJoinBatch.class);
/**
* The maximum number of records within each internal batch.
*/
- private int RECORDS_PER_BATCH; // internal batches
+ private final int RECORDS_PER_BATCH; // internal batches
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
- private boolean semiJoin;
- private boolean joinIsLeftOrFull;
- private boolean joinIsRightOrFull;
+ private final boolean semiJoin;
+ private final boolean joinIsLeftOrFull;
+ private final boolean joinIsRightOrFull;
private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
// Join conditions
@@ -141,13 +152,15 @@
private final List<NamedExpression> rightExpr;
/**
- * Names of the join columns. This names are used in order to help estimate the size of the {@link HashTable}s.
+ * Names of the join columns. This names are used in order to help estimate
+ * the size of the {@link HashTable}s.
*/
private final Set<String> buildJoinColumns;
// Fields used for partitioning
/**
- * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
+ * The number of {@link HashPartition}s. This is configured via a system
+ * option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
private int numPartitions = 1; // must be 2 to the power of bitsInMask
@@ -155,8 +168,8 @@
* The master class used to generate {@link HashTable}s.
*/
private ChainedHashTable baseHashTable;
- private MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
- private MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
+ private final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+ private final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
private boolean canSpill = true;
private boolean wasKilled; // a kill was received, may need to clean spilled partns
@@ -174,18 +187,18 @@
private BatchSchema probeSchema;
// Whether this HashJoin is used for a row-key based join
- private boolean isRowKeyJoin = false;
+ private final boolean isRowKeyJoin;
- private JoinControl joinControl;
+ private final JoinControl joinControl;
// An iterator over the build side hash table (only applicable for row-key joins)
- private boolean buildComplete = false;
+ private boolean buildComplete;
// indicates if we have previously returned an output batch
private boolean firstOutputBatch = true;
private int rightHVColPosition;
- private BufferAllocator allocator;
+ private final BufferAllocator allocator;
// Local fields for left/right incoming - may be replaced when reading from spilled
private RecordBatch buildBatch;
private RecordBatch probeBatch;
@@ -193,27 +206,27 @@
/**
* Flag indicating whether or not the first data holding build batch needs to be fetched.
*/
- private MutableBoolean prefetchedBuild = new MutableBoolean(false);
+ private final MutableBoolean prefetchedBuild = new MutableBoolean(false);
/**
* Flag indicating whether or not the first data holding probe batch needs to be fetched.
*/
- private MutableBoolean prefetchedProbe = new MutableBoolean(false);
+ private final MutableBoolean prefetchedProbe = new MutableBoolean(false);
// For handling spilling
- private SpillSet spillSet;
+ private final SpillSet spillSet;
HashJoinPOP popConfig;
- private int originalPartition = -1; // the partition a secondary reads from
+ private final int originalPartition = -1; // the partition a secondary reads from
IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
- private int maxBatchesInMemory;
- private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
+ private final int maxBatchesInMemory;
+ private final List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
private boolean enableRuntimeFilter;
private RuntimeFilterReporter runtimeFilterReporter;
private ValueVectorHashHelper.Hash64 hash64;
- private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
- private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
- private List<BloomFilter> bloomFilters = new ArrayList<>();
- private boolean bloomFiltersGenerated = false;
+ private final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+ private final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+ private final List<BloomFilter> bloomFilters = new ArrayList<>();
+ private boolean bloomFiltersGenerated;
/**
* This holds information about the spilled partitions for the build and probe side.
@@ -223,13 +236,13 @@
private final String innerSpillFile;
private int outerSpilledBatches;
private String outerSpillFile;
- private boolean updatedOuter = false;
+ private boolean updatedOuter;
- public HashJoinSpilledPartition(final int cycle,
- final int originPartition,
- final int prevOriginPartition,
- final int innerSpilledBatches,
- final String innerSpillFile) {
+ public HashJoinSpilledPartition(int cycle,
+ int originPartition,
+ int prevOriginPartition,
+ int innerSpilledBatches,
+ String innerSpillFile) {
super(cycle, originPartition, prevOriginPartition);
this.innerSpilledBatches = innerSpilledBatches;
@@ -254,7 +267,7 @@
return outerSpillFile;
}
- public void updateOuter(final int outerSpilledBatches, final String outerSpillFile) {
+ public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
Preconditions.checkState(!updatedOuter);
updatedOuter = true;
@@ -294,8 +307,8 @@
/**
* Queue of spilled partitions to process.
*/
- private SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
- private HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
+ private final SpilledState<HashJoinSpilledPartition> spilledState = new SpilledState<>();
+ private final HashJoinUpdater spilledStateUpdater = new HashJoinUpdater();
private HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
public enum Metric implements MetricDef {
@@ -349,9 +362,11 @@
if (rightUpstream == OK_NEW_SCHEMA) {
buildSchema = right.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
+ // position of the new "column" for keeping the hash values
+ // (after the real columns)
rightHVColPosition = right.getContainer().getNumberOfColumns();
- // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+ // In special cases, when the probe side is empty, and
+ // inner/left join - no need for Hash Table
skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
// We only need the hash tables if we have data on the build side.
setupHashTable();
@@ -364,10 +379,12 @@
}
}
- // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+ // If we have a valid schema, this will build a valid container.
+ // If we were unable to obtain a valid schema,
// we still need to build a dummy schema. This code handles both cases for us.
setupOutputContainerSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.setEmpty();
}
/**
@@ -381,7 +398,9 @@
buildBatch,
() -> {
batchMemoryManager.update(RIGHT_INDEX, 0, true);
- RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+ batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX),
+ getRecordBatchStatsContext());
});
}
@@ -396,7 +415,9 @@
probeBatch,
() -> {
batchMemoryManager.update(LEFT_INDEX, 0);
- RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+ batchMemoryManager.getRecordBatchSizer(LEFT_INDEX),
+ getRecordBatchStatsContext());
});
}
@@ -411,11 +432,11 @@
* @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
*/
private IterOutcome prefetchFirstBatch(IterOutcome outcome,
- final MutableBoolean prefetched,
- final MutableBoolean isEmpty,
- final int index,
- final RecordBatch batch,
- final Runnable memoryManagerUpdate) {
+ MutableBoolean prefetched,
+ MutableBoolean isEmpty,
+ int index,
+ RecordBatch batch,
+ Runnable memoryManagerUpdate) {
if (prefetched.booleanValue()) {
// We have already prefetch the first data holding batch
return outcome;
@@ -429,7 +450,7 @@
outcome = sniffNonEmptyBatch(outcome, index, batch);
}
- isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data.
+ isEmpty.setValue(outcome == IterOutcome.NONE); // If we received NONE there is no data.
if (outcome == IterOutcome.OUT_OF_MEMORY) {
// We reached a termination state
@@ -440,7 +461,7 @@
} else {
// Got our first batch(es)
if (spilledState.isFirstCycle()) {
- // Only collect stats for the first cylce
+ // Only collect stats for the first cycle
memoryManagerUpdate.run();
}
state = BatchState.FIRST;
@@ -488,10 +509,10 @@
*/
public HashJoinMemoryCalculator getCalculatorImpl() {
if (maxBatchesInMemory == 0) {
- final double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
- final double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
- final double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
- final String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
+ double safetyFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_SAFETY_FACTOR_KEY);
+ double fragmentationFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_FRAGMENTATION_FACTOR_KEY);
+ double hashTableDoublingFactor = context.getOptions().getDouble(ExecConstants.HASHJOIN_HASH_DOUBLE_FACTOR_KEY);
+ String hashTableCalculatorType = context.getOptions().getString(ExecConstants.HASHJOIN_HASHTABLE_CALC_TYPE_KEY);
return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, semiJoin);
} else {
@@ -502,7 +523,7 @@
@Override
public IterOutcome innerNext() {
if (wasKilled) {
- // We have recieved a kill signal. We need to stop processing.
+ // We have received a kill signal. We need to stop processing.
this.cleanup();
super.close();
return IterOutcome.NONE;
@@ -522,7 +543,7 @@
*/
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
- final IterOutcome buildExecuteTermination = executeBuildPhase();
+ IterOutcome buildExecuteTermination = executeBuildPhase();
if (buildExecuteTermination != null) {
// A termination condition was reached while executing the build phase.
@@ -591,10 +612,7 @@
outputRecords = hashJoinProbe.probeAndProject();
- for (final VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputRecords);
- }
- container.setRecordCount(outputRecords);
+ container.setValueCount(outputRecords);
batchMemoryManager.updateOutgoingStats(outputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
@@ -685,7 +703,7 @@
private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
batch.kill(true);
while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : batch) {
+ for (VectorWrapper<?> wrapper : batch) {
wrapper.getValueVector().clear();
}
upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
@@ -695,7 +713,7 @@
private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
private void setupHashTable() throws SchemaChangeException {
- final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+ List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
if ( skipHashTableBuild ) { return; }
@@ -713,13 +731,13 @@
leftExpr = null;
} else {
if (probeBatch.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
- final String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
+ String errorMsg = new StringBuilder().append("Hash join does not support probe batch with selection vectors. ").append("Probe batch has selection mode = ").append
(probeBatch.getSchema().getSelectionVectorMode()).toString();
throw new SchemaChangeException(errorMsg);
}
}
- final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
+ HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
// Create the chained hash table
@@ -735,7 +753,7 @@
ErrorCollector collector = new ErrorCollectorImpl();
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
@@ -868,7 +886,7 @@
// TODO dirty hack to prevent regressions. Remove this once batch sizing is implemented.
// We don't have enough memory to do partitioning, we have to do everything in memory
- final String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
+ String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. " +
"Forcing legacy behavoir of using unbounded memory in order to prevent regressions.",
numPartitions,
FileUtils.byteCountToDisplaySize(buildCalc.getMaxReservedMemory()),
@@ -876,7 +894,7 @@
logger.warn(message);
// create a Noop memory calculator
- final HashJoinMemoryCalculator calc = getCalculatorImpl();
+ HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(false);
buildCalc = calc.next();
@@ -908,7 +926,7 @@
private void disableSpilling(String reason) {
// Fail, or just issue a warning if a reason was given, or a fallback option is enabled
if ( reason == null ) {
- final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+ boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
if (fallbackEnabled) {
logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
" to use unbounded memory");
@@ -986,7 +1004,7 @@
initializeRuntimeFilter();
// Make the calculator aware of our partitions
- final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
+ HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
buildCalc.setPartitionStatSet(partitionStatSet);
boolean moreData = true;
@@ -1007,7 +1025,7 @@
// Fall through
case OK:
batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
- final int currentRecordCount = buildBatch.getRecordCount();
+ int currentRecordCount = buildBatch.getRecordCount();
//create runtime filter
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
//create runtime filter and send out async
@@ -1040,7 +1058,7 @@
}
// Append the new inner row to the appropriate partition; spill (that partition) if needed
- partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
+ partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc);
}
if ( read_right_HV_vector != null ) {
@@ -1048,6 +1066,8 @@
read_right_HV_vector = null;
}
break;
+ default:
+ throw new IllegalStateException(rightUpstream.name());
}
// Get the next incoming record batch
rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
@@ -1080,12 +1100,10 @@
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
- //
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
- //
for (int index = 0; index < partitions.length; index++) {
- final HashPartition partn = partitions[index];
+ HashPartition partn = partitions[index];
if (partn.isSpilled()) {
// Don't build hash tables for spilled partitions
@@ -1101,7 +1119,7 @@
partn.buildContainersHashTableAndHelper();
}
} catch (OutOfMemoryException e) {
- final String message = "Failed building hash table on partition " + index + ":\n"
+ String message = "Failed building hash table on partition " + index + ":\n"
+ makeDebugString() + "\n"
+ postBuildCalc.makeDebugString();
// Include debug info
@@ -1135,9 +1153,9 @@
private void setupOutputContainerSchema() {
if (buildSchema != null && ! semiJoin ) {
- for (final MaterializedField field : buildSchema) {
- final MajorType inputType = field.getType();
- final MajorType outputType;
+ for (MaterializedField field : buildSchema) {
+ MajorType inputType = field.getType();
+ MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2197).
if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
@@ -1148,16 +1166,16 @@
}
// make sure to project field with children for children to show up in the schema
- final MaterializedField projected = field.withType(outputType);
+ MaterializedField projected = field.withType(outputType);
// Add the vector to our output container
container.addOrGet(projected);
}
}
if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
- for (final VectorWrapper<?> vv : probeBatch) {
- final MajorType inputType = vv.getField().getType();
- final MajorType outputType;
+ for (VectorWrapper<?> vv : probeBatch) {
+ MajorType inputType = vv.getField().getType();
+ MajorType outputType;
// If right or full outer join then the output type should be optional. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
@@ -1168,7 +1186,7 @@
outputType = inputType;
}
- final ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
+ ValueVector v = container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
if (v instanceof AbstractContainerVector) {
vv.getValueVector().makeTransferPair(v);
v.clear();
@@ -1213,15 +1231,15 @@
buildJoinColumns = Sets.newHashSet();
List<SchemaPath> rightConditionPaths = new ArrayList<>();
for (int i = 0; i < conditions.size(); i++) {
- final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+ SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
rightConditionPaths.add(rightPath);
}
for (int i = 0; i < conditions.size(); i++) {
- final SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
- final PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
+ SchemaPath rightPath = (SchemaPath) conditions.get(i).getRight();
+ PathSegment.NameSegment nameSegment = (PathSegment.NameSegment)rightPath.getLastSegment();
buildJoinColumns.add(nameSegment.getPath());
- final String refName = "build_side_" + i;
+ String refName = "build_side_" + i;
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
}
@@ -1234,7 +1252,7 @@
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
- final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
+ long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
if (memLimit != 0) {
allocator.setLimit(memLimit);
@@ -1250,8 +1268,8 @@
partitions = new HashPartition[0];
// get the output batch size from config.
- final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+ int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
@@ -1305,11 +1323,11 @@
* @return A memory dump string.
*/
public String makeDebugString() {
- final StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder();
for (int partitionIndex = 0; partitionIndex < partitions.length; partitionIndex++) {
- final String partitionPrefix = "Partition " + partitionIndex + ": ";
- final HashPartition hashPartition = partitions[partitionIndex];
+ String partitionPrefix = "Partition " + partitionIndex + ": ";
+ HashPartition hashPartition = partitions[partitionIndex];
sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
}
@@ -1326,7 +1344,7 @@
if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
if (!spilledState.isFirstCycle()) { return; } // These stats are only for before processing spilled files
- final HashTableStats htStats = new HashTableStats();
+ HashTableStats htStats = new HashTableStats();
long numSpilled = 0;
HashTableStats newStats = new HashTableStats();
// sum the stats from all the partitions
@@ -1446,18 +1464,15 @@
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
- this.cleanup();
+ cleanup();
super.close();
}
public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
- final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
- cg.plainJavaCapable(true);
- // cg.saveCodeForDebugging(true);
// No real code generation !!
- return context.getImplementationClass(cg);
+ return new HashJoinProbeTemplate();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index e862c0e..d40f6a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -35,7 +35,7 @@
import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-public abstract class HashJoinProbeTemplate implements HashJoinProbe {
+public class HashJoinProbeTemplate implements HashJoinProbe {
VectorContainer container; // the outgoing container
@@ -50,13 +50,13 @@
// joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
private JoinControl joinControl;
- private HashJoinBatch outgoingJoinBatch = null;
+ private HashJoinBatch outgoingJoinBatch;
// Number of records to process on the probe side
- private int recordsToProcess = 0;
+ private int recordsToProcess;
// Number of records processed on the probe side
- private int recordsProcessed = 0;
+ private int recordsProcessed;
// Number of records in the output container
private int outputRecords;
@@ -71,7 +71,7 @@
private ProbeState probeState = ProbeState.PROBE_PROJECT;
// For outer or right joins, this is a list of unmatched records that needs to be projected
- private IntArrayList unmatchedBuildIndexes = null;
+ private IntArrayList unmatchedBuildIndexes;
private HashPartition partitions[];
@@ -79,14 +79,14 @@
// probing later on the same chain of duplicates
private HashPartition currPartition;
- private int currRightPartition = 0; // for returning RIGHT/FULL
+ private int currRightPartition; // for returning RIGHT/FULL
IntVector read_left_HV_vector; // HV vector that was read from the spilled batch
- private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
+ private int cycleNum; // 1-primary, 2-secondary, 3-tertiary, etc.
private HashJoinBatch.HashJoinSpilledPartition spilledInners[]; // for the outer to find the partition
private boolean buildSideIsEmpty = true;
private int numPartitions = 1; // must be 2 to the power of bitsInMask
- private int partitionMask = 0; // numPartitions - 1
- private int bitsInMask = 0; // number of bits in the MASK
+ private int partitionMask; // numPartitions - 1
+ private int bitsInMask; // number of bits in the MASK
private int numberOfBuildSideColumns;
private int targetOutputRecords;
private boolean semiJoin;
@@ -96,6 +96,7 @@
this.targetOutputRecords = targetOutputRecords;
}
+ @Override
public int getOutputCount() {
return outputRecords;
}
@@ -143,7 +144,7 @@
this.recordsProcessed = 0;
// A special case - if the left was an empty file
- if ( leftStartState == IterOutcome.NONE ){
+ if (leftStartState == IterOutcome.NONE){
changeToFinalProbeState();
} else {
this.recordsToProcess = probeBatch.getRecordCount();
@@ -151,16 +152,16 @@
// for those outer partitions that need spilling (cause their matching inners spilled)
// initialize those partitions' current batches and hash-value vectors
- for ( HashPartition partn : this.partitions) {
+ for (HashPartition partn : this.partitions) {
partn.allocateNewCurrentBatchAndHV();
}
currRightPartition = 0; // In case it's a Right/Full outer join
// Initialize the HV vector for the first (already read) left batch
- if ( this.cycleNum > 0 ) {
- if ( read_left_HV_vector != null ) { read_left_HV_vector.clear();}
- if ( leftStartState != IterOutcome.NONE ) { // Skip when outer spill was empty
+ if (this.cycleNum > 0) {
+ if (read_left_HV_vector != null) { read_left_HV_vector.clear();}
+ if (leftStartState != IterOutcome.NONE) { // Skip when outer spill was empty
read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast();
}
}
@@ -178,6 +179,7 @@
destVector.copyEntry(container.getRecordCount(), srcVector, buildSrcIndex);
}
}
+
/**
* Append the given probe side row into the outgoing container, following the build side part
* @param probeSrcContainer The container for the left/outer side
@@ -190,6 +192,7 @@
destVector.copyEntry(container.getRecordCount(), srcVector, probeSrcIndex);
}
}
+
/**
* A special version of the VectorContainer's appendRow for the HashJoin; (following a probe) it
* copies the build and probe sides into the outgoing container. (It uses a composite
@@ -204,12 +207,14 @@
private int outputRow(ArrayList<VectorContainer> buildSrcContainers, int compositeBuildSrcIndex,
VectorContainer probeSrcContainer, int probeSrcIndex) {
- if ( buildSrcContainers != null ) {
+ if (buildSrcContainers != null) {
int buildBatchIndex = compositeBuildSrcIndex >>> 16;
int buildOffset = compositeBuildSrcIndex & 65535;
appendBuild(buildSrcContainers.get(buildBatchIndex), buildOffset);
}
- if ( probeSrcContainer != null ) { appendProbe(probeSrcContainer, probeSrcIndex); }
+ if (probeSrcContainer != null) {
+ appendProbe(probeSrcContainer, probeSrcIndex);
+ }
return container.incRecordCount();
}
@@ -221,7 +226,7 @@
while (outputRecords < targetOutputRecords && recordsProcessed < recordsToProcess) {
outputRecords =
outputRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
- null /* no probeBatch */, 0 /* no probe index */ );
+ null /* no probeBatch */, 0 /* no probe index */);
recordsProcessed++;
}
}
@@ -248,8 +253,8 @@
recordsToProcess = 0;
changeToFinalProbeState();
// in case some outer partitions were spilled, need to spill their last batches
- for ( HashPartition partn : partitions ) {
- if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
+ for (HashPartition partn : partitions) {
+ if (! partn.isSpilled()) { continue; } // skip non-spilled
partn.completeAnOuterBatch(false);
// update the partition's spill record with the outer side
HashJoinBatch.HashJoinSpilledPartition sp = spilledInners[partn.getPartitionNum()];
@@ -262,7 +267,7 @@
case OK_NEW_SCHEMA:
if (probeBatch.getSchema().equals(probeSchema)) {
- for ( HashPartition partn : partitions ) { partn.updateBatches(); }
+ for (HashPartition partn : partitions) { partn.updateBatches(); }
} else {
throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
@@ -278,7 +283,7 @@
if (recordsToProcess == 0) {
continue;
}
- if ( cycleNum > 0 ) {
+ if (cycleNum > 0) {
read_left_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
}
}
@@ -287,8 +292,8 @@
int probeIndex = -1;
// Check if we need to drain the next row in the probe side
if (getNextRecord) {
- if ( !buildSideIsEmpty ) {
- int hashCode = ( cycleNum == 0 ) ?
+ if (!buildSideIsEmpty) {
+ int hashCode = (cycleNum == 0) ?
partitions[0].getProbeHashCode(recordsProcessed)
: read_left_HV_vector.getAccessor().get(recordsProcessed);
int currBuildPart = hashCode & partitionMask;
@@ -299,7 +304,7 @@
currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
// If the matching inner partition was spilled
- if ( outgoingJoinBatch.isSpilledInner(currBuildPart) ) {
+ if (outgoingJoinBatch.isSpilledInner(currBuildPart)) {
// add this row to its outer partition (may cause a spill, when the batch is full)
currPartition.appendOuterRow(hashCode, recordsProcessed);
@@ -312,8 +317,8 @@
}
- if ( semiJoin ) {
- if ( probeIndex != -1 ) {
+ if (semiJoin) {
+ if (probeIndex != -1) {
// output the probe side only
outputRecords =
outputRow(null, 0, probeBatch.getContainer(), recordsProcessed);
@@ -392,7 +397,6 @@
}
}
}
-
}
/**
@@ -408,7 +412,7 @@
outputRecords = 0;
// When handling spilled partitions, the state becomes DONE at the end of each partition
- if ( probeState == ProbeState.DONE ) {
+ if (probeState == ProbeState.DONE) {
return outputRecords; // that is zero
}
@@ -422,7 +426,7 @@
do {
if (unmatchedBuildIndexes == null) { // first time for this partition ?
- if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
+ if (buildSideIsEmpty) { return outputRecords; } // in case of an empty right
// Get this partition's list of build indexes that didn't match any record on the probe side
unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
recordsProcessed = 0;
@@ -432,14 +436,14 @@
// Project the list of unmatched records on the build side
executeProjectRightPhase(currRightPartition);
- if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
+ if (recordsProcessed < recordsToProcess) { // more records in this partition?
return outputRecords; // outgoing is full; report and come back later
} else {
currRightPartition++; // on to the next right partition
unmatchedBuildIndexes = null;
}
- } while ( currRightPartition < numPartitions );
+ } while (currRightPartition < numPartitions);
probeState = ProbeState.DONE; // last right partition was handled; we are done now
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index ae04579..3f90a3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -147,10 +147,7 @@
incoming.getContainer().zeroVectors();
outgoingSv.setRecordCount(0);
outgoingSv.setBatchActualRecordCount(0);
- // Must allocate vectors to allow for offset vectors which
- // require a zero in the 0th position.
- container.allocateNew();
- container.setRecordCount(0);
+ container.setEmpty();
// Release buffer for sv2 (if any)
if (incomingSv != null) {
incomingSv.clear();
@@ -162,16 +159,9 @@
tp.transfer();
}
- // Must report the actual value count as the record
- // count. This is NOT the input record count when the input
- // is an SV2.
- int inputValueCount = incoming.getContainer().getRecordCount();
- container.setRecordCount(inputValueCount);
-
// Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
// batch to output record batch and later an SV2Remover copies the needed records.
outgoingSv.allocateNew(inputRecordCount);
- outgoingSv.setBatchActualRecordCount(inputValueCount);
limit(inputRecordCount);
// Release memory for incoming sv (if any)
@@ -224,7 +214,7 @@
}
}
- outgoingSv.setRecordCount(svIndex);
+ setOutgoingRecordCount(inputRecordCount, svIndex);
}
private void updateOutputSV2(int svIndex, int incomingIndex) {
@@ -243,6 +233,16 @@
}
}
+ private void setOutgoingRecordCount(int inputRecordCount, int outputCount) {
+ outgoingSv.setRecordCount(outputCount);
+ // Must report the actual value count as the record
+ // count. This is NOT the input record count when the input
+ // is an SV2.
+ int inputValueCount = incoming.getContainer().getRecordCount();
+ container.setRecordCount(inputValueCount);
+ outgoingSv.setBatchActualRecordCount(inputValueCount);
+ }
+
/**
* Reset the states for recordStartOffset, numberOfRecords and based on the
* {@link PartitionLimit} passed to the operator. It also resets the
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 8f16b51..420b61a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -18,19 +18,17 @@
package org.apache.drill.exec.physical.impl.mergereceiver;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.ArrayList;
import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
@@ -63,13 +61,8 @@
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.SchemaBuilder;
-import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ControlsInjector;
@@ -78,20 +71,21 @@
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
-
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
-
import io.netty.buffer.ByteBuf;
/**
* The MergingRecordBatch merges pre-sorted record batches from remote senders.
*/
public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(MergingRecordBatch.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(MergingRecordBatch.class);
private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
@@ -99,21 +93,20 @@
private RecordBatchLoader[] batchLoaders;
private final RawFragmentBatchProvider[] fragProviders;
private final ExchangeFragmentContext context;
- private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
private final MergingReceiverPOP config;
- private boolean hasRun = false;
+ private boolean hasRun;
private boolean outgoingBatchHasSpace = true;
private boolean hasMoreIncoming = true;
- private int outgoingPosition = 0;
- private int senderCount = 0;
+ private int outgoingPosition;
+ private int senderCount;
private RawFragmentBatch[] incomingBatches;
private int[] batchOffsets;
private PriorityQueue <Node> pqueue;
private RawFragmentBatch[] tempBatchHolder;
- private long[] inputCounts;
- private long[] outputCounts;
+ private final long[] inputCounts;
+ private final long[] outputCounts;
public enum Metric implements MetricDef {
BYTES_RECEIVED,
@@ -132,7 +125,6 @@
super(config, context, true, context.newOperatorContext(config));
this.fragProviders = fragProviders;
this.context = context;
- this.outgoingContainer = new VectorContainer(oContext);
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
this.config = config;
this.inputCounts = new long[config.getNumSenders()];
@@ -343,11 +335,11 @@
bldr.addField(v.getField());
// allocate a new value vector
- outgoingContainer.addOrGet(v.getField());
+ container.addOrGet(v.getField());
}
allocateOutgoing();
- outgoingContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
// generate code for merge operations (copy and compare)
try {
@@ -479,9 +471,7 @@
}
// set the value counts in the outgoing vectors
- for (final VectorWrapper<?> vw : outgoingContainer) {
- vw.getValueVector().getMutator().setValueCount(outgoingPosition);
- }
+ container.setValueCount(outgoingPosition);
if (pqueue.isEmpty()) {
state = BatchState.DONE;
@@ -535,8 +525,8 @@
@Override
public BatchSchema getSchema() {
- if (outgoingContainer.hasSchema()) {
- return outgoingContainer.getSchema();
+ if (container.hasSchema()) {
+ return container.getSchema();
}
return null;
}
@@ -567,7 +557,7 @@
}
tempBatchHolder[i] = batch;
for (final SerializedField field : batch.getHeader().getDef().getFieldList()) {
- final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field));
+ final ValueVector v = container.addOrGet(MaterializedField.create(field));
v.allocateNew();
}
break;
@@ -575,7 +565,8 @@
} catch (final IOException e) {
throw new DrillRuntimeException(e);
}
- outgoingContainer.buildSchema(SelectionVectorMode.NONE);
+ container.buildSchema(SelectionVectorMode.NONE);
+ container.setEmpty();
}
@Override
@@ -644,36 +635,6 @@
//No op
}
- @Override
- public Iterator<VectorWrapper<?>> iterator() {
- return outgoingContainer.iterator();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TypedFieldId getValueVectorId(final SchemaPath path) {
- return outgoingContainer.getValueVectorId(path);
- }
-
- @Override
- public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
- return outgoingContainer.getValueAccessorById(clazz, ids);
- }
-
- @Override
- public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
- }
-
private boolean isSameSchemaAmongBatches(final RecordBatchLoader[] batchLoaders) {
Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not allowed!");
@@ -689,7 +650,7 @@
}
private void allocateOutgoing() {
- for (final VectorWrapper<?> w : outgoingContainer) {
+ for (final VectorWrapper<?> w : container) {
final ValueVector v = w.getValueVector();
if (v instanceof FixedWidthVector) {
AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
@@ -732,7 +693,7 @@
g.setMappingSet(MAIN_MAPPING);
final MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
- merger.doSetup(context, batch, outgoingContainer);
+ merger.doSetup(context, batch, container);
return merger;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException(e);
@@ -742,7 +703,7 @@
private final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- private GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+ private final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
private final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
private void generateComparisons(final ClassGenerator<?> g, final VectorAccessible batch) throws SchemaChangeException {
@@ -817,7 +778,7 @@
@Override
public void close() {
- outgoingContainer.clear();
+ container.clear();
if (batchLoaders != null) {
for (final RecordBatchLoader rbl : batchLoaders) {
if (rbl != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index e8b522c..c309e8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,6 +23,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
@@ -69,43 +70,45 @@
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
/**
- * The purpose of this operator is to generate an ordered partition, rather than a random hash partition. This could be
- * used to do a total order sort, for example. This operator reads in a few incoming record batches, samples these
- * batches, and stores them in the distributed cache. The samples from all the parallel-running fragments are merged,
- * and a partition-table is built and stored in the distributed cache for use by all fragments. A new column is added to
- * the outgoing batch, whose value is determined by where each record falls in the partition table. This column is used
- * by PartitionSenderRootExec to determine which bucket to assign each record to.
+ * Generates an ordered partition, rather than a random hash partition. This
+ * could be used to do a total order sort, for example. This operator reads in a
+ * few incoming record batches, samples these batches, and stores them in the
+ * distributed cache. The samples from all the parallel-running fragments are
+ * merged, and a partition-table is built and stored in the distributed cache
+ * for use by all fragments. A new column is added to the outgoing batch, whose
+ * value is determined by where each record falls in the partition table. This
+ * column is used by PartitionSenderRootExec to determine which bucket to assign
+ * each record to.
*/
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
-// private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-// private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
- public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig //
- .newBuilder(CachedVectorContainer.class) //
- .name("SINGLE-" + CachedVectorContainer.class.getSimpleName()) //
- .mode(SerializationMode.DRILL_SERIALIZIABLE) //
+ public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig
+ .newBuilder(CachedVectorContainer.class)
+ .name("SINGLE-" + CachedVectorContainer.class.getSimpleName())
+ .mode(SerializationMode.DRILL_SERIALIZIABLE)
.build();
- public static final CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = CacheConfig //
- .newBuilder(CachedVectorContainer.class) //
- .name("MULTI-" + CachedVectorContainer.class.getSimpleName()) //
- .mode(SerializationMode.DRILL_SERIALIZIABLE) //
+ public static final CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = CacheConfig
+ .newBuilder(CachedVectorContainer.class)
+ .name("MULTI-" + CachedVectorContainer.class.getSimpleName())
+ .mode(SerializationMode.DRILL_SERIALIZIABLE)
.build();
- private final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
- ClassGenerator.DEFAULT_SCALAR_MAP);
+ private final MappingSet mainMapping = new MappingSet( (String) null, null,
+ ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
- private final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null,
+ private final MappingSet partitionMapping = new MappingSet("partitionIndex",
+ null, "partitionVectors", null,
ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
private final int recordsToSample; // How many records must be received before analyzing
@@ -115,13 +118,13 @@
protected final RecordBatch incoming;
private boolean first = true;
private OrderedPartitionProjector projector;
- private VectorContainer partitionVectors = new VectorContainer();
- private int partitions;
+ private final VectorContainer partitionVectors = new VectorContainer();
+ private final int partitions;
private Queue<VectorContainer> batchQueue;
private int recordsSampled;
- private int sendingMajorFragmentWidth;
- private boolean startedUnsampledBatches = false;
- private boolean upstreamNone = false;
+ private final int sendingMajorFragmentWidth;
+ private boolean startedUnsampledBatches;
+ private boolean upstreamNone;
private int recordCount;
private final IntVector partitionKeyVector;
@@ -142,6 +145,7 @@
DistributedCache cache = null;
// Clearly, this code is not used!
+ // cache can only be null here!
this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG);
this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG);
Preconditions.checkNotNull(tableMap);
@@ -161,7 +165,6 @@
partitionKeyVector.clear();
}
-
private boolean saveSamples() throws SchemaChangeException, ClassTransformationException, IOException {
recordsSampled = 0;
IterOutcome upstream;
@@ -223,10 +226,7 @@
allocationSize *= 2;
}
}
- for (VectorWrapper<?> vw : containerToCache) {
- vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
- }
- containerToCache.setRecordCount(copier.getOutputRecords());
+ containerToCache.setValueCount(copier.getOutputRecords());
// Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
// into a serializable wrapper object, and then add to distributed map
@@ -251,9 +251,13 @@
}
/**
- * Wait until the at least the given timeout is expired or interrupted and the fragment status is not runnable.
- * @param timeout Timeout in milliseconds.
- * @return True if the given timeout is expired. False when interrupted and the fragment status is not runnable.
+ * Wait until the at least the given timeout is expired or interrupted and the
+ * fragment status is not runnable.
+ *
+ * @param timeout
+ * Timeout in milliseconds.
+ * @return True if the given timeout is expired. False when interrupted and
+ * the fragment status is not runnable.
*/
private boolean waitUntilTimeOut(final long timeout) {
while(true) {
@@ -269,11 +273,14 @@
}
/**
- * This method is called when the first batch comes in. Incoming batches are collected until a threshold is met. At
- * that point, the records in the batches are sorted and sampled, and the sampled records are stored in the
- * distributed cache. Once a sufficient fraction of the fragments have shared their samples, each fragment grabs all
- * the samples, sorts all the records, builds a partition table, and attempts to push the partition table to the
- * distributed cache. Whichever table gets pushed first becomes the table used by all fragments for partitioning.
+ * Called when the first batch comes in. Incoming batches are collected until
+ * a threshold is met. At that point, the records in the batches are sorted
+ * and sampled, and the sampled records are stored in the distributed cache.
+ * Once a sufficient fraction of the fragments have shared their samples, each
+ * fragment grabs all the samples, sorts all the records, builds a partition
+ * table, and attempts to push the partition table to the distributed cache.
+ * Whichever table gets pushed first becomes the table used by all fragments
+ * for partitioning.
*
* @return True is successful. False if failed.
*/
@@ -288,8 +295,9 @@
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
- final long fragmentsBeforeProceed = (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
- final String finalTableKey = mapKey + "final";
+ long fragmentsBeforeProceed =
+ (long) Math.ceil(sendingMajorFragmentWidth * completionFactor);
+ String finalTableKey = mapKey + "final";
if (val == fragmentsBeforeProceed) { // we crossed the barrier, build table and get data.
buildTable();
@@ -326,7 +334,7 @@
partitionVectors.add(w.getValueVector());
}
- } catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
+ } catch (ClassTransformationException | IOException | SchemaChangeException ex) {
kill(false);
context.getExecutorState().fail(ex);
return false;
@@ -340,8 +348,8 @@
// Get all samples from distributed map
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator());
- final VectorContainer allSamplesContainer = new VectorContainer();
- final VectorContainer candidatePartitionTable = new VectorContainer();
+ VectorContainer allSamplesContainer = new VectorContainer();
+ VectorContainer candidatePartitionTable = new VectorContainer();
CachedVectorContainer wrap = null;
try {
for (CachedVectorContainer w : mmap.get(mapKey)) {
@@ -375,9 +383,7 @@
int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
- for (VectorWrapper<?> vw : candidatePartitionTable) {
- vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
- }
+ candidatePartitionTable.setValueCount(copier.getOutputRecords());
break;
} else {
candidatePartitionTable.zeroVectors();
@@ -397,7 +403,6 @@
wrap.clear();
}
}
-
}
/**
@@ -415,8 +420,8 @@
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException {
- final ErrorCollector collector = new ErrorCollectorImpl();
- final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
+ ErrorCollector collector = new ErrorCollectorImpl();
+ ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getOptions());
// Note: disabled for now. This may require some debugging:
// no tests are available for this operator.
// cg.getCodeGenerator().plainOldJavaCapable(true);
@@ -425,7 +430,7 @@
int i = 0;
for (Ordering od : orderings) {
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
.clearMode().setMode(TypeProtos.DataMode.REQUIRED);
TypeProtos.MajorType newType = builder.build();
@@ -567,31 +572,29 @@
int recordCount = batch.getRecordCount();
AllocationHelper.allocate(partitionKeyVector, recordCount, 50);
projector.projectRecords(recordCount, 0);
- for (VectorWrapper<?> v : container) {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(recordCount);
- }
+ container.setValueCount(recordCount);
}
/**
- * Sets up projection that will transfer all of the columns in batch, and also populate the partition column based on
- * which partition a record falls into in the partition table
+ * Sets up projection that will transfer all of the columns in batch, and also
+ * populate the partition column based on which partition a record falls into
+ * in the partition table
*
* @param batch
* @throws SchemaChangeException
*/
protected void setupNewSchema(VectorAccessible batch) throws SchemaChangeException {
container.clear();
- final ErrorCollector collector = new ErrorCollectorImpl();
- final List<TransferPair> transfers = Lists.newArrayList();
+ ErrorCollector collector = new ErrorCollectorImpl();
+ List<TransferPair> transfers = Lists.newArrayList();
- final ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(
+ ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(
OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getOptions());
// Note: disabled for now. This may require some debugging:
// no tests are available for this operator.
-// cg.getCodeGenerator().plainOldJavaCapable(true);
+ // cg.getCodeGenerator().plainOldJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// cg.getCodeGenerator().saveCodeForDebugging(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
for (VectorWrapper<?> vw : batch) {
TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator());
@@ -603,7 +606,7 @@
int count = 0;
for (Ordering od : popConfig.getOrderings()) {
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index edbbfe2..a3149b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -51,13 +51,15 @@
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
-
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class PartitionerTemplate implements Partitioner {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class);
+ static final Logger logger = LoggerFactory.getLogger(PartitionerTemplate.class);
- // Always keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors
+ // Always keep the recordCount as (2^x) - 1 to better utilize the memory
+ // allocation in ValueVectors
private static final int DEFAULT_RECORD_BATCH_SIZE = (1 << 10) - 1;
private SelectionVector2 sv2;
@@ -68,7 +70,7 @@
protected FragmentContext context;
private int start;
private int end;
- private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
+ private final List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE;
@@ -306,10 +308,14 @@
public void flush(boolean schemaChanged) throws IOException {
if (dropAll) {
- // If we are in dropAll mode, we still want to copy the data, because we can't stop copying a single outgoing
- // batch with out stopping all outgoing batches. Other option is check for status of dropAll before copying
- // every single record in copy method which has the overhead for every record all the time. Resetting the output
- // count, reusing the same buffers and copying has overhead only for outgoing batches whose receiver has
+ // If we are in dropAll mode, we still want to copy the data, because we
+ // can't stop copying a single outgoing
+ // batch with out stopping all outgoing batches. Other option is check
+ // for status of dropAll before copying
+ // every single record in copy method which has the overhead for every
+ // record all the time. Resetting the output
+ // count, reusing the same buffers and copying has overhead only for
+ // outgoing batches whose receiver has
// terminated.
// Reset the count to 0 and use existing buffers for exhausting input where receiver of this batch is terminated
@@ -332,11 +338,7 @@
return;
}
- if (recordCount != 0) {
- for (VectorWrapper<?> w : vectorContainer) {
- w.getValueVector().getMutator().setValueCount(recordCount);
- }
- }
+ vectorContainer.setValueCount(recordCount);
FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLastBatch,
handle.getQueryId(),
@@ -354,8 +356,10 @@
stats.stopWait();
}
- // If the current batch is the last batch, then set a flag to ignore any requests to flush the data
- // This is possible when the receiver is terminated, but we still get data from input operator
+ // If the current batch is the last batch, then set a flag to ignore any
+ // requests to flush the data
+ // This is possible when the receiver is terminated, but we still get data
+ // from input operator
if (isLastBatch) {
dropAll = true;
}
@@ -421,7 +425,6 @@
return recordCount;
}
-
@Override
public long getTotalRecords() {
return totalRecords;
@@ -459,6 +462,5 @@
public void clear(){
vectorContainer.clear();
}
-
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 8215fde..674b3ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -886,7 +886,7 @@
doAlloc(0);
container.buildSchema(SelectionVectorMode.NONE);
- container.setValueCount(0);
+ container.setEmpty();
wasNone = true;
return IterOutcome.OK_NEW_SCHEMA;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 0c11893..dd30c44 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -34,6 +34,7 @@
private ImmutableList<TransferPair> transfers;
private SelectionVector2 vector2;
+ @SuppressWarnings("unused")
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
@@ -97,8 +98,10 @@
case TWO_BYTE:
vector2 = incoming.getSelectionVector2();
break;
- default:
+ case NONE:
break;
+ default:
+ throw new UnsupportedOperationException();
}
this.transfers = ImmutableList.copyOf(transfers);
doSetup(context, incoming, outgoing);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index 6a54828..11d307b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -35,34 +35,33 @@
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A RangePartitionRecordBatch is a run-time operator that provides the ability to divide up the input rows
- * into a fixed number of separate ranges or 'buckets' based on the values of a set of columns (the range
- * partitioning columns).
+ * Provides the ability to divide up the input rows into a fixed number of
+ * separate ranges or 'buckets' based on the values of a set of columns (the
+ * range partitioning columns).
*/
public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePartitionSender> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangePartitionRecordBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(RangePartitionRecordBatch.class);
- private int numPartitions;
+ private final int numPartitions;
private int recordCount;
private final IntVector partitionIdVector;
private final List<TransferPair> transfers;
public RangePartitionRecordBatch(RangePartitionSender popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(popConfig, context, incoming);
- this.numPartitions = popConfig.getDestinations().size();
+ numPartitions = popConfig.getDestinations().size();
SchemaPath outputPath = popConfig.getPartitionFunction().getPartitionFieldRef();
MaterializedField outputField = MaterializedField.create(outputPath.getAsNamePart().getName(), Types.required(TypeProtos.MinorType.INT));
- this.partitionIdVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
- this.transfers = Lists.newArrayList();
+ partitionIdVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ transfers = Lists.newArrayList();
}
-
@Override
public void close() {
super.close();
@@ -91,10 +90,7 @@
partitionIdVector.allocateNew(num);
recordCount = projectRecords(num, 0);
- for (VectorWrapper<?> v : container) {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(recordCount);
- }
+ container.setValueCount(recordCount);
}
// returning OK here is fine because the base class AbstractSingleRecordBatch
// is handling the actual return status; thus if there was a new schema, the
@@ -169,8 +165,8 @@
* @param firstOutputIndex
* @return the number of records projected
*/
- private final int projectRecords(final int recordCount, int firstOutputIndex) {
- final int countN = recordCount;
+ private final int projectRecords(int recordCount, int firstOutputIndex) {
+ int countN = recordCount;
int counter = 0;
for (int i = 0; i < countN; i++, firstOutputIndex++) {
int partition = getPartition(i);
@@ -183,7 +179,6 @@
return counter;
}
-
@Override
public void dump() {
logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index 970f970..1394938 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -23,8 +23,10 @@
import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractSV4Copier extends AbstractCopier {
- // Storing VectorWrapper reference instead of ValueVector[]. With EMIT outcome support underlying operator
- // operator can generate multiple output batches with no schema changes which will change the ValueVector[]
+ // Storing VectorWrapper reference instead of ValueVector[]. With EMIT outcome
+ // support underlying operator
+ // operator can generate multiple output batches with no schema changes which
+ // will change the ValueVector[]
// reference but not VectorWrapper reference.
protected VectorWrapper<?>[] vvIn;
private SelectionVector4 sv4;
@@ -32,21 +34,18 @@
@Override
public void setup(VectorAccessible incoming, VectorContainer outgoing) {
super.setup(incoming, outgoing);
- this.sv4 = incoming.getSelectionVector4();
+ sv4 = incoming.getSelectionVector4();
- final int count = outgoing.getNumberOfColumns();
+ int count = outgoing.getNumberOfColumns();
vvIn = new VectorWrapper[count];
- {
- int index = 0;
-
- for (VectorWrapper vectorWrapper: incoming) {
- vvIn[index] = vectorWrapper;
- index++;
- }
+ int index = 0;
+ for (VectorWrapper<?> vectorWrapper: incoming) {
+ vvIn[index++] = vectorWrapper;
}
}
+ @Override
public void copyEntryIndirect(int inIndex, int outIndex) {
copyEntry(sv4.get(inIndex), outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index f64a11e..0bb7186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -42,7 +42,7 @@
{
int index = 0;
- for (VectorWrapper vectorWrapper: incoming) {
+ for (VectorWrapper<?> vectorWrapper: incoming) {
vvIn[index] = vectorWrapper.getValueVector();
index++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index e81eb43..1117ab3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -27,7 +27,7 @@
public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
SchemaChangeCallBack callBack) {
- for(VectorWrapper<?> vv : incomingBatch){
+ for (VectorWrapper<?> vv : incomingBatch) {
ValueVector v = vv.getValueVectors()[0];
v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack));
}
@@ -37,7 +37,7 @@
public void copyEntry(int inIndex, int outIndex) {
int inOffset = inIndex & 0xFFFF;
int inVector = inIndex >>> 16;
- for ( int i = 0; i < vvIn.length; i++ ) {
+ for (int i = 0; i < vvIn.length; i++) {
ValueVector[] vectorsFromIncoming = vvIn[i].getValueVectors();
vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index d45157a..4471248 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -43,14 +43,18 @@
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- // Don't clear off container just because an OK_NEW_SCHEMA was received from upstream. For cases when there is just
- // change in container type but no actual schema change, RemovingRecordBatch should consume OK_NEW_SCHEMA and
- // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
+ // Don't clear off container just because an OK_NEW_SCHEMA was received from
+ // upstream. For cases when there is just
+ // change in container type but no actual schema change, RemovingRecordBatch
+ // should consume OK_NEW_SCHEMA and
+ // send OK to downstream instead. Since the output of RemovingRecordBatch is
+ // always going to be a regular container
// change in incoming container type is not actual schema change.
container.zeroVectors();
copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack);
- // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
+ // If there is an actual schema change then below condition will be true and
+ // it will send OK_NEW_SCHEMA
// downstream too
if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index d56f848..dca23cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -23,7 +23,6 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Trace;
@@ -38,24 +37,27 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/** TraceRecordBatch contains value vectors which are exactly the same
+/**
+ * Contains value vectors which are exactly the same
* as the incoming record batch's value vectors. If the incoming
* record batch has a selection vector (type 2) then TraceRecordBatch
* will also contain a selection vector.
- *
+ * <p>
* Purpose of this record batch is to dump the data associated with all
* the value vectors and selection vector to disk.
- *
+ * <p>
* This record batch does not modify any data or schema, it simply
* consumes the incoming record batch's data, dump to disk and pass the
* same set of value vectors (and selection vectors) to its parent record
- * batch
+ * batch.
*/
public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(TraceRecordBatch.class);
- private SelectionVector2 sv = null;
+ private SelectionVector2 sv;
private final BufferAllocator localAllocator;
@@ -75,13 +77,14 @@
localAllocator = context.getNewChildAllocator("trace", 200, 0, Long.MAX_VALUE);
String fileName = getFileName();
- /* Create the log file we will dump to and initialize the file descriptors */
+ // Create the log file we will dump to and initialize the file descriptors
try {
Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY,
+ context.getConfig().getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
FileSystem fs = FileSystem.get(conf);
- /* create the file */
+ // create the file
fos = fs.create(new Path(fileName));
} catch (IOException e) {
throw new ExecutionSetupException("Unable to create file: " + fileName + " check permissions or if directory exists", e);
@@ -91,15 +94,15 @@
@Override
public int getRecordCount() {
if (sv == null) {
- return incoming.getRecordCount();
+ return container.getRecordCount();
} else {
return sv.getCount();
}
}
/**
- * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
- * this record batch to a log file.
+ * Invoked for every record batch and it simply dumps the buffers
+ * associated with all the value vectors in this record batch to a log file.
*/
@Override
protected IterOutcome doWork() {
@@ -110,7 +113,7 @@
} else {
sv = null;
}
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2);
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getContainer().getRecordCount(), incoming, incomingHasSv2);
VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, oContext.getAllocator());
try {
@@ -121,28 +124,31 @@
batch.reconstructContainer(localAllocator, container);
if (incomingHasSv2) {
sv = wrap.getSv2();
+ container.setRecordCount(sv.getBatchActualRecordCount());
+ } else {
+ container.setRecordCount(batch.getDef().getRecordCount());
}
return getFinalOutcome(false);
}
@Override
- protected boolean setupNewSchema() throws SchemaChangeException {
- /* Trace operator does not deal with hyper vectors yet */
+ protected boolean setupNewSchema() {
+ // Trace operator does not deal with hyper vectors yet
if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
- throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+ throw new UnsupportedOperationException("Trace operator does not work with hyper vectors");
}
- /*
- * we have a new schema, clear our existing container to load the new value vectors
- */
+ // we have a new schema, clear our existing container to load the new value vectors
+
container.clear();
- /* Add all the value vectors in the container */
+ // Add all the value vectors in the container
for (VectorWrapper<?> vv : incoming) {
TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator());
container.add(tp.getTo());
}
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
+ container.setRecordCount(incoming.getRecordCount());
return true;
}
@@ -157,12 +163,12 @@
@Override
public void close() {
- /* Release the selection vector */
+ // Release the selection vector
if (sv != null) {
sv.clear();
}
- /* Close the file descriptors */
+ // Close the file descriptors
try {
fos.close();
} catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8c9bb47..25dae80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -338,11 +338,10 @@
public int getRemainingRecords() {
return (totalRecordsToProcess - recordsProcessed);
}
-
}
private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> {
- private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
+ private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fa8c954..1715c99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -374,8 +374,7 @@
TransferPair tp = resetUnnestTransferPair();
container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
container.buildSchema(SelectionVectorMode.NONE);
- container.allocateNew();
- container.setValueCount(0);
+ container.setEmpty();
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index a6ebef0..d40bd6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -51,9 +51,11 @@
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class UnorderedReceiverBatch implements CloseableRecordBatch {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(UnorderedReceiverBatch.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(UnorderedReceiverBatch.class);
private final RecordBatchLoader batchLoader;
@@ -78,11 +80,15 @@
}
}
- public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
+ public UnorderedReceiverBatch(ExchangeFragmentContext context,
+ RawFragmentBatchProvider fragProvider, UnorderedReceiver config)
+ throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
- // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
- // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
+ // In normal case, batchLoader does not require an allocator. However, in
+ // case of splitAndTransfer of a value vector,
+ // we may need an allocator for the new offset vector. Therefore, here we
+ // pass the context's allocator to batchLoader.
oContext = context.newOperatorContext(config);
this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
@@ -90,8 +96,10 @@
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
this.config = config;
- // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
- context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
+ // Register this operator's buffer allocator so that incoming buffers are
+ // owned by this allocator
+ context.getBuffers().getCollector(config.getOppositeMajorFragmentId())
+ .setAllocator(oContext.getAllocator());
}
@Override
@@ -110,7 +118,7 @@
}
@Override
- public void kill(final boolean sendUpstream) {
+ public void kill(boolean sendUpstream) {
if (sendUpstream) {
informSenders();
}
@@ -133,12 +141,12 @@
}
@Override
- public TypedFieldId getValueVectorId(final SchemaPath path) {
+ public TypedFieldId getValueVectorId(SchemaPath path) {
return batchLoader.getValueVectorId(path);
}
@Override
- public VectorWrapper<?> getValueAccessorById(final Class<?> clazz, final int... ids) {
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return batchLoader.getValueAccessorById(clazz, ids);
}
@@ -146,8 +154,9 @@
try {
injector.injectInterruptiblePause(context.getExecutionControls(), "waiting-for-data", logger);
return fragProvider.getNext();
- } catch(final InterruptedException e) {
- // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ } catch(InterruptedException e) {
+ // Preserve evidence that the interruption occurred so that code higher up
+ // on the call stack can learn of the
// interruption and respond to it if it wants to.
Thread.currentThread().interrupt();
@@ -190,8 +199,8 @@
return lastOutcome;
}
- final RecordBatchDef rbd = batch.getHeader().getDef();
- final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+ RecordBatchDef rbd = batch.getHeader().getDef();
+ boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean catch clause below.
stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
@@ -230,7 +239,9 @@
@Override
public VectorContainer getOutgoingContainer() {
- throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(
+ String.format("You should not call getOutgoingContainer() for class %s",
+ getClass().getCanonicalName()));
}
@Override
@@ -240,15 +251,15 @@
private void informSenders() {
logger.info("Informing senders of request to terminate sending.");
- final FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ FragmentHandle handlePrototype = FragmentHandle.newBuilder()
.setMajorFragmentId(config.getOppositeMajorFragmentId())
.setQueryId(context.getHandle().getQueryId())
.build();
- for (final MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
- final FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ for (MinorFragmentEndpoint providingEndpoint : config.getProvidingEndpoints()) {
+ FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
.setMinorFragmentId(providingEndpoint.getId())
.build();
- final FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
.setReceiver(context.getHandle())
.setSender(sender)
.build();
@@ -262,19 +273,19 @@
private class OutcomeListener implements RpcOutcomeListener<Ack> {
@Override
- public void failed(final RpcException ex) {
+ public void failed(RpcException ex) {
logger.warn("Failed to inform upstream that receiver is finished");
}
@Override
- public void success(final Ack value, final ByteBuf buffer) {
+ public void success(Ack value, ByteBuf buffer) {
// Do nothing
}
@Override
- public void interrupted(final InterruptedException e) {
+ public void interrupted(InterruptedException e) {
if (context.getExecutorState().shouldContinue()) {
- final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
+ String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
logger.error(errMsg, e);
context.getExecutorState().fail(new RpcException(errMsg, e));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
index 0f3517f..99bd6d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java
@@ -31,12 +31,15 @@
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Unpivot maps. Assumptions are:
@@ -44,7 +47,7 @@
* 2) Each map contains the same number of fields and field names are also same (types could be different).
*
* Example input and output:
- * Schema of input:
+ * Schema of input: <pre>
* "schema" : BIGINT - Schema number. For each schema change this number is incremented.
* "computed" : BIGINT - What time is it computed?
* "columns" : MAP - Column names
@@ -69,22 +72,23 @@
* "statscount" : BIGINT
* "nonnullstatcount" : BIGINT
* .... one column for each map type ...
+ * </pre>
*/
public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMaps> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnpivotMapsRecordBatch.class);
+ private static final Logger logger = LoggerFactory.getLogger(UnpivotMapsRecordBatch.class);
private final List<String> mapFieldsNames;
private boolean first = true;
- private int keyIndex = 0;
- private List<String> keyList = null;
+ private int keyIndex;
+ private List<String> keyList;
- private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap = null;
+ private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap;
// Map of non-map fields to VV in the incoming schema
- private Map<MaterializedField, ValueVector> copySrcVecMap = null;
+ private Map<MaterializedField, ValueVector> copySrcVecMap;
private List<TransferPair> transferList;
- private int recordCount = 0;
+ private int recordCount;
public UnpivotMapsRecordBatch(UnpivotMaps pop, RecordBatch incoming, FragmentContext context)
throws OutOfMemoryException {
@@ -150,6 +154,7 @@
}
}
+ @Override
public VectorContainer getOutgoingContainer() {
return this.container;
}
@@ -170,11 +175,10 @@
keyIndex = (keyIndex + 1) % keyList.size();
recordCount = outRecordCount;
+ container.setRecordCount(recordCount);
if (keyIndex == 0) {
- for (VectorWrapper w : incoming) {
- w.clear();
- }
+ VectorAccessibleUtilities.clear(incoming.getContainer());
}
return IterOutcome.OK;
}
@@ -270,6 +274,7 @@
container.clear();
buildKeyList();
buildOutputContainer();
+ container.setEmpty();
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 5c4f8b4..39189b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -22,19 +22,31 @@
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch;
import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch;
+import org.apache.drill.exec.physical.impl.join.HashJoinBatch;
import org.apache.drill.exec.physical.impl.join.MergeJoinBatch;
import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch;
import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch;
import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
+import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch;
import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch;
+import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch;
+import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
+import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch;
+import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SimpleVectorWrapper;
@@ -223,6 +235,18 @@
rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
+ rules.put(MergingRecordBatch.class, CheckMode.VECTORS);
+ rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS);
+ rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS);
+ rules.put(TraceRecordBatch.class, CheckMode.VECTORS);
+ rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS);
+ rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS);
+ rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS);
+ rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS);
+ rules.put(TopNBatch.class, CheckMode.VECTORS);
+ rules.put(HashJoinBatch.class, CheckMode.VECTORS);
+ rules.put(ExternalSortBatch.class, CheckMode.VECTORS);
+ rules.put(WriterRecordBatch.class, CheckMode.VECTORS);
return rules;
}
@@ -518,9 +542,13 @@
// empty map vector that causes validation to fail.
ValueVector child = internalMap.getChild(type.name());
if (child == null) {
- error(name, vector, String.format(
- "Union vector includes type %s, but the internal map has no matching member",
- type.name()));
+ // Disabling this check for now. TopNBatch, SortBatch
+ // and perhaps others will create vectors with a set of
+ // types, but won't actually populate some of the types.
+ //
+ // error(name, vector, String.format(
+ // "Union vector includes type %s, but the internal map has no matching member",
+ // type.name()));
} else {
validateVector(name + "-type-" + type.name(),
valueCount, child);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
index cc7a04d..a759399 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java
@@ -26,18 +26,22 @@
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Named;
import java.util.List;
/**
- * WindowFramer implementation that doesn't support the FRAME clause (will assume the default frame).
- * <br>According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking functions don't support the FRAME clause.
- * This class will handle such functions.
+ * WindowFramer implementation that doesn't support the FRAME clause (will
+ * assume the default frame). <p>
+ * According to the SQL standard, LEAD, LAG, ROW_NUMBER, NTILE and all ranking
+ * functions don't support the FRAME clause. This class will handle such
+ * functions.
*/
public abstract class NoFrameSupportTemplate implements WindowFramer {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoFrameSupportTemplate.class);
+ private static final Logger logger = LoggerFactory.getLogger(NoFrameSupportTemplate.class);
private VectorContainer container;
private VectorContainer internal;
@@ -53,8 +57,8 @@
private Partition partition; // current partition being processed
@Override
- public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
- final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException {
+ public void setup(List<WindowDataBatch> batches, VectorContainer container, OperatorContext oContext,
+ boolean requireFullPartition, WindowPOP popConfig) throws SchemaChangeException {
this.container = container;
this.batches = batches;
@@ -81,9 +85,7 @@
@Override
public void doWork() throws DrillException {
int currentRow = 0;
-
- this.current = batches.get(0);
-
+ current = batches.get(0);
outputCount = current.getRecordCount();
while (currentRow < outputCount) {
@@ -108,10 +110,9 @@
}
}
- private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
+ private void newPartition(WindowDataBatch current, int currentRow) throws SchemaChangeException {
partition = new Partition();
updatePartitionSize(partition, currentRow);
-
setupPartition(current, container);
}
@@ -131,16 +132,19 @@
}
/**
- * process all rows (computes and writes function values) of current batch that are part of current partition.
- * @param currentRow first unprocessed row
+ * Process all rows (computes and writes function values) of current batch
+ * that are part of current partition.
+ *
+ * @param currentRow
+ * first unprocessed row
* @return index of next unprocessed row
- * @throws DrillException if it can't write into the container
+ * @throws DrillException
+ * if it can't write into the container
*/
- private int processPartition(final int currentRow) throws DrillException {
+ private int processPartition(int currentRow) throws DrillException {
logger.trace("process partition {}, currentRow: {}, outputCount: {}", partition, currentRow, outputCount);
setupCopyNext(current, container);
-
copyPrevFromInternal();
// copy remaining from current
@@ -198,15 +202,14 @@
}
}
- private void processRow(final int row) throws DrillException {
+ private void processRow(int row) throws DrillException {
if (partition.isFrameDone()) {
// because all peer rows share the same frame, we only need to compute and aggregate the frame once
- final long peers = countPeers(row);
+ long peers = countPeers(row);
partition.newFrame(peers);
}
outputRow(row, partition);
-
partition.rowAggregated();
}
@@ -214,7 +217,7 @@
* updates partition's length after computing the number of rows for the current the partition starting at the specified
* row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
*/
- private void updatePartitionSize(final Partition partition, final int start) {
+ private void updatePartitionSize(Partition partition, int start) {
logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
long length = 0;
@@ -226,7 +229,7 @@
outer:
for (WindowDataBatch batch : batches) {
- final int recordCount = batch.getRecordCount();
+ int recordCount = batch.getRecordCount();
// check first container from start row, and subsequent containers from first row
for (; row < recordCount; row++, length++) {
@@ -262,18 +265,18 @@
}
/**
- * count number of peer rows for current row
+ * Count number of peer rows for current row
* @param start starting row of the current frame
* @return num peer rows for current row
* @throws SchemaChangeException
*/
- private long countPeers(final int start) throws SchemaChangeException {
+ private long countPeers(int start) throws SchemaChangeException {
long length = 0;
// a single frame can include rows from multiple batches
// start processing first batch and, if necessary, move to next batches
for (WindowDataBatch batch : batches) {
- final int recordCount = batch.getRecordCount();
+ int recordCount = batch.getRecordCount();
// for every remaining row in the partition, count it if it's a peer row
for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
@@ -308,12 +311,14 @@
+ "]";
}
-
/**
- * called once for each row after we evaluate all peer rows. Used to write a value in the row
+ * Called once for each row after we evaluate all peer rows. Used to write a
+ * value in the row
*
- * @param outIndex index of row
- * @param partition object used by "computed" window functions
+ * @param outIndex
+ * index of row
+ * @param partition
+ * object used by "computed" window functions
*/
public abstract void outputRow(@Named("outIndex") int outIndex,
@Named("partition") Partition partition)
@@ -331,10 +336,13 @@
throws SchemaChangeException;
/**
- * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to
- * outIndex
- * @param inIndex source row of the copy
- * @param outIndex destination row of the copy.
+ * Copies value(s) from inIndex row to outIndex row. Mostly used by LEAD.
+ * inIndex always points to the row next to outIndex
+ *
+ * @param inIndex
+ * source row of the copy
+ * @param outIndex
+ * destination row of the copy.
*/
public abstract void copyNext(@Named("inIndex") int inIndex,
@Named("outIndex") int outIndex)
@@ -344,10 +352,13 @@
throws SchemaChangeException;
/**
- * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row
+ * Copies value(s) from inIndex row to outIndex row. Mostly used by LAG.
+ * inIndex always points to the previous row
*
- * @param inIndex source row of the copy
- * @param outIndex destination row of the copy.
+ * @param inIndex
+ * source row of the copy
+ * @param outIndex
+ * destination row of the copy.
*/
public abstract void copyPrev(@Named("inIndex") int inIndex,
@Named("outIndex") int outIndex)
@@ -364,12 +375,12 @@
throws SchemaChangeException;
/**
- * reset all window functions
+ * Reset all window functions
*/
public abstract boolean resetValues() throws SchemaChangeException;
/**
- * compares two rows from different batches (can be the same), if they have the same value for the partition by
+ * Compares two rows from different batches (can be the same), if they have the same value for the partition by
* expression
* @param b1Index index of first row
* @param b1 batch for first row
@@ -385,7 +396,7 @@
throws SchemaChangeException;
/**
- * compares two rows from different batches (can be the same), if they have the same value for the order by
+ * Compares two rows from different batches (can be the same), if they have the same value for the order by
* expression
* @param b1Index index of first row
* @param b1 batch for first row
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
index 55fa647..a7d9a8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/Partition.java
@@ -17,13 +17,17 @@
*/
package org.apache.drill.exec.physical.impl.window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * Used internally to keep track of partitions and frames.<br>
- * A partition can be partial, which means we don't know "yet" the total number of records that are part of this partition.
- * Even for partial partitions, we know the number of rows that are part of current frame
+ * Used internally to keep track of partitions and frames.<p>
+ * A partition can be partial, which means we don't know "yet" the total number
+ * of records that are part of this partition. Even for partial partitions, we
+ * know the number of rows that are part of current frame
*/
public class Partition {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Partition.class);
+ private static final Logger logger = LoggerFactory.getLogger(Partition.class);
private boolean partial; // true if we don't know yet the full length of this partition
private long length; // size of this partition (if partial is true, then this is a partial length of the partition)
@@ -48,10 +52,13 @@
public long getLength() {
return length;
}
+
/**
- * @param length number of rows in this partition
- * @param partial if true, then length is not the full length of the partition but just the number of rows in the
- * current batch
+ * @param length
+ * number of rows in this partition
+ * @param partial
+ * if true, then length is not the full length of the partition but
+ * just the number of rows in the current batch
*/
public void updateLength(long length, boolean partial) {
this.length += length;
@@ -62,7 +69,6 @@
public void rowAggregated() {
remaining--;
peers--;
-
row_number++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 59e84ef..6ed004f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -21,8 +21,6 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
-
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FunctionCall;
@@ -47,23 +45,25 @@
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JExpr;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.codemodel.JExpr;
/**
* support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
*
*/
public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WindowFrameRecordBatch.class);
+ static final Logger logger = LoggerFactory.getLogger(WindowFrameRecordBatch.class);
private final RecordBatch incoming;
private List<WindowDataBatch> batches;
private WindowFramer[] framers;
- private boolean hasOrderBy; // true if window definition contains an order-by clause
private final List<WindowFunction> functions = Lists.newArrayList();
private boolean noMoreBatches; // true when downstream returns NONE
@@ -71,14 +71,16 @@
private boolean shouldStop; // true if we received an early termination request
- public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ public WindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context,
+ RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
batches = Lists.newArrayList();
}
/**
- * Hold incoming batches in memory until all window functions are ready to process the batch on top of the queue
+ * Hold incoming batches in memory until all window functions are ready to
+ * process the batch on top of the queue
*/
@Override
public IterOutcome innerNext() {
@@ -104,7 +106,8 @@
return IterOutcome.NONE;
}
- // keep saving incoming batches until the first unprocessed batch can be processed, or upstream == NONE
+ // keep saving incoming batches until the first unprocessed batch can be
+ // processed, or upstream == NONE
while (!noMoreBatches && !canDoWork()) {
IterOutcome upstream = next(incoming);
logger.trace("next(incoming) returned {}", upstream);
@@ -124,7 +127,7 @@
if (schema != null) {
throw new UnsupportedOperationException("OVER clause doesn't currently support changing schemas.");
}
- this.schema = incoming.getSchema();
+ schema = incoming.getSchema();
}
case OK:
if (incoming.getRecordCount() > 0) {
@@ -160,15 +163,13 @@
private void doWork() throws DrillException {
- final WindowDataBatch current = batches.get(0);
- final int recordCount = current.getRecordCount();
+ WindowDataBatch current = batches.get(0);
+ int recordCount = current.getRecordCount();
logger.trace("WindowFramer.doWork() START, num batches {}, current batch has {} rows", batches.size(), recordCount);
// allocate outgoing vectors
- for (VectorWrapper<?> w : container) {
- w.getValueVector().allocateNew();
- }
+ container.allocateNew();
for (WindowFramer framer : framers) {
framer.doWork();
@@ -181,10 +182,7 @@
tp.transfer();
}
- container.setRecordCount(recordCount);
- for (VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(recordCount);
- }
+ container.setValueCount(recordCount);
// we can safely free the current batch
current.clear();
@@ -194,8 +192,8 @@
}
/**
- * @return true when all window functions are ready to process the current batch (it's the first batch currently
- * held in memory)
+ * @return true when all window functions are ready to process the current
+ * batch (it's the first batch currently held in memory)
*/
private boolean canDoWork() {
if (batches.size() < 2) {
@@ -204,10 +202,10 @@
return false;
}
- final VectorAccessible current = batches.get(0);
- final int currentSize = current.getRecordCount();
- final VectorAccessible last = batches.get(batches.size() - 1);
- final int lastSize = last.getRecordCount();
+ VectorAccessible current = batches.get(0);
+ int currentSize = current.getRecordCount();
+ VectorAccessible last = batches.get(batches.size() - 1);
+ int lastSize = last.getRecordCount();
boolean partitionEndReached;
boolean frameEndReached;
@@ -215,7 +213,7 @@
partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last);
frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last);
- for (final WindowFunction function : functions) {
+ for (WindowFunction function : functions) {
if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) {
return false;
}
@@ -230,18 +228,20 @@
@Override
protected void buildSchema() throws SchemaChangeException {
logger.trace("buildSchema()");
- final IterOutcome outcome = next(incoming);
+ IterOutcome outcome = next(incoming);
switch (outcome) {
- case NONE:
- state = BatchState.DONE;
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- return;
- case STOP:
- state = BatchState.STOP;
- return;
- case OUT_OF_MEMORY:
- state = BatchState.OUT_OF_MEMORY;
- return;
+ case NONE:
+ state = BatchState.DONE;
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ return;
+ case STOP:
+ state = BatchState.STOP;
+ return;
+ case OUT_OF_MEMORY:
+ state = BatchState.OUT_OF_MEMORY;
+ return;
+ default:
+ break;
}
try {
@@ -260,30 +260,28 @@
logger.trace("creating framer(s)");
- final List<LogicalExpression> keyExprs = Lists.newArrayList();
- final List<LogicalExpression> orderExprs = Lists.newArrayList();
+ List<LogicalExpression> keyExprs = Lists.newArrayList();
+ List<LogicalExpression> orderExprs = Lists.newArrayList();
boolean requireFullPartition = false;
boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate
boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate
- hasOrderBy = popConfig.getOrderings().size() > 0;
-
// all existing vectors will be transferred to the outgoing container in framer.doWork()
- for (final VectorWrapper<?> wrapper : batch) {
+ for (VectorWrapper<?> wrapper : batch) {
container.addOrGet(wrapper.getField());
}
// add aggregation vectors to the container, and materialize corresponding expressions
- for (final NamedExpression ne : popConfig.getAggregations()) {
+ for (NamedExpression ne : popConfig.getAggregations()) {
if (!(ne.getExpr() instanceof FunctionCall)) {
throw UserException.functionError()
.message("Unsupported window function '%s'", ne.getExpr())
.build(logger);
}
- final FunctionCall call = (FunctionCall) ne.getExpr();
- final WindowFunction winfun = WindowFunction.fromExpression(call);
+ FunctionCall call = (FunctionCall) ne.getExpr();
+ WindowFunction winfun = WindowFunction.fromExpression(call);
if (winfun.materialize(ne, container, context.getFunctionRegistry())) {
functions.add(winfun);
requireFullPartition |= winfun.requiresFullPartition(popConfig);
@@ -300,12 +298,12 @@
container.setRecordCount(0);
// materialize partition by expressions
- for (final NamedExpression ne : popConfig.getWithins()) {
+ for (NamedExpression ne : popConfig.getWithins()) {
keyExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(ne.getExpr(), batch, context.getFunctionRegistry()));
}
// materialize order by expressions
- for (final Order.Ordering oe : popConfig.getOrderings()) {
+ for (Order.Ordering oe : popConfig.getOrderings()) {
orderExprs.add(ExpressionTreeMaterializer.materializeAndCheckErrors(oe.getExpr(), batch, context.getFunctionRegistry()));
}
@@ -328,31 +326,31 @@
}
}
- private WindowFramer generateFramer(final List<LogicalExpression> keyExprs, final List<LogicalExpression> orderExprs,
- final List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
+ private WindowFramer generateFramer(List<LogicalExpression> keyExprs, List<LogicalExpression> orderExprs,
+ List<WindowFunction> functions, boolean useCustomFrame) throws IOException, ClassTransformationException {
TemplateClassDefinition<WindowFramer> definition = useCustomFrame ?
WindowFramer.FRAME_TEMPLATE_DEFINITION : WindowFramer.NOFRAME_TEMPLATE_DEFINITION;
- final ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getOptions());
+ ClassGenerator<WindowFramer> cg = CodeGenerator.getRoot(definition, context.getOptions());
{
// generating framer.isSamePartition()
- final GeneratorMapping IS_SAME_PARTITION_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
- final MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
- final MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+ GeneratorMapping IS_SAME_PARTITION_READ = GeneratorMapping.create("isSamePartition", "isSamePartition", null, null);
+ MappingSet isaB1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
+ MappingSet isaB2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PARTITION_READ, IS_SAME_PARTITION_READ);
setupIsFunction(cg, keyExprs, isaB1, isaB2);
}
{
// generating framer.isPeer()
- final GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer", null, null);
- final MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
- final MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+ GeneratorMapping IS_SAME_PEER_READ = GeneratorMapping.create("isPeer", "isPeer", null, null);
+ MappingSet isaP1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
+ MappingSet isaP2 = new MappingSet("b2Index", null, "b2", null, IS_SAME_PEER_READ, IS_SAME_PEER_READ);
// isPeer also checks if it's the same partition
setupIsFunction(cg, Iterables.concat(keyExprs, orderExprs), isaP1, isaP2);
}
- for (final WindowFunction function : functions) {
+ for (WindowFunction function : functions) {
// only generate code for the proper window functions
if (function.supportsCustomFrames() == useCustomFrame) {
function.generateCode(cg);
@@ -363,7 +361,7 @@
CodeGenerator<WindowFramer> codeGen = cg.getCodeGenerator();
codeGen.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// codeGen.saveCodeForDebugging(true);
+ codeGen.saveCodeForDebugging(true);
return context.getImplementationClass(codeGen);
}
@@ -371,8 +369,8 @@
/**
* setup comparison functions isSamePartition and isPeer
*/
- private void setupIsFunction(final ClassGenerator<WindowFramer> cg, final Iterable<LogicalExpression> exprs,
- final MappingSet leftMapping, final MappingSet rightMapping) {
+ private void setupIsFunction(ClassGenerator<WindowFramer> cg, Iterable<LogicalExpression> exprs,
+ MappingSet leftMapping, MappingSet rightMapping) {
cg.setMappingSet(leftMapping);
for (LogicalExpression expr : exprs) {
if (expr == null) {
@@ -384,10 +382,10 @@
cg.setMappingSet(rightMapping);
ClassGenerator.HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
- final LogicalExpression fh =
+ LogicalExpression fh =
FunctionGenerationHelper
.getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry());
- final ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
+ ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
cg.getEvalBlock()._return(JExpr.TRUE);
@@ -404,7 +402,7 @@
}
if (batches != null) {
- for (final WindowDataBatch bd : batches) {
+ for (WindowDataBatch bd : batches) {
bd.clear();
}
batches = null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
index a7964d6..4bb3d38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java
@@ -29,11 +29,14 @@
import java.util.List;
public interface WindowFramer {
- TemplateClassDefinition<WindowFramer> NOFRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
- TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION = new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
+ TemplateClassDefinition<WindowFramer> NOFRAME_TEMPLATE_DEFINITION =
+ new TemplateClassDefinition<>(WindowFramer.class, NoFrameSupportTemplate.class);
+ TemplateClassDefinition<WindowFramer> FRAME_TEMPLATE_DEFINITION =
+ new TemplateClassDefinition<>(WindowFramer.class, FrameSupportTemplate.class);
- void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext operatorContext,
- final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException;
+ void setup(final List<WindowDataBatch> batches, final VectorContainer container,
+ final OperatorContext operatorContext, final boolean requireFullPartition,
+ final WindowPOP popConfig) throws SchemaChangeException;
/**
* process the inner batch and write the aggregated values in the container
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 9587325..3006529 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -28,23 +28,23 @@
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.AllocationHelper;
+
public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
private SelectionVector4 vector4;
private List<BatchGroup> batchGroups;
private VectorAccessible hyperBatch;
private VectorAccessible outgoing;
private int size;
- private int queueSize = 0;
+ private int queueSize;
@Override
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
- VectorAccessible outgoing) throws SchemaChangeException {
+ public void setup(FragmentContext context, BufferAllocator allocator,
+ VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+ VectorAccessible outgoing) throws SchemaChangeException {
this.hyperBatch = hyperBatch;
this.batchGroups = batchGroups;
this.outgoing = outgoing;
@@ -90,20 +90,14 @@
}
private void setValueCount(int count) {
- for (VectorWrapper w: outgoing) {
- w.getValueVector().getMutator().setValueCount(count);
- }
+ VectorAccessibleUtilities.setValueCount(outgoing, count);
}
@Override
public void close() throws IOException {
vector4.clear();
- for (final VectorWrapper<?> w: outgoing) {
- w.getValueVector().clear();
- }
- for (final VectorWrapper<?> w : hyperBatch) {
- w.clear();
- }
+ VectorAccessibleUtilities.clear(outgoing);
+ VectorAccessibleUtilities.clear(hyperBatch);
for (BatchGroup batchGroup : batchGroups) {
batchGroup.close();
@@ -123,9 +117,7 @@
}
private void allocateVectors(int targetRecordCount) {
- for (VectorWrapper w: outgoing) {
- AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
- }
+ VectorAccessibleUtilities.allocateVectors(outgoing, targetRecordCount);
}
private void siftDown() {
@@ -162,8 +154,8 @@
return doEval(sv1, sv2);
}
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
+ public abstract void doSetup(@Named("context") FragmentContext context,
+ @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 96b9cee..cb835e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -22,14 +22,17 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
/**
- * Implements an AbstractUnaryRecordBatch where the incoming record batch is known at the time of creation
+ * Implements an AbstractUnaryRecordBatch where the incoming record batch is
+ * known at the time of creation
+ *
* @param <T>
*/
public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractUnaryRecordBatch<T> {
protected final RecordBatch incoming;
- public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ public AbstractSingleRecordBatch(T popConfig, FragmentContext context,
+ RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
index a7cb01f..808f6ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractUnaryRecordBatch.java
@@ -23,6 +23,8 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The base class for operators that have a single input. The concrete implementations provide the
@@ -34,9 +36,9 @@
* @param <T>
*/
public abstract class AbstractUnaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
+ private static final Logger logger = LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
- protected boolean outOfMemory = false;
+ protected boolean outOfMemory;
protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private IterOutcome lastKnownOutcome;
@@ -140,18 +142,17 @@
protected abstract IterOutcome doWork();
/**
- * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
- * This could happen when the underneath Scan operators do not produce any batch with schema.
- *
+ * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE
+ * before return a OK_NEW_SCHEMA: This could happen when the underneath Scan
+ * operators do not produce any batch with schema.
* <p>
- * Notice that NULL input is different from input with an empty batch. In the later case, input provides
- * at least a batch, thought it's empty.
- *</p>
- *
+ * Notice that NULL input is different from input with an empty batch. In the
+ * later case, input provides at least a batch, thought it's empty.
+ * </p>
* <p>
- * This behavior could be override in each individual operator, if the operator's semantics is to
- * inject a batch with schema.
- *</p>
+ * This behavior could be override in each individual operator, if the
+ * operator's semantics is to inject a batch with schema.
+ * </p>
*
* @return IterOutcome.NONE.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 8361dbe..1f06710 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -89,18 +89,18 @@
// the schema has changed since the previous call.
// Set up to recognize previous fields that no longer exist.
- final Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
- for (final VectorWrapper<?> wrapper : container) {
- final ValueVector vector = wrapper.getValueVector();
+ Map<String, ValueVector> oldFields = CaseInsensitiveMap.newHashMap();
+ for (VectorWrapper<?> wrapper : container) {
+ ValueVector vector = wrapper.getValueVector();
oldFields.put(vector.getField().getName(), vector);
}
- final VectorContainer newVectors = new VectorContainer();
+ VectorContainer newVectors = new VectorContainer();
try {
- final List<SerializedField> fields = def.getFieldList();
+ List<SerializedField> fields = def.getFieldList();
int bufOffset = 0;
- for (final SerializedField field : fields) {
- final MaterializedField fieldDef = MaterializedField.create(field);
+ for (SerializedField field : fields) {
+ MaterializedField fieldDef = MaterializedField.create(field);
ValueVector vector = oldFields.remove(fieldDef.getName());
if (vector == null) {
@@ -144,8 +144,8 @@
}
// rebuild the schema.
- final SchemaBuilder builder = BatchSchema.newBuilder();
- for (final VectorWrapper<?> v : newVectors) {
+ SchemaBuilder builder = BatchSchema.newBuilder();
+ for (VectorWrapper<?> v : newVectors) {
builder.addField(v.getField());
}
builder.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
@@ -156,14 +156,12 @@
} catch (final Throwable cause) {
// We have to clean up new vectors created here and pass over the actual cause. It is upper layer who should
// adjudicate to call upper layer specific clean up logic.
- for (final VectorWrapper<?> wrapper:newVectors) {
- wrapper.getValueVector().clear();
- }
+ VectorAccessibleUtilities.clear(newVectors);
throw cause;
} finally {
if (! oldFields.isEmpty()) {
schemaChanged = true;
- for (final ValueVector vector : oldFields.values()) {
+ for (ValueVector vector : oldFields.values()) {
vector.clear();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 04cf32e..e03b17c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -36,7 +36,6 @@
* RecordIterator will hold onto multiple record batches in order to support resetting beyond record batch boundary.
*/
public class RecordIterator implements VectorAccessible {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordIterator.class);
private final RecordBatch incoming;
private final AbstractRecordBatch<?> outgoing;
@@ -48,10 +47,10 @@
private int markedInnerPosition;
private long markedOuterPosition;
private IterOutcome lastOutcome;
- private int inputIndex; // For two way merge join 0:left, 1:right
+ private final int inputIndex; // For two way merge join 0:left, 1:right
private boolean lastBatchRead; // True if all batches are consumed.
private boolean initialized;
- private OperatorContext oContext;
+ private final OperatorContext oContext;
private final boolean enableMarkAndReset;
private final VectorContainer container; // Holds VectorContainer of current record batch
@@ -217,6 +216,7 @@
container.addOrGet(w.getField());
}
container.buildSchema(rbd.getContainer().getSchema().getSelectionVectorMode());
+ container.setEmpty();
initialized = true;
}
if (innerRecordCount > 0) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 6d4e057..1cfc61d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -32,6 +32,7 @@
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -124,9 +125,15 @@
* Transfer vectors from containerIn to this.
*/
public void transferIn(VectorContainer containerIn) {
- Preconditions.checkArgument(this.wrappers.size() == containerIn.wrappers.size());
- for (int i = 0; i < this.wrappers.size(); ++i) {
- containerIn.wrappers.get(i).transfer(this.wrappers.get(i));
+ rawTransferIn(containerIn);
+ setRecordCount(containerIn.getRecordCount());
+ }
+
+ @VisibleForTesting
+ public void rawTransferIn(VectorContainer containerIn) {
+ Preconditions.checkArgument(wrappers.size() == containerIn.wrappers.size());
+ for (int i = 0; i < wrappers.size(); ++i) {
+ containerIn.wrappers.get(i).transfer(wrappers.get(i));
}
}
@@ -237,14 +244,14 @@
* @param srcIndex The index of the row to copy from the source {@link VectorContainer}.
* @return Position one above where the row was appended
*/
- public int appendRow(VectorContainer srcContainer, int srcIndex) {
- for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
- ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
- ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
- destVector.copyEntry(recordCount, srcVector, srcIndex);
- }
- return incRecordCount();
+ public int appendRow(VectorContainer srcContainer, int srcIndex) {
+ for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
+ ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
+ ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
+ destVector.copyEntry(recordCount, srcVector, srcIndex);
}
+ return incRecordCount();
+ }
public TypedFieldId add(ValueVector vv) {
schemaChanged();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 9e31ff8..577517d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -31,10 +31,10 @@
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
- * A specialized version of record batch that can moves out buffers and preps them for writing.
+ * A specialized version of record batch that can moves out buffers and preps
+ * them for writing.
*/
public class WritableBatch implements AutoCloseable {
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class);
private final RecordBatchDef def;
private final DrillBuf[] buffers;
@@ -74,7 +74,8 @@
public void reconstructContainer(BufferAllocator allocator, VectorContainer container) {
Preconditions.checkState(!cleared,
"Attempted to reconstruct a container from a WritableBatch after it had been cleared");
- if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */
+ // If we have DrillBuf's associated with value vectors
+ if (buffers.length > 0) {
int len = 0;
for (DrillBuf b : buffers) {
len += b.capacity();
@@ -82,7 +83,7 @@
DrillBuf newBuf = allocator.buffer(len);
try {
- /* Copy data from each buffer into the compound buffer */
+ // Copy data from each buffer into the compound buffer
int offset = 0;
for (DrillBuf buf : buffers) {
newBuf.setBytes(offset, buf);
@@ -94,9 +95,9 @@
int bufferOffset = 0;
- /*
- * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
- */
+ // For each value vector slice up the appropriate size from the
+ // compound buffer and load it into the value vector
+
int vectorIndex = 0;
for (VectorWrapper<?> vv : container) {
@@ -120,16 +121,11 @@
svMode = SelectionVectorMode.NONE;
}
container.buildSchema(svMode);
-
- /* Set the record count in the value vector */
- for (VectorWrapper<?> v : container) {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(def.getRecordCount());
- }
+ container.setValueCount(def.getRecordCount());
}
public void clear() {
- if(cleared) {
+ if (cleared) {
return;
}
for (DrillBuf buf : buffers) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 69500c0..99b0723 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -43,6 +43,8 @@
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Encapsulates the future management of query submissions. This entails a
@@ -56,8 +58,8 @@
* handle this case and then do a switcheroo.
*/
public class QueryResultHandler {
- private static final org.slf4j.Logger logger =
- org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(QueryResultHandler.class);
/**
* Current listener for results, for each active query.
@@ -74,7 +76,7 @@
}
public RpcConnectionHandler<UserToBitConnection> getWrappedConnectionHandler(
- final RpcConnectionHandler<UserToBitConnection> handler) {
+ RpcConnectionHandler<UserToBitConnection> handler) {
return new ChannelClosedHandler(handler);
}
@@ -82,11 +84,11 @@
* Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
* handles data result messages
*/
- public void resultArrived( ByteBuf pBody ) throws RpcException {
- final QueryResult queryResult = RpcBus.get( pBody, QueryResult.PARSER );
+ public void resultArrived(ByteBuf pBody) throws RpcException {
+ QueryResult queryResult = RpcBus.get(pBody, QueryResult.PARSER);
- final QueryId queryId = queryResult.getQueryId();
- final QueryState queryState = queryResult.getQueryState();
+ QueryId queryId = queryResult.getQueryId();
+ QueryState queryState = queryResult.getQueryState();
if (logger.isDebugEnabled()) {
logger.debug("resultArrived: queryState: {}, queryId = {}", queryState,
@@ -95,18 +97,18 @@
assert queryResult.hasQueryState() : "received query result without QueryState";
- final boolean isFailureResult = QueryState.FAILED == queryState;
+ boolean isFailureResult = QueryState.FAILED == queryState;
// CANCELED queries are handled the same way as COMPLETED
- final boolean isTerminalResult;
- switch ( queryState ) {
+ boolean isTerminalResult;
+ switch (queryState) {
case FAILED:
case CANCELED:
case COMPLETED:
isTerminalResult = true;
break;
default:
- logger.error( "Unexpected/unhandled QueryState " + queryState
- + " (for query " + queryId + ")" );
+ logger.error("Unexpected/unhandled QueryState " + queryState
+ + " (for query " + queryId + ")");
isTerminalResult = false;
break;
}
@@ -127,19 +129,19 @@
try {
resultsListener.queryCompleted(queryState);
- } catch ( Exception e ) {
+ } catch (Exception e) {
resultsListener.submissionFailed(UserException.systemError(e).build(logger));
}
} else {
logger.warn("queryState {} was ignored", queryState);
}
} finally {
- if ( isTerminalResult ) {
+ if (isTerminalResult) {
// TODO: What exactly are we checking for? How should we really check
// for it?
- if ( (! ( resultsListener instanceof BufferingResultsListener )
- || ((BufferingResultsListener) resultsListener).output != null ) ) {
- queryIdToResultsListenersMap.remove( queryId, resultsListener );
+ if ((! (resultsListener instanceof BufferingResultsListener)
+ || ((BufferingResultsListener) resultsListener).output != null)) {
+ queryIdToResultsListenersMap.remove(queryId, resultsListener);
}
}
}
@@ -149,50 +151,52 @@
* Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
* handles query data messages
*/
- public void batchArrived( ConnectionThrottle throttle,
- ByteBuf pBody, ByteBuf dBody ) throws RpcException {
- final QueryData queryData = RpcBus.get( pBody, QueryData.PARSER );
+ public void batchArrived(ConnectionThrottle throttle,
+ ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ QueryData queryData = RpcBus.get(pBody, QueryData.PARSER);
// Current batch coming in.
- final DrillBuf drillBuf = (DrillBuf) dBody;
- final QueryDataBatch batch = new QueryDataBatch( queryData, drillBuf );
+ DrillBuf drillBuf = (DrillBuf) dBody;
+ QueryDataBatch batch = new QueryDataBatch(queryData, drillBuf);
- final QueryId queryId = queryData.getQueryId();
+ QueryId queryId = queryData.getQueryId();
if (logger.isDebugEnabled()) {
logger.debug("batchArrived: queryId = {}", QueryIdHelper.getQueryId(queryId));
}
- logger.trace( "batchArrived: batch = {}", batch );
+ logger.trace("batchArrived: batch = {}", batch);
- final UserResultsListener resultsListener = newUserResultsListener(queryId);
+ UserResultsListener resultsListener = newUserResultsListener(queryId);
// A data case--pass on via dataArrived
try {
resultsListener.dataArrived(batch, throttle);
// That releases batch if successful.
- } catch ( Exception e ) {
+ } catch (Exception e) {
batch.release();
resultsListener.submissionFailed(UserException.systemError(e).build(logger));
}
}
/**
- * Return {@link UserResultsListener} associated with queryId. Will create a new {@link BufferingResultsListener}
- * if no listener found.
- * @param queryId queryId we are getting the listener for
+ * Return {@link UserResultsListener} associated with queryId. Will create a
+ * new {@link BufferingResultsListener} if no listener found.
+ *
+ * @param queryId
+ * queryId we are getting the listener for
* @return {@link UserResultsListener} associated with queryId
*/
private UserResultsListener newUserResultsListener(QueryId queryId) {
- UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
- logger.trace( "For QueryId [{}], retrieved results listener {}", queryId, resultsListener );
- if ( null == resultsListener ) {
+ UserResultsListener resultsListener = queryIdToResultsListenersMap.get(queryId);
+ logger.trace("For QueryId [{}], retrieved results listener {}", queryId, resultsListener);
+ if (null == resultsListener) {
// WHO?? didn't get query ID response and set submission listener yet,
// so install a buffering listener for now
BufferingResultsListener bl = new BufferingResultsListener();
- resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
+ resultsListener = queryIdToResultsListenersMap.putIfAbsent(queryId, bl);
// If we had a successful insertion, use that reference. Otherwise, just
// throw away the new buffering listener.
- if ( null == resultsListener ) {
+ if (null == resultsListener) {
resultsListener = bl;
}
}
@@ -201,7 +205,7 @@
private static class BufferingResultsListener implements UserResultsListener {
- private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
+ private final ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
private volatile UserException ex;
private volatile QueryState queryState;
private volatile UserResultsListener output;
@@ -252,8 +256,10 @@
@Override
public void submissionFailed(UserException ex) {
assert queryState == null;
- // there is one case when submissionFailed() is called even though the query didn't fail on the server side
- // it happens when UserResultsListener.batchArrived() throws an exception that will be passed to
+ // there is one case when submissionFailed() is called even though the
+ // query didn't fail on the server side
+ // it happens when UserResultsListener.batchArrived() throws an exception
+ // that will be passed to
// submissionFailed() by QueryResultHandler.dataArrived()
queryState = QueryState.FAILED;
synchronized (this) {
@@ -335,7 +341,7 @@
}
@Override
- public void interrupted(final InterruptedException ex) {
+ public void interrupted(InterruptedException ex) {
if (!isTerminal.compareAndSet(false, true)) {
logger.warn("Received multiple responses to run query request.");
return;
@@ -350,26 +356,27 @@
}
/**
- * When a {@link UserToBitConnection connection} to a server is successfully created, this handler adds a
- * listener to that connection that listens to connection closure. If the connection is closed, all active
+ * When a {@link UserToBitConnection connection} to a server is successfully
+ * created, this handler adds a listener to that connection that listens to
+ * connection closure. If the connection is closed, all active
* {@link UserResultsListener result listeners} are failed.
*/
private class ChannelClosedHandler implements RpcConnectionHandler<UserToBitConnection> {
private final RpcConnectionHandler<UserToBitConnection> parentHandler;
- public ChannelClosedHandler(final RpcConnectionHandler<UserToBitConnection> parentHandler) {
+ public ChannelClosedHandler(RpcConnectionHandler<UserToBitConnection> parentHandler) {
this.parentHandler = parentHandler;
}
@Override
- public void connectionSucceeded(final UserToBitConnection connection) {
+ public void connectionSucceeded(UserToBitConnection connection) {
connection.getChannel().closeFuture().addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
- for (final UserResultsListener listener : queryIdToResultsListenersMap.values()) {
+ for (UserResultsListener listener : queryIdToResultsListenersMap.values()) {
listener.submissionFailed(UserException.connectionError()
.message("Connection %s closed unexpectedly. Drillbit down?",
connection.getName())
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
index 865aae4..223fecf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestAggNullable.java
@@ -26,7 +26,6 @@
@Category(OperatorTest.class)
public class TestAggNullable extends BaseTestQuery {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestAggNullable.class);
private static void enableAggr(boolean ha, boolean sa) throws Exception {
@@ -72,5 +71,4 @@
assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
}
-
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 3a02544..ddb1e5b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -37,6 +37,7 @@
@Category({SqlFunctionTest.class, OperatorTest.class, PlannerTest.class, UnlikelyTest.class})
public class TestExampleQueries extends BaseTestQuery {
+
@BeforeClass
public static void setupTestFiles() {
dirTestWatcher.copyResourceToRoot(Paths.get("tpchmulti"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
index d3cf0d8..f79eb60 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
@@ -36,7 +36,6 @@
@Category({SqlTest.class, PlannerTest.class})
public class TestStarQueries extends BaseTestQuery {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStarQueries.class);
@BeforeClass
public static void setupTestFiles() {
@@ -298,7 +297,7 @@
try {
test("select x.n_nationkey, x.n_name, x.n_regionkey, x.r_name from (select * from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r where n.n_regionkey = r.r_regionkey) x " );
} catch (UserException e) {
- logger.info("***** Test resulted in expected failure: " + e.getMessage());
+ // Expected
throw e;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 3523bdf..292602f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -347,7 +347,6 @@
.baselineColumns("n_nationkey", "n_regionkey", "n_name")
.build().run();
-
testBuilder()
.sqlQuery(query2)
.unOrdered()
@@ -355,7 +354,7 @@
.baselineTypes(TypeProtos.MinorType.INT, TypeProtos.MinorType.INT, TypeProtos.MinorType.VARCHAR)
.baselineColumns("n_nationkey", "n_regionkey", "n_name")
.build().run();
- }
+ }
@Test // see DRILL-1977, DRILL-2376, DRILL-2377, DRILL-2378, DRILL-2379
@Category(UnlikelyTest.class)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
index 0ffd04c..a4d10b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSeparatePlanningTest.java
@@ -51,6 +51,7 @@
*/
@Category({SlowTest.class, PlannerTest.class})
public class DrillSeparatePlanningTest extends ClusterTest {
+
@BeforeClass
public static void setupFiles() {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "json"));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
index de8a77e..fd43fa1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java
@@ -39,54 +39,67 @@
public void test2() throws Exception {
test("alter session set `planner.enable_hashjoin` = false");
test("alter session set `planner.slice_target` = 1");
- test("SELECT r.r_name, \n" +
- " t1.f \n" +
- "FROM cp.`tpch/region.parquet` r \n" +
- " JOIN (SELECT Flatten(x) AS f \n" +
- " FROM (SELECT Convert_from('[0, 1]', 'json') AS x \n" +
- " FROM cp.`tpch/region.parquet`)) t1 \n" +
- " ON t1.f = cast(r.r_regionkey as bigint) \n" +
- "ORDER BY r.r_name");
- test("alter session set `planner.enable_hashjoin` = true");
- test("alter session set `planner.slice_target` = 1000000");
+ try {
+ test("SELECT r.r_name, \n" +
+ " t1.f \n" +
+ "FROM cp.`tpch/region.parquet` r \n" +
+ " JOIN (SELECT Flatten(x) AS f \n" +
+ " FROM (SELECT Convert_from('[0, 1]', 'json') AS x \n" +
+ " FROM cp.`tpch/region.parquet`)) t1 \n" +
+ " ON t1.f = cast(r.r_regionkey as bigint) \n" +
+ "ORDER BY r.r_name");
+ } finally {
+ test("alter session reset `planner.enable_hashjoin`");
+ test("alter session reset `planner.slice_target`");
+ }
}
@Test
public void test3() throws Exception {
test("alter session set `planner.enable_hashjoin` = false");
test("alter session set `planner.slice_target` = 1");
- test("select f from\n" +
- "(select convert_from(nation, 'json') as f from\n" +
- "(select concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', r.r_regionkey, '}') as nation\n" +
- " from cp.`tpch/nation.parquet` n,\n" +
- " cp.`tpch/region.parquet` r\n" +
- " where \n" +
- " n.n_regionkey = r.r_regionkey)) t\n" +
- "order by t.f.name");
+ try {
+ test("select f from\n" +
+ "(select convert_from(nation, 'json') as f from\n" +
+ "(select concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', r.r_regionkey, '}') as nation\n" +
+ " from cp.`tpch/nation.parquet` n,\n" +
+ " cp.`tpch/region.parquet` r\n" +
+ " where \n" +
+ " n.n_regionkey = r.r_regionkey)) t\n" +
+ "order by t.f.name");
+ } finally {
+ test("alter session reset `planner.enable_hashjoin`");
+ test("alter session reset `planner.slice_target`");
+ }
}
@Test
public void test4() throws Exception {
test("alter session set `planner.enable_hashjoin` = false");
test("alter session set `planner.slice_target` = 1");
- test("SELECT f \n" +
- "FROM (SELECT Convert_from(nation, 'json') AS f \n" +
- " FROM (SELECT Concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', \n" +
- " r.r_regionkey, \n" +
- " '}') AS \n" +
- " nation \n" +
- " FROM cp.`tpch/nation.parquet` n, \n" +
- " cp.`tpch/region.parquet` r \n" +
- " WHERE n.n_regionkey = r.r_regionkey \n" +
- " AND r.r_regionkey = 4)) t \n" +
- "ORDER BY t.f.name");
+ try {
+ test("SELECT f \n" +
+ "FROM (SELECT Convert_from(nation, 'json') AS f \n" +
+ " FROM (SELECT Concat('{\"name\": \"', n.n_name, '\", ', '\"regionkey\": ', \n" +
+ " r.r_regionkey, \n" +
+ " '}') AS \n" +
+ " nation \n" +
+ " FROM cp.`tpch/nation.parquet` n, \n" +
+ " cp.`tpch/region.parquet` r \n" +
+ " WHERE n.n_regionkey = r.r_regionkey \n" +
+ " AND r.r_regionkey = 4)) t \n" +
+ "ORDER BY t.f.name");
+ } finally {
+ test("alter session reset `planner.enable_hashjoin`");
+ test("alter session reset `planner.slice_target`");
+ }
}
@Test //DRILL-4783 when resultset is empty, don't throw exception.
@Category(UnlikelyTest.class)
public void test5() throws Exception {
- //when there is no incoming record, flatten won't throw exception
+ // when there is no incoming record, flatten won't throw exception
testBuilder().sqlQuery("select flatten(j) from \n" +
" (select convert_from(names, 'json') j \n" +
" from (select concat('[\"', first_name, '\", ', '\"', last_name, '\"]') names \n" +
@@ -94,7 +107,7 @@
.expectsEmptyResultSet()
.build().run();
- //result is not empty and is list type,
+ // result is not empty and is list type,
testBuilder().sqlQuery("select flatten(j) n from \n" +
" (select convert_from(names, 'json') j \n" +
" from (select concat('[\"', first_name, '\", ', '\"', last_name, '\"]') names \n" +
@@ -105,7 +118,7 @@
.baselineValues("Nowmer")
.build().run();
- //result is not empty, and flatten got incompatible (non-list) incoming records. got exception thrown
+ // result is not empty, and flatten got incompatible (non-list) incoming records. got exception thrown
errorMsgTestHelper("select flatten(first_name) from \n" +
"(select first_name from cp.`employee.json` where first_name='Sheri')",
"Flatten does not support inputs of non-list values");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 51e47ba..2d1a1ba 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -77,9 +77,6 @@
this.allocator = context.getAllocator();
this.container = new VectorContainer(allocator, schema);
this.allOutcomes = iterOutcomes;
- this.currentContainerIndex = 0;
- this.currentOutcomeIndex = 0;
- this.isDone = false;
}
@Deprecated
@@ -193,7 +190,7 @@
@Override
public IterOutcome next() {
- if(isDone) {
+ if (isDone) {
return IterOutcome.NONE;
}
@@ -213,16 +210,18 @@
switch (rowSet.indirectionType()) {
case NONE:
case TWO_BYTE:
- container.transferIn(input);
- if ( input.hasRecordCount() ) { // in case special test of uninitialized input container
- container.setRecordCount(input.getRecordCount());
+ if (input.hasRecordCount()) { // in case special test of uninitialized input container
+ container.transferIn(input);
+ } else {
+ // Not normally a valid condition, supported here just for testing
+ container.rawTransferIn(input);
}
SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
if (sv2 != null) {
// Operators assume that new values for an Sv2 are transferred in.
sv2.allocateNewSafe(inputSv2.getCount());
- for (int i=0; i<inputSv2.getCount(); ++i) {
+ for (int i=0; i < inputSv2.getCount(); ++i) {
sv2.setIndex(i, inputSv2.getIndex(i));
}
sv2.setRecordCount(inputSv2.getCount());
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 376ac03..87a324f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -57,8 +57,6 @@
/**
* A template for Hash Aggr spilling tests
- *
- * @throws Exception
*/
private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback, boolean predict,
String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
@@ -84,8 +82,6 @@
/**
* Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
* ("normal spill" means spill-cycle = 1 )
- *
- * @throws Exception
*/
@Test
public void testSimpleHashAggrSpill() throws Exception {
@@ -96,8 +92,6 @@
/**
* Test with "needed memory" prediction turned off
* (i.e., exercise code paths that catch OOMs from the Hash Table and recover)
- *
- * @throws Exception
*/
@Test
@Ignore("DRILL-7301")
@@ -126,9 +120,8 @@
}
/**
- * Test Secondary and Tertiary spill cycles - Happens when some of the spilled partitions cause more spilling as they are read back
- *
- * @throws Exception
+ * Test Secondary and Tertiary spill cycles - Happens when some of the spilled
+ * partitions cause more spilling as they are read back
*/
@Test
public void testHashAggrSecondaryTertiarySpill() throws Exception {
@@ -139,9 +132,8 @@
}
/**
- * Test with the "fallback" option disabled: When not enough memory available to allow spilling, then fail (Resource error) !!
- *
- * @throws Exception
+ * Test with the "fallback" option disabled: When not enough memory available
+ * to allow spilling, then fail (Resource error) !!
*/
@Test
public void testHashAggrFailWithFallbackDisabed() throws Exception {
@@ -158,10 +150,10 @@
}
/**
- * Test with the "fallback" option ON: When not enough memory is available to allow spilling (internally need enough memory to
- * create multiple partitions), then behave like the pre-1.11 Hash Aggregate: Allocate unlimited memory, no spill.
- *
- * @throws Exception
+ * Test with the "fallback" option ON: When not enough memory is available to
+ * allow spilling (internally need enough memory to create multiple
+ * partitions), then behave like the pre-1.11 Hash Aggregate: Allocate
+ * unlimited memory, no spill.
*/
@Test
public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 7610e71..d2485f1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -276,7 +276,7 @@
@Test
@Category(UnlikelyTest.class)
public void drill1652() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select uid, flatten(transactions) from dfs.`tmp/bigfile.json`");
}
}
@@ -330,7 +330,7 @@
public void testKVGenFlatten2() throws Exception {
// currently runs
// TODO - re-verify results by hand
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select flatten(kvgen(visited_cellid_counts)) as mytb from dfs.`tmp/mapkv.json`");
}
}
@@ -351,7 +351,7 @@
// FIXED BY RETURNING PROPER SCHEMA DURING FAST SCHEMA STEP
// these types of problems are being solved more generally as we develp better support for chaning schema
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select celltbl.catl from (\n" +
" select flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b limit 100\n" +
" ) celltbl where celltbl.catl = 'Doctors'");
@@ -360,7 +360,7 @@
@Test
public void countAggFlattened() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select celltbl.catl, count(celltbl.catl) from ( " +
"select business_id, flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b limit 100 " +
") celltbl group by celltbl.catl limit 10 ");
@@ -369,35 +369,33 @@
@Test
public void flattenAndAdditionalColumn() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select business_id, flatten(categories) from dfs.`tmp/yelp_academic_dataset_business.json` b");
}
}
@Test
public void testFailingFlattenAlone() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select flatten(categories) from dfs.`tmp/yelp_academic_dataset_business.json` b ");
}
}
@Test
public void testDistinctAggrFlattened() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test(" select distinct(celltbl.catl) from (\n" +
" select flatten(categories) catl from dfs.`tmp/yelp_academic_dataset_business.json` b\n" +
" ) celltbl");
}
-
}
@Test
@Category(UnlikelyTest.class)
public void testDrill1665() throws Exception {
- if(RUN_ADVANCED_TESTS){
+ if (RUN_ADVANCED_TESTS) {
test("select id, flatten(evnts) as rpt from dfs.`tmp/drill1665.json`");
}
-
}
@Test
@@ -479,7 +477,6 @@
.unOrdered()
.jsonBaselineFile("flatten/drill-2106-result.json")
.go();
-
}
@Test // see DRILL-2146
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index a015ff7..5530176 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -36,6 +36,7 @@
import org.junit.experimental.categories.Category;
public class TestWindowFrame extends BaseTestQuery {
+
@BeforeClass
public static void setupMSortBatchSize() throws IOException {
// make sure memory sorter outputs 20 rows per batch
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
index f828cf7..01110c4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.drill.PlanTestBase;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
@@ -69,7 +68,8 @@
.baselineValues("`sales_district_id`", 110.0, 110.0, 23L, 8.0)
.go();
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -96,7 +96,8 @@
.baselineValues("`birth_date`", 1155.0, 1155.0, 52L, 10.0)
.go();
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -124,8 +125,9 @@
.baselineValues("`birth_date`", 1138.0, 1138.0, 38L, 10.001597699312988)
.go();
} finally {
- test("ALTER SESSION SET `exec.statistics.deterministic_sampling` = false");
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("exec.statistics.deterministic_sampling");
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -143,7 +145,9 @@
test("ALTER SESSION SET `planner.statistics.use` = true");
test("SELECT * FROM dfs.tmp.`lineitem` l JOIN dfs.tmp.`orders` o ON l.l_orderkey = o.o_orderkey");
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
+ resetSessionOption("planner.statistics.use");
}
}
@@ -166,7 +170,8 @@
verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.employee_basic4 COMPUTE STATISTICS",
"16");
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -204,7 +209,8 @@
.baselineValues("`dir1`", 120.0, 120.0, 4L, 2.0)
.go();
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -214,21 +220,26 @@
final String tmpLocation = "/multilevel/parquet";
test("ALTER SESSION SET `planner.slice_target` = 1");
test("ALTER SESSION SET `store.format` = 'parquet'");
- test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
- "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS",
- "Table parquetStale has not changed since last ANALYZE!");
- // Verify we recompute statistics once a new file/directory is added. Update the directory some
- // time after ANALYZE so that the timestamps are different.
- Thread.sleep(1000);
- final String Q4 = "/multilevel/parquet/1996/Q4";
- test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
- "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
- Thread.sleep(1000);
- test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ try {
+ test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+ "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS",
+ "Table parquetStale has not changed since last ANALYZE!");
+ // Verify we recompute statistics once a new file/directory is added. Update the directory some
+ // time after ANALYZE so that the timestamps are different.
+ Thread.sleep(1000);
+ final String Q4 = "/multilevel/parquet/1996/Q4";
+ test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+ "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ Thread.sleep(1000);
+ test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ } finally {
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
+ }
}
@Test
@@ -236,99 +247,104 @@
//Test ndv/rowcount for scan
test("ALTER SESSION SET `planner.slice_target` = 1");
test("ALTER SESSION SET `store.format` = 'parquet'");
- test("CREATE TABLE dfs.tmp.employeeUseStat AS SELECT * from cp.`employee.json`");
- test("CREATE TABLE dfs.tmp.departmentUseStat AS SELECT * from cp.`department.json`");
- test("ANALYZE TABLE dfs.tmp.employeeUseStat COMPUTE STATISTICS");
- test("ANALYZE TABLE dfs.tmp.departmentUseStat COMPUTE STATISTICS");
- test("ALTER SESSION SET `planner.statistics.use` = true");
- String query = " select employee_id from dfs.tmp.employeeUseStat where department_id = 2";
- String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 96.25,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
+ try {
+ test("CREATE TABLE dfs.tmp.employeeUseStat AS SELECT * from cp.`employee.json`");
+ test("CREATE TABLE dfs.tmp.departmentUseStat AS SELECT * from cp.`department.json`");
+ test("ANALYZE TABLE dfs.tmp.employeeUseStat COMPUTE STATISTICS");
+ test("ANALYZE TABLE dfs.tmp.departmentUseStat COMPUTE STATISTICS");
+ test("ALTER SESSION SET `planner.statistics.use` = true");
+ String query = " select employee_id from dfs.tmp.employeeUseStat where department_id = 2";
+ String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 96.25,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
- query = " select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5)";
- String[] expectedPlan2 = {"Filter\\(condition.*\\).*rowcount = 192.5,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
+ query = " select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5)";
+ String[] expectedPlan2 = {"Filter\\(condition.*\\).*rowcount = 192.5,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
- query = "select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5) and employee_id = 5";
- String[] expectedPlan3 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan3, new String[]{});
+ query = "select employee_id from dfs.tmp.employeeUseStat where department_id IN (2, 5) and employee_id = 5";
+ String[] expectedPlan3 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan3, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
- + " on emp.department_id = dept.department_id";
- String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan4, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+ + " on emp.department_id = dept.department_id";
+ String[] expectedPlan4 = {"HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan4, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
- + " on emp.department_id = dept.department_id where dept.department_id = 5";
- String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan5, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+ + " on emp.department_id = dept.department_id where dept.department_id = 5";
+ String[] expectedPlan5 = {"HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan5, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
- + " on emp.department_id = dept.department_id"
- + " where dept.department_id = 5 and emp.employee_id = 10";
- String[] expectedPlan6 = {"MergeJoin\\(condition.*\\).*rowcount = 1.0,.*",
- "Filter\\(condition=\\[AND\\(=\\(\\$1, 10\\), =\\(\\$0, 5\\)\\)\\]\\).*rowcount = 1.0,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
- "Filter\\(condition=\\[=\\(\\$0, 5\\)\\]\\).*rowcount = 1.0,.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+ + " on emp.department_id = dept.department_id"
+ + " where dept.department_id = 5 and emp.employee_id = 10";
+ String[] expectedPlan6 = {"MergeJoin\\(condition.*\\).*rowcount = 1.0,.*",
+ "Filter\\(condition=\\[AND\\(=\\(\\$1, 10\\), =\\(\\$0, 5\\)\\)\\]\\).*rowcount = 1.0,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+ "Filter\\(condition=\\[=\\(\\$0, 5\\)\\]\\).*rowcount = 1.0,.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
- query = " select emp.employee_id, count(*)"
- + " from dfs.tmp.employeeUseStat emp"
- + " group by emp.employee_id";
- String[] expectedPlan7 = {"HashAgg\\(group=\\[\\{0\\}\\], EXPR\\$1=\\[COUNT\\(\\)\\]\\).*rowcount = 1155.0,.*",
- "Scan.*columns=\\[`employee_id`\\].*rowcount = 1155.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan7, new String[]{});
+ query = " select emp.employee_id, count(*)"
+ + " from dfs.tmp.employeeUseStat emp"
+ + " group by emp.employee_id";
+ String[] expectedPlan7 = {"HashAgg\\(group=\\[\\{0\\}\\], EXPR\\$1=\\[COUNT\\(\\)\\]\\).*rowcount = 1155.0,.*",
+ "Scan.*columns=\\[`employee_id`\\].*rowcount = 1155.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan7, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
- + " on emp.department_id = dept.department_id "
- + " group by emp.employee_id";
- String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 730.0992454469841,.*",
- "HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan8, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+ + " on emp.department_id = dept.department_id "
+ + " group by emp.employee_id";
+ String[] expectedPlan8 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 730.0992454469841,.*",
+ "HashJoin\\(condition.*\\).*rowcount = 1155.0,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan8, new String[]{});
- query = "select emp.employee_id, dept.department_description"
- + " from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
- + " on emp.department_id = dept.department_id "
- + " group by emp.employee_id, emp.store_id, dept.department_description "
- + " having dept.department_description = 'FINANCE'";
- String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 60.84160378724867.*",
- "HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
- "Filter\\(condition=\\[=\\(\\$1, 'FINANCE'\\)\\]\\).*rowcount = 1.0,.*",
- "Scan.*columns=\\[`department_id`, `department_description`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan9, new String[]{});
+ query = "select emp.employee_id, dept.department_description"
+ + " from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept"
+ + " on emp.department_id = dept.department_id "
+ + " group by emp.employee_id, emp.store_id, dept.department_description "
+ + " having dept.department_description = 'FINANCE'";
+ String[] expectedPlan9 = {"HashAgg\\(group=\\[\\{0, 1, 2\\}\\]\\).*rowcount = 60.84160378724867.*",
+ "HashJoin\\(condition.*\\).*rowcount = 96.25,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
+ "Filter\\(condition=\\[=\\(\\$1, 'FINANCE'\\)\\]\\).*rowcount = 1.0,.*",
+ "Scan.*columns=\\[`department_id`, `department_description`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan9, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
- + " on emp.department_id = dept.department_id "
- + " group by emp.employee_id, emp.store_id "
- + " having emp.store_id = 7";
- String[] expectedPlan10 = {"HashAgg\\(group=\\[\\{0, 1\\}\\]\\).*rowcount = 29.203969817879365.*",
- "HashJoin\\(condition.*\\).*rowcount = 46.2,.*",
- "Filter\\(condition=\\[=\\(\\$2, 7\\)\\]\\).*rowcount = 46.2,.*",
- "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan10, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
+ + " on emp.department_id = dept.department_id "
+ + " group by emp.employee_id, emp.store_id "
+ + " having emp.store_id = 7";
+ String[] expectedPlan10 = {"HashAgg\\(group=\\[\\{0, 1\\}\\]\\).*rowcount = 29.203969817879365.*",
+ "HashJoin\\(condition.*\\).*rowcount = 46.2,.*",
+ "Filter\\(condition=\\[=\\(\\$2, 7\\)\\]\\).*rowcount = 46.2,.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`, `store_id`\\].*rowcount = 1155.0.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan10, new String[]{});
- query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
- + " on emp.department_id = dept.department_id "
- + " group by emp.employee_id "
- + " having emp.employee_id = 7";
- String[] expectedPlan11 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 1.0.*",
- "HashJoin\\(condition.*\\).*rowcount = 1.0,.*",
- "Filter\\(condition=\\[=\\(\\$1, 7\\)\\]\\).*rowcount = 1.0.*",
- "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*",
- "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan11, new String[]{});
+ query = " select emp.employee_id from dfs.tmp.employeeUseStat emp join dfs.tmp.departmentUseStat dept\n"
+ + " on emp.department_id = dept.department_id "
+ + " group by emp.employee_id "
+ + " having emp.employee_id = 7";
+ String[] expectedPlan11 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 1.0.*",
+ "HashJoin\\(condition.*\\).*rowcount = 1.0,.*",
+ "Filter\\(condition=\\[=\\(\\$1, 7\\)\\]\\).*rowcount = 1.0.*",
+ "Scan.*columns=\\[`department_id`\\].*rowcount = 12.0.*",
+ "Scan.*columns=\\[`department_id`, `employee_id`\\].*rowcount = 1155.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan11, new String[]{});
+ } finally {
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
+ }
}
@Test
@@ -337,35 +353,41 @@
test("ALTER SESSION SET `store.format` = 'parquet'");
test("ALTER SESSION SET `planner.statistics.use` = true");
final String tmpLocation = "/multilevel/parquet";
- // copy the data into the temporary location
- test("DROP TABLE dfs.tmp.parquetStale");
- test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
- "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
- String query = "select count(distinct o_orderkey) from dfs.tmp.parquetStale";
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
- test("REFRESH TABLE METADATA dfs.tmp.parquetStale");
- // Verify we recompute statistics once a new file/directory is added. Update the directory some
- // time after ANALYZE so that the timestamps are different.
- Thread.sleep(1000);
- final String Q4 = "/multilevel/parquet/1996/Q4";
- test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
- "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
- // query should use STALE statistics
- String[] expectedStalePlan = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
- "Scan.*rowcount = 130.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedStalePlan, new String[]{});
- // Query should use Parquet Metadata, since statistics not available. In this case, NDV is computed as
- // 1/10*rowcount (Calcite default). Hence, NDV is 13.0 instead of the correct 119.0
- test("DROP TABLE dfs.tmp.`parquetStale/.stats.drill`");
- String[] expectedPlan1 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 13.0.*",
- "Scan.*rowcount = 130.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
- // query should use the new statistics. NDV remains unaffected since we copy the Q4 into Q5
- verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
- String[] expectedPlan2 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
- "Scan.*rowcount = 130.0.*"};
- PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
- test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+ try {
+ // copy the data into the temporary location
+ test("DROP TABLE dfs.tmp.parquetStale");
+ test("CREATE TABLE dfs.tmp.parquetStale AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+ "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", tmpLocation);
+ String query = "select count(distinct o_orderkey) from dfs.tmp.parquetStale";
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ test("REFRESH TABLE METADATA dfs.tmp.parquetStale");
+ // Verify we recompute statistics once a new file/directory is added. Update the directory some
+ // time after ANALYZE so that the timestamps are different.
+ Thread.sleep(1000);
+ final String Q4 = "/multilevel/parquet/1996/Q4";
+ test("CREATE TABLE dfs.tmp.`parquetStale/1996/Q5` AS SELECT o_orderkey, o_custkey, o_orderstatus, " +
+ "o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment from dfs.`%s`", Q4);
+ // query should use STALE statistics
+ String[] expectedStalePlan = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
+ "Scan.*rowcount = 130.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedStalePlan, new String[]{});
+ // Query should use Parquet Metadata, since statistics not available. In this case, NDV is computed as
+ // 1/10*rowcount (Calcite default). Hence, NDV is 13.0 instead of the correct 119.0
+ test("DROP TABLE dfs.tmp.`parquetStale/.stats.drill`");
+ String[] expectedPlan1 = {"HashAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 13.0.*",
+ "Scan.*rowcount = 130.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
+ // query should use the new statistics. NDV remains unaffected since we copy the Q4 into Q5
+ verifyAnalyzeOutput("ANALYZE TABLE dfs.tmp.parquetStale COMPUTE STATISTICS", "9");
+ String[] expectedPlan2 = {"StreamAgg\\(group=\\[\\{0\\}\\]\\).*rowcount = 119.0.*",
+ "Scan.*rowcount = 130.0.*"};
+ PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan2, new String[]{});
+ test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
+ } finally {
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
+ resetSessionOption("planner.statistics.use");
+ }
}
// Test basic histogram creation functionality for int, bigint, double, date, timestamp and boolean data types.
@@ -436,9 +458,10 @@
String[] expectedPlan6 = {"Filter\\(condition.*\\).*rowcount = 1.0,.*",
"Scan.*columns=\\[`store_id`\\].*rowcount = 1128.0.*"};
PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan6, new String[]{});
-
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
+ resetSessionOption("planner.statistics.use");
}
}
@@ -462,7 +485,8 @@
.baselineValues("`c_acctbal`", 11)
.go();
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
@@ -498,11 +522,11 @@
.go();
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
-
@Test
public void testHistogramWithIntervalPredicate() throws Exception {
try {
@@ -516,7 +540,8 @@
String[] expectedPlan1 = {"Filter\\(condition.*\\).*rowcount = 59?.*,.*", "Scan.*columns=\\[`o_orderdate`\\].*rowcount = 15000.0.*"};
PlanTestBase.testPlanWithAttributesMatchingPatterns(query, expectedPlan1, new String[]{});
} finally {
- test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
+ resetSessionOption("planner.slice_target");
+ resetSessionOption("store.format");
}
}
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 29cbe39..e32ecc9 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -579,21 +579,21 @@
/**
* Fill in missing values up to, but not including, the given
* index.
- *
+ *
* @param index the index about to be written, or the total
* vector length about to be set
*/
-
+
@VisibleForTesting
protected void fillEmpties(int index) {
values.getMutator().fillEmpties(lastSet, index);
while (index > bits.getValueCapacity()) {
bits.reAlloc();
}
-
+
// Set last set to the given index; which the caller
// will write to
-
+
lastSet = index;
}
@@ -753,7 +753,7 @@
values.getMutator().setValueCount(valueCount);
bits.getMutator().setValueCount(valueCount);
}
-
+
<#if type.major == "VarLen">
/** Enables this wrapper container class to participate in bulk mutator logic */
private final class VarLenBulkInputCallbackImpl implements VarLenBulkInput.BulkInputCallback<VarLenBulkEntry> {
@@ -853,7 +853,7 @@
public void setSetCount(int n) {
<#if type.major = "VarLen">lastSet = n - 1;</#if>
}
-
+
// For nullable vectors, exchanging buffers (done elsewhere)
// requires also exchanging mutator state (done here.)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
index e4e8cbc..099f113 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -25,7 +25,11 @@
public interface Mutator extends ValueVector.Mutator {
- // Used by the vector accessors to force the last set value.
+ /**
+ * Used by the vector accessors to force the last set value.
+ * @param n the value of the last set field used to
+ * fill empties
+ */
void setSetCount(int n);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
index 564e784..5830b30 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
@@ -34,7 +34,7 @@
if (capacity == 0) {
return;
}
- int length = Math.min(maxPrint, capacity);
+ int length = Math.max(maxPrint, vector.getAccessor().getValueCount());
printOffsets(vector, 0, length);
}