blob: b169f587cc7879dece733b2f4b8c7611088822fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.junit.Test;
/**
* Verifies that the V1 scan framework properly pushes the LIMIT
* into the scan by stopping the scan once any reader produces
* the rows up to the limit. Verifies that the result set loader
* enforces the limit at the batch level, that the reader lifecycle
* enforces limits at the reader level, and that the scan framework
* enforces limits at by skipping over unneeded readers.
*/
public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
/**
* Mock reader that returns two 50-row batches.
*/
protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
protected boolean openCalled;
protected ResultSetLoader tableLoader;
@Override
public boolean open(SchemaNegotiator negotiator) {
openCalled = true;
negotiator.tableSchema(new SchemaBuilder()
.add("a", MinorType.INT)
.build(), true);
tableLoader = negotiator.build();
return true;
}
@Override
public boolean next() {
if (tableLoader.batchCount() > 2) {
return false;
}
RowSetLoader rowSet = tableLoader.writer();
int base = tableLoader.batchCount() * 50 + 1;
for (int i = 0; i < 50; i++) {
if (rowSet.isFull()) {
break;
}
rowSet.addSingleCol(base + i);
}
return true;
}
@Override
public void close() { }
}
/**
* LIMIT 0, to obtain only the schema.
*/
@Test
public void testLimit0() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(0);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(0, batch.rowCount());
assertEquals(1, batch.schema().getFieldCount());
batch.release();
// No second batch or second reader
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertFalse(reader2.openCalled);
}
/**
* LIMIT 1, simplest case
*/
@Test
public void testLimit1() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(1);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(1, batch.rowCount());
batch.release();
// No second batch or second reader
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertFalse(reader2.openCalled);
}
/**
* LIMIT 50, same as batch size, to check boundary conditions.
*/
@Test
public void testLimitOnBatchEnd() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(50);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
// No second batch or second reader
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertFalse(reader2.openCalled);
}
/**
* LIMIT 75, halfway through second batch.
*/
@Test
public void testLimitOnScondBatch() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(75);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
assertTrue(scan.next());
batch = scan.batchAccessor();
assertEquals(25, batch.rowCount());
batch.release();
// No second reader
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertFalse(reader2.openCalled);
}
/**
* LIMIT 100, at EOF of the first reader.
*/
@Test
public void testLimitOnEOF() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(100);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
assertTrue(scan.next());
batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
// No second reader
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertFalse(reader2.openCalled);
}
/**
* LIMIT 125: full first reader, limit on second
*/
@Test
public void testLimitOnSecondReader() {
Mock50RowReader reader1 = new Mock50RowReader();
Mock50RowReader reader2 = new Mock50RowReader();
BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
builder.builder.limit(125);
ScanFixture scanFixture = builder.build();
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
BatchAccessor batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
assertTrue(scan.next());
batch = scan.batchAccessor();
assertEquals(50, batch.rowCount());
batch.release();
// First batch, second reader
assertTrue(scan.next());
batch = scan.batchAccessor();
assertEquals(25, batch.rowCount());
batch.release();
// No second batch
assertFalse(scan.next());
scanFixture.close();
assertTrue(reader1.openCalled);
assertTrue(reader2.openCalled);
}
}