blob: 8404dc42bada395ab64c36f57749ec5de9a1ed29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.resultSet.ResultSetCopier;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.exec.physical.rowSet.RowSets;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector2Builder;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
public class TestResultSetCopier extends SubOperatorTest {
private static final TupleMetadata TEST_SCHEMA =
new SchemaBuilder()
.add("id", MinorType.INT)
.add("name", MinorType.VARCHAR)
.build();
private static abstract class BaseDataGen implements UpstreamSource {
protected int schemaVersion = 1;
protected final ResultSetLoader rsLoader;
protected VectorContainer batch;
protected int batchCount;
protected int rowCount;
protected int batchSize;
protected int batchLimit;
public BaseDataGen(TupleMetadata schema, int batchSize, int batchLimit) {
ResultSetOptions options = new ResultSetOptionBuilder()
.readerSchema(schema)
.vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
.build();
rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
this.batchSize = batchSize;
this.batchLimit = batchLimit;
}
@Override
public int schemaVersion() { return schemaVersion; }
@Override
public VectorContainer batch() { return batch; }
@Override
public boolean next() {
if (batchCount >= batchLimit) {
return false;
}
makeBatch();
return true;
}
protected abstract void makeBatch();
@Override
public SelectionVector2 sv2() { return null; }
@Override
public void release() {
if (batch != null) {
batch.zeroVectors();
}
SelectionVector2 sv2 = sv2();
if (sv2 != null) {
sv2.clear();
}
}
}
private static class DataGen extends BaseDataGen {
public DataGen() {
this(3, 1);
}
public DataGen(int batchSize, int batchLimit) {
super(TEST_SCHEMA, batchSize, batchLimit);
}
@Override
protected void makeBatch() {
rsLoader.startBatch();
for (int i = 0; i < batchSize; i++) {
rowCount++;
rsLoader.writer().addRow(rowCount, "Row " + rowCount);
}
batch = rsLoader.harvest();
batchCount++;
}
}
public static class SchemaChangeGen extends DataGen {
int schema1Limit;
public SchemaChangeGen(int batchSize, int batchLimit, int schema1Limit) {
super(batchSize, batchLimit);
this.schema1Limit = schema1Limit;
}
public SchemaChangeGen(int schema1Limit) {
super(3, 3);
this.schema1Limit = schema1Limit;
}
public SchemaChangeGen() {
this(2);
}
public TupleMetadata schema2() {
return new SchemaBuilder()
.add("id", MinorType.INT)
.add("name", MinorType.VARCHAR)
.add("amount", MinorType.INT)
.build();
}
@Override
protected void makeBatch() {
if (batchCount < schema1Limit) {
super.makeBatch();
} else if (batchCount == schema1Limit) {
evolveSchema();
makeBatch2();
} else {
makeBatch2();
}
}
public void makeBatch2() {
rsLoader.startBatch();
for (int i = 0; i < batchSize; i++) {
rowCount++;
rsLoader.writer().addRow(rowCount, "Row " + rowCount, rowCount * 10);
}
batch = rsLoader.harvest();
batchCount++;
}
public void evolveSchema() {
rsLoader.writer().addColumn(MetadataUtils.newScalar("amount", MinorType.INT, DataMode.REQUIRED));
schemaVersion = 2;
}
}
private static class NullableGen extends BaseDataGen {
public NullableGen() {
super(new SchemaBuilder()
.add("id", MinorType.INT)
.addNullable("name", MinorType.VARCHAR)
.addNullable("amount", MinorType.INT)
.build(),
10, 1);
}
@Override
protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
for (int i = 0; i < batchSize; i++) {
rowCount++;
writer.start();
writer.scalar(0).setInt(rowCount);
if (i % 2 == 0) {
writer.scalar(1).setString("Row " + rowCount);
}
if (i % 3 == 0) {
writer.scalar(2).setInt(rowCount * 10);
}
writer.save();
}
batch = rsLoader.harvest();
batchCount++;
}
}
private static class ArrayGen extends BaseDataGen {
public ArrayGen() {
super(new SchemaBuilder()
.add("id", MinorType.INT)
.addArray("name", MinorType.VARCHAR)
.build(),
3, 1);
}
@Override
protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
ArrayWriter aw = writer.array(1);
for (int i = 0; i < batchSize; i++) {
rowCount++;
writer.start();
writer.scalar(0).setInt(rowCount);
int n = i % 3;
for (int j = 0; j < n; j++) {
aw.scalar().setString("Row " + rowCount + "." + j);
}
writer.save();
}
batch = rsLoader.harvest();
batchCount++;
}
}
private static class MapGen extends BaseDataGen {
public MapGen() {
super(new SchemaBuilder()
.add("id", MinorType.INT)
.addMapArray("map")
.add("name", MinorType.VARCHAR)
.add("amount", MinorType.INT)
.resumeSchema()
.build(),
3, 1);
}
@Override
protected void makeBatch() {
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
ArrayWriter aw = writer.array(1);
TupleWriter mw = aw.entry().tuple();
for (int i = 0; i < batchSize; i++) {
rowCount++;
writer.start();
writer.scalar(0).setInt(rowCount);
int n = i % 3;
for (int j = 0; j < n; j++) {
mw.scalar(0).setString("Row " + rowCount + "." + j);
mw.scalar(1).setInt(rowCount * 100 + j);
aw.save();
}
writer.save();
}
batch = rsLoader.harvest();
batchCount++;
}
}
public static class FilteredGen extends DataGen {
SelectionVector2 sv2;
public FilteredGen() {
super(10, 1);
}
@Override
protected void makeBatch() {
super.makeBatch();
makeSv2();
}
// Pick out every other record, in descending
// order.
private void makeSv2() {
SelectionVector2Builder sv2Builder =
new SelectionVector2Builder(fixture.allocator(), batch.getRecordCount());
for (int i = 0; i < 5; i++) {
sv2Builder.setNext(10 - 2 * i - 1);
}
sv2 = sv2Builder.harvest(batch);
batch.buildSchema(SelectionVectorMode.TWO_BYTE);
}
@Override
public SelectionVector2 sv2() { return sv2; }
}
private ResultSetCopierImpl newCopier(UpstreamSource source) {
PullResultSetReader reader = new PullResultSetReaderImpl(source);
return new ResultSetCopierImpl(fixture.allocator(), reader);
}
private ResultSetCopierImpl newCopier(UpstreamSource source, ResultSetOptionBuilder outputOptions) {
PullResultSetReader reader = new PullResultSetReaderImpl(source);
return new ResultSetCopierImpl(fixture.allocator(), reader, outputOptions);
}
@Test
public void testBasics() {
DataGen dataGen = new DataGen();
ResultSetCopier copier = newCopier(dataGen);
// Nothing should work yet
try {
copier.copyAllRows();
fail();
} catch (IllegalStateException e) {
// Expected
}
try {
copier.harvest();
fail();
} catch (IllegalStateException e) {
// Expected
}
// Predicates should work
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Define a schema and start an output batch.
copier.startOutputBatch();
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Provide an input batch
assertTrue(copier.nextInputBatch());
assertFalse(copier.isCopyPending());
assertFalse(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Now can do some actual copying
while (copier.copyNextRow()) {
// empty
}
assertFalse(copier.isCopyPending());
assertTrue(copier.hasOutputRows());
assertFalse(copier.isOutputFull());
// Get and verify the output batch
// (Does not free the input batch, we reuse it
// in the verify step below.)
RowSet result = fixture.wrap(copier.harvest());
new RowSetComparison(fixture.wrap(dataGen.batch()))
.verifyAndClear(result);
// No more input
copier.startOutputBatch();
assertFalse(copier.nextInputBatch());
// OK to try multiple times
assertFalse(copier.nextInputBatch());
// Copier will release the input batch
copier.close();
}
@Test
public void testImmediateClose() {
ResultSetCopier copier = newCopier(new DataGen());
// Close OK before things got started
copier.close();
// Second close is benign
copier.close();
}
@Test
public void testCloseBeforeSchema() {
ResultSetCopier copier = newCopier(new DataGen());
// Start batch, no data yet.
copier.startOutputBatch();
// Close OK before things data arrives
copier.close();
// Second close is benign
copier.close();
}
@Test
public void testCloseWithData() {
ResultSetCopier copier = newCopier(new DataGen());
// Start batch, with data.
copier.startOutputBatch();
copier.nextInputBatch();
copier.copyNextRow();
// Close OK with input and output batch allocated.
copier.close();
// Second close is benign
copier.close();
}
/**
* Test merging multiple batches from the same input
* source; all batches share the same vectors, hence
* implicitly the same schema.
* <p>
* This copier does not support merging from multiple
* streams.
*/
@Test
public void testMerge() {
ResultSetCopier copier = newCopier(new DataGen(3, 5));
copier.startOutputBatch();
for (int i = 0; i < 5; i++) {
assertTrue(copier.nextInputBatch());
assertFalse(copier.isOutputFull());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
assertFalse(copier.isCopyPending());
}
assertFalse(copier.nextInputBatch());
RowSet result = fixture.wrap(copier.harvest());
// Verify with single batch with all rows
DataGen dataGen = new DataGen(15, 1);
dataGen.next();
RowSet expected = RowSets.wrap(dataGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
}
@Test
public void testMultiOutput() {
// Equivalent of operator start() method.
DataGen dataGen = new DataGen(15, 2);
ResultSetOptionBuilder options = new ResultSetOptionBuilder()
.rowCountLimit(12);
ResultSetCopier copier = newCopier(dataGen, options);
// Equivalent of an entire operator run
DataGen validatorGen = new DataGen(12, 2);
int outputCount = 0;
while (true) {
// Equivalent of operator next() method
copier.startOutputBatch();
while (! copier.isOutputFull()) {
if (!copier.nextInputBatch()) {
break;
}
copier.copyAllRows();
}
if (!copier.hasOutputRows()) {
break;
}
// Equivalent of sending downstream
RowSet result = fixture.wrap(copier.harvest());
validatorGen.next();
RowSet expected = RowSets.wrap(validatorGen.batch());
RowSetUtilities.verify(expected, result, result.rowCount());
outputCount++;
}
// Ensure more than one output batch.
assertTrue(outputCount > 1);
// Ensure all rows generated.
assertEquals(30, dataGen.rowCount);
// Simulate operator close();
copier.close();
}
@Test
public void testCopyRecord() {
ResultSetCopier copier = newCopier(new DataGen(3, 2));
copier.startOutputBatch();
copier.nextInputBatch();
copier.copyRow(2);
copier.copyRow(0);
copier.copyRow(1);
copier.nextInputBatch();
copier.copyRow(1);
copier.copyRow(0);
copier.copyRow(2);
assertFalse(copier.nextInputBatch());
RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
.addRow(3, "Row 3")
.addRow(1, "Row 1")
.addRow(2, "Row 2")
.addRow(5, "Row 5")
.addRow(4, "Row 4")
.addRow(6, "Row 6")
.build();
RowSetUtilities.verify(expected, fixture.wrap(copier.harvest()));
copier.close();
}
@Test
public void testSchemaChange() {
ResultSetCopier copier = newCopier(new SchemaChangeGen(3, 4, 2));
// Copy first batch with first schema
copier.startOutputBatch();
assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
// Second, same schema
assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
// Plenty of room. But, change the schema.
assertTrue(copier.nextInputBatch());
assertTrue(copier.isOutputFull());
// Must harvest partial output
RowSet result = fixture.wrap(copier.harvest());
SchemaChangeGen verifierGen = new SchemaChangeGen(6, 2, 1);
verifierGen.next();
RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
// Start a new batch, implicitly complete pending copy
copier.startOutputBatch();
copier.copyAllRows();
// Add one more of second schema
assertTrue(copier.nextInputBatch());
copier.copyAllRows();
assertFalse(copier.isOutputFull());
result = fixture.wrap(copier.harvest());
verifierGen.next();
expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
assertFalse(copier.isCopyPending());
copier.close();
}
// TODO: Test with two consecutive schema changes in
// same input batch: once with rows pending, another without.
@Test
public void testSV2() {
ResultSetCopier copier = newCopier(new FilteredGen());
copier.startOutputBatch();
assertTrue(copier.nextInputBatch());
copier.copyAllRows();
RowSet expected = new RowSetBuilder(fixture.allocator(), TEST_SCHEMA)
.addRow(10, "Row 10")
.addRow(8, "Row 8")
.addRow(6, "Row 6")
.addRow(4, "Row 4")
.addRow(2, "Row 2")
.build();
RowSet result = fixture.wrap(copier.harvest());
RowSetUtilities.verify(expected, result);
copier.close();
}
@Test
public void testSV4() {
// TODO
}
@Test
public void testNullable() {
ResultSetCopier copier = newCopier(new NullableGen());
copier.startOutputBatch();
copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
NullableGen verifierGen = new NullableGen();
verifierGen.next();
RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
}
@Test
public void testArrays() {
ResultSetCopier copier = newCopier(new ArrayGen());
copier.startOutputBatch();
copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
ArrayGen verifierGen = new ArrayGen();
verifierGen.next();
RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
}
@Test
public void testMaps() {
ResultSetCopier copier = newCopier(new MapGen());
copier.startOutputBatch();
copier.nextInputBatch();
copier.copyAllRows();
RowSet result = fixture.wrap(copier.harvest());
MapGen verifierGen = new MapGen();
verifierGen.next();
RowSet expected = RowSets.wrap(verifierGen.batch());
RowSetUtilities.verify(expected, result);
copier.close();
}
@Test
public void testUnions() {
// TODO
}
@Test
public void testOverflow() {
// TODO
}
}