blob: 9542d1e07024965a9c89ca16aa0bd884798afb3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.List;
import org.apache.drill.categories.RowSetTest;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
import org.apache.drill.exec.physical.impl.scan.file.ImplicitColumnManager;
import org.apache.drill.exec.physical.impl.scan.file.ImplicitColumnManager.ImplicitColumnOptions;
import org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import com.google.common.collect.Lists;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test the "columns" array mechanism integrated with the scan schema
* orchestrator including simulating reading data.
*/
@Category(RowSetTest.class)
public class TestColumnsArray extends SubOperatorTest {
private static class MockScanner {
ScanSchemaOrchestrator scanner;
ReaderSchemaOrchestrator reader;
ResultSetLoader loader;
}
private ImplicitColumnOptions standardOptions(Path filePath) {
ImplicitColumnOptions options = new ImplicitColumnOptions();
options.useLegacyWildcardExpansion(false); // Don't expand partition columns for wildcard
options.setSelectionRoot(new Path("hdfs:///w"));
options.setFiles(Lists.newArrayList(filePath));
return options;
}
private MockScanner buildScanner(List<SchemaPath> projList) {
MockScanner mock = new MockScanner();
// Set up the file metadata manager
Path filePath = new Path("hdfs:///w/x/y/z.csv");
ImplicitColumnManager metadataManager = new ImplicitColumnManager(
fixture.getOptionManager(),
standardOptions(filePath));
// ...and the columns array manager
ColumnsArrayManager colsManager = new ColumnsArrayManager(false);
// Configure the schema orchestrator
ScanOrchestratorBuilder builder = new MockScanBuilder();
builder.withImplicitColumns(metadataManager);
builder.addParser(colsManager.projectionParser());
builder.addResolver(colsManager.resolver());
// SELECT <proj list> ...
builder.projection(projList);
mock.scanner = new ScanSchemaOrchestrator(fixture.allocator(), builder);
// FROM z.csv
metadataManager.startFile(filePath);
mock.reader = mock.scanner.startReader();
// Table schema (columns: VARCHAR[])
TupleMetadata tableSchema = new SchemaBuilder()
.addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
mock.loader = mock.reader.makeTableLoader(tableSchema);
// First empty batch
mock.reader.defineSchema();
return mock;
}
/**
* Test columns array. The table must be able to support it by having a
* matching column.
*/
@Test
public void testColumnsArray() {
MockScanner mock = buildScanner(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
ColumnsScanFramework.COLUMNS_COL,
ScanTestUtils.partitionColName(0)));
// Verify empty batch.
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.addArray("columns", MinorType.VARCHAR)
.addNullable("dir0", MinorType.VARCHAR)
.buildSchema();
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
assertNotNull(mock.scanner.output());
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
// Create a batch of data.
mock.reader.startBatch();
mock.loader.writer()
.addRow(new Object[] {new String[] {"fred", "flintstone"}})
.addRow(new Object[] {new String[] {"barney", "rubble"}});
mock. reader.endBatch();
// Verify
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow("z.csv", new String[] {"fred", "flintstone"}, "x")
.addRow("z.csv", new String[] {"barney", "rubble"}, "x")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
mock.scanner.close();
}
@Test
public void testWildcard() {
MockScanner mock = buildScanner(RowSetTestUtils.projectAll());
// Verify empty batch.
TupleMetadata expectedSchema = new SchemaBuilder()
.addArray("columns", MinorType.VARCHAR)
.buildSchema();
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
assertNotNull(mock.scanner.output());
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
// Create a batch of data.
mock.reader.startBatch();
mock.loader.writer()
.addRow(new Object[] {new String[] {"fred", "flintstone"}})
.addRow(new Object[] {new String[] {"barney", "rubble"}});
mock. reader.endBatch();
// Verify
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addSingleCol(new String[] {"fred", "flintstone"})
.addSingleCol(new String[] {"barney", "rubble"})
.build();
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
mock.scanner.close();
}
@Test
public void testWildcardAndFileMetadata() {
MockScanner mock = buildScanner(RowSetTestUtils.projectList(
ScanTestUtils.FILE_NAME_COL,
SchemaPath.DYNAMIC_STAR,
ScanTestUtils.partitionColName(0)));
// Verify empty batch.
TupleMetadata expectedSchema = new SchemaBuilder()
.add("filename", MinorType.VARCHAR)
.addArray("columns", MinorType.VARCHAR)
.addNullable("dir0", MinorType.VARCHAR)
.buildSchema();
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.build();
assertNotNull(mock.scanner.output());
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
// Create a batch of data.
mock.reader.startBatch();
mock.loader.writer()
.addRow(new Object[] {new String[] {"fred", "flintstone"}})
.addRow(new Object[] {new String[] {"barney", "rubble"}});
mock. reader.endBatch();
// Verify
{
SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow("z.csv", new String[] {"fred", "flintstone"}, "x")
.addRow("z.csv", new String[] {"barney", "rubble"}, "x")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(mock.scanner.output()));
}
mock.scanner.close();
}
private ScanSchemaOrchestrator buildScan(boolean requireColumns, List<SchemaPath> cols) {
// Set up the columns array manager
ColumnsArrayManager colsManager = new ColumnsArrayManager(requireColumns);
// Configure the schema orchestrator
ScanOrchestratorBuilder builder = new ColumnsScanBuilder();
builder.addParser(colsManager.projectionParser());
builder.addResolver(colsManager.resolver());
builder.projection(cols);
return new ScanSchemaOrchestrator(fixture.allocator(), builder);
}
/**
* Test attempting to use the columns array with an early schema with
* column types not compatible with a varchar array.
*/
@Test
public void testMissingColumnsColumn() {
ScanSchemaOrchestrator scanner = buildScan(true,
RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
.buildSchema();
try {
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(tableSchema);
reader.defineSchema();
fail();
} catch (IllegalStateException e) {
// Expected
}
scanner.close();
}
@Test
public void testNotRepeated() {
ScanSchemaOrchestrator scanner = buildScan(true,
RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
.add(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
try {
ReaderSchemaOrchestrator reader = scanner.startReader();
reader.makeTableLoader(tableSchema);
reader.defineSchema();
fail();
} catch (IllegalStateException e) {
// Expected
}
scanner.close();
}
/**
* Verify that if the columns column is not required, that `columns`
* is treated like any other column.
*/
@Test
public void testReqularCol() {
ScanSchemaOrchestrator scanner = buildScan(false,
RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
.add(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
ReaderSchemaOrchestrator reader = scanner.startReader();
ResultSetLoader rsLoader = reader.makeTableLoader(tableSchema);
reader.defineSchema();
reader.startBatch();
rsLoader.writer()
.addRow("fred");
reader.endBatch();
SingleRowSet expected = fixture.rowSetBuilder(tableSchema)
.addRow("fred")
.build();
RowSetUtilities.verify(expected,
fixture.wrap(scanner.output()));
scanner.close();
}
}