blob: d116e25e38368c0b50774e89b3e1f25ab458e527 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.protocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.proto.UserBitShared.NamePart;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.BatchSchemaBuilder;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the implementation of the Drill Volcano iterator protocol that
* wraps the modular operator implementation.
*/
@Category(RowSetTests.class)
public class TestOperatorRecordBatch extends SubOperatorTest {
private static final Logger logger = LoggerFactory.getLogger(TestOperatorRecordBatch.class);
/**
* Mock operator executor that simply tracks each method call
* and provides a light-weight vector container. Returns a
* defined number of (batches) with an optional schema change.
*/
private class MockOperatorExec implements OperatorExec {
public boolean bindCalled;
public boolean buildSchemaCalled;
public int nextCalls = 1;
public int nextCount;
public int schemaChangeAt = -1;
public boolean cancelCalled;
public boolean closeCalled;
public boolean schemaEOF;
private final VectorContainerAccessor batchAccessor;
public MockOperatorExec() {
this(mockBatch());
}
public MockOperatorExec(VectorContainer container) {
batchAccessor = new VectorContainerAccessor();
batchAccessor.addBatch(container);
}
public MockOperatorExec(VectorContainerAccessor accessor) {
batchAccessor = accessor;
}
@Override
public void bind(OperatorContext context) { bindCalled = true; }
@Override
public BatchAccessor batchAccessor() {
return batchAccessor;
}
@Override
public boolean buildSchema() {
buildSchemaCalled = true;
return !schemaEOF;
}
@Override
public boolean next() {
nextCount++;
if (nextCount > nextCalls) {
return false;
}
if (nextCount == schemaChangeAt) {
BatchSchemaBuilder newSchema = new BatchSchemaBuilder(batchAccessor.schema());
newSchema.schemaBuilder()
.add("b", MinorType.VARCHAR);
VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema.build());
batchAccessor.addBatch(newContainer);
}
return true;
}
@Override
public void cancel() { cancelCalled = true; }
@Override
public void close() {
batchAccessor().container().clear();
closeCalled = true;
}
}
private static VectorContainer mockBatch() {
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT);
VectorContainer container = new VectorContainer(fixture.allocator(), new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build());
container.buildSchema(SelectionVectorMode.NONE);
return container;
}
private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) {
// Dummy operator definition
PhysicalOperator popConfig = new Limit(null, 0, 100);
return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec, true);
}
/**
* Simulate a normal run: return some batches, encounter a schema change.
*/
@Test
public void testNormalLifeCycle() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 2;
opExec.schemaChangeAt = 2;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertSame(fixture.getFragmentContext(), opBatch.fragmentContext());
assertNotNull(opBatch.getContext());
// First call to next() builds schema
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertTrue(opExec.bindCalled);
assertTrue(opExec.buildSchemaCalled);
assertEquals(0, opExec.nextCount);
// Second call returns the first batch
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(1, opExec.nextCount);
// Third call causes a schema change
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(2, opExec.nextCount);
// Fourth call reaches EOF
assertEquals(IterOutcome.NONE, opBatch.next());
assertEquals(3, opExec.nextCount);
// Close
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
assertFalse(opExec.cancelCalled);
}
/**
* Simulate a truncated life cycle: next() is never called. Not a valid part
* of the protocol; but should be ready anyway.
*/
@Test
public void testTruncatedLifeCycle() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.schemaEOF = true;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
} catch (Exception e) {
fail();
}
assertTrue(opExec.bindCalled);
assertTrue(opExec.closeCalled);
}
/**
* Simulate reaching EOF when trying to create the schema.
*/
@Test
public void testSchemaEOF() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.schemaEOF = true;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.NONE, opBatch.next());
assertTrue(opExec.buildSchemaCalled);
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
}
/**
* Simulate reaching EOF on the first batch. This simulated data source
* discovered a schema, but had no data.
*/
@Test
public void testFirstBatchEOF() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 0;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertTrue(opExec.buildSchemaCalled);
assertEquals(IterOutcome.NONE, opBatch.next());
assertEquals(1, opExec.nextCount);
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
}
/**
* Simulate the caller failing the operator before getting the schema.
*/
@Test
public void testFailEarly() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 2;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
opBatch.kill(false);
assertFalse(opExec.buildSchemaCalled);
assertEquals(0, opExec.nextCount);
assertFalse(opExec.cancelCalled);
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
}
/**
* Simulate the caller failing the operator before EOF.
*/
@Test
public void testFailWhileReading() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 2;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
opBatch.kill(false);
assertTrue(opExec.cancelCalled);
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
}
/**
* Simulate the caller failing the operator after EOF but before close.
* This is a silly time to fail, but have to handle it anyway.
*/
@Test
public void testFailBeforeClose() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 2;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.NONE, opBatch.next());
opBatch.kill(false);
// Already hit EOF, so fail won't be passed along.
assertFalse(opExec.cancelCalled);
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
}
/**
* Simulate the caller failing the operator after close.
* This is violates the operator protocol, but have to handle it anyway.
*/
@Test
public void testFailAfterClose() {
MockOperatorExec opExec = new MockOperatorExec();
opExec.nextCalls = 2;
OperatorRecordBatch opBatch = makeOpBatch(opExec);
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.NONE, opBatch.next());
try {
opBatch.close();
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
opBatch.kill(false);
assertFalse(opExec.cancelCalled);
}
/**
* The record batch abstraction has a bunch of methods to work with a vector container.
* Rather than simply exposing the container itself, the batch instead exposes various
* container operations. Probably an artifact of its history. In any event, make
* sure those methods are passed through to the container accessor.
*/
@Test
public void testBatchAccessor() {
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
SingleRowSet rs = fixture.rowSetBuilder(schema)
.addRow(10, "fred")
.addRow(20, "wilma")
.build();
MockOperatorExec opExec = new MockOperatorExec(rs.container());
opExec.nextCalls = 1;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(schema, opBatch.getSchema());
assertEquals(2, opBatch.getRecordCount());
assertSame(rs.container(), opBatch.getOutgoingContainer());
Iterator<VectorWrapper<?>> iter = opBatch.iterator();
assertEquals("a", iter.next().getValueVector().getField().getName());
assertEquals("b", iter.next().getValueVector().getField().getName());
// Not a full test of the schema path; just make sure that the
// pass-through to the Vector Container works.
SchemaPath path = SchemaPath.create(NamePart.newBuilder().setName("a").build());
TypedFieldId id = opBatch.getValueVectorId(path);
assertEquals(MinorType.INT, id.getFinalType().getMinorType());
assertEquals(1, id.getFieldIds().length);
assertEquals(0, id.getFieldIds()[0]);
path = SchemaPath.create(NamePart.newBuilder().setName("b").build());
id = opBatch.getValueVectorId(path);
assertEquals(MinorType.VARCHAR, id.getFinalType().getMinorType());
assertEquals(1, id.getFieldIds().length);
assertEquals(1, id.getFieldIds()[0]);
// Sanity check of getValueAccessorById()
VectorWrapper<?> w = opBatch.getValueAccessorById(IntVector.class, 0);
assertNotNull(w);
assertEquals("a", w.getValueVector().getField().getName());
w = opBatch.getValueAccessorById(VarCharVector.class, 1);
assertNotNull(w);
assertEquals("b", w.getValueVector().getField().getName());
// getWritableBatch() ?
// No selection vectors
try {
opBatch.getSelectionVector2();
fail();
} catch (UnsupportedOperationException e) {
// Expected
}
try {
opBatch.getSelectionVector4();
fail();
} catch (UnsupportedOperationException e) {
// Expected
}
} catch (Exception e) {
fail(e.getMessage());
}
assertTrue(opExec.closeCalled);
}
@Test
public void testSchemaChange() {
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
SingleRowSet rs = fixture.rowSetBuilder(schema)
.addRow(10, "fred")
.addRow(20, "wilma")
.build();
VectorContainer container = rs.container();
MockOperatorExec opExec = new MockOperatorExec(container);
int schemaVersion = opExec.batchAccessor().schemaVersion();
// Be tidy: start at 1.
assertEquals(1, schemaVersion);
// Changing data does not trigger schema change
container.zeroVectors();
opExec.batchAccessor.addBatch(container);
assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
// Different container, same vectors, does not trigger a change
VectorContainer c2 = new VectorContainer(fixture.allocator());
for (VectorWrapper<?> vw : container) {
c2.add(vw.getValueVector());
}
c2.buildSchema(SelectionVectorMode.NONE);
opExec.batchAccessor.addBatch(c2);
assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
opExec.batchAccessor.addBatch(container);
assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
// Replacing a vector with another of the same type does trigger
// a change.
VectorContainer c3 = new VectorContainer(fixture.allocator());
c3.add(container.getValueVector(0).getValueVector());
c3.add(TypeHelper.getNewVector(
container.getValueVector(1).getValueVector().getField(),
fixture.allocator(), null));
c3.buildSchema(SelectionVectorMode.NONE);
opExec.batchAccessor.addBatch(c3);
assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
schemaVersion = opExec.batchAccessor().schemaVersion();
// No change if same schema again
opExec.batchAccessor.addBatch(c3);
assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
// Adding a vector triggers a change
MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL);
c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null));
c3.buildSchema(SelectionVectorMode.NONE);
opExec.batchAccessor.addBatch(c3);
assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
schemaVersion = opExec.batchAccessor().schemaVersion();
// No change if same schema again
opExec.batchAccessor.addBatch(c3);
assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());
// Removing a vector triggers a change
c3.remove(c3.getValueVector(2).getValueVector());
c3.buildSchema(SelectionVectorMode.NONE);
assertEquals(2, c3.getNumberOfColumns());
opExec.batchAccessor.addBatch(c3);
assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
schemaVersion = opExec.batchAccessor().schemaVersion();
// Clean up
opExec.close();
c2.clear();
c3.clear();
}
/**
* Test that an SV2 is properly handled by the proper container accessor.
*/
@Test
public void testSv2() {
TupleMetadata schema = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.buildSchema();
SingleRowSet rs = fixture.rowSetBuilder(schema)
.addRow(10, "fred")
.addRow(20, "wilma")
.withSv2()
.build();
IndirectContainerAccessor accessor = new IndirectContainerAccessor();
accessor.addBatch(rs.container());
accessor.setSelectionVector(rs.getSv2());
MockOperatorExec opExec = new MockOperatorExec(accessor);
opExec.nextCalls = 1;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertSame(rs.getSv2(), opBatch.getSelectionVector2());
} catch (Exception e) {
fail();
}
assertTrue(opExec.closeCalled);
// Must release SV2
rs.clear();
}
//-----------------------------------------------------------------------
// Exception error cases
//
// Assumes that any of the operator executor methods could throw an
// exception. A wise implementation will throw a user exception that the
// operator just passes along. A lazy implementation will throw any old
// unchecked exception. Validate both cases.
public static final String ERROR_MSG = "My Bad!";
/**
* Failure on the bind method.
*/
@Test
public void testWrappedExceptionOnBind() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public void bind(OperatorContext context) {
throw new IllegalStateException(ERROR_MSG);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
} catch (Throwable t) {
fail();
}
assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
assertFalse(opExec.closeCalled); // Same with close
}
@Test
public void testUserExceptionOnBind() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public void bind(OperatorContext context) {
throw UserException.connectionError()
.message(ERROR_MSG)
.build(logger);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
opBatch.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
} catch (Throwable t) {
fail();
}
assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
assertFalse(opExec.closeCalled); // Same with close
}
/**
* Failure when building the schema (first call to next()).
*/
@Test
public void testWrappedExceptionOnBuildSchema() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public boolean buildSchema() {
throw new IllegalStateException(ERROR_MSG);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
opBatch.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
} catch (Throwable t) {
fail();
}
assertTrue(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
@Test
public void testUserExceptionOnBuildSchema() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public boolean buildSchema() {
throw UserException.dataReadError()
.message(ERROR_MSG)
.build(logger);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
opBatch.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
} catch (Throwable t) {
fail();
}
assertTrue(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
/**
* Failure on the second or subsequent calls to next(), when actually
* fetching a record batch.
*/
@Test
public void testWrappedExceptionOnNext() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public boolean next() {
throw new IllegalStateException(ERROR_MSG);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
opBatch.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
} catch (Throwable t) {
fail();
}
assertTrue(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
@Test
public void testUserExceptionOnNext() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public boolean next() {
throw UserException.dataReadError()
.message(ERROR_MSG)
.build(logger);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
opBatch.next();
fail();
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
} catch (Throwable t) {
fail();
}
assertTrue(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
/**
* Failure when closing the operator implementation.
*/
@Test
public void testWrappedExceptionOnClose() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public void close() {
// Release memory
super.close();
// Then fail
throw new IllegalStateException(ERROR_MSG);
}
};
opExec.nextCalls = 1;
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.NONE, opBatch.next());
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertTrue(e.getCause() instanceof IllegalStateException);
} catch (Throwable t) {
fail();
}
assertFalse(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
@Test
public void testUserExceptionOnClose() {
MockOperatorExec opExec = new MockOperatorExec() {
@Override
public void close() {
// Release memory
super.close();
// Then fail
throw UserException.dataReadError()
.message(ERROR_MSG)
.build(logger);
}
};
try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
assertEquals(IterOutcome.OK, opBatch.next());
assertEquals(IterOutcome.NONE, opBatch.next());
} catch (UserException e) {
assertTrue(e.getMessage().contains(ERROR_MSG));
assertNull(e.getCause());
} catch (Throwable t) {
fail();
}
assertFalse(opExec.cancelCalled);
assertTrue(opExec.closeCalled);
}
}