blob: 66f10fcdb96ea4b10c9fe8c05c6ed4bfc3a1bbd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserException.Builder;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.base.AbstractSubScan;
import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BaseTestScanLifecycle extends SubOperatorTest {
private static final Logger logger = LoggerFactory.getLogger(BaseTestScanLifecycle.class);
public static class DummySubScan extends AbstractSubScan {
public DummySubScan() {
super("fake-user");
}
@Override
public int getOperatorType() { return 0; }
}
protected static abstract class SingleReaderFactory implements ReaderFactory<SchemaNegotiator> {
private int counter;
@Override
public boolean hasNext() {
return counter == 0;
}
}
protected static abstract class TwoReaderFactory implements ReaderFactory<SchemaNegotiator> {
private int counter;
@Override
public ManagedReader next(SchemaNegotiator negotiator) {
counter++;
switch (counter) {
case 1:
return firstReader(negotiator);
case 2:
return secondReader(negotiator);
default:
return null;
}
}
public abstract ManagedReader firstReader(SchemaNegotiator negotiator);
public abstract ManagedReader secondReader(SchemaNegotiator negotiator);
@Override
public boolean hasNext() {
return counter < 2;
}
}
public static final TupleMetadata SCHEMA = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.VARCHAR)
.build();
/**
* Base class for the "mock" readers used in this test. The mock readers
* follow the normal (enhanced) reader API, but instead of actually reading
* from a data source, they just generate data with a known schema.
* They also expose internal state such as identifying which methods
* were actually called.
*/
protected static abstract class BaseMockBatchReader implements ManagedReader {
protected int startIndex;
protected int batchCount;
protected int batchLimit;
protected ResultSetLoader tableLoader;
public BaseMockBatchReader(int batchLimit) {
this.batchLimit = batchLimit;
}
protected void makeBatch() {
RowSetLoader writer = tableLoader.writer();
int offset = (batchCount - 1) * 20 + startIndex;
writer.addRow(offset + 10, "fred");
writer.addRow(offset + 20, "wilma");
}
@Override
public boolean next() {
batchCount++;
if (batchCount > batchLimit) {
return false;
}
makeBatch();
return true;
}
@Override
public void close() {}
}
/**
* Mock reader with no data or schema, indicated by an early EOF
* exception.
*/
protected static class NoDataReader extends BaseMockBatchReader {
public NoDataReader(SchemaNegotiator sn) throws EarlyEofException {
super(0);
throw new EarlyEofException();
}
@Override
public boolean next() { return false; }
}
protected static class MockEmptySchemaReader implements ManagedReader {
private final ResultSetLoader tableLoader;
public MockEmptySchemaReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(new TupleSchema(), true);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader writer = tableLoader.writer();
writer.addRow();
writer.addRow();
return true;
}
@Override
public void close() { }
}
/**
* "Early schema" refers to the case in which the reader can provide a schema
* when the reader is opened. Examples: CSV, HBase, MapR-DB binary, JDBC.
*/
protected static class MockEarlySchemaReader extends BaseMockBatchReader {
public MockEarlySchemaReader(SchemaNegotiator negotiator, int batchCount) {
super(batchCount);
negotiator.tableSchema(SCHEMA);
negotiator.schemaIsComplete(true);
tableLoader = negotiator.build();
}
}
protected static class MockLateSchemaReader extends BaseMockBatchReader {
public MockLateSchemaReader(SchemaNegotiator negotiator, int batchCount) {
super(batchCount);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (batchLimit == 0) {
return false;
}
if (batchCount == 0) {
RowSetLoader rowSet = tableLoader.writer();
rowSet.addColumn(SCHEMA.metadata(0));
rowSet.addColumn(SCHEMA.metadata(1));
}
return super.next();
}
}
protected static class MockSingleColReader implements ManagedReader {
private final ResultSetLoader tableLoader;
public MockSingleColReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(new SchemaBuilder()
.add("a", MinorType.INT)
.build());
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
rowSet.addSingleCol(101);
rowSet.addSingleCol(102);
return true;
}
@Override
public void close() { }
}
protected static class MockThreeColReader implements ManagedReader {
public static final TupleMetadata READER_SCHEMA = new SchemaBuilder()
.addAll(SCHEMA)
.add("c", MinorType.BIGINT)
.build();
private final ResultSetLoader tableLoader;
public MockThreeColReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(READER_SCHEMA);
negotiator.schemaIsComplete(true);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
rowSet.addRow(101, "wilma", 1001);
rowSet.addRow(102, "betty", 1002);
return true;
}
@Override
public void close() { }
}
public static final TupleMetadata CONFLICT_SCHEMA = new SchemaBuilder()
.add("a", MinorType.INT)
.addNullable("b", MinorType.BIGINT)
.build();
protected static class MockEarlySchemaTypeConflictReader implements ManagedReader {
private final ResultSetLoader tableLoader;
public MockEarlySchemaTypeConflictReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(CONFLICT_SCHEMA);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
rowSet.addRow(101, 1001);
rowSet.addRow(102, 1002);
return true;
}
@Override
public void close() { }
}
protected static class MockLateSchemaTypeConflictReader implements ManagedReader {
private final ResultSetLoader tableLoader;
public MockLateSchemaTypeConflictReader(SchemaNegotiator negotiator) {
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
if (tableLoader.batchCount() == 0) {
rowSet.addColumn(CONFLICT_SCHEMA.metadata(0).copy());
rowSet.addColumn(CONFLICT_SCHEMA.metadata(1).copy());
}
rowSet.addRow(101, 1001);
rowSet.addRow(102, 1002);
return true;
}
@Override
public void close() { }
}
protected static class MockModeConflictReader implements ManagedReader {
public static final TupleMetadata READER_SCHEMA = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR) // Nullable in base reader
.build();
private final ResultSetLoader tableLoader;
public MockModeConflictReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(READER_SCHEMA);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
rowSet.addRow(101, "wilma");
rowSet.addRow(102, "betty");
return true;
}
@Override
public void close() { }
}
protected static class MockReorderedReader implements ManagedReader {
public static final TupleMetadata READER_SCHEMA = new SchemaBuilder()
.add(SCHEMA.metadata(1).copy())
.add(SCHEMA.metadata(0).copy())
.build();
private final ResultSetLoader tableLoader;
public MockReorderedReader(SchemaNegotiator negotiator) {
negotiator.tableSchema(READER_SCHEMA);
tableLoader = negotiator.build();
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 0) {
return false;
}
RowSetLoader writer = tableLoader.writer();
writer.addRow("barney", 30);
writer.addRow("betty", 40);
return true;
}
@Override
public void close() { }
}
protected static class FailingReader implements ManagedReader {
public final String failType;
public final CustomErrorContext ec;
public FailingReader(SchemaNegotiator sn, String failType) {
this.failType = failType;
this.ec = new ChildErrorContext(sn.parentErrorContext()) {
@Override
public void addContext(Builder builder) {
super.addContext(builder);
builder.addContext("My custom context");
}
};
sn.setErrorContext(ec);
if (failType.equals("ctor")) {
throw new IllegalStateException("Oops ctor");
}
if (failType.equals("ctor-u")) {
throw UserException.dataReadError()
.message("Oops ctor")
.addContext(ec)
.build(logger);
}
sn.build();
}
@Override
public boolean next() {
if (failType.equals("next")) {
throw new IllegalStateException("Oops next");
}
if (failType.equals("next-u")) {
throw UserException.dataReadError()
.message("Oops next")
.addContext(ec)
.build(logger);
}
return false;
}
@Override
public void close() {
if (failType.equals("close")) {
throw new IllegalStateException("Oops close");
}
if (failType.equals("close-u")) {
throw UserException.dataReadError()
.message("Oops close")
.addContext(ec)
.build(logger);
}
}
}
protected ScanLifecycle buildScan(ScanLifecycleBuilder builder) {
return builder.build(
fixture.operatorContext(new DummySubScan()));
}
protected RowSet simpleExpected(int startIndex) {
int offset = startIndex * 20;
return fixture.rowSetBuilder(SCHEMA)
.addRow(offset + 10, "fred")
.addRow(offset + 20, "wilma")
.build();
}
protected void verifyEmptyReader(ScanLifecycle scan) {
RowBatchReader reader = scan.nextReader();
assertTrue(reader.open());
assertFalse(reader.next());
assertFalse(scan.hasOutputSchema());
reader.close();
}
protected void verifyStandardReader(ScanLifecycle scan, int offset) {
RowBatchReader reader = scan.nextReader();
reader = scan.nextReader();
assertTrue(reader.open());
assertTrue(reader.next());
RowSetUtilities.verify(simpleExpected(0), fixture.wrap(reader.output()));
assertFalse(reader.next());
reader.close();
}
}