blob: e511fa03c3031d007277b2f20c0a110e24b99180 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.accessor.writer;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.joda.time.Period;
import io.netty.buffer.DrillBuf;
/**
* Column writer implementation that acts as the basis for the
* generated, vector-specific implementations. All set methods
* throw an exception; subclasses simply override the supported
* method(s).
* <p>
* The only tricky part to this class is understanding the
* state of the write indexes as the write proceeds. There are
* two pointers to consider:
* <ul>
* <li>lastWriteIndex: The position in the vector at which the
* client last asked us to write data. This index is maintained
* in this class because it depends only on the actions of this
* class.</li>
* <li>vectorIndex: The position in the vector at which we will
* write if the client chooses to write a value at this time.
* The vector index is shared by all columns at the same repeat
* level. It is incremented as the client steps through the write
* and is observed in this class each time a write occurs.</i>
* </ul>
* A repeat level is defined as any of the following:
* <ul>
* <li>The set of top-level scalar columns, or those within a
* top-level, non-repeated map, or nested to any depth within
* non-repeated maps rooted at the top level.</li>
* <li>The values for a single scalar array.</li>
* <li>The set of scalar columns within a repeated map, or
* nested within non-repeated maps within a repeated map.</li>
* </ul>
* Items at a repeat level index together and share a vector
* index. However, the columns within a repeat level
* <i>do not</i> share a last write index: some can lag further
* behind than others.
* <p>
* Let's illustrate the states. Let's focus on one column and
* illustrate the three states that can occur during write:
* <ul>
* <li><b>Behind</b>: the last write index is more than one position behind
* the vector index. Zero-filling will be needed to catch up to
* the vector index.</li>
* <li><b>Written</b>: the last write index is the same as the vector
* index because the client wrote data at this position (and previous
* values were back-filled with nulls, empties or zeros.)</li>
* <li><b>Unwritten</b>: the last write index is one behind the vector
* index. This occurs when the column was written, then the client
* moved to the next row or array position.</li>
* <li><b>Restarted</b>: The current row is abandoned (perhaps filtered
* out) and is to be rewritten. The last write position moves
* back one position. Note that, the Restarted state is
* indistinguishable from the unwritten state: the only real
* difference is that the current slot (pointed to by the
* vector index) contains the previous written value that must
* be overwritten or back-filled. But, this is fine, because we
* assume that unwritten values are garbage anyway.</li>
* </ul>
* To illustrate:<pre><code>
* Behind Written Unwritten Restarted
* |X| |X| |X| |X|
* lw >|X| |X| |X| |X|
* | | |0| |0| lw > |0|
* v >| | lw, v > |X| lw > |X| v > |X|
* v > | |
* </code></pre>
* The illustrated state transitions are:
* <ul>
* <li>Suppose the state starts in Behind.<ul>
* <li>If the client writes a value, then the empty slot is
* back-filled and the state moves to Written.</li>
* <li>If the client does not write a value, the state stays
* at Behind, and the gap of unfilled values grows.</li></ul></li>
* <li>When in the Written state:<ul>
* <li>If the client saves the current row or array position,
* the vector index increments and we move to the Unwritten
* state.</li>
* <li>If the client abandons the row, the last write position
* moves back one to recreate the unwritten state. We've
* shown this state separately above just to illustrate
* the two transitions from Written.</li></ul></li>
* <li>When in the Unwritten (or Restarted) states:<ul>
* <li>If the client writes a value, then the writer moves back to the
* Written state.</li>
* <li>If the client skips the value, then the vector index increments
* again, leaving a gap, and the writer moves to the
* Behind state.</li></ul>
* </ul>
* <p>
* We've already noted that the Restarted state is identical to
* the Unwritten state (and was discussed just to make the flow a bit
* clearer.) The astute reader will have noticed that the Behind state is
* the same as the Unwritten state if we define the combined state as
* when the last write position is behind the vector index.
* <p>
* Further, if
* one simply treats the gap between last write and the vector indexes
* as the amount (which may be zero) to back-fill, then there is just
* one state. This is, in fact, how the code works: it always writes
* to the vector index (and can do so multiple times for a single row),
* back-filling as necessary.
* <p>
* The states, then, are more for our use in understanding the algorithm.
* They are also very useful when working through the logic of performing
* a roll-over when a vector overflows.
*/
public abstract class BaseScalarWriter extends AbstractScalarWriterImpl {
public static final int MIN_BUFFER_SIZE = 256;
/**
* Listener invoked if the vector overflows. If not provided, then the writer
* does not support vector overflow.
*/
protected ColumnWriterListener listener;
/**
* Value to use to fill empties. Must be at least as wide as each
* value.
*/
protected byte emptyValue[];
protected DrillBuf drillBuf;
/**
* Capacity, in values, of the currently allocated buffer that backs
* the vector. Updated each time the buffer changes. The capacity is in
* values (rather than bytes) to streamline the per-write logic.
*/
protected int capacity;
@Override
public void bindListener(ColumnWriterListener listener) {
this.listener = listener;
}
@Override
public void bindSchema(ColumnMetadata schema) {
super.bindSchema(schema);
// Set the default value, if any, from the schema.
final Object defaultValue = schema.decodeDefaultValue();
if (defaultValue != null) {
setDefaultValue(defaultValue);
}
}
/**
* All change of buffer comes through this function to allow capturing
* the buffer address and capacity. Only two ways to set the buffer:
* by binding to a vector in bindVector(), or by resizing the vector
* in prepareWrite().
*/
protected abstract void setBuffer();
protected void realloc(int size) {
vector().reallocRaw(size);
setBuffer();
}
/**
* The vector is about to grow. Give the listener a chance to
* veto the growth and opt for overflow instead.
*
* @param delta the new amount of memory to allocate
* @return true if the vector can be grown, false if an
* overflow should be triggered
*/
protected boolean canExpand(int delta) {
if (listener == null) {
return true;
} else {
return listener.canExpand(this, delta);
}
}
/**
* Handle vector overflow. If this is an array, then there is a slim chance
* we may need to grow the vector immediately after overflow. Since a double
* overflow is not allowed, this recursive call won't continue forever.
*/
protected void overflowed() {
if (listener == null) {
throw new UnsupportedOperationException("Overflow not supported");
} else {
listener.overflowed(this);
}
}
public abstract void skipNulls();
@Override
public boolean nullable() { return false; }
@Override
public void setNull() {
throw UnsupportedConversionError.nullError(schema());
}
@Override
public void setBoolean(boolean value) {
throw conversionError("boolean");
}
@Override
public void setInt(int value) {
throw conversionError("int");
}
@Override
public void setLong(long value) {
throw conversionError("long");
}
@Override
public void setFloat(float value) {
throw conversionError("float");
}
@Override
public void setDouble(double value) {
throw conversionError("double");
}
@Override
public void setString(String value) {
throw conversionError("String");
}
@Override
public void setBytes(byte[] value, int len) {
throw conversionError("bytes");
}
@Override
public void appendBytes(byte[] value, int len) {
throw conversionError("bytes");
}
@Override
public void setDecimal(BigDecimal value) {
throw conversionError("Decimal");
}
@Override
public void setPeriod(Period value) {
throw conversionError("Period");
}
@Override
public void setDate(LocalDate value) {
throw conversionError("LocalDate");
}
@Override
public void setTime(LocalTime value) {
throw conversionError("LocalTime");
}
@Override
public void setTimestamp(Instant value) {
throw conversionError("Instant");
}
@Override
public void dump(HierarchicalFormatter format) {
format.extend();
super.dump(format);
format
.attribute("vectorIndex", vectorIndex)
.attributeIdentity("listener", listener)
.attribute("capacity", capacity)
.endObject();
}
}