blob: ac8c4f02d3abdc2471ca5b4f6639a3af064f2581 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort.managed;
import static org.apache.drill.test.rowSet.RowSetUtilities.objArray;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.xsort.managed.SortTestUtilities.CopierTester;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet.ExtendableRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.apache.drill.exec.physical.rowSet.RowSetWriter;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Light-weight sanity test of the copier class. The implementation has
* been used in production, so the tests here just check for the obvious
* cases.
* <p>
* Note, however, that if significant changes are made to the copier,
* then additional tests should be added to re-validate the code.
*/
@Category(OperatorTest.class)
public class TestCopier extends SubOperatorTest {
@Test
public void testEmptyInput() {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
List<BatchGroup> batches = new ArrayList<>();
Sort popConfig = SortTestUtilities.makeCopierConfig(Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED);
OperatorContext opContext = fixture.newOperatorContext(popConfig);
PriorityQueueCopierWrapper copier = new PriorityQueueCopierWrapper(opContext);
VectorContainer dest = new VectorContainer();
try {
// TODO: Create a vector allocator to pass as last parameter so
// that the test uses the same vector allocator as the production
// code. Only nuisance is that we don't have the required metadata
// readily at hand here...
copier.startMerge(new BatchSchema(BatchSchema.SelectionVectorMode.NONE, schema.toFieldList()),
batches, dest, 10, null);
fail();
} catch (AssertionError e) {
// Expected
} finally {
opContext.close();
}
}
@Test
public void testEmptyBatch() throws Exception {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
CopierTester tester = new CopierTester(fixture);
tester.addInput(fixture.rowSetBuilder(schema)
.withSv2()
.build());
tester.run();
}
@Test
public void testSingleRow() throws Exception {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
CopierTester tester = new CopierTester(fixture);
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(10, "10")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(10, "10")
.build());
tester.run();
}
@Test
public void testTwoBatchesSingleRow() throws Exception {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
CopierTester tester = new CopierTester(fixture);
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(10, "10")
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(20, "20")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(10, "10")
.addRow(20, "20")
.build());
tester.run();
}
public static SingleRowSet makeDataSet(TupleMetadata schema, int first, int step, int count) {
ExtendableRowSet rowSet = fixture.rowSet(schema);
RowSetWriter writer = rowSet.writer(count);
int value = first;
for (int i = 0; i < count; i++, value += step) {
RowSetUtilities.setFromInt(writer, 0, value);
writer.scalar(1).setString(Integer.toString(value));
writer.save();
}
writer.done();
return rowSet;
}
@Test
public void testMultipleOutput() throws Exception {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
CopierTester tester = new CopierTester(fixture);
tester.addInput(makeDataSet(schema, 0, 2, 10).toIndirect());
tester.addInput(makeDataSet(schema, 1, 2, 10).toIndirect());
tester.addOutput(makeDataSet(schema, 0, 1, 10));
tester.addOutput(makeDataSet(schema, 10, 1, 10));
tester.run();
}
// Also verifies that SV2s work
@Test
public void testMultipleOutputDesc() throws Exception {
TupleMetadata schema = SortTestUtilities.nonNullSchema();
CopierTester tester = new CopierTester(fixture);
tester.sortOrder = Ordering.ORDER_DESC;
tester.nullOrder = Ordering.NULLS_UNSPECIFIED;
SingleRowSet input = makeDataSet(schema, 0, 2, 10).toIndirect();
RowSetUtilities.reverse(input.getSv2());
tester.addInput(input);
input = makeDataSet(schema, 1, 2, 10).toIndirect();
RowSetUtilities.reverse(input.getSv2());
tester.addInput(input);
tester.addOutput(makeDataSet(schema, 19, -1, 10));
tester.addOutput(makeDataSet(schema, 9, -1, 10));
tester.run();
}
@Test
public void testAscNullsLast() throws Exception {
TupleMetadata schema = SortTestUtilities.nullableSchema();
CopierTester tester = new CopierTester(fixture);
tester.sortOrder = Ordering.ORDER_ASC;
tester.nullOrder = Ordering.NULLS_LAST;
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(1, "1")
.addRow(4, "4")
.addRow(null, "null")
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(2, "2")
.addRow(3, "3")
.addRow(null, "null")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(1, "1")
.addRow(2, "2")
.addRow(3, "3")
.addRow(4, "4")
.addRow(null, "null")
.addRow(null, "null")
.build());
tester.run();
}
@Test
public void testAscNullsFirst() throws Exception {
TupleMetadata schema = SortTestUtilities.nullableSchema();
CopierTester tester = new CopierTester(fixture);
tester.sortOrder = Ordering.ORDER_ASC;
tester.nullOrder = Ordering.NULLS_FIRST;
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(1, "1")
.addRow(4, "4")
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(2, "2")
.addRow(3, "3")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(null, "null")
.addRow(1, "1")
.addRow(2, "2")
.addRow(3, "3")
.addRow(4, "4")
.build());
tester.run();
}
@Test
public void testDescNullsLast() throws Exception {
TupleMetadata schema = SortTestUtilities.nullableSchema();
CopierTester tester = new CopierTester(fixture);
tester.sortOrder = Ordering.ORDER_DESC;
tester.nullOrder = Ordering.NULLS_LAST;
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(4, "4")
.addRow(1, "1")
.addRow(null, "null")
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(3, "3")
.addRow(2, "2")
.addRow(null, "null")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(4, "4")
.addRow(3, "3")
.addRow(2, "2")
.addRow(1, "1")
.addRow(null, "null")
.addRow(null, "null")
.build());
tester.run();
}
@Test
public void testDescNullsFirst() throws Exception {
TupleMetadata schema = SortTestUtilities.nullableSchema();
CopierTester tester = new CopierTester(fixture);
tester.sortOrder = Ordering.ORDER_DESC;
tester.nullOrder = Ordering.NULLS_FIRST;
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(4, "4")
.addRow(1, "1")
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(3, "3")
.addRow(2, "2")
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(null, "null")
.addRow(null, "null")
.addRow(4, "4")
.addRow(3, "3")
.addRow(2, "2")
.addRow(1, "1")
.build());
tester.run();
}
public static void runTypeTest(OperatorFixture fixture, MinorType type) throws Exception {
TupleMetadata schema = SortTestUtilities.makeSchema(type, false);
CopierTester tester = new CopierTester(fixture);
tester.addInput(makeDataSet(schema, 0, 2, 5).toIndirect());
tester.addInput(makeDataSet(schema, 1, 2, 5).toIndirect());
tester.addOutput(makeDataSet(schema, 0, 1, 10));
tester.run();
}
@Test
public void testTypes() throws Exception {
testAllTypes(fixture);
}
public static void testAllTypes(OperatorFixture fixture) throws Exception {
runTypeTest(fixture, MinorType.INT);
runTypeTest(fixture, MinorType.BIGINT);
runTypeTest(fixture, MinorType.FLOAT4);
runTypeTest(fixture, MinorType.FLOAT8);
runTypeTest(fixture, MinorType.VARDECIMAL);
runTypeTest(fixture, MinorType.VARCHAR);
runTypeTest(fixture, MinorType.VARBINARY);
runTypeTest(fixture, MinorType.DATE);
runTypeTest(fixture, MinorType.TIME);
runTypeTest(fixture, MinorType.TIMESTAMP);
runTypeTest(fixture, MinorType.INTERVAL);
runTypeTest(fixture, MinorType.INTERVALDAY);
runTypeTest(fixture, MinorType.INTERVALYEAR);
// Others not tested. See DRILL-5329
}
@Test
public void testMapType() throws Exception {
testMapType(fixture);
}
public void testMapType(OperatorFixture fixture) throws Exception {
TupleMetadata schema = new SchemaBuilder()
.add("key", MinorType.INT)
.addMap("m1")
.add("b", MinorType.INT)
.addMap("m2")
.add("c", MinorType.INT)
.resumeMap()
.resumeSchema()
.buildSchema();
CopierTester tester = new CopierTester(fixture);
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(1, objArray(10, objArray(100)))
.addRow(5, objArray(50, objArray(500)))
.withSv2()
.build());
tester.addInput(fixture.rowSetBuilder(schema)
.addRow(2, objArray(20, objArray(200)))
.addRow(6, objArray(60, objArray(600)))
.withSv2()
.build());
tester.addOutput(fixture.rowSetBuilder(schema)
.addRow(1, objArray(10, objArray(100)))
.addRow(2, objArray(20, objArray(200)))
.addRow(5, objArray(50, objArray(500)))
.addRow(6, objArray(60, objArray(600)))
.build());
tester.run();
}
}