blob: 842f18e0c94cd08eb465636c1d40061f8f447a59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.project;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
import org.apache.drill.exec.physical.impl.scan.file.ImplicitColumnManager;
import org.apache.drill.exec.physical.impl.scan.file.ImplicitColumnManager.ImplicitColumnOptions;
import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder.NullBuilderBuilder;
import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests schema smoothing at the schema projection level.
* This level handles reusing prior types when filling null
* values. But, because no actual vectors are involved, it
* does not handle the schema chosen for a table ahead of
* time, only the schema as it is merged with prior schema to
* detect missing columns.
* <p>
* Focuses on the <tt>SmoothingProjection</tt> class itself.
* <p>
* Note that, at present, schema smoothing does not work for entire
* maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
* and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
* not currently know how to recreate the map. The same is true of
* lists and unions. Handling such cases is complex and is probably
* better handled via a system that allows the user to specify their
* intent by providing a schema to apply to the two files.
* <p>
* Note that schema smoothing itself is an experimental work-around
* to a fundamental limitation in Drill:
* <ul>
* <li>Drill cannot predict the future: each file (or batch)
* may have a different schema.</li>
* <li>Drill does not know about these differences until they
* occur.</li>
* <li>The scan operator is obligated to return the same schema
* (and same vectors) from one file to the next, else a "hard"
* schema change is sent down stream.</li>
* </ul>
*
* The problem is actually intractable. The schema smoother handles the
* cases that can be handled, such as required --> nullable, a column
* disappearing, etc. This whole mechanism should be scrapped if/when
* Drill can work with schemas. Or, keep this to handle, as best we can,
* odd schemas, but insist on a schema to resolve issues that this
* mechanism cannot handle (and that, indeed, no algorithm could handle
* because such an algorithm would require time-travel: looking into
* the future to know what data will be scanned.)
*/
@Category(RowSetTests.class)
public class TestSchemaSmoothing extends SubOperatorTest {
private ImplicitColumnOptions standardOptions(List<Path> files) {
ImplicitColumnOptions options = new ImplicitColumnOptions();
options.setSelectionRoot(new Path("hdfs:///w"));
options.setFiles(files);
return options;
}
/**
* Sanity test for the simple, discrete case. The purpose of
* discrete is just to run the basic lifecycle in a way that
* is compatible with the schema-persistence version.
*/
@Test
public void testDiscrete() {
// Set up the file metadata manager
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
fixture.getOptionManager(),
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL, "a", "b"),
ScanTestUtils.parsers(metadataManager.projectionParser()));
{
// Define a file a.csv
metadataManager.startFile(filePathA);
// Build the output schema from the (a, b) table schema
TupleMetadata twoColSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
final NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow rootTuple = new ResolvedRow(builder);
new ExplicitSchemaProjection(
scanProj, twoColSchema, rootTuple,
ScanTestUtils.resolvers(metadataManager));
// Verify the full output schema
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
// Verify
List<ResolvedColumn> columns = rootTuple.columns();
assertEquals(3, columns.size());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
assertEquals("a.csv", ((FileMetadataColumn) columns.get(0)).value());
assertTrue(columns.get(1) instanceof ResolvedTableColumn);
}
{
// Define a file b.csv
metadataManager.startFile(filePathB);
// Build the output schema from the (a) table schema
TupleMetadata oneColSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final NullColumnBuilder builder = new NullBuilderBuilder().build();
ResolvedRow rootTuple = new ResolvedRow(builder);
new ExplicitSchemaProjection(
scanProj, oneColSchema, rootTuple,
ScanTestUtils.resolvers(metadataManager));
// Verify the full output schema
// Since this mode is "discrete", we don't remember the type
// of the missing column. (Instead, it is filled in at the
// vector level as part of vector persistence.) During projection, it is
// marked with type NULL so that the null column builder will fill in
// the proper type.
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.add("a", MinorType.INT)
.addNullable("b", MinorType.NULL)
.buildSchema();
// Verify
List<ResolvedColumn> columns = rootTuple.columns();
assertEquals(3, columns.size());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
assertEquals(ScanTestUtils.FILE_NAME_COL, columns.get(0).name());
assertEquals("b.csv", ((FileMetadataColumn) columns.get(0)).value());
assertTrue(columns.get(1) instanceof ResolvedTableColumn);
assertTrue(columns.get(2) instanceof ResolvedNullColumn);
}
}
/**
* Low-level test of the smoothing projection, including the exceptions
* it throws when things are not going its way.
*/
@Test
public void testSmoothingProjection() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
// Table 1: (a: nullable bigint, b)
final TupleMetadata schema1 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.add("c", MinorType.FLOAT8)
.buildSchema();
ResolvedRow priorSchema;
{
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
new WildcardProjection(
scanProj, schema1, rootTuple,
ScanTestUtils.resolvers());
priorSchema = rootTuple;
}
// Table 2: (a: nullable bigint, c), column omitted, original schema preserved
final TupleMetadata schema2 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.add("c", MinorType.FLOAT8)
.buildSchema();
try {
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
new SmoothingProjection(
scanProj, schema2, priorSchema, rootTuple,
ScanTestUtils.resolvers());
assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
priorSchema = rootTuple;
} catch (final IncompatibleSchemaException e) {
fail();
}
// Table 3: (a, c, d), column added, must replan schema
final TupleMetadata schema3 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.add("c", MinorType.FLOAT8)
.add("d", MinorType.INT)
.buildSchema();
try {
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
new SmoothingProjection(
scanProj, schema3, priorSchema, rootTuple,
ScanTestUtils.resolvers());
fail();
} catch (final IncompatibleSchemaException e) {
// Expected
}
// Table 4: (a: double), change type must replan schema
final TupleMetadata schema4 = new SchemaBuilder()
.addNullable("a", MinorType.FLOAT8)
.addNullable("b", MinorType.VARCHAR)
.add("c", MinorType.FLOAT8)
.buildSchema();
try {
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
new SmoothingProjection(
scanProj, schema4, priorSchema, rootTuple,
ScanTestUtils.resolvers());
fail();
} catch (final IncompatibleSchemaException e) {
// Expected
}
// Table 5: Drop a non-nullable column, must replan
final TupleMetadata schema6 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
try {
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
new SmoothingProjection(
scanProj, schema6, priorSchema, rootTuple,
ScanTestUtils.resolvers());
fail();
} catch (final IncompatibleSchemaException e) {
// Expected
}
}
/**
* Case in which the table schema is a superset of the prior
* schema. Discard prior schema. Turn off auto expansion of
* metadata for a simpler test.
*/
@Test
public void testSmaller() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
{
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
smoother.resolve(priorSchema, rootTuple);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
}
{
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
smoother.resolve(tableSchema, rootTuple);
assertEquals(2, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
}
}
/**
* Case in which the table schema and prior are disjoint
* sets. Discard the prior schema.
*/
@Test
public void testDisjoint() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(2, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
}
}
private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
final NullColumnBuilder builder = new NullBuilderBuilder().build();
final ResolvedRow rootTuple = new ResolvedRow(builder);
smoother.resolve(schema, rootTuple);
return rootTuple;
}
/**
* Column names match, but types differ. Discard the prior schema.
*/
@Test
public void testDifferentTypes() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(2, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
}
}
/**
* The prior and table schemas are identical. Preserve the prior
* schema (though, the output is no different than if we discarded
* the prior schema...)
*/
@Test
public void testSameSchemas() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
assertTrue(actualSchema.isEquivalent(tableSchema));
assertTrue(actualSchema.isEquivalent(priorSchema));
}
}
/**
* The prior and table schemas are identical, but the cases of names differ.
* Preserve the case of the first schema.
*/
@Test
public void testDifferentCase() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("A", MinorType.INT)
.add("B", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
final List<ResolvedColumn> columns = rootTuple.columns();
assertEquals("a", columns.get(0).name());
}
}
/**
* Can't preserve the prior schema if it had required columns
* where the new schema has no columns.
*/
@Test
public void testRequired() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(2, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
}
}
/**
* Preserve the prior schema if table is a subset and missing columns
* are nullable or repeated.
*/
@Test
public void testMissingNullableColumns() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.addNullable("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.addArray("c", MinorType.BIGINT)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
}
}
/**
* Preserve the prior schema if table is a subset. Map the table
* columns to the output using the prior schema ordering.
*/
@Test
public void testReordering() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
final TupleMetadata priorSchema = new SchemaBuilder()
.addNullable("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.addArray("c", MinorType.BIGINT)
.buildSchema();
final TupleMetadata tableSchema = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.addNullable("a", MinorType.INT)
.buildSchema();
{
doResolve(smoother, priorSchema);
}
{
final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
}
}
/**
* If using the legacy wildcard expansion, reuse schema if partition paths
* are the same length.
*/
@Test
public void testSamePartitionLength() {
// Set up the file metadata manager
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
fixture.getOptionManager(),
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
ScanLevelProjection scanProj = ScanLevelProjection.build(
ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers(metadataManager));
TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
{
metadataManager.startFile(filePathB);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
}
/**
* If using the legacy wildcard expansion, reuse schema if the new partition path
* is shorter than the previous. (Unneeded partitions will be set to null by the
* scan projector.)
*/
@Test
public void testShorterPartitionLength() {
// Set up the file metadata manager
Path filePathA = new Path("hdfs:///w/x/y/a.csv");
Path filePathB = new Path("hdfs:///w/x/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
fixture.getOptionManager(),
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
ScanLevelProjection scanProj = ScanLevelProjection.build(
ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers(metadataManager));
TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
{
metadataManager.startFile(filePathB);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
}
/**
* If using the legacy wildcard expansion, we are able to use the same
* schema even if the new partition path is longer than the previous.
* Because all file names are provided up front.
*/
@Test
public void testLongerPartitionLength() {
// Set up the file metadata manager
Path filePathA = new Path("hdfs:///w/x/a.csv");
Path filePathB = new Path("hdfs:///w/x/y/b.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
fixture.getOptionManager(),
standardOptions(Lists.newArrayList(filePathA, filePathB)));
// Set up the scan level projection
ScanLevelProjection scanProj = ScanLevelProjection.build(
ScanTestUtils.projectAllWithAllImplicit(2),
ScanTestUtils.parsers(metadataManager.projectionParser()));
// Define the schema smoother
SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers(metadataManager));
TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
TupleMetadata expectedSchema = ScanTestUtils.expandImplicit(tableSchema, metadataManager, 2);
{
metadataManager.startFile(filePathA);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
{
metadataManager.startFile(filePathB);
ResolvedRow rootTuple = doResolve(smoother, tableSchema);
assertEquals(1, smoother.schemaVersion());
assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(expectedSchema));
}
}
/**
* Integrated test across multiple schemas at the batch level.
*/
@Test
public void testSmoothableSchemaBatches() {
final ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectAll(),
ScanTestUtils.parsers());
final SchemaSmoother smoother = new SchemaSmoother(scanProj,
ScanTestUtils.resolvers());
// Table 1: (a: bigint, b)
final TupleMetadata schema1 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.add("c", MinorType.FLOAT8)
.buildSchema();
{
final ResolvedRow rootTuple = doResolve(smoother, schema1);
// Just use the original schema.
assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
assertEquals(1, smoother.schemaVersion());
}
// Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
final TupleMetadata schema2 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.add("c", MinorType.FLOAT8)
.buildSchema();
{
final ResolvedRow rootTuple = doResolve(smoother, schema2);
assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
assertEquals(1, smoother.schemaVersion());
}
// Table 3: (a, c, d), column added, must replan schema
final TupleMetadata schema3 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.add("c", MinorType.FLOAT8)
.add("d", MinorType.INT)
.buildSchema();
{
final ResolvedRow rootTuple = doResolve(smoother, schema3);
assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
assertEquals(2, smoother.schemaVersion());
}
// Table 4: Drop a non-nullable column, must replan
final TupleMetadata schema4 = new SchemaBuilder()
.addNullable("a", MinorType.BIGINT)
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
{
final ResolvedRow rootTuple = doResolve(smoother, schema4);
assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
assertEquals(3, smoother.schemaVersion());
}
// Table 5: (a: double), change type must replan schema
final TupleMetadata schema5 = new SchemaBuilder()
.addNullable("a", MinorType.FLOAT8)
.addNullable("b", MinorType.VARCHAR)
.buildSchema();
{
final ResolvedRow rootTuple = doResolve(smoother, schema5);
assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
assertEquals(4, smoother.schemaVersion());
}
}
/**
* A SELECT * query uses the schema of the table as the output schema.
* This is trivial when the scanner has one table. But, if two or more
* tables occur, then things get interesting. The first table sets the
* schema. The second table then has:
* <ul>
* <li>The same schema, trivial case.</li>
* <li>A subset of the first table. The type of the "missing" column
* from the first table is used for a null column in the second table.</li>
* <li>A superset or disjoint set of the first schema. This triggers a hard schema
* change.</li>
* </ul>
* <p>
* It is an open question whether previous columns should be preserved on
* a hard reset. For now, the code implements, and this test verifies, that a
* hard reset clears the "memory" of prior schemas.
*/
@Test
public void testWildcardSmoothing() {
ScanOrchestratorBuilder builder = new MockScanBuilder();
builder.enableSchemaSmoothing(true);
builder.projection(RowSetTestUtils.projectAll());
final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator(), builder);
final TupleMetadata firstSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.addNullable("c", MinorType.BIGINT)
.buildSchema();
final TupleMetadata subsetSchema = new SchemaBuilder()
.addNullable("b", MinorType.VARCHAR, 10)
.add("a", MinorType.INT)
.buildSchema();
final TupleMetadata disjointSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR, 10)
.add("d", MinorType.VARCHAR)
.buildSchema();
final SchemaTracker tracker = new SchemaTracker();
int schemaVersion;
{
// First table, establishes the baseline
// ... FROM table 1
final ReaderSchemaOrchestrator reader = projector.startReader();
final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
reader.startBatch();
loader.writer()
.addRow(10, "fred", 110L)
.addRow(20, "wilma", 110L);
reader.endBatch();
tracker.trackSchema(projector.output());
schemaVersion = tracker.schemaVersion();
final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
.addRow(10, "fred", 110L)
.addRow(20, "wilma", 110L)
.build();
new RowSetComparison(expected)
.verifyAndClearAll(fixture.wrap(projector.output()));
}
{
// Second table, same schema, the trivial case
// ... FROM table 2
final ReaderSchemaOrchestrator reader = projector.startReader();
final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
reader.startBatch();
loader.writer()
.addRow(70, "pebbles", 770L)
.addRow(80, "hoppy", 880L);
reader.endBatch();
tracker.trackSchema(projector.output());
assertEquals(schemaVersion, tracker.schemaVersion());
final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
.addRow(70, "pebbles", 770L)
.addRow(80, "hoppy", 880L)
.build();
new RowSetComparison(expected)
.verifyAndClearAll(fixture.wrap(projector.output()));
}
{
// Third table: subset schema of first two
// ... FROM table 3
final ReaderSchemaOrchestrator reader = projector.startReader();
final ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
reader.startBatch();
loader.writer()
.addRow("bambam", 30)
.addRow("betty", 40);
reader.endBatch();
tracker.trackSchema(projector.output());
assertEquals(schemaVersion, tracker.schemaVersion());
final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
.addRow(30, "bambam", null)
.addRow(40, "betty", null)
.build();
new RowSetComparison(expected)
.verifyAndClearAll(fixture.wrap(projector.output()));
}
{
// Fourth table: disjoint schema, cases a schema reset
// ... FROM table 4
final ReaderSchemaOrchestrator reader = projector.startReader();
final ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
reader.startBatch();
loader.writer()
.addRow(50, "dino", "supporting")
.addRow(60, "barney", "main");
reader.endBatch();
tracker.trackSchema(projector.output());
assertNotEquals(schemaVersion, tracker.schemaVersion());
final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
.addRow(50, "dino", "supporting")
.addRow(60, "barney", "main")
.build();
new RowSetComparison(expected)
.verifyAndClearAll(fixture.wrap(projector.output()));
}
projector.close();
}
// TODO: Test schema smoothing with repeated
// TODO: Test hard schema change
// TODO: Typed null column tests (resurrect)
// TODO: Test maps and arrays of maps
}