blob: 920a7ff9df1c945754c546f622d48097c9eeba3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import com.google.common.collect.Lists;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.exec.physical.rowSet.HyperRowSetImpl;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static junit.framework.TestCase.assertTrue;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.junit.Assert.assertEquals;
@Category(OperatorTest.class)
public class TestSortEmitOutcome extends BaseTestOpBatchEmitOutcome {
private ExternalSortBatch sortBatch;
private static ExternalSort sortPopConfig;
@BeforeClass
public static void defineOrdering() {
String columnToSort = inputSchema.column(0).getName();
FieldReference expr = FieldReference.getWithQuotedRef(columnToSort);
Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_ASC, expr, Order.Ordering.NULLS_FIRST);
sortPopConfig = new ExternalSort(null, Lists.newArrayList(ordering), false);
}
@After
public void closeOperator() {
if (sortBatch != null) {
sortBatch.close();
}
}
/**
* Verifies that if SortBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
* empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
*/
@Test
public void testSortEmptyBatchEmitOutcome() {
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += sortBatch.getRecordCount();
// Output for first empty EMIT batch
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(sortBatch.next() == NONE);
}
/**
* Verifies ExternalSortBatch handling of first non-empty batch with EMIT outcome post buildSchema phase. Expectation
* is that it will return 2 output batch for first EMIT incoming, first output batch with OK_NEW_SCHEMA followed by
* second output batch with EMIT outcome.
*/
@Test
public void testSortNonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(4, 40, "item4")
.addRow(13, 130, "item13")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
// Output batch 1 for first non-empty EMIT batch
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Output batch 2 for first non-empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
/**
* Verifies ExternalSortBatch behavior when it receives first incoming batch post buildSchema phase as empty batch
* with EMIT outcome followed by non-empty batch with EMIT outcome. Expectation is sort will handle the EMIT
* boundary correctly and produce 2 empty output batch for first EMIT outcome and 1 non-empty output batch for second
* EMIT outcome.
*/
@Test
public void testSortEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(4, 40, "item4")
.addRow(13, 130, "item13")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
// Output for first empty EMIT batch
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
// Output for second non-empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
/**
* Verifies ExternalSortBatch behavior with runs of empty batch with EMIT outcome followed by an non-empty batch
* with EMIT outcome.
*/
@Test
public void testSortMultipleEmptyBatchWithANonEmptyBatchEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(13, 130, "item13")
.addRow(4, 40, "item4")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(4, 40, "item4")
.addRow(13, 130, "item13")
.build();
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
// Output for first empty EMIT batch
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch.next() == EMIT);
// Output for 2nd empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
// Output for 3rd empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(0, outputRecordCount);
// Output for 4th non-empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
outputRecordCount += sortBatch.getRecordCount();
assertEquals(3, outputRecordCount);
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet.clear();
}
/**
* Verifies ExternalSortBatch behavior when it receives non-empty batch in BuildSchema phase followed by empty EMIT
* batch. Second record boundary has non-empty batch with OK outcome followed by empty EMIT outcome batch. In this
* case for first non-empty batch in buildSchema phase, sort should consider that data as part of first record
* boundary and produce it in output for that record boundary with EMIT outcome. Same is true for second pair of
* batches with OK and EMIT outcome
*/
@Test
public void testTopNResetsAfterFirstEmitOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
inputOutcomes.add(OK);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
// Output batch 1 for non-empty batch in BuildSchema phase and empty EMIT batch following it
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(1, sortBatch.getRecordCount());
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet);
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
// Output batch 2 for non-empty input batch with OK followed by empty EMIT batch
assertTrue(sortBatch.next() == EMIT);
assertEquals(2, sortBatch.getRecordCount());
// verify results
RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet2.clear();
expectedRowSet1.clear();
}
/**
* Verifies ExternalSortBatch behavior when it receives incoming batches with different IterOutcomes like
* OK_NEW_SCHEMA / OK / EMIT / NONE
*/
@Test
public void testSort_NonEmptyFirst_EmptyOKEmitOutcome() {
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(OK);
inputOutcomes.add(EMIT);
inputOutcomes.add(NONE);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(0, sortBatch.getRecordCount());
// Output batch 1 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(1, sortBatch.getRecordCount());
// verify results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Output batch 2 for first 3 input batches with OK_NEW_SCHEMA/OK/EMIT outcome
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
// Output batch for NONE outcome
assertTrue(sortBatch.next() == NONE);
// Release memory for row set
expectedRowSet.clear();
}
@Test
public void testTopNMultipleOutputBatch() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(4, 40, "item4")
.addRow(2, 20, "item2")
.addRow(5, 50, "item5")
.addRow(3, 30, "item3")
.build();
final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.addRow(4, 40, "item4")
.addRow(5, 50, "item5")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
inputOutcomes.add(OK);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
// Output batch 1 for first EMIT outcome
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(1, sortBatch.getRecordCount());
// verify results
RowSet actualRowSet1 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet1).verify(actualRowSet1);
// Output batch 2 for first EMIT outcome
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
// Output batch for OK outcome
assertTrue(sortBatch.next() == OK);
assertEquals(4, sortBatch.getRecordCount());
// verify results
RowSet actualRowSet2 = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet2).verify(actualRowSet2);
// Output batch for NONE outcome
assertTrue(sortBatch.next() == NONE);
// Release memory for row sets
nonEmptyInputRowSet2.clear();
expectedRowSet2.clear();
expectedRowSet1.clear();
}
@Test
public void testSortMultipleEMITOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
// Output batch 1 for first EMIT outcome
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(1, sortBatch.getRecordCount());
// Output batch 2 for first EMIT outcome
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
// Output batch for second EMIT outcome
assertTrue(sortBatch.next() == EMIT);
assertEquals(2, sortBatch.getRecordCount());
// Output batch for third EMIT outcome
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
nonEmptyInputRowSet2.clear();
}
/**
* Verifies ExternalSortBatch behavior when it receives multiple non-empty batch across same EMIT boundary such
* that all the output records can fit within single output batch. Then Sort correctly waits for the EMIT outcome
* before producing the output batches for all the buffered incoming batches with data.
*/
@Test
public void testSortMultipleInputToSingleOutputBatch() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.addRow(2, 20, "item2")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
// BuildSchema phase output
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
// Output batch 1 for the EMIT boundary
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(2, sortBatch.getRecordCount());
// Verify Results
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Output batch 2 for the EMIT boundary
assertTrue(sortBatch.next() == EMIT);
assertEquals(0, sortBatch.getRecordCount());
nonEmptyInputRowSet2.clear();
}
/**
* Verifies ExternalSortBatch behavior when it sees batches with EMIT outcome but has to spill to disk because of
* memory pressure. Expectation is currenlty spilling is not supported with EMIT outcome so while preparing the
* output batch that will be detected and Sort will throw UnsupportedOperationException
* @throws Exception
*/
@Test(expected = UnsupportedOperationException.class)
public void testSpillNotSupportedWithEmitOutcome() throws Exception {
final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
// Configuration that forces Sort to spill after buffering 2 incoming batches with data
builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
final OperatorFixture fixture_local = builder.build();
final RowSet.SingleRowSet local_EmptyInputRowSet = fixture_local.rowSetBuilder(inputSchema).build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
.addRow(3, 30, "item3")
.addRow(2, 20, "item2")
.build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
.addRow(4, 40, "item4")
.build();
inputContainer.add(local_EmptyInputRowSet.container());
inputContainer.add(local_nonEmptyInputRowSet1.container());
inputContainer.add(local_nonEmptyInputRowSet2.container());
inputContainer.add(local_nonEmptyInputRowSet3.container());
inputContainer.add(local_EmptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(EMIT);
final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
inputContainer, inputOutcomes, local_EmptyInputRowSet.container().getSchema());
final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
mockInputBatch);
assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
// Should throw the exception
sortBatch_local.next();
// Release memory for row sets
local_EmptyInputRowSet.clear();
local_nonEmptyInputRowSet1.clear();
local_nonEmptyInputRowSet2.clear();
local_nonEmptyInputRowSet3.clear();
sortBatch_local.close();
fixture_local.close();
}
/***************************************************************************************************************
* Test for validating ExternalSortBatch behavior without EMIT outcome
***************************************************************************************************************/
@Test
public void testTopN_WithEmptyNonEmptyBatchesAndOKOutcome() {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(7, 70, "item7")
.addRow(3, 30, "item3")
.addRow(13, 130, "item13")
.build();
final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(17, 170, "item17")
.addRow(23, 230, "item23")
.addRow(130, 1300, "item130")
.build();
final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.addRow(3, 30, "item3")
.addRow(7, 70, "item7")
.addRow(13, 130, "item13")
.addRow(17, 170, "item17")
.addRow(23, 230, "item23")
.addRow(130, 1300, "item130")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet3.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertEquals(7, sortBatch.getRecordCount());
assertTrue(sortBatch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
RowSet actualRowSet = HyperRowSetImpl.fromContainer(sortBatch.getContainer(), sortBatch.getSelectionVector4());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(sortBatch.next() == NONE);
nonEmptyInputRowSet2.clear();
nonEmptyInputRowSet3.clear();
expectedRowSet.clear();
}
@Test
public void testRegularTopNWithEmptyDataSet() {
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(OK_NEW_SCHEMA);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
sortBatch = new ExternalSortBatch(sortPopConfig, operatorFixture.getFragmentContext(), mockInputBatch);
assertTrue(sortBatch.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch.next() == NONE);
}
/**
* Verifies successful spilling in absence of EMIT outcome
* @throws Exception
*/
@Test
public void testSpillWithNoEmitOutcome() throws Exception {
final OperatorFixture.Builder builder = OperatorFixture.builder(dirTestWatcher);
// Configuration that forces Sort to spill after buffering 2 incoming batches with data
builder.configBuilder().put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 2);
final OperatorFixture fixture_local = builder.build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet1 = fixture_local.rowSetBuilder(inputSchema)
.addRow(3, 30, "item3")
.addRow(2, 20, "item2")
.build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet2 = fixture_local.rowSetBuilder(inputSchema)
.addRow(1, 10, "item1")
.build();
final RowSet.SingleRowSet local_nonEmptyInputRowSet3 = fixture_local.rowSetBuilder(inputSchema)
.addRow(4, 40, "item4")
.build();
inputContainer.add(local_nonEmptyInputRowSet1.container());
inputContainer.add(local_nonEmptyInputRowSet2.container());
inputContainer.add(local_nonEmptyInputRowSet3.container());
inputOutcomes.add(OK_NEW_SCHEMA);
inputOutcomes.add(OK);
inputOutcomes.add(OK);
final PhysicalOperator mockPopConfig_local = new MockStorePOP(null);
final OperatorContext opContext_local = fixture_local.getFragmentContext().newOperatorContext(mockPopConfig_local);
final MockRecordBatch mockInputBatch = new MockRecordBatch(fixture_local.getFragmentContext(), opContext_local,
inputContainer, inputOutcomes, local_nonEmptyInputRowSet1.container().getSchema());
final ExternalSortBatch sortBatch_local = new ExternalSortBatch(sortPopConfig, fixture_local.getFragmentContext(),
mockInputBatch);
assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch_local.next() == OK_NEW_SCHEMA);
assertTrue(sortBatch_local.getRecordCount() == 4);
assertTrue(sortBatch_local.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE);
assertTrue(sortBatch_local.next() == NONE);
// Release memory for row sets
local_nonEmptyInputRowSet1.clear();
local_nonEmptyInputRowSet2.clear();
local_nonEmptyInputRowSet3.clear();
sortBatch_local.close();
fixture_local.close();
}
}