blob: 4030f1eb513bb19124d079418cfb8811042f96e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Arrays;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleReader;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test (non-array) map support in the result set loader and related classes.
*/
@Category(RowSetTests.class)
public class TestResultSetLoaderMaps extends SubOperatorTest {
@Test
public void testBasics() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("c", MinorType.INT)
.add("d", MinorType.VARCHAR)
.resumeSchema()
.add("e", MinorType.VARCHAR)
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertFalse(rsLoader.isProjectionEmpty());
final RowSetLoader rootWriter = rsLoader.writer();
// Verify structure and schema
assertEquals(5, rsLoader.schemaVersion());
final TupleMetadata actualSchema = rootWriter.tupleSchema();
assertEquals(3, actualSchema.size());
assertTrue(actualSchema.metadata(1).isMap());
assertEquals(2, actualSchema.metadata("m").tupleSchema().size());
assertEquals(2, actualSchema.column("m").getChildren().size());
rsLoader.startBatch();
// Write a row the way that clients will do.
final ScalarWriter aWriter = rootWriter.scalar("a");
final TupleWriter mWriter = rootWriter.tuple("m");
final ScalarWriter cWriter = mWriter.scalar("c");
final ScalarWriter dWriter = mWriter.scalar("d");
final ScalarWriter eWriter = rootWriter.scalar("e");
rootWriter.start();
aWriter.setInt(10);
cWriter.setInt(110);
dWriter.setString("fred");
eWriter.setString("pebbles");
rootWriter.save();
// Try adding a duplicate column.
try {
mWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL));
fail();
} catch (final UserException e) {
// Expected
}
// Write another using the test-time conveniences
rootWriter.addRow(20, mapValue(210, "barney"), "bam-bam");
// Harvest the batch
final RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(5, rsLoader.schemaVersion());
assertEquals(2, actual.rowCount());
final MapVector mapVector = (MapVector) actual.container().getValueVector(1).getValueVector();
assertEquals(2, mapVector.getAccessor().getValueCount());
// Validate data
final SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, mapValue(110, "fred"), "pebbles")
.addRow(20, mapValue(210, "barney"), "bam-bam")
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create schema with a map, then add columns to the map
* after delivering the first batch. The new columns should appear
* in the second-batch output.
*/
@Test
public void testMapEvolution() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(3, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
rootWriter
.addRow(10, mapValue("fred"))
.addRow(20, mapValue("barney"));
RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(3, rsLoader.schemaVersion());
assertEquals(2, actual.rowCount());
// Validate first batch
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, mapValue("fred"))
.addRow(20, mapValue("barney"))
.build();
RowSetUtilities.verify(expected, actual);
// Add three columns in the second batch. One before
// the batch starts, one before the first row, and one after
// the first row.
final TupleWriter mapWriter = rootWriter.tuple("m");
mapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.REQUIRED));
rsLoader.startBatch();
mapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.BIGINT, DataMode.REQUIRED));
rootWriter.addRow(30, mapValue("wilma", 130, 130_000L));
mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter.addRow(40, mapValue("betty", 140, 140_000L, "bam-bam"));
actual = fixture.wrap(rsLoader.harvest());
assertEquals(6, rsLoader.schemaVersion());
assertEquals(2, actual.rowCount());
// Validate first batch
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.VARCHAR)
.add("c", MinorType.INT)
.add("d", MinorType.BIGINT)
.add("e", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(30, mapValue("wilma", 130, 130_000L, ""))
.addRow(40, mapValue("betty", 140, 140_000L, "bam-bam"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Test adding a map to a loader after writing the first row.
*/
@Test
public void testMapAddition() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(1, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
// Start without the map. Add a map after the first row.
rsLoader.startBatch();
rootWriter.addRow(10);
final int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
final TupleWriter mapWriter = rootWriter.tuple(mapIndex);
// Add a column to the map with the same name as the top-level column.
// Verifies that the name spaces are independent.
final int colIndex = mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
assertEquals(0, colIndex);
// Ensure metadata was added
assertTrue(mapWriter.tupleSchema().size() == 1);
assertSame(mapWriter.tupleSchema(), mapWriter.schema().tupleSchema());
assertSame(mapWriter.tupleSchema().metadata(colIndex), mapWriter.scalar(colIndex).schema());
rootWriter
.addRow(20, mapValue("fred"))
.addRow(30, mapValue("barney"));
final RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(3, rsLoader.schemaVersion());
assertEquals(3, actual.rowCount());
final MapVector mapVector = (MapVector) actual.container().getValueVector(1).getValueVector();
final MaterializedField mapField = mapVector.getField();
assertEquals(1, mapField.getChildren().size());
assertTrue(mapWriter.scalar(colIndex).schema().schema().isEquivalent(
mapField.getChildren().iterator().next()));
// Validate first batch
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("a", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, mapValue(""))
.addRow(20, mapValue("fred"))
.addRow(30, mapValue("barney"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Test adding an empty map to a loader after writing the first row.
* Then add columns in another batch. Yes, this is a bizarre condition,
* but we must check it anyway for robustness.
*/
@Test
public void testEmptyMapAddition() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(1, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
// Start without the map. Add a map after the first row.
rsLoader.startBatch();
rootWriter.addRow(10);
final int mapIndex = rootWriter.addColumn(SchemaBuilder.columnSchema("m", MinorType.MAP, DataMode.REQUIRED));
final TupleWriter mapWriter = rootWriter.tuple(mapIndex);
rootWriter
.addRow(20, mapValue())
.addRow(30, mapValue());
RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(2, rsLoader.schemaVersion());
assertEquals(3, actual.rowCount());
// Validate first batch
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.resumeSchema()
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, mapValue())
.addRow(20, mapValue())
.addRow(30, mapValue())
.build();
RowSetUtilities.verify(expected, actual);
// Now add another column to the map
rsLoader.startBatch();
mapWriter.addColumn(SchemaBuilder.columnSchema("a", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter
.addRow(40, mapValue("fred"))
.addRow(50, mapValue("barney"));
actual = fixture.wrap(rsLoader.harvest());
assertEquals(3, rsLoader.schemaVersion());
assertEquals(2, actual.rowCount());
// Validate first batch
expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("a", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(40, mapValue("fred"))
.addRow(50, mapValue("barney"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create nested maps. Then, add columns to each map
* on the fly. Use required, variable-width columns since
* those require the most processing and are most likely to
* fail if anything is out of place.
*/
@Test
public void testNestedMapsRequired() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m1")
.add("b", MinorType.VARCHAR)
.addMap("m2")
.add("c", MinorType.VARCHAR)
.resumeMap()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(5, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
rootWriter.addRow(10, mapValue("b1", mapValue("c1")));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(5, rsLoader.schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, mapValue("b1", mapValue("c1")))
.build();
RowSetUtilities.verify(expected, actual);
// Now add columns in the second batch.
rsLoader.startBatch();
rootWriter.addRow(20, mapValue("b2", mapValue("c2")));
final TupleWriter m1Writer = rootWriter.tuple("m1");
m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
final TupleWriter m2Writer = m1Writer.tuple("m2");
m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter.addRow(30, mapValue("b3", mapValue("c3", "e3"), "d3"));
// And another set while the write proceeds.
m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.REQUIRED));
m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4"));
// Validate second batch
actual = fixture.wrap(rsLoader.harvest());
assertEquals(9, rsLoader.schemaVersion());
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m1")
.add("b", MinorType.VARCHAR)
.addMap("m2")
.add("c", MinorType.VARCHAR)
.add("e", MinorType.VARCHAR)
.add("g", MinorType.VARCHAR)
.resumeMap()
.add("d", MinorType.VARCHAR)
.add("f", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(20, mapValue("b2", mapValue("c2", "", "" ), "", "" ))
.addRow(30, mapValue("b3", mapValue("c3", "e3", "" ), "d3", "" ))
.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create nested maps. Then, add columns to each map
* on the fly. This time, with nullable types.
*/
@Test
public void testNestedMapsNullable() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m1")
.addNullable("b", MinorType.VARCHAR)
.addMap("m2")
.addNullable("c", MinorType.VARCHAR)
.resumeMap()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
rootWriter.addRow(10, mapValue("b1", mapValue("c1")));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, mapValue("b1", mapValue("c1")))
.build();
RowSetUtilities.verify(expected, actual);
// Now add columns in the second batch.
rsLoader.startBatch();
rootWriter.addRow(20, mapValue("b2", mapValue("c2")));
final TupleWriter m1Writer = rootWriter.tuple("m1");
m1Writer.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
final TupleWriter m2Writer = m1Writer.tuple("m2");
m2Writer.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.OPTIONAL));
rootWriter.addRow(30, mapValue("b3", mapValue("c3", "e3"), "d3"));
// And another set while the write proceeds.
m1Writer.addColumn(SchemaBuilder.columnSchema("f", MinorType.VARCHAR, DataMode.OPTIONAL));
m2Writer.addColumn(SchemaBuilder.columnSchema("g", MinorType.VARCHAR, DataMode.OPTIONAL));
rootWriter.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4"));
// Validate second batch
actual = fixture.wrap(rsLoader.harvest());
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m1")
.addNullable("b", MinorType.VARCHAR)
.addMap("m2")
.addNullable("c", MinorType.VARCHAR)
.addNullable("e", MinorType.VARCHAR)
.addNullable("g", MinorType.VARCHAR)
.resumeMap()
.addNullable("d", MinorType.VARCHAR)
.addNullable("f", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(20, mapValue("b2", mapValue("c2", null, null), null, null))
.addRow(30, mapValue("b3", mapValue("c3", "e3", null), "d3", null))
.addRow(40, mapValue("b4", mapValue("c4", "e4", "g4"), "d4", "e4"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Test a map that contains a scalar array. No reason to suspect that this
* will have problem as the array writer is fully tested in the accessor
* subsystem. Still, need to test the cardinality methods of the loader
* layer.
*/
@Test
public void testMapWithArray() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.addArray("c", MinorType.INT)
.addArray("d", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
// Write some rows
rsLoader.startBatch();
rootWriter
.addRow(10, mapValue(intArray(110, 120, 130),
strArray("d1.1", "d1.2", "d1.3", "d1.4")))
.addRow(20, mapValue(intArray(210), strArray()))
.addRow(30, mapValue(intArray(), strArray("d3.1")));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, mapValue(intArray(110, 120, 130),
strArray("d1.1", "d1.2", "d1.3", "d1.4")))
.addRow(20, mapValue(intArray(210), strArray()))
.addRow(30, mapValue(intArray(), strArray("d3.1")))
.build();
RowSetUtilities.verify(expected, actual);
// Add another array after the first row in the second batch.
rsLoader.startBatch();
rootWriter
.addRow(40, mapValue(intArray(410, 420), strArray("d4.1", "d4.2")))
.addRow(50, mapValue(intArray(510), strArray("d5.1")));
final TupleWriter mapWriter = rootWriter.tuple("m");
mapWriter.addColumn(SchemaBuilder.columnSchema("e", MinorType.VARCHAR, DataMode.REPEATED));
rootWriter
.addRow(60, mapValue(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2")))
.addRow(70, mapValue(intArray(710), strArray(), strArray("e7.1", "e7.2")));
// Validate first batch. The new array should have been back-filled with
// empty offsets for the missing rows.
actual = fixture.wrap(rsLoader.harvest());
expected = fixture.rowSetBuilder(actual.schema())
.addRow(40, mapValue(intArray(410, 420), strArray("d4.1", "d4.2"), strArray()))
.addRow(50, mapValue(intArray(510), strArray("d5.1"), strArray()))
.addRow(60, mapValue(intArray(610, 620), strArray("d6.1", "d6.2"), strArray("e6.1", "e6.2")))
.addRow(70, mapValue(intArray(710), strArray(), strArray("e7.1", "e7.2")))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create a schema with a map, then trigger an overflow on one of the columns
* in the map. Proper overflow handling should occur regardless of nesting
* depth.
*/
@Test
public void testMapWithOverflow() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m1")
.add("b", MinorType.INT)
.addMap("m2")
.add("c", MinorType.INT) // Before overflow, written
.add("d", MinorType.VARCHAR)
.add("e", MinorType.INT) // After overflow, not yet written
.resumeMap()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
final byte value[] = new byte[512];
Arrays.fill(value, (byte) 'X');
int count = 0;
rsLoader.startBatch();
while (! rootWriter.isFull()) {
rootWriter.addRow(count, mapValue(count * 10, mapValue(count * 100, value, count * 1000)));
count++;
}
// Our row count should include the overflow row
final int expectedCount = ValueVector.MAX_BUFFER_SIZE / value.length;
assertEquals(expectedCount + 1, count);
// Loader's row count should include only "visible" rows
assertEquals(expectedCount, rootWriter.rowCount());
// Total count should include invisible and look-ahead rows.
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
// Result should exclude the overflow row
RowSet result = fixture.wrap(rsLoader.harvest());
assertEquals(expectedCount, result.rowCount());
// Ensure the odd map vector value count variable is set correctly.
final MapVector m1Vector = (MapVector) result.container().getValueVector(1).getValueVector();
assertEquals(expectedCount, m1Vector.getAccessor().getValueCount());
final MapVector m2Vector = (MapVector) m1Vector.getChildByOrdinal(1);
assertEquals(expectedCount, m2Vector.getAccessor().getValueCount());
result.clear();
// Next batch should start with the overflow row
rsLoader.startBatch();
assertEquals(1, rootWriter.rowCount());
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
result = fixture.wrap(rsLoader.harvest());
assertEquals(1, result.rowCount());
result.clear();
rsLoader.close();
}
/**
* Test the case in which a new column is added during the overflow row. Unlike
* the top-level schema case, internally we must create a copy of the map, and
* move vectors across only when the result is to include the schema version
* of the target column. For overflow, the new column is added after the
* first batch; it is added in the second batch that contains the overflow
* row in which the column was added.
*/
@Test
public void testMapOverflowWithNewColumn() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.INT)
.add("c", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(4, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
// Can't use the shortcut to populate rows when doing a schema
// change.
final ScalarWriter aWriter = rootWriter.scalar("a");
final TupleWriter mWriter = rootWriter.tuple("m");
final ScalarWriter bWriter = mWriter.scalar("b");
final ScalarWriter cWriter = mWriter.scalar("c");
final byte value[] = new byte[512];
Arrays.fill(value, (byte) 'X');
int count = 0;
rsLoader.startBatch();
while (! rootWriter.isFull()) {
rootWriter.start();
aWriter.setInt(count);
bWriter.setInt(count * 10);
cWriter.setBytes(value, value.length);
if (rootWriter.isFull()) {
// Overflow just occurred. Add another column.
mWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.INT, DataMode.OPTIONAL));
mWriter.scalar("d").setInt(count * 100);
}
rootWriter.save();
count++;
}
// Result set should include the original columns, but not d.
RowSet result = fixture.wrap(rsLoader.harvest());
assertEquals(4, rsLoader.schemaVersion());
assertTrue(schema.isEquivalent(result.schema()));
final BatchSchema expectedSchema = new BatchSchema(SelectionVectorMode.NONE, schema.toFieldList());
assertTrue(expectedSchema.isEquivalent(result.batchSchema()));
// Use a reader to validate row-by-row. Too large to create an expected
// result set.
RowSetReader reader = result.reader();
TupleReader mapReader = reader.tuple("m");
int rowId = 0;
while (reader.next()) {
assertEquals(rowId, reader.scalar("a").getInt());
assertEquals(rowId * 10, mapReader.scalar("b").getInt());
assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
rowId++;
}
result.clear();
// Next batch should start with the overflow row
rsLoader.startBatch();
assertEquals(1, rootWriter.rowCount());
result = fixture.wrap(rsLoader.harvest());
assertEquals(1, result.rowCount());
reader = result.reader();
mapReader = reader.tuple("m");
while (reader.next()) {
assertEquals(rowId, reader.scalar("a").getInt());
assertEquals(rowId * 10, mapReader.scalar("b").getInt());
assertTrue(Arrays.equals(value, mapReader.scalar("c").getBytes()));
assertEquals(rowId * 100, mapReader.scalar("d").getInt());
}
result.clear();
rsLoader.close();
}
/**
* Version of the {#link TestResultSetLoaderProtocol#testOverwriteRow()} test
* that uses nested columns.
*/
@Test
public void testOverwriteRow() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.INT)
.add("c", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
// Can't use the shortcut to populate rows when doing overwrites.
final ScalarWriter aWriter = rootWriter.scalar("a");
final TupleWriter mWriter = rootWriter.tuple("m");
final ScalarWriter bWriter = mWriter.scalar("b");
final ScalarWriter cWriter = mWriter.scalar("c");
// Write 100,000 rows, overwriting 99% of them. This will cause vector
// overflow and data corruption if overwrite does not work; but will happily
// produce the correct result if everything works as it should.
final byte value[] = new byte[512];
Arrays.fill(value, (byte) 'X');
int count = 0;
rsLoader.startBatch();
while (count < 100_000) {
rootWriter.start();
count++;
aWriter.setInt(count);
bWriter.setInt(count * 10);
cWriter.setBytes(value, value.length);
if (count % 100 == 0) {
rootWriter.save();
}
}
// Verify using a reader.
final RowSet result = fixture.wrap(rsLoader.harvest());
assertEquals(count / 100, result.rowCount());
final RowSetReader reader = result.reader();
final TupleReader mReader = reader.tuple("m");
int rowId = 1;
while (reader.next()) {
assertEquals(rowId * 100, reader.scalar("a").getInt());
assertEquals(rowId * 1000, mReader.scalar("b").getInt());
assertTrue(Arrays.equals(value, mReader.scalar("c").getBytes()));
rowId++;
}
result.clear();
rsLoader.close();
}
/**
* Verify that map name spaces (and implementations) are
* independent.
*/
@Test
public void testNameSpace() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("a", MinorType.INT)
.addMap("m")
.add("a", MinorType.INT)
.resumeMap()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertFalse(rsLoader.isProjectionEmpty());
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
// Write a row the way that clients will do.
final ScalarWriter a1Writer = rootWriter.scalar("a");
final TupleWriter m1Writer = rootWriter.tuple("m");
final ScalarWriter a2Writer = m1Writer.scalar("a");
final TupleWriter m2Writer = m1Writer.tuple("m");
final ScalarWriter a3Writer = m2Writer.scalar("a");
rootWriter.start();
a1Writer.setInt(11);
a2Writer.setInt(12);
a3Writer.setInt(13);
rootWriter.save();
rootWriter.start();
a1Writer.setInt(21);
a2Writer.setInt(22);
a3Writer.setInt(23);
rootWriter.save();
// Try simplified test format
rootWriter.addRow(31,
mapValue(32,
mapValue(33)));
// Verify
final RowSet actual = fixture.wrap(rsLoader.harvest());
final SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(11, mapValue(12, mapValue(13)))
.addRow(21, mapValue(22, mapValue(23)))
.addRow(31, mapValue(32, mapValue(33)))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
}