blob: e3d5a70b49337596ba7619c8df43100801e2e9ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.project;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBuilderBuilder;
import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
import org.apache.drill.exec.physical.resultSet.ResultVectorCache;
import org.apache.drill.exec.physical.resultSet.impl.NullResultVectorCacheImpl;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import io.netty.buffer.DrillBuf;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
import org.apache.drill.categories.RowSetTest;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
/**
* Test the row batch merger by merging two batches. Tests both the
* "direct" and "exchange" cases. Direct means that the output container
* contains the source vector directly: they are the same vectors.
* Exchange means we have two vectors, but we swap the underlying
* Drillbufs to effectively shift data from source to destination
* vector.
*/
@Category(RowSetTest.class)
public class TestRowBatchMerger extends SubOperatorTest {
public static class RowSetSource implements VectorSource {
private SingleRowSet rowSet;
public RowSetSource(SingleRowSet rowSet) {
this.rowSet = rowSet;
}
public RowSet rowSet() { return rowSet; }
public void clear() {
rowSet.clear();
}
@Override
public ValueVector vector(int index) {
return rowSet.container().getValueVector(index).getValueVector();
}
}
public static final int SLAB_SIZE = 16 * 1024 * 1024;
@BeforeClass
public static void setup() {
// Party on 10 16MB blocks of memory to detect vector issues
DrillBuf bufs[] = new DrillBuf[10];
for (int i = 0; i < bufs.length; i++) {
bufs[i] = fixture.allocator().buffer(SLAB_SIZE);
for (int j = 0; j < SLAB_SIZE / 4; j++) {
bufs[i].setInt(j * 4, 0xDEADBEEF);
}
}
for (int i = 0; i < bufs.length; i++) {
bufs[i].release();
}
}
private RowSetSource makeFirst() {
TupleMetadata firstSchema = new SchemaBuilder()
.add("d", MinorType.VARCHAR)
.add("a", MinorType.INT)
.buildSchema();
return new RowSetSource(
fixture.rowSetBuilder(firstSchema)
.addRow("barney", 10)
.addRow("wilma", 20)
.build());
}
private RowSetSource makeSecond() {
TupleMetadata secondSchema = new SchemaBuilder()
.add("b", MinorType.INT)
.add("c", MinorType.VARCHAR)
.buildSchema();
return new RowSetSource(
fixture.rowSetBuilder(secondSchema)
.addRow(1, "foo.csv")
.addRow(2, "foo.csv")
.build());
}
public static class TestProjection extends ResolvedColumn {
public TestProjection(VectorSource source, int sourceIndex) {
super(source, sourceIndex);
}
@Override
public String name() { return null; }
@Override
public MaterializedField schema() { return null; }
}
@Test
public void testSimpleFlat() {
// Create the first batch
RowSetSource first = makeFirst();
// Create the second batch
RowSetSource second = makeSecond();
ResolvedRow resolvedTuple = new ResolvedRow(null);
resolvedTuple.add(new TestProjection(first, 1));
resolvedTuple.add(new TestProjection(second, 0));
resolvedTuple.add(new TestProjection(second, 1));
resolvedTuple.add(new TestProjection(first, 0));
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(null, output);
output.setRecordCount(first.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.INT)
.add("c", MinorType.VARCHAR)
.add("d", MinorType.VARCHAR)
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, 1, "foo.csv", "barney")
.addRow(20, 2, "foo.csv", "wilma")
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
}
@Test
public void testImplicitFlat() {
// Create the first batch
RowSetSource first = makeFirst();
// Create the second batch
RowSetSource second = makeSecond();
ResolvedRow resolvedTuple = new ResolvedRow(null);
resolvedTuple.add(new TestProjection(resolvedTuple, 1));
resolvedTuple.add(new TestProjection(second, 0));
resolvedTuple.add(new TestProjection(second, 1));
resolvedTuple.add(new TestProjection(resolvedTuple, 0));
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(first.rowSet().container(), output);
output.setRecordCount(first.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.INT)
.add("c", MinorType.VARCHAR)
.add("d", MinorType.VARCHAR)
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, 1, "foo.csv", "barney")
.addRow(20, 2, "foo.csv", "wilma")
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
}
@Test
public void testFlatWithNulls() {
// Create the first batch
RowSetSource first = makeFirst();
// Create null columns
NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow resolvedTuple = new ResolvedRow(builder);
resolvedTuple.add(new TestProjection(resolvedTuple, 1));
resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
resolvedTuple.add(new TestProjection(resolvedTuple, 0));
// Build the null values
ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
builder.build(cache);
builder.load(first.rowSet().rowCount());
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(first.rowSet().container(), output);
output.setRecordCount(first.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("null1", MinorType.INT)
.addNullable("null2", MinorType.VARCHAR)
.add("d", MinorType.VARCHAR)
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, null, null, "barney")
.addRow(20, null, null, "wilma")
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
builder.close();
}
/**
* Test the ability to create maps from whole cloth if requested in
* the projection list, and the map is not available from the data
* source.
*/
@Test
public void testNullMaps() {
// Create the first batch
RowSetSource first = makeFirst();
// Create null columns
NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow resolvedTuple = new ResolvedRow(builder);
resolvedTuple.add(new TestProjection(resolvedTuple, 1));
ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
ResolvedTuple nullMap = nullMapCol.members();
nullMap.add(nullMap.nullBuilder().add("null1"));
nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
ResolvedTuple nullMap2 = nullMapCol2.members();
nullMap2.add(nullMap2.nullBuilder().add("null3"));
nullMap.add(nullMapCol2);
resolvedTuple.add(nullMapCol);
resolvedTuple.add(new TestProjection(resolvedTuple, 0));
// Build the null values
ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
resolvedTuple.buildNulls(cache);
// LoadNulls
resolvedTuple.loadNulls(first.rowSet().rowCount());
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(first.rowSet().container(), output);
resolvedTuple.setRowCount(first.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("map1")
.addNullable("null1", MinorType.INT)
.addNullable("null2", MinorType.VARCHAR)
.addMap("map2")
.addNullable("null3", MinorType.INT)
.resumeMap()
.resumeSchema()
.add("d", MinorType.VARCHAR)
.buildSchema();
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, mapValue(null, null, singleMap(null)), "barney")
.addRow(20, mapValue(null, null, singleMap(null)), "wilma")
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
resolvedTuple.close();
}
/**
* Test that the merger mechanism can rewrite a map to include
* projected null columns.
*/
@Test
public void testMapRevision() {
// Create the first batch
TupleMetadata inputSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.addMap("a")
.add("c", MinorType.INT)
.resumeSchema()
.buildSchema();
RowSetSource input = new RowSetSource(
fixture.rowSetBuilder(inputSchema)
.addRow("barney", singleMap(10))
.addRow("wilma", singleMap(20))
.build());
// Create mappings
NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow resolvedTuple = new ResolvedRow(builder);
resolvedTuple.add(new TestProjection(resolvedTuple, 0));
ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
inputSchema.column(1), 1);
resolvedTuple.add(mapCol);
ResolvedTuple map = mapCol.members();
map.add(new TestProjection(map, 0));
map.add(map.nullBuilder().add("null1"));
// Build the null values
ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
resolvedTuple.buildNulls(cache);
// LoadNulls
resolvedTuple.loadNulls(input.rowSet().rowCount());
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(input.rowSet().container(), output);
output.setRecordCount(input.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.addMap("a")
.add("c", MinorType.INT)
.addNullable("null1", MinorType.INT)
.resumeSchema()
.buildSchema();
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow("barney", mapValue(10, null))
.addRow("wilma", mapValue(20, null))
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
}
/**
* Test that the merger mechanism can rewrite a map array to include
* projected null columns.
*/
@Test
public void testMapArrayRevision() {
// Create the first batch
TupleMetadata inputSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.addMapArray("a")
.add("c", MinorType.INT)
.resumeSchema()
.buildSchema();
RowSetSource input = new RowSetSource(
fixture.rowSetBuilder(inputSchema)
.addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
.addRow("wilma", mapArray(singleMap(20), singleMap(21)))
.build());
// Create mappings
NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow resolvedTuple = new ResolvedRow(builder);
resolvedTuple.add(new TestProjection(resolvedTuple, 0));
ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
inputSchema.column(1), 1);
resolvedTuple.add(mapCol);
ResolvedTuple map = mapCol.members();
map.add(new TestProjection(map, 0));
map.add(map.nullBuilder().add("null1"));
// Build the null values
ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
resolvedTuple.buildNulls(cache);
// LoadNulls
resolvedTuple.loadNulls(input.rowSet().rowCount());
// Do the merge
VectorContainer output = new VectorContainer(fixture.allocator());
resolvedTuple.project(input.rowSet().container(), output);
output.setRecordCount(input.rowSet().rowCount());
RowSet result = fixture.wrap(output);
// Verify
TupleMetadata expectedSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.addMapArray("a")
.add("c", MinorType.INT)
.addNullable("null1", MinorType.INT)
.resumeSchema()
.buildSchema();
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow("barney", mapArray(
mapValue(10, null), mapValue(11, null), mapValue(12, null)))
.addRow("wilma", mapArray(
mapValue(20, null), mapValue(21, null)))
.build();
new RowSetComparison(expected)
.verifyAndClearAll(result);
}
}