blob: a67a79c3f907769aec7314729a228b767d6f7872 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.drill.categories.RowSetTest;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.record.VectorContainer;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests the basics of the scan operator protocol: error conditions,
* etc.
*/
@Category(RowSetTest.class)
public class TestScanOperExecBasics extends BaseScanOperatorExecTest {
/**
* Pathological case that a scan operator is provided no readers.
* It will throw a user exception because the downstream operators
* can't handle this case so we choose to stop the show early to
* avoid getting into a strange state.
*/
@Test
public void testNoReader() {
// Create the scan operator
ScanFixture scanFixture = simpleFixture();
ScanOperatorExec scan = scanFixture.scanOp;
try {
scan.buildSchema();
} catch (UserException e) {
// Expected
assertTrue(e.getCause() instanceof ExecutionSetupException);
}
// Must close the DAG (context and scan operator) even on failures
scanFixture.close();
}
public final String ERROR_MSG = "My Bad!";
@Test
public void testExceptionOnOpen() {
// Reader which fails on open with a known error message
// using an exception other than UserException.
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean open(SchemaNegotiator schemaNegotiator) {
openCalled = true;
throw new IllegalStateException(ERROR_MSG);
}
};
reader.batchLimit = 0;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
try {
scan.buildSchema();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
}
assertTrue(reader.openCalled);
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
assertTrue(reader.closeCalled);
}
@Test
public void testUserExceptionOnOpen() {
// Reader which fails on open with a known error message
// using a UserException.
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean open(SchemaNegotiator schemaNegotiator) {
openCalled = true;
throw UserException.dataReadError()
.addContext(ERROR_MSG)
.build(logger);
}
};
reader.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
try {
scan.buildSchema();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
}
assertTrue(reader.openCalled);
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
assertTrue(reader.closeCalled);
}
@Test
public void testExceptionOnFirstNext() {
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean next() {
super.next(); // Load some data
throw new IllegalStateException(ERROR_MSG);
}
};
reader.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
}
assertTrue(reader.openCalled);
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
assertTrue(reader.closeCalled);
}
@Test
public void testUserExceptionOnFirstNext() {
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean next() {
super.next(); // Load some data
throw UserException.dataReadError()
.addContext(ERROR_MSG)
.build(logger);
}
};
reader.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
// EOF
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
}
assertTrue(reader.openCalled);
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
assertTrue(reader.closeCalled);
}
/**
* Test throwing an exception after the first batch, but while
* "reading" the second. Note that the first batch returns data
* and is spread over two next() calls, so the error is on the
* third call to the scan operator next().
*/
@Test
public void testExceptionOnSecondNext() {
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean next() {
if (batchCount == 1) {
super.next(); // Load some data
throw new IllegalStateException(ERROR_MSG);
}
return super.next();
}
};
reader.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
// Schema
assertTrue(scan.buildSchema());
// First batch
assertTrue(scan.next());
scan.batchAccessor().release();
// Fail
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
}
scanFixture.close();
assertTrue(reader.closeCalled);
}
@Test
public void testUserExceptionOnSecondNext() {
MockEarlySchemaReader reader = new MockEarlySchemaReader() {
@Override
public boolean next() {
if (batchCount == 1) {
super.next(); // Load some data
throw UserException.dataReadError()
.addContext(ERROR_MSG)
.build(logger);
}
return super.next();
}
};
reader.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader);
ScanOperatorExec scan = scanFixture.scanOp;
// Schema
assertTrue(scan.buildSchema());
// First batch
assertTrue(scan.next());
scan.batchAccessor().release();
// Fail
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
}
scanFixture.close();
assertTrue(reader.closeCalled);
}
@Test
public void testExceptionOnClose() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader() {
@Override
public void close() {
super.close();
throw new IllegalStateException(ERROR_MSG);
}
};
reader1.batchLimit = 2;
MockEarlySchemaReader reader2 = new MockEarlySchemaReader();
reader2.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader1, reader2);
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
scan.batchAccessor().release();
assertTrue(scan.next());
scan.batchAccessor().release();
// Fail on close of first reader
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
}
assertTrue(reader1.closeCalled);
assertFalse(reader2.openCalled);
scanFixture.close();
}
@Test
public void testUserExceptionOnClose() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader() {
@Override
public void close() {
super.close();
throw UserException.dataReadError()
.addContext(ERROR_MSG)
.build(logger);
}
};
reader1.batchLimit = 2;
MockEarlySchemaReader reader2 = new MockEarlySchemaReader();
reader2.batchLimit = 2;
ScanFixture scanFixture = simpleFixture(reader1, reader2);
ScanOperatorExec scan = scanFixture.scanOp;
assertTrue(scan.buildSchema());
assertTrue(scan.next());
scan.batchAccessor().release();
assertTrue(scan.next());
scan.batchAccessor().release();
// Fail on close of first reader
try {
scan.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
}
assertTrue(reader1.closeCalled);
assertFalse(reader2.openCalled);
scanFixture.close();
}
/**
* Test multiple readers, all EOF on first batch.
* The scan will return one empty batch to declare the
* early schema. Results in an empty (rather than null)
* result set.
*/
@Test
public void testMultiEOFOnFirstBatch() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
reader1.batchLimit = 0;
MockEarlySchemaReader reader2 = new MockEarlySchemaReader();
reader2.batchLimit = 0;
ScanFixture scanFixture = simpleFixture(reader1, reader2);
ScanOperatorExec scan = scanFixture.scanOp;
// EOF
assertTrue(scan.buildSchema());
assertTrue(scan.next());
VectorContainer container = scan.batchAccessor().container();
assertEquals(0, container.getRecordCount());
assertEquals(2, container.getNumberOfColumns());
assertTrue(reader1.closeCalled);
assertTrue(reader2.closeCalled);
assertEquals(0, scan.batchAccessor().rowCount());
assertFalse(scan.next());
scanFixture.close();
}
}