blob: 24a083d258f183fff91b28fb9b2745380baeeb7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(EvfTest.class)
public class TestScanLifecycleBasics extends BaseTestScanLifecycle {
/**
* Simplest possible do-nothing, default config case.
* Scan produces no valid schema.
*/
@Test
public void testNoReaders() {
ScanLifecycle scan = buildScan(new ScanLifecycleBuilder());
assertNull(scan.nextReader());
assertFalse(scan.hasOutputSchema());
scan.close();
}
/**
* Single reader, late schema, early EOF.
* The scan never has a valid schema in this case.
*/
@Test
public void testEarlyEOF() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator)
throws EarlyEofException {
return new NoDataReader(negotiator);
}
});
ScanLifecycle scan = buildScan(builder);
RowBatchReader reader = scan.nextReader();
assertFalse(reader.open());
reader.close();
assertFalse(scan.hasOutputSchema());
scan.close();
}
/**
* Single reader empty schema, EOF on first next().
* Since both no schema and no rows, we assume this to be a
* null (not empty) result set, so no output schema.
*/
@Test
public void testNullReader() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockLateSchemaReader(negotiator, 0);
}
});
ScanLifecycle scan = buildScan(builder);
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
assertFalse(reader.next());
assertFalse(scan.hasOutputSchema());
reader.close();
scan.close();
}
/**
* Single reader early schema, EOF on first next().
* Since this is an early-schema reader, there is an output
* schema even without any rows.
*/
@Test
public void testNullReaderWithSchema() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 0);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
assertTrue(scan.hasOutputSchema());
assertEquals(SCHEMA, scan.outputSchema());
assertTrue(reader.next());
VectorContainer result = reader.output();
assertEquals(0, result.getRecordCount());
result.zeroVectors();
// Early schema with no additional columns discovered
assertEquals(SCHEMA, scan.outputSchema());
// But, no second batch.
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT * from an early-schema table of (a, b)
*/
@Test
public void testEarlySchemaOneBatch() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
assertEquals(SCHEMA, scan.outputSchema());
assertTrue(reader.next());
RowSetUtilities.verify(simpleExpected(0), fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
@Test
public void testEarlySchemaTwoBatches() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 2);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
assertEquals(SCHEMA, scan.outputSchema());
assertTrue(reader.next());
RowSetUtilities.verify(simpleExpected(0), fixture.wrap(reader.output()));
assertTrue(reader.next());
RowSetUtilities.verify(simpleExpected(1), fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Single reader late schema, one batch.
*/
@Test
public void testLateSchemaOneBatch() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockLateSchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Late schema: so no output schema is available after open
assertFalse(scan.hasOutputSchema());
assertTrue(reader.next());
// Late schema, output schema available after first batch
assertTrue(scan.hasOutputSchema());
assertEquals(SCHEMA, scan.outputSchema());
RowSetUtilities.verify(simpleExpected(0), fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
@Test
public void testLateSchemaTwoBatches() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockLateSchemaReader(negotiator, 2);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Late schema: so no output schema is available after open
assertFalse(scan.hasOutputSchema());
assertTrue(reader.next());
// Late schema, output schema available after first batch
assertEquals(SCHEMA, scan.outputSchema());
RowSetUtilities.verify(simpleExpected(0), fixture.wrap(reader.output()));
assertTrue(reader.next());
RowSetUtilities.verify(simpleExpected(1), fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT a, c FROM table(a, b)
*/
@Test
public void testEarlySchemaWithProject() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("a", "c"));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.SOME, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("c", MinorType.INT)
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, null)
.addRow(20, null)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT a, c FROM table(a, b)
* c will be null
*/
@Test
public void testLateSchemaWithProject() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("a", "c"));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockLateSchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.SOME, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
assertTrue(reader.next());
// Late schema: so output schema is available after next()
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("c", MinorType.INT)
.build();
assertEquals(expectedSchema, scan.outputSchema());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, null)
.addRow(20, null)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT b, a FROM table(a, b)
*/
@Test
public void testEarlySchemaReorder() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("b", "a"));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.SOME, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.addNullable("b", MinorType.VARCHAR)
.add("a", MinorType.INT)
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow("fred", 10)
.addRow("wilma", 20)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
@Test
public void testEarlySchemaWithProjectNone() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectNone());
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.NONE, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow()
.addRow()
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT * from an early-schema table of () (that is,
* a schema that consists of zero columns.
*/
@Test
public void testLateSchemaWithCustomType() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("a", "c"));
builder.nullType(Types.optional(MinorType.VARCHAR));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockLateSchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
assertTrue(reader.next());
// Late schema: so output schema is available after next()
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("c", MinorType.VARCHAR)
.build();
assertEquals(expectedSchema, scan.outputSchema());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10, null)
.addRow(20, null)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT a FROM table(a, b)
*/
@Test
public void testEarlySchemaSubset() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("a"));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEarlySchemaReader(negotiator, 1);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.SOME, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.add("a", MinorType.INT)
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(10)
.addRow(20)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT * from an early-schema table of () (that is,
* a schema that consists of zero columns.
*/
@Test
public void testEarlySchemaEmpty() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEmptySchemaReader(negotiator);
}
});
ScanLifecycle scan = buildScan(builder);
assertSame(ProjectionType.ALL, scan.schemaTracker().projectionType());
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow()
.addRow()
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
/**
* Test SELECT a from an early-schema table of () (that is,
* a schema that consists of zero columns.
*/
@Test
public void testEarlySchemaEmptyWithProject() {
ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
builder.projection(RowSetTestUtils.projectList("a"));
builder.readerFactory(new SingleReaderFactory() {
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
return new MockEmptySchemaReader(negotiator);
}
});
ScanLifecycle scan = buildScan(builder);
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
// Early schema: so output schema is available after open
TupleMetadata expectedSchema = new SchemaBuilder()
.addNullable("a", MinorType.INT)
.build();
assertEquals(expectedSchema, scan.outputSchema());
assertTrue(reader.next());
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addSingleCol(null)
.addSingleCol(null)
.build();
RowSetUtilities.verify(expected, fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
scan.close();
}
}