blob: 7b14c41b96faae658a09cc11f037d08e4f6bc70f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleNameSpace;
import org.apache.drill.exec.vector.accessor.ValueWriter;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the addition of an output schema to a reader. The output schema
* defines the schema to be output from the scan operator, and forces
* conversions between reader and output data types.
*/
@Category(EvfTest.class)
public class TestScanOuputSchema extends BaseScanTest {
private static class MockSimpleReader implements ManagedReader {
private final ResultSetLoader tableLoader;
private final TupleNameSpace<ValueWriter> writers = new TupleNameSpace<>();
public MockSimpleReader(SchemaNegotiator schemaNegotiator) {
TupleMetadata providedSchema = schemaNegotiator.providedSchema();
schemaNegotiator.tableSchema(providedSchema);
tableLoader = schemaNegotiator.build();
TupleMetadata schema = new SchemaBuilder()
// Schema provided in test
.add("a", MinorType.VARCHAR)
// No schema provided
.add("b", MinorType.VARCHAR)
// No schema and not projected in test
.add("c", MinorType.VARCHAR)
.buildSchema();
buildWriters(providedSchema, schema);
}
/**
* Simple mock version of convert from an imaginary input type to
* the provided schema. The reader schema is in terms of types which
* the standard conversions can convert. Add columns not in the provided
* schema, convert those which are in the provided schema.
* <p>
* A real reader would likely have its own column adapters and its
* own type conversion rules.
*/
private void buildWriters(TupleMetadata providedSchema,
TupleMetadata schema) {
RowSetLoader rowWriter = tableLoader.writer();
StandardConversions conversions = StandardConversions.builder().build();
for (int i = 0; i < schema.size(); i++) {
ColumnMetadata colSchema = schema.metadata(i);
String colName = colSchema.name();
ColumnMetadata providedCol;
if (i < providedSchema.size()) {
providedCol = providedSchema.metadata(colName);
} else {
providedCol = null;
}
if (providedCol == null) {
int colIndex = rowWriter.addColumn(colSchema);
writers.add(colSchema.name(), rowWriter.scalar(colIndex));
} else {
writers.add(colSchema.name(),
conversions.converterFor(rowWriter.scalar(colSchema.name()), colSchema));
}
}
}
@Override
public boolean next() {
if (tableLoader.batchCount() >= 1) {
return false;
}
RowSetLoader writer = tableLoader.writer();
writer.start();
writers.get(0).setString("10");
writers.get(1).setString("foo");
writers.get(2).setString("bar");
writer.save();
return true;
}
@Override
public void close() { }
}
/**
* Test an output schema.
* <ul>
* <li>Column a has an input type of VARCHAR, and output type of INT,
* and the framework will insert an implicit conversion.</li>
* <li>Column b has an output type of BIGINT, is projected, but is
* not provided by the reader. It will use the default value of 20L.</li>
* <li>Column c is not in the output schema, is not provided by the
* reader, but is projected, so it will use the default null type
* of VARCHAR, with a null value.</li>
* </ul>
*/
@Test
public void testProvidedSchema() {
TupleMetadata providedSchema = new SchemaBuilder()
.add("a", MinorType.INT) // Projected, in reader
.add("d", MinorType.BIGINT) // Projected, not in reader
.add("e", MinorType.BIGINT) // Not projected, not in reader
.buildSchema();
providedSchema.metadata("d").setDefaultValue("20");
providedSchema.metadata("e").setDefaultValue("30");
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
builder.setProjection(new String[]{"a", "b", "d", "f"});
builder.addReader(negotiator -> new MockSimpleReader(negotiator));
builder.builder.providedSchema(providedSchema);
builder.builder.nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.add("d", MinorType.BIGINT)
.addNullable("f", MinorType.VARCHAR)
.buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
// Batch with defaults and null types
assertTrue(scan.next());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, "foo", 20L, null)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
assertFalse(scan.next());
scanFixture.close();
}
/**
* Test non-strict specified schema, with a wildcard, with extra
* reader columns. Reader columns are included in output.
*/
@Test
public void testProvidedSchemaWithWildcard() {
TupleMetadata providedSchema = new SchemaBuilder()
.add("a", MinorType.INT) // Projected, in reader
.add("d", MinorType.BIGINT) // Projected, not in reader
.add("e", MinorType.BIGINT) // Not projected, not in reader
.buildSchema();
providedSchema.metadata("d").setDefaultValue("20");
providedSchema.metadata("e").setDefaultValue("30");
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
builder.setProjection(RowSetTestUtils.projectAll());
builder.addReader(negotiator -> new MockSimpleReader(negotiator));
builder.builder.providedSchema(providedSchema);
builder.builder.nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("d", MinorType.BIGINT)
.add("e", MinorType.BIGINT)
.add("b", MinorType.VARCHAR)
.add("c", MinorType.VARCHAR)
.buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
// Batch with defaults and null types
assertTrue(scan.next());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, 20L, 30L, "foo", "bar")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
assertFalse(scan.next());
scanFixture.close();
}
@Test
public void testStrictProvidedSchemaWithWildcard() {
TupleMetadata providedSchema = new SchemaBuilder()
.add("a", MinorType.INT) // Projected, in reader
.add("d", MinorType.BIGINT) // Projected, not in reader
.add("e", MinorType.BIGINT) // Not projected, not in reader
.buildSchema();
providedSchema.metadata("d").setDefaultValue("20");
providedSchema.metadata("e").setDefaultValue("30");
providedSchema.setProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, Boolean.TRUE.toString());
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
// Project schema only
builder.setProjection(RowSetTestUtils.projectAll());
builder.addReader(negotiator -> new MockSimpleReader(negotiator));
builder.builder.providedSchema(providedSchema);
builder.builder.nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("d", MinorType.BIGINT)
.add("e", MinorType.BIGINT)
.buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
// Batch with defaults and null types
assertTrue(scan.next());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, 20L, 30L)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
assertFalse(scan.next());
scanFixture.close();
}
@Test
public void testStrictProvidedSchemaWithWildcardAndSpecialCols() {
TupleMetadata providedSchema = new SchemaBuilder()
.add("a", MinorType.INT) // Projected, in reader
.add("d", MinorType.BIGINT) // Projected, not in reader
.add("e", MinorType.BIGINT) // Not projected, not in reader
.buildSchema();
providedSchema.metadata("d").setDefaultValue("20");
providedSchema.metadata("e").setDefaultValue("30");
providedSchema.setProperty(TupleMetadata.IS_STRICT_SCHEMA_PROP, Boolean.TRUE.toString());
providedSchema.metadata("a").setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
// Project schema only
builder.setProjection(RowSetTestUtils.projectAll());
builder.addReader(negotiator -> new MockSimpleReader(negotiator));
builder.builder.providedSchema(providedSchema);
builder.builder.nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
TupleMetadata expectedSchema = new SchemaBuilder()
.add("d", MinorType.BIGINT)
.add("e", MinorType.BIGINT)
.buildSchema();
// Initial schema
assertTrue(scan.buildSchema());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
// Batch with defaults and null types
assertTrue(scan.next());
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(20L, 30L)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
}
assertFalse(scan.next());
scanFixture.close();
}
}