blob: 3d1af8719a40b40a717bc11560311ec91fb2cb3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import static org.apache.drill.test.rowSet.RowSetUtilities.intArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.map;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.validate.BatchValidator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.DictColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Arrays;
/**
* Test (non-array) dict support in the result set loader and related classes.
*/
@Category(RowSetTests.class)
public class TestResultSetLoaderDicts extends SubOperatorTest {
@Test
public void testBasics() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.INT)
.value(MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertFalse(rsLoader.isProjectionEmpty());
final RowSetLoader rootWriter = rsLoader.writer();
// Verify structure and schema
assertEquals(4, rsLoader.schemaVersion());
final TupleMetadata actualSchema = rootWriter.tupleSchema();
assertEquals(2, actualSchema.size());
assertTrue(actualSchema.metadata(1).isDict());
assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
assertEquals(2, actualSchema.column("d").getChildren().size());
rsLoader.startBatch();
// Write a row the way that clients will do.
final ScalarWriter aWriter = rootWriter.scalar("a");
final DictWriter dictWriter = rootWriter.dict("d");
final ScalarWriter keyWriter = dictWriter.keyWriter();
final ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
rootWriter.start();
aWriter.setInt(10);
keyWriter.setInt(110);
valueWriter.setString("fred");
dictWriter.save();
keyWriter.setInt(111);
valueWriter.setString("george");
dictWriter.save();
rootWriter.save();
// Write another using the test-time conveniences
rootWriter.addRow(20, map(210, "barney", 211, "bart", 212, "jerry"));
// Harvest the batch
final RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(4, rsLoader.schemaVersion());
assertEquals(2, actual.rowCount());
final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
assertEquals(2, dictVector.getAccessor().getValueCount());
// Validate data
final SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, map(110, "fred", 111, "george"))
.addRow(20, map(210, "barney", 211, "bart", 212, "jerry"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Test adding a dict to a loader after writing the first row.
*/
@Test
public void testDictAddition() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(1, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
// Start without the dict. Add then add a dict after the first row.
rsLoader.startBatch();
rootWriter.addRow(10);
MaterializedField dictField = SchemaBuilder.columnSchema("d", MinorType.DICT, DataMode.REQUIRED);
dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_KEY_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
dictField.addChild(SchemaBuilder.columnSchema(DictVector.FIELD_VALUE_NAME, MinorType.VARCHAR, DataMode.REQUIRED));
DictColumnMetadata dictMetadata = MetadataUtils.newDict(dictField);
final int dictIndex = rootWriter.addColumn(dictMetadata);
final DictWriter dictWriter = rootWriter.dict(dictIndex);
// Ensure metadata was added
final TupleMetadata actualSchema = rootWriter.tupleSchema();
assertTrue(actualSchema.metadata(1).isDict());
assertEquals(2, actualSchema.metadata("d").tupleSchema().size());
assertEquals(2, actualSchema.column("d").getChildren().size());
assertEquals(2, actualSchema.size());
assertEquals(2, dictWriter.schema().tupleSchema().size());
rootWriter.addRow(20, map("name", "fred", "lastname", "smith"))
.addRow(30, map("name", "barney", "lastname", "johnson"));
final RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(4, rsLoader.schemaVersion());
assertEquals(3, actual.rowCount());
final DictVector dictVector = (DictVector) actual.container().getValueVector(1).getValueVector();
assertEquals(2, dictVector.getField().getChildren().size());
// Validate first batch
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.VARCHAR)
.value(MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, map())
.addRow(20, map("name", "fred", "lastname", "smith"))
.addRow(30, map("name", "barney", "lastname", "johnson"))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create dict with map value. Then, add columns to the map
* on the fly. Use required, variable-width columns since
* those require the most processing and are most likely to
* fail if anything is out of place.
*/
@Test
public void testMapValueRequiredFields() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.VARCHAR)
.mapValue()
.add("b", MinorType.VARCHAR)
.resumeDict()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(5, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(5, rsLoader.schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
.build();
RowSetUtilities.verify(expected, actual);
// Now add columns in the second batch.
rsLoader.startBatch();
rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
final DictWriter dictWriter = rootWriter.dict("d");
final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
// And another set while the write proceeds.
nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.REQUIRED));
rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
// Validate second batch
actual = fixture.wrap(rsLoader.harvest());
assertEquals(7, rsLoader.schemaVersion());
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.VARCHAR)
.mapValue()
.add("b", MinorType.VARCHAR)
.add("c", MinorType.VARCHAR)
.add("d", MinorType.VARCHAR)
.resumeDict()
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(20, map("a2", mapValue("c11", "", ""), "b2", mapValue("c12", "", ""), "c2", mapValue("c13", "", "")))
.addRow(30, map("a3", mapValue("c21", "d21", "")))
.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
/**
* Create dict with map value. Then, add columns to the map
* on the fly. Use required, variable-width columns since
* those require the most processing and are most likely to
* fail if anything is out of place.
*/
@Test
public void testMapValueNullableFields() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.VARCHAR)
.mapValue()
.addNullable("b", MinorType.VARCHAR)
.resumeDict()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
assertEquals(5, rsLoader.schemaVersion());
final RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
rootWriter.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
assertEquals(5, rsLoader.schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, map("a", mapValue("c1"), "b", mapValue("c2")))
.build();
RowSetUtilities.verify(expected, actual);
// Now add columns in the second batch.
rsLoader.startBatch();
rootWriter.addRow(20, map("a2", mapValue("c11"), "b2", mapValue("c12"), "c2", mapValue("c13")));
final DictWriter dictWriter = rootWriter.dict("d");
final TupleWriter nestedMapWriter = dictWriter.valueWriter().tuple();
nestedMapWriter.addColumn(SchemaBuilder.columnSchema("c", MinorType.VARCHAR, DataMode.OPTIONAL));
rootWriter.addRow(30, map("a3", mapValue("c21", "d21")));
// And another set while the write proceeds.
nestedMapWriter.addColumn(SchemaBuilder.columnSchema("d", MinorType.VARCHAR, DataMode.OPTIONAL));
rootWriter.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")));
// Validate second batch
actual = fixture.wrap(rsLoader.harvest());
assertEquals(7, rsLoader.schemaVersion());
final TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.VARCHAR)
.mapValue()
.addNullable("b", MinorType.VARCHAR)
.addNullable("c", MinorType.VARCHAR)
.addNullable("d", MinorType.VARCHAR)
.resumeDict()
.resumeSchema()
.buildSchema();
expected = fixture.rowSetBuilder(expectedSchema)
.addRow(20, map("a2", mapValue("c11", null, null), "b2", mapValue("c12", null, null), "c2", mapValue("c13", null, null)))
.addRow(30, map("a3", mapValue("c21", "d21", null)))
.addRow(40, map("a4", mapValue("c31", "d31", "e31"), "b4", mapValue("c32", "d32", "e32")))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
@Test
public void testArrayValue() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.INT)
.repeatedValue(MinorType.INT)
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
// Write some rows
rsLoader.startBatch();
rootWriter
.addRow(10, map(
1, intArray(110, 120, 130),
2, intArray(111, 121)
))
.addRow(20, map())
.addRow(30, map(
1, intArray(),
2, intArray(310, 320),
3, intArray(311, 321, 331, 341),
4, intArray(312, 322, 332)
));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, map(
1, intArray(110, 120, 130),
2, intArray(111, 121)
))
.addRow(20, map())
.addRow(30, map(
1, intArray(),
2, intArray(310, 320),
3, intArray(311, 321, 331, 341),
4, intArray(312, 322, 332)
))
.build();
RowSetUtilities.verify(expected, actual);
// Add another rows in the second batch.
rsLoader.startBatch();
rootWriter
.addRow(40, map(1, intArray(410, 420)))
.addRow(50, map(1, intArray(510), 2, intArray(511, 531)));
// Validate first batch. The new array should have been back-filled with
// empty offsets for the missing rows.
actual = fixture.wrap(rsLoader.harvest());
expected = fixture.rowSetBuilder(actual.schema())
.addRow(40, map(1, intArray(410, 420)))
.addRow(50, map(1, intArray(510), 2, intArray(511, 531)))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
@Test
public void testDictValue() {
final TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.addDict("d", MinorType.INT)
.dictValue()
.key(MinorType.INT)
.nullableValue(MinorType.VARCHAR)
.resumeDict()
.resumeSchema()
.buildSchema();
final ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setSchema(schema)
.build();
final ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
final RowSetLoader rootWriter = rsLoader.writer();
// Write some rows
rsLoader.startBatch();
rootWriter
.addRow(10, map(
1, map(1, "a", 2, "b", 4, "c"),
2, map(2, "a2", 1, "c2")
))
.addRow(20, map())
.addRow(30, map(
1, map(),
2, map(1, "a3"),
3, map(2, "b4", 4, "n4", 1, null),
4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
));
// Validate first batch
RowSet actual = fixture.wrap(rsLoader.harvest());
SingleRowSet expected = fixture.rowSetBuilder(schema)
.addRow(10, map(
1, map(1, "a", 2, "b", 4, "c"),
2, map(2, "a2", 1, "c2")
))
.addRow(20, map())
.addRow(30, map(
1, map(),
2, map(1, "a3"),
3, map(2, "b4", 4, "n4", 1, null),
4, map(3, "m5", 1, "a5", 2, "c5", 8, "m5", 21, "h5")
))
.build();
RowSetUtilities.verify(expected, actual);
// Add another rows in the second batch.
rsLoader.startBatch();
rootWriter
.addRow(40, map(
1, map(1, "j6", 0, "k6"))
)
.addRow(50, map(
1, map(2, "l7"),
2, map(1, "o8", 5, "p8", 7, "u8")
));
// Validate first batch. The new dict should have been back-filled with
// empty offsets for the missing rows.
actual = fixture.wrap(rsLoader.harvest());
expected = fixture.rowSetBuilder(actual.schema())
.addRow(40, map(
1, map(1, "j6", 0, "k6"))
)
.addRow(50, map(
1, map(2, "l7"),
2, map(1, "o8", 5, "p8", 7, "u8")
))
.build();
RowSetUtilities.verify(expected, actual);
rsLoader.close();
}
@Test
public void testKeyOverflow() {
TupleMetadata schema = new SchemaBuilder()
.addDict("d", MinorType.VARCHAR)
.value(MinorType.INT)
.resumeSchema()
.buildSchema();
ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.setSchema(schema)
.build();
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
byte[] key = new byte[523];
Arrays.fill(key, (byte) 'X');
int dictSize = 4; // number of entries in each dict
// Number of rows should be driven by vector size.
// Our row count should include the overflow row
DictWriter dictWriter = rootWriter.dict(0);
ScalarWriter keyWriter = dictWriter.keyWriter();
ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
int expectedCount = ValueVector.MAX_BUFFER_SIZE / (key.length * dictSize);
{
int count = 0;
while (! rootWriter.isFull()) {
rootWriter.start();
for (int i = 0; i < dictSize; i++) {
keyWriter.setBytes(key, key.length);
valueWriter.setInt(0); // acts as a placeholder, the actual value is not important
dictWriter.save(); // not necessary for scalars, just for completeness
}
rootWriter.save();
count++;
}
assertEquals(expectedCount + 1, count);
// Loader's row count should include only "visible" rows
assertEquals(expectedCount, rootWriter.rowCount());
// Total count should include invisible and look-ahead rows.
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
// Result should exclude the overflow row
VectorContainer container = rsLoader.harvest();
BatchValidator.validate(container);
RowSet result = fixture.wrap(container);
assertEquals(expectedCount, result.rowCount());
result.clear();
}
// Next batch should start with the overflow row
{
rsLoader.startBatch();
assertEquals(1, rootWriter.rowCount());
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
VectorContainer container = rsLoader.harvest();
BatchValidator.validate(container);
RowSet result = fixture.wrap(container);
assertEquals(1, result.rowCount());
result.clear();
}
rsLoader.close();
}
@Test
public void testValueOverflow() {
TupleMetadata schema = new SchemaBuilder()
.addDict("d", MinorType.INT)
.value(MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
.setRowCountLimit(ValueVector.MAX_ROW_COUNT)
.setSchema(schema)
.build();
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
RowSetLoader rootWriter = rsLoader.writer();
rsLoader.startBatch();
byte[] value = new byte[523];
Arrays.fill(value, (byte) 'X');
int dictSize = 4; // number of entries in each dict
// Number of rows should be driven by vector size.
// Our row count should include the overflow row
DictWriter dictWriter = rootWriter.dict(0);
ScalarWriter keyWriter = dictWriter.keyWriter();
ScalarWriter valueWriter = dictWriter.valueWriter().scalar();
int expectedCount = ValueVector.MAX_BUFFER_SIZE / (value.length * dictSize);
{
int count = 0;
while (! rootWriter.isFull()) {
rootWriter.start();
for (int i = 0; i < dictSize; i++) {
keyWriter.setInt(0); // acts as a placeholder, the actual value is not important
valueWriter.setBytes(value, value.length);
dictWriter.save(); // not necessary for scalars, just for completeness
}
rootWriter.save();
count++;
}
assertEquals(expectedCount + 1, count);
// Loader's row count should include only "visible" rows
assertEquals(expectedCount, rootWriter.rowCount());
// Total count should include invisible and look-ahead rows.
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
// Result should exclude the overflow row
VectorContainer container = rsLoader.harvest();
BatchValidator.validate(container);
RowSet result = fixture.wrap(container);
assertEquals(expectedCount, result.rowCount());
result.clear();
}
// Next batch should start with the overflow row
{
rsLoader.startBatch();
assertEquals(1, rootWriter.rowCount());
assertEquals(expectedCount + 1, rsLoader.totalRowCount());
VectorContainer container = rsLoader.harvest();
BatchValidator.validate(container);
RowSet result = fixture.wrap(container);
assertEquals(1, result.rowCount());
result.clear();
}
rsLoader.close();
}
}