blob: bf471e0bedd3742a5345475f7edc0899f9328869 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.drill.exec.physical.resultSet.impl;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.accessor.WriterPosition;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Base class for a single vector. Handles the bulk of work for that vector.
* Subclasses are specialized for offset vectors or values vectors.
* (The "single vector" name contrasts with classes that manage compound
* vectors, such as a data and offsets vector.)
* * <p>
* During overflow it is critical to update the various stored vector
* lengths so that serialization/deserialization works correctly.
public abstract class SingleVectorState implements VectorState {
public abstract static class SimpleVectorState extends SingleVectorState {
private static final Logger logger = LoggerFactory.getLogger(SimpleVectorState.class);
public SimpleVectorState(WriterEvents writer,
ValueVector mainVector) {
super(writer, mainVector);
protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
int newIndex = 0;
logger.trace("Vector {} of type {}: copy {} values from {} to {}",
Math.max(0, sourceEndIndex - sourceStartIndex + 1),
sourceStartIndex, newIndex);
// Copy overflow values from the full vector to the new
// look-ahead vector. Uses vector-level operations for convenience.
// These aren't very efficient, but overflow does not happen very
// often.
for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
mainVector.copyEntry(newIndex, backupVector, src);
* State for a scalar value vector. The vector might be for a simple (non-array)
* vector, or might be the payload part of a scalar array (repeated scalar)
* vector.
public static class FixedWidthVectorState extends SimpleVectorState {
public FixedWidthVectorState(WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
public int allocateVector(ValueVector vector, int cardinality) {
((FixedWidthVector) vector).allocateNew(cardinality);
return vector.getAllocatedSize();
public static class IsSetVectorState extends FixedWidthVectorState {
public IsSetVectorState(WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
public int allocateVector(ValueVector vector, int cardinality) {
int size = super.allocateVector(vector, cardinality);
// IsSet ("bit") vectors rely on values being initialized to zero (unset.)
((FixedWidthVector) vector).zeroVector();
return size;
* State for a scalar value vector. The vector might be for a simple (non-array)
* vector, or might be the payload part of a scalar array (repeated scalar)
* vector.
public static class VariableWidthVectorState extends SimpleVectorState {
private final ColumnMetadata schema;
public VariableWidthVectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
this.schema = schema;
public int allocateVector(ValueVector vector, int cardinality) {
// Cap the allocated size to the maximum.
int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth());
((VariableWidthVector) vector).allocateNew(size, cardinality);
return vector.getAllocatedSize();
public void rollover(int cardinality) {
// Adjust offset vector length
int offsetLength = writer.rowStartIndex() + 1;
VariableWidthVector varWidthVector = ((VariableWidthVector) backupVector);
UInt4Vector offsetVector = varWidthVector.getOffsetVector();
offsetVector.getMutator().setValueCount(offsetLength );
// Adjust data vector length.
((BaseDataValueVector) backupVector).getBuffer().writerIndex(
offsetVector.getAccessor().get(offsetLength - 1));
* Special case for an offset vector. Offset vectors are managed like any other
* vector with respect to overflow and allocation. This means that the loader
* classes avoid the use of the RepeatedVector class methods, instead working
* with the offsets vector (here) or the values vector to allow the needed
* fine control over overflow operations.
public static class OffsetVectorState extends SingleVectorState {
private static final Logger logger = LoggerFactory.getLogger(OffsetVectorState.class);
* The child writer used to determine positions on overflow.
* The repeated list vector defers creating the child until the
* child type is know so this field cannot be final. It will,
* however, change value only once: from null to a valid writer.
private WriterPosition childWriter;
public OffsetVectorState(WriterEvents writer, ValueVector mainVector,
WriterPosition childWriter) {
super(writer, mainVector);
this.childWriter = childWriter;
public void setChildWriter(WriterEvents childWriter) {
this.childWriter = childWriter;
public int allocateVector(ValueVector toAlloc, int cardinality) {
((UInt4Vector) toAlloc).allocateNew(cardinality);
return toAlloc.getBufferSize();
public int rowStartOffset() {
return ((OffsetVectorWriter) writer).rowStartOffset();
protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) {
if (sourceStartIndex > sourceEndIndex) {
assert childWriter != null;
// This is an offset vector. The data to copy is one greater
// than the row index.
// Copy overflow values from the full vector to the new
// look-ahead vector. Since this is an offset vector, values must
// be adjusted as they move across.
// Indexing can be confusing. Offset vectors have values offset
// from their row by one position. The offset vector position for
// row i has the start value for row i. The offset vector position for
// i+1 has the start of the next value. The difference between the
// two is the element length. As a result, the offset vector always has
// one more value than the number of rows, and position 0 is always 0.
// The index passed in here is that of the row that overflowed. That
// offset vector position contains the offset of the start of the data
// for the current row. We must subtract that offset from each copied
// value to adjust the offset for the destination.
UInt4Vector sourceVector = ((UInt4Vector) backupVector);
final UInt4Vector.Accessor sourceAccessor = sourceVector.getAccessor();
final UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator();
final int offset = childWriter.rowStartIndex();
int newIndex = 1;
logger.trace("Offset vector: copy {} values from {} to {} with offset {}",
Math.max(0, sourceEndIndex - sourceStartIndex + 1),
sourceStartIndex, newIndex, offset);
assert offset == sourceAccessor.get(sourceStartIndex - 1);
// Position zero is special and will be filled in by the writer
// later.
for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) {
destMutator.set(newIndex, sourceAccessor.get(src) - offset);
// Adjust offset vector length
int offsetLength = writer.rowStartIndex() + 1;
sourceVector.getMutator().setValueCount(offsetLength );
// Getting offsets right was a pain. If you modify this code,
// you'll likely relive that experience. Enabling the three two
// lines will help reveal some of the mystery around offsets and their
// confusing off-by-one design.
// VectorChecker.verifyOffsets("nested", sourceVector);
// VectorPrinter.printOffsets(sourceVector, sourceStartIndex - 1, sourceEndIndex - sourceStartIndex + 3);
// VectorPrinter.printOffsets((UInt4Vector) mainVector, 0, newIndex);
protected final WriterEvents writer;
protected final ValueVector mainVector;
protected ValueVector backupVector;
public SingleVectorState(WriterEvents writer, ValueVector mainVector) {
this.writer = writer;
this.mainVector = mainVector;
public <T extends ValueVector> T vector() { return (T) mainVector; }
public int allocate(int cardinality) {
return allocateVector(mainVector, cardinality);
protected abstract int allocateVector(ValueVector vector, int cardinality);
* A column within the row batch overflowed. Prepare to absorb the rest of
* the in-flight row by rolling values over to a new vector, saving the
* complete vector for later. This column could have a value for the overflow
* row, or for some previous row, depending on exactly when and where the
* overflow occurs.
* sourceStartIndex: the index of the row that caused the overflow, the
* values of which should be copied to a new "look-ahead" vector. If the
* vector is an array, then the overflowIndex is the position of the first
* element to be moved, and multiple elements may need to move
* @param cardinality the number of unique columns in the row
public void rollover(int cardinality) {
int sourceStartIndex = writer.rowStartIndex();
// Remember the last write index for the original vector.
// This tells us the end of the set of values to move, while the
// sourceStartIndex above tells us the start.
int sourceEndIndex = writer.lastWriteIndex();
// Switch buffers between the backup vector and the writer's output
// vector. Done this way because writers are bound to vectors and
// we wish to keep the binding.
if (backupVector == null) {
backupVector = TypeHelper.getNewVector(mainVector.getField(),
parseVectorType(mainVector), mainVector.getAllocator(), null);
assert cardinality > 0;
allocateVector(backupVector, cardinality);;
// Copy overflow values from the full vector to the new
// look-ahead vector.
copyOverflow(sourceStartIndex, sourceEndIndex);
// At this point, the writer is positioned to write to the look-ahead
// vector at the position after the copied values. The original vector
// is saved along with a last write position that is no greater than
// the retained values.
* The vector mechanism here relies on the vector metadata. However, if the
* main vector is nullable, it will contain a <code>values</code> vector which
* is required. But the <code>values</code> vector will carry metadata that
* declares it to be nullable. While this is clearly a bug, it is a bug that has
* become a "feature" and cannot be changed. This code works around this feature
* by parsing out the actual type of the vector.
* @param vector the vector to clone, the type of which may not match the
* metadata declared within that vector
* @return the actual major type of the vector
protected static MajorType parseVectorType(ValueVector vector) {
MajorType purportedType = vector.getField().getType();
if (purportedType.getMode() != DataMode.OPTIONAL) {
return purportedType;
// For nullable vectors, the purported type can be wrong. The "outer"
// vector is nullable, but the internal "values" vector is required, though
// it carries a nullable type -- that is, the metadata lies.
if (vector instanceof NullableVector) {
return purportedType;
return purportedType.toBuilder()
protected abstract void copyOverflow(int sourceStartIndex, int sourceEndIndex);
* Exchange the data from the backup vector and the main vector, putting
* the completed buffers back into the main vectors, and stashing the
* overflow buffers away in the backup vector.
* Restore the main vector's last write position.
public void harvestWithLookAhead() {;
* The previous full batch has been sent downstream and the client is
* now ready to start writing to the next batch. Initialize that new batch
* with the look-ahead values saved during overflow of the previous batch.
public void startBatchWithLookAhead() {;
public void close() {
if (backupVector != null) {
public boolean isProjected() { return true; }
public static SimpleVectorState vectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) {
if (schema.isVariableWidth()) {
return new VariableWidthVectorState(schema, writer, mainVector);
} else {
return new FixedWidthVectorState(writer, mainVector);
public void dump(HierarchicalFormatter format) {
.attributeIdentity("writer", writer)
.attributeIdentity("mainVector", mainVector)
.attributeIdentity("backupVector", backupVector)