blob: cada9b7790b549ac181d8e563cb2a6fed4f018cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record.vector;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.drill.categories.VectorTest;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetOptionBuilder;
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.BatchSchemaBuilder;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
@Category(VectorTest.class)
public class TestLoad extends ExecTest {
private final DrillConfig drillConfig = DrillConfig.create();
@Test
public void testLoadValueVector() throws Exception {
final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig);
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("ints", MinorType.INT)
.add("chars", MinorType.VARCHAR)
.addNullable("chars2", MinorType.VARCHAR);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
// Create vectors
final List<ValueVector> vectors = createVectors(allocator, schema, 100);
// Writeable batch now owns vector buffers
final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false);
// Serialize the vectors
final DrillBuf byteBuf = serializeBatch(allocator, writableBatch);
// Batch loader does NOT take ownership of the serialized buffer
final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
batchLoader.load(writableBatch.getDef(), byteBuf);
// Release the serialized buffer.
byteBuf.release();
// TODO: Do actual validation
assertEquals(100, batchLoader.getRecordCount());
// Free the original vectors
writableBatch.clear();
// Free the deserialized vectors
batchLoader.clear();
// The allocator will verify that the frees were done correctly.
allocator.close();
}
@Test
public void testLoadValueVectorEmptyVarCharArray() throws Exception {
try (BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig)) {
TupleMetadata schema = new SchemaBuilder()
.addArray("chars", MinorType.VARCHAR)
.build();
ResultSetLoaderImpl.ResultSetOptions options = new ResultSetOptionBuilder()
.readerSchema(schema)
.build();
ResultSetLoader resultSetLoader = new ResultSetLoaderImpl(allocator, options);
resultSetLoader.startBatch();
RowSetLoader rowWriter = resultSetLoader.writer();
rowWriter.addRow(new Object[]{null});
VectorContainer harvest = resultSetLoader.harvest();
// Create vectors
List<ValueVector> vectors = StreamSupport.stream(harvest.spliterator(), false)
.map(VectorWrapper::getValueVector)
.collect(Collectors.toList());
// Writeable batch now owns vector buffers
WritableBatch writableBatch = WritableBatch.getBatchNoHV(1, vectors, false);
// Serialize the vectors
DrillBuf byteBuf = serializeBatch(allocator, writableBatch);
// Batch loader does NOT take ownership of the serialized buffer
RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
batchLoader.load(writableBatch.getDef(), byteBuf);
// Release the serialized buffer.
byteBuf.release();
assertEquals(1, batchLoader.getRecordCount());
// Free the original vectors
writableBatch.clear();
// Free the deserialized vectors
batchLoader.clear();
}
}
// TODO: Replace this low-level code with RowSet usage once
// DRILL-5657 is committed to master.
private static List<ValueVector> createVectors(BufferAllocator allocator, BatchSchema schema, int i) {
final List<ValueVector> vectors = new ArrayList<>();
for (MaterializedField field : schema) {
ValueVector v = TypeHelper.getNewVector(field, allocator);
AllocationHelper.allocate(v, 100, 50);
v.getMutator().generateTestData(100);
vectors.add(v);
}
return vectors;
}
static DrillBuf serializeBatch(BufferAllocator allocator, WritableBatch writableBatch) {
final ByteBuf[] byteBufs = writableBatch.getBuffers();
int bytes = 0;
for (ByteBuf buf : byteBufs) {
bytes += buf.writerIndex();
}
final DrillBuf byteBuf = allocator.buffer(bytes);
int index = 0;
for (ByteBuf buf : byteBufs) {
buf.readBytes(byteBuf, index, buf.writerIndex());
index += buf.writerIndex();
}
byteBuf.writerIndex(bytes);
return byteBuf;
}
/**
* Test function to simulate loading a batch.
*
* @param allocator a memory allocator
* @param batchLoader the batch loader under test
* @param schema the schema of the new batch
* @return false if the same schema, true if schema changed;
* that is, whether the schema changed
* @throws SchemaChangeException should not occur
*/
private boolean loadBatch(BufferAllocator allocator,
final RecordBatchLoader batchLoader,
BatchSchema schema) throws SchemaChangeException {
final List<ValueVector> vectors = createVectors(allocator, schema, 100);
final WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false);
final DrillBuf byteBuf = serializeBatch(allocator, writableBatch);
boolean result = batchLoader.load(writableBatch.getDef(), byteBuf);
byteBuf.release();
writableBatch.clear();
return result;
}
@Test
public void testSchemaChange() throws SchemaChangeException {
final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig);
final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
// Initial schema: a: INT, b: VARCHAR
// Schema change: N/A
SchemaBuilder schemaBuilder1 = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR);
BatchSchema schema1 = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder1)
.build();
{
assertTrue(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Same schema
// Schema change: No
{
assertFalse(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Reverse columns: b: VARCHAR, a: INT
// Schema change: No
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("b", MinorType.VARCHAR)
.add("a", MinorType.INT);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertFalse(loadBatch(allocator, batchLoader, schema));
// Potential bug: see DRILL-5828
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Drop a column: a: INT
// Schema change: Yes
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Add a column: a: INT, b: VARCHAR, c: INT
// Schema change: Yes
{
assertTrue(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.add("c", MinorType.INT);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Change a column type: a: INT, b: VARCHAR, c: VARCHAR
// Schema change: Yes
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.add("b", MinorType.VARCHAR)
.add("c", MinorType.VARCHAR);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Empty schema
// Schema change: Yes
{
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(new SchemaBuilder())
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
batchLoader.clear();
allocator.close();
}
@Test
public void testMapSchemaChange() throws SchemaChangeException {
final BufferAllocator allocator = RootAllocatorFactory.newRoot(drillConfig);
final RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
// Initial schema: a: INT, m: MAP{}
SchemaBuilder schemaBuilder1 = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.resumeSchema();
BatchSchema schema1 = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder1)
.build();
{
assertTrue(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Same schema
// Schema change: No
{
assertFalse(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Add column to map: a: INT, m: MAP{b: VARCHAR}
// Schema change: Yes
SchemaBuilder schemaBuilder2 = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.VARCHAR)
.resumeSchema();
BatchSchema schema2 = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder2)
.build();
{
assertTrue(loadBatch(allocator, batchLoader, schema2));
assertTrue(schema2.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Same schema
// Schema change: No
{
assertFalse(loadBatch(allocator, batchLoader, schema2));
assertTrue(schema2.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Add column: a: INT, m: MAP{b: VARCHAR, c: INT}
// Schema change: Yes
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.VARCHAR)
.add("c", MinorType.INT)
.resumeSchema();
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Drop a column: a: INT, m: MAP{b: VARCHAR}
// Schema change: Yes
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.VARCHAR)
.resumeSchema();
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Change type: a: INT, m: MAP{b: INT}
// Schema change: Yes
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT)
.addMap("m")
.add("b", MinorType.INT)
.resumeSchema();
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Empty map: a: INT, m: MAP{}
{
assertTrue(loadBatch(allocator, batchLoader, schema1));
assertTrue(schema1.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
// Drop map: a: INT
{
SchemaBuilder schemaBuilder = new SchemaBuilder()
.add("a", MinorType.INT);
BatchSchema schema = new BatchSchemaBuilder()
.withSchemaBuilder(schemaBuilder)
.build();
assertTrue(loadBatch(allocator, batchLoader, schema));
assertTrue(schema.isEquivalent(batchLoader.getSchema()));
batchLoader.getContainer().zeroVectors();
}
batchLoader.clear();
allocator.close();
}
}