blob: 27c198ce26fd091f0770238b93498ca4f6097254 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.TopN;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.physical.rowSet.HyperRowSetImpl;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(OperatorTest.class)
public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTopNEmitOutcome.class);
/**
* Verifies that if TopNBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
* empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
*/
@Test
public void testTopNEmptyBatchEmitOutcome() {
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.ASCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += topNBatch.getRecordCount();
assertTrue(topNBatch.next() == OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == EMIT);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(topNBatch.next() == NONE);
}
/**
* Verifies that if TopNBatch receives a RecordBatch with EMIT outcome post build schema phase then it produces
* output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
* outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
*/
@Test
public void testTopNNonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.addRow(2, 20, "item2")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
@Test
public void testTopNEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.addRow(2, 20, "item2")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
@Test
public void testTopNMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.addRow(2, 20, "item2")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += topNBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
/**
* Verifies that if TopNBatch receives multiple non-empty record batch with EMIT outcome in between then it produces
* output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
* buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
* record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
* produces output for it differently. The test validates that for each output received the order of the records are
* correct
*/
@Test
public void testTopNResetsAfterFirstEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(3, 30, "item3")
.addRow(2, 20, "item2")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
// State refresh happens and limit again works on new data batches
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(2, topNBatch.getRecordCount());
// verify results with baseline
RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet2.clear();
expectedRowSet1.clear();
}
/**
* Verifies TopNBatch correctness for the case where it receives non-empty batch in build schema phase followed by
* empty batchs with OK and EMIT outcomes.
*/
@Test
public void testTopN_NonEmptyFirst_EmptyOKEmitOutcome() {
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.NONE);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(0, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
// Release memory for row set
expectedRowSet1.clear();
}
/**
* Verifies that if TopNBatch receives multiple non-empty record batch with EMIT outcome in between then it produces
* output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
* buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
* record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
* produces output for it differently. The test validates that for each output received the order of the records are
* correct
*/
@Test
public void testTopNMultipleOutputBatchWithLowerLimits() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(4, 40, "item4")
.addRow(2, 20, "item2")
.addRow(5, 50, "item5")
.addRow(3, 30, "item3")
.build();
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(5, 50, "item5")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 1);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
// State refresh happens and limit again works on new data batches
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK);
assertEquals(1, topNBatch.getRecordCount());
// verify results with baseline
RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet2.clear();
expectedRowSet1.clear();
}
@Test
public void testTopNMultipleEMITOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// first limit evaluation
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
// After seeing EMIT limit will refresh it's state and again evaluate limit on next set of input batches
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(2, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
nonEmptyInputRowSet2.clear();
}
@Test
public void testTopNMultipleInputToSingleOutputBatch() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(1, 10, "item1")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 10);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(2, topNBatch.getRecordCount());
// Verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
nonEmptyInputRowSet2.clear();
}
@Test
public void testTopNMultipleInputToMultipleOutputBatch_LowerLimits() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(7, 70, "item7")
.addRow(3, 30, "item3")
.addRow(13, 130, "item13")
.build();
final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(17, 170, "item17")
.addRow(3, 30, "item3")
.addRow(13, 130, "item13")
.build();
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(13, 130, "item13")
.addRow(7, 70, "item7")
.build();
final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(17, 170, "item17")
.addRow(13, 130, "item13")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet3.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 2);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(2, topNBatch.getRecordCount());
RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, topNBatch.getRecordCount());
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(2, topNBatch.getRecordCount());
RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
nonEmptyInputRowSet2.clear();
nonEmptyInputRowSet3.clear();
expectedRowSet1.clear();
expectedRowSet2.clear();
}
/*****************************************************************************************
Tests for validating regular TopN behavior with no EMIT outcome
******************************************************************************************/
@Test
public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(7, 70, "item7")
.addRow(3, 30, "item3")
.addRow(13, 130, "item13")
.build();
final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(17, 170, "item17")
.addRow(23, 230, "item23")
.addRow(130, 1300, "item130")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(130, 1300, "item130")
.addRow(23, 230, "item23")
.addRow(17, 170, "item17")
.addRow(13, 130, "item13")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet3.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 4);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(4, topNBatch.getRecordCount());
RowSet actualRowSet = HyperRowSetImpl.fromContainer(topNBatch.getContainer(), topNBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
nonEmptyInputRowSet2.clear();
nonEmptyInputRowSet3.clear();
expectedRowSet.clear();
}
@Test
public void testRegularTopNWithEmptyDataSet() {
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 4);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
}
@Test
public void testRegularTopNWithEmptyDataSetAndNoneOutcome() {
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.NONE);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
final TopN topNConfig = new TopN(null,
Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
RelFieldCollation.NullDirection.FIRST)), false, 4);
final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
}
}