blob: 0a9aa1fc191be8f8c055f83df791d90a28061ed9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.categories.RowSetTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the ability of the scan operator to "absorb" schema changes by
* "smoothing" out data types and modes across readers. Schema smoothing
* is helpful when there is no output schema, but only works within the
* context of a single scan operator: it cannot help when a query has
* multiple scans, each in its own fragment.
*/
@Category(RowSetTest.class)
public class TestScanOperExecSmoothing extends BaseScanOperatorExecTest {
private static class MockEarlySchemaReader2 extends MockEarlySchemaReader {
@Override
public boolean open(SchemaNegotiator schemaNegotiator) {
openCalled = true;
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
schemaNegotiator.tableSchema(schema, true);
schemaNegotiator.build();
tableLoader = schemaNegotiator.build();
return true;
}
@Override
protected void writeRow(RowSetLoader writer, int col1, String col2) {
writer.start();
if (writer.column(0) != null) {
writer.scalar(0).setString(Integer.toString(col1));
}
if (writer.column(1) != null) {
writer.scalar(1).setString(col2);
}
writer.save();
}
}
private static class MockOneColEarlySchemaReader extends BaseMockBatchReader {
@Override
public boolean open(SchemaNegotiator schemaNegotiator) {
openCalled = true;
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.buildSchema();
schemaNegotiator.tableSchema(schema, true);
tableLoader = schemaNegotiator.build();
return true;
}
@Override
public boolean next() {
batchCount++;
if (batchCount > batchLimit) {
return false;
}
makeBatch();
return true;
}
@Override
protected void writeRow(RowSetLoader writer, int col1, String col2) {
writer.start();
if (writer.column(0) != null) {
writer.scalar(0).setInt(col1 + 1);
}
writer.save();
}
}
/**
* Multiple readers with a schema change between them.
*/
@Test
public void testSchemaChange() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
reader1.batchLimit = 2;
MockEarlySchemaReader reader2 = new MockEarlySchemaReader2();
reader2.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader1, reader2);
ScanOperatorExec scan = scanFixture.scanOp;
// Build schema
assertTrue(scan.buildSchema());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
readSchemaChangeBatches(scanFixture, reader2);
}
@Test
public void testSchemaChangeNoSchemaBatch() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
reader1.batchLimit = 2;
MockEarlySchemaReader reader2 = new MockEarlySchemaReader2();
reader2.batchLimit = 2;
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.enableSchemaBatch = false;
ScanFixture scanFixture = builder.build();
readSchemaChangeBatches(scanFixture, reader2);
}
private void readSchemaChangeBatches(ScanFixture scanFixture, MockEarlySchemaReader reader2) {
ScanOperatorExec scan = scanFixture.scanOp;
// First batch
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Second batch
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Second reader.
TupleMetadata expectedSchema2 = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
.addNullable("b", MinorType.VARCHAR, 10)
.buildSchema();
assertTrue(scan.next());
assertEquals(2, scan.batchAccessor().schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema2)
.addRow("10", "fred")
.addRow("20", "wilma")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
// Second batch from second reader.
assertTrue(scan.next());
assertEquals(2, scan.batchAccessor().schemaVersion());
expected = fixture.rowSetBuilder(expectedSchema2)
.addRow("30", "fred")
.addRow("40", "wilma")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
// EOF
assertFalse(scan.next());
assertTrue(reader2.closeCalled);
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
}
/**
* Test the ability of the scan operator to "smooth" out schema changes
* by reusing the type from a previous reader, if known. That is,
* given three readers:<br>
* (a, b)<br>
* (b)<br>
* (a, b)<br>
* Then the type of column a should be preserved for the second reader that
* does not include a. This works if a is nullable. If so, a's type will
* be used for the empty column, rather than the usual nullable int.
* <p>
* Full testing of smoothing is done in
* {#link TestScanProjector}. Here we just make sure that the
* smoothing logic is available via the scan operator.
*/
@Test
public void testSchemaSmoothing() {
// Reader returns (a, b)
MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
reader1.batchLimit = 1;
// Reader returns (a)
MockOneColEarlySchemaReader reader2 = new MockOneColEarlySchemaReader();
reader2.batchLimit = 1;
reader2.startIndex = 100;
// Reader returns (a, b)
MockEarlySchemaReader reader3 = new MockEarlySchemaReader();
reader3.batchLimit = 1;
reader3.startIndex = 200;
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
builder.setProjection(new String[]{"a", "b"});
builder.addReader(reader1);
builder.addReader(reader2);
builder.addReader(reader3);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
// Schema based on (a, b)
assertTrue(scan.buildSchema());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Batch from (a, b) reader 1
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
verifyBatch(0, scan.batchAccessor().container());
// Batch from (a) reader 2
// Due to schema smoothing, b vector type is left unchanged,
// but is null filled.
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
SingleRowSet expected = fixture.rowSetBuilder(scan.batchAccessor().schema())
.addRow(111, null)
.addRow(121, null)
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scan.batchAccessor().container()));
// Batch from (a, b) reader 3
// Recycles b again, back to being a table column.
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
verifyBatch(200, scan.batchAccessor().container());
assertFalse(scan.next());
scanFixture.close();
}
// TODO: Schema change in late reader
}