blob: a4884a14550adc5f5f75af1ac04cad6e6ebaf961 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.rowSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.RowSetTest;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.rowSet.TestFixedWidthWriter.TestIndex;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
import org.apache.drill.test.SubOperatorTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* The offset vector writer is unique: it follows the same API as
* the other writers, but has a unique twist because offsets are written
* into the slot one after the other vectors. That is, if we are writing
* row 5, the offset vector writer writes to position 6. This is done to
* write the end offset of row 5 as the start offset of row 6. (It does,
* however, waste space as we need twice the number of elements in the
* offset vector as other vectors when writing power-of-two record
* counts.)
*/
@Category(RowSetTest.class)
public class TestOffsetVectorWriter extends SubOperatorTest {
/**
* Party over enough memory that the uninitialized nature of
* vectors under the new writers will cause test to fail if
* the writer's don't correctly fill in all values.
*/
@BeforeClass
public static void setup() {
fixture.dirtyMemory(100);
}
/**
* Basic test to write a contiguous set of offsets, enough to cause
* the vector to double in size twice, then read back the values.
*/
@Test
public void testWrite() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
// Start write sets initial position to 0.
writer.startWrite();
assertEquals(0, vector.getAccessor().get(0));
// Pretend to write offsets for values of width 10. We write
// the end position of each field.
// Write enough that the vector is resized.
long origAddr = vector.getBuffer().addr();
for (int i = 0; i < 3000; i++) {
index.index = i;
writer.startRow();
assertEquals(i * 10, writer.nextOffset());
writer.setNextOffset((i+1) * 10);
assertEquals((i+1) * 10, writer.nextOffset());
writer.saveRow();
}
writer.endWrite();
// Should have been reallocated.
assertNotEquals(origAddr, vector.getBuffer().addr());
// Verify values
for (int i = 0; i < 3001; i++) {
assertEquals(i * 10, vector.getAccessor().get(i));
}
}
}
@Test
public void testRestartRow() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
writer.startWrite();
// Write rows, rewriting every other row.
writer.startRow();
index.index = 0;
for (int i = 0; i < 50; i++) {
if (i % 2 == 0) {
assertEquals(i == 0 ? 0 : (i - 1) * 10, writer.nextOffset());
writer.setNextOffset((i + 1) * 10);
writer.saveRow();
writer.startRow();
index.index++;
} else {
writer.setNextOffset((i + 1) * 10);
writer.restartRow();
}
}
writer.endWrite();
// Verify values
assertEquals(0, vector.getAccessor().get(0));
for (int i = 1; i < 25; i++) {
assertEquals((2 * i - 1) * 10, vector.getAccessor().get(i));
}
}
}
/**
* Offset vectors have specific behavior when back-filling missing values:
* the last offset must be carried forward into the missing slots. The
* slots cannot be zero-filled, or entries will end up with a negative
* length.
*/
@Test
public void testFillEmpties() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
writer.startWrite();
// Pretend to write offsets for values of width 10, but
// skip four out of five values, forcing backfill.
// The loop will cause the vector to double in size.
// The number of values is odd, forcing the writer to
// back-fill at the end as well as between values.
long origAddr = vector.getBuffer().addr();
for (int i = 5; i < 3001; i += 5) {
index.index = i;
writer.startRow();
int startOffset = writer.nextOffset();
assertEquals((i/5 - 1) * 10, startOffset);
writer.setNextOffset(startOffset + 10);
writer.saveRow();
}
index.index = 3003;
writer.endWrite();
// Should have been reallocated.
assertNotEquals(origAddr, vector.getBuffer().addr());
// Verify values
for (int i = 0; i < 3004; i++) {
assertEquals(((i-1)/5) * 10, vector.getAccessor().get(i));
}
}
}
/**
* The rollover method is used during vector overflow.
*/
@Test
public void testRollover() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
writer.startWrite();
// Simulate doing an overflow of ten values.
for (int i = 0; i < 10; i++) {
index.index = i;
writer.startRow();
writer.setNextOffset((i+1) * 10);
writer.saveRow();
}
// Overflow occurs after writing the 11th row
index.index = 10;
writer.startRow();
writer.setNextOffset(110);
// Overflow occurs
writer.preRollover();
// Simulate rollover
for (int i = 0; i < 15; i++) {
vector.getMutator().set(i, 0xdeadbeef);
}
// Simulate shifting the last value down (which changes
// the offset.)
vector.getMutator().set(1, 10);
// Post rollover, slot 0 should be initialized
writer.postRollover();
index.index = 0;
writer.saveRow();
// Simulate resuming with a few more values.
for (int i = 1; i < 5; i++) {
index.index = i;
writer.startRow();
writer.setNextOffset((i + 1) * 10);
writer.saveRow();
}
writer.endWrite();
// Verify the results
for (int i = 0; i < 6; i++) {
assertEquals(i * 10, vector.getAccessor().get(i));
}
}
}
/**
* Simulate the case in which the tail end of an overflow
* batch has empties. <tt>preRollover()</tt> should back-fill
* them with the next offset prior to rollover.
*/
@Test
public void testRolloverWithEmpties() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
writer.startWrite();
// Simulate doing an overflow of 15 values,
// of which 5 are empty.
for (int i = 0; i < 10; i++) {
index.index = i;
writer.startRow();
writer.setNextOffset((i+1) * 10);
writer.saveRow();
}
for (int i = 10; i < 15; i++) {
index.index = i;
writer.startRow();
writer.saveRow();
}
// Overflow occurs before writing the 16th row
index.index = 15;
writer.startRow();
// Overflow occurs. This should fill empty offsets.
writer.preRollover();
// Verify the first "batch" results
for (int i = 0; i < 11; i++) {
assertEquals("i = " + i, i * 10, vector.getAccessor().get(i));
}
for (int i = 11; i < 16; i++) {
assertEquals("i = " + i, 100, vector.getAccessor().get(i));
}
// Simulate rollover
for (int i = 0; i < 20; i++) {
vector.getMutator().set(i, 0xdeadbeef);
}
// Simulate finishing the overflow row.
index.index++;
// Post rollover, slot 0 should be initialized.
// This is a rollover. This row must set the value
// for the new row 0 (which was presumably set/filled
// after the overflow.)
writer.postRollover();
index.index = 0;
writer.setNextOffset(0);
writer.saveRow();
// Skip more values.
for (int i = 1; i < 5; i++) {
index.index = i;
writer.startRow();
writer.saveRow();
}
// Simulate resuming with a few more values.
for (int i = 5; i < 10; i++) {
index.index = i;
writer.startRow();
writer.setNextOffset((i - 4) * 10);
writer.saveRow();
}
writer.endWrite();
// Verify the results
for (int i = 0; i < 6; i++) {
assertEquals("Index + " + i, 0, vector.getAccessor().get(i));
}
for (int i = 6; i < 11; i++) {
assertEquals("Index + " + i, (i - 5) * 10, vector.getAccessor().get(i));
}
}
}
/**
* Test resize monitoring. Add a listener to an offsets writer,
* capture each resize, and refuse a resize when the number
* of ints exceeds 8K values. This will trigger an overflow,
* which will throw an exception which we then check for.
*/
@Test
public void testSizeLimit() {
try (UInt4Vector vector = allocVector(1000)) {
TestIndex index = new TestIndex();
OffsetVectorWriterImpl writer = makeWriter(vector, index);
writer.bindListener(new ColumnWriterListener() {
int totalAlloc = 4096;
@Override
public void overflowed(ScalarWriter writer) {
throw new IllegalStateException("overflow called");
}
@Override
public boolean canExpand(ScalarWriter writer, int delta) {
totalAlloc += delta;
return totalAlloc < 16_384 * 4;
}
});
writer.startWrite();
try {
for (int i = 0;; i++ ) {
index.index = i;
writer.startRow();
writer.setNextOffset(i);
writer.saveRow();
}
}
catch(IllegalStateException e) {
assertTrue(e.getMessage().contains("overflow called"));
}
// Should have failed on 8191, which doubled vector
// to 16K, which was rejected. Note the 8191 value,
// because offsets are one ahead of the index.
assertEquals(8191, index.index);
}
}
private UInt4Vector allocVector(int size) {
MaterializedField field = SchemaBuilder.columnSchema("x", MinorType.UINT4,
DataMode.REQUIRED);
UInt4Vector vector = new UInt4Vector(field, fixture.allocator());
vector.allocateNew(size);
// Party on the bytes of the vector so we start dirty
for (int i = 0; i < size; i++) {
vector.getMutator().set(i, 0xdeadbeef);
}
assertNotEquals(0, vector.getAccessor().get(0));
return vector;
}
private OffsetVectorWriterImpl makeWriter(UInt4Vector vector, TestIndex index) {
OffsetVectorWriterImpl writer = new OffsetVectorWriterImpl(vector);
writer.bindIndex(index);
assertEquals(ValueType.INTEGER, writer.valueType());
return writer;
}
}