blob: d813ec993f01e73c2d8349063c660eb0aa31e516 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import org.apache.drill.categories.EvfTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test vector overflow in the context of the scan operator.
*/
@Category(EvfTest.class)
public class TestScanOverflow extends BaseScanTest {
/**
* Mock reader that produces "jumbo" batches that cause a vector to
* fill and a row to overflow from one batch to the next.
*/
private static class OverflowReader extends BaseMockBatchReader {
private final String value;
public int rowCount;
/**
* If true, the reader will report EOF after filling a batch
* to overflow. This simulates the corner case in which a reader
* has, say, 1000 rows, hits overflow on row 1000, then declares
* it has nothing more to read.
* <p>
* If false, reports EOF on a call to next() without reading more
* rows. The overflow row from the prior batch still exists in
* the result set loader.
*/
public boolean reportEofWithOverflow;
public OverflowReader(SchemaNegotiator schemaNegotiator) {
char buf[] = new char[512];
Arrays.fill(buf, 'x');
value = new String(buf);
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
.buildSchema();
schemaNegotiator.tableSchema(schema, true);
tableLoader = schemaNegotiator.build();
}
@Override
public boolean next() {
batchCount++;
if (batchCount > batchLimit) {
return false;
}
RowSetLoader writer = tableLoader.writer();
while (! writer.isFull()) {
writer.start();
writer.scalar(0).setString(value);
writer.save();
rowCount++;
}
// The vector overflowed on the last row. But, we still had to write the row.
// The row is tucked away in the loader to appear as the first row in
// the next batch.
//
// Depending on the flag set by the test routine, either report the EOF
// during this read, or report it next time around.
return reportEofWithOverflow
? batchCount < batchLimit
: true;
}
}
/**
* Test multiple readers, with one of them creating "jumbo" batches
* that overflow. Specifically, test a corner case. A batch ends right
* at file EOF, but that last batch overflowed.
*/
@Test
public void testMultipleReadersWithOverflowEofWithData() {
runOverflowTest(true);
}
@Test
public void testMultipleReadersWithOverflowEofWithoutData() {
runOverflowTest(false);
}
private void runOverflowTest(boolean eofWithData) {
ObservableCreator creator1 = new ObservableCreator() {
@Override
public ManagedReader create(SchemaNegotiator negotiator) {
OverflowReader reader = new OverflowReader(negotiator);
reader.batchLimit = 2;
reader.reportEofWithOverflow = eofWithData;
return reader;
};
};
ObservableCreator creator2 = new ObservableCreator() {
@Override
public ManagedReader create(SchemaNegotiator negotiator) {
OverflowReader reader = new OverflowReader(negotiator);
reader.batchLimit = 2;
return reader;
};
};
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder(fixture);
builder.projectAll();
builder.addReader(creator1);
builder.addReader(creator2);
// Want overflow, set size and row counts at their limits.
builder.builder.batchByteLimit(ScanLifecycleBuilder.MAX_BATCH_BYTE_SIZE);
builder.builder.batchRecordLimit(ScanLifecycleBuilder.MAX_BATCH_ROW_COUNT);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Second batch. Should be 1 less than the reader's row
// count because the loader has its own one-row lookahead batch.
assertTrue(scan.next());
OverflowReader reader1 = creator1.reader();
assertEquals(1, reader1.batchCount);
assertEquals(1, scan.batchAccessor().schemaVersion());
int prevRowCount = scan.batchAccessor().rowCount();
assertEquals(reader1.rowCount - 1, prevRowCount);
scan.batchAccessor().release();
// Third batch, adds more data to the lookahead batch. Also overflows
// so returned records is one less than total produced so far minus
// those returned earlier.
assertTrue(scan.next());
assertEquals(2, reader1.batchCount);
assertEquals(1, scan.batchAccessor().schemaVersion());
assertEquals(reader1.rowCount - prevRowCount - 1, scan.batchAccessor().rowCount());
scan.batchAccessor().release();
int prevReaderRowCount = reader1.rowCount;
// Third batch. Returns the overflow row from the second batch of
// the first reader.
assertTrue(scan.next());
assertEquals(eofWithData ? 2 : 3, reader1.batchCount);
assertEquals(1, scan.batchAccessor().schemaVersion());
assertEquals(1, scan.batchAccessor().rowCount());
assertEquals(prevReaderRowCount, reader1.rowCount);
scan.batchAccessor().release();
// Second reader.
assertTrue(scan.next());
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Second batch from second reader.
assertTrue(scan.next());
OverflowReader reader2 = creator2.reader();
assertEquals(2, reader2.batchCount);
assertEquals(1, scan.batchAccessor().schemaVersion());
scan.batchAccessor().release();
// Third batch. Returns the overflow row from the second batch of
// the second reader.
assertTrue(scan.next());
assertEquals(3, reader2.batchCount);
assertEquals(1, scan.batchAccessor().schemaVersion());
assertEquals(1, scan.batchAccessor().rowCount());
assertEquals(prevReaderRowCount, reader2.rowCount);
scan.batchAccessor().release();
// EOF
assertFalse(scan.next());
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
}
}