blob: 3882cbd0498c20c9607217a1916fa01691bf1f3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.resultSet.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.resultSet.PullResultSetReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.selection.SelectionVector2;
import com.google.common.base.Preconditions;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
public class TestResultSetReader extends SubOperatorTest {
private static final TupleMetadata SCHEMA1 = new SchemaBuilder()
.add("id", MinorType.INT)
.add("name", MinorType.VARCHAR)
.build();
private static final TupleMetadata SCHEMA2 = new SchemaBuilder()
.addAll(SCHEMA1)
.add("amount", MinorType.INT)
.build();
public static class BatchGenerator implements UpstreamSource {
private enum State { SCHEMA1, SCHEMA2 };
private final ResultSetLoader rsLoader;
private VectorContainer batch;
private int schemaVersion;
private State state;
private int batchCount;
private int rowCount;
private final int schema1Count;
private final int schema2Count;
private final int batchSize;
public BatchGenerator(int batchSize, int schema1Count, int schema2Count) {
ResultSetOptions options = new ResultSetOptionBuilder()
.readerSchema(SCHEMA1)
.vectorCache(new ResultVectorCacheImpl(fixture.allocator()))
.build();
this.rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
this.state = State.SCHEMA1;
this.batchSize = batchSize;
this.schemaVersion = 1;
this.schema1Count = schema1Count;
this.schema2Count = schema2Count;
}
public void batch1() {
Preconditions.checkState(state == State.SCHEMA1);
rsLoader.startBatch();
RowSetLoader writer = rsLoader.writer();
for (int i = 0; i < batchSize; i++) {
rowCount++;
writer.start();
writer.scalar("id").setInt(rowCount);
writer.scalar("name").setString("Row" + rowCount);
writer.save();
}
batch = rsLoader.harvest();
batchCount++;
}
public void batch2() {
RowSetLoader writer = rsLoader.writer();
if (state == State.SCHEMA1) {
writer.addColumn(SCHEMA2.metadata("amount"));
state = State.SCHEMA2;
schemaVersion++;
}
rsLoader.startBatch();
for (int i = 0; i < batchSize; i++) {
rowCount++;
writer.start();
writer.scalar("id").setInt(rowCount);
writer.scalar("name").setString("Row" + rowCount);
writer.scalar("amount").setInt(rowCount * 10);
writer.save();
}
batch = rsLoader.harvest();
batchCount++;
}
public void close() {
rsLoader.close();
}
@Override
public boolean next() {
if (batchCount == schema1Count + schema2Count) {
return false;
}
if (batchCount < schema1Count) {
batch1();
} else {
batch2();
}
return true;
}
@Override
public int schemaVersion() { return schemaVersion; }
@Override
public VectorContainer batch() { return batch; }
@Override
public SelectionVector2 sv2() { return null; }
@Override
public void release() {
if (batch != null) {
batch.zeroVectors();
}
}
}
@Test
public void testBasics() {
PullResultSetReader rsReader = new PullResultSetReaderImpl(
new BatchGenerator(10, 2, 1));
// Start state
try {
rsReader.reader();
fail();
} catch (IllegalStateException e) {
// Expected
}
// Ask for schema. Does an implicit next.
assertEquals(SCHEMA1, rsReader.schema());
assertEquals(1, rsReader.schemaVersion());
// Move to the first batch.
// (Don't need to do a full reader test, that is already done
// elsewhere.)
assertTrue(rsReader.next());
assertEquals(1, rsReader.schemaVersion());
RowSetReader reader1;
{
RowSetReader reader = rsReader.reader();
reader1 = reader;
assertTrue(reader.next());
assertEquals(1, reader.scalar("id").getInt());
assertEquals("Row1", reader.scalar("name").getString());
}
// Second batch, same schema.
assertTrue(rsReader.next());
assertEquals(1, rsReader.schemaVersion());
{
RowSetReader reader = rsReader.reader();
assertSame(reader1, reader);
reader1 = reader;
assertTrue(reader.next());
assertEquals(11, reader.scalar("id").getInt());
assertEquals("Row11", reader.scalar("name").getString());
}
// Batch with new schema
assertTrue(rsReader.next());
assertEquals(2, rsReader.schemaVersion());
{
RowSetReader reader = rsReader.reader();
assertNotSame(reader1, reader);
reader1 = reader;
assertTrue(reader.next());
assertEquals(21, reader.scalar("id").getInt());
assertEquals("Row21", reader.scalar("name").getString());
assertEquals(210, reader.scalar("amount").getInt());
}
assertFalse(rsReader.next());
rsReader.close();
}
@Test
public void testCloseAtStart() {
PullResultSetReader rsReader = new PullResultSetReaderImpl(
new BatchGenerator(10, 2, 1));
// Close OK in start state
rsReader.close();
// Second close OK
rsReader.close();
}
@Test
public void testCloseDuringRead() {
PullResultSetReader rsReader = new PullResultSetReaderImpl(
new BatchGenerator(10, 2, 1));
// Move to first batch
assertTrue(rsReader.next());
// Close OK in start state
rsReader.close();
// Second close OK
rsReader.close();
}
@Test
public void testCloseAfterNext() {
PullResultSetReader rsReader = new PullResultSetReaderImpl(
new BatchGenerator(10, 2, 1));
// Move to first batch
assertTrue(rsReader.next());
// Close OK in start state
rsReader.close();
// Second close OK
rsReader.close();
}
}