blob: c93f5cbd304c74748b2a4af99999b6de653a6e6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.limit;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category(OperatorTest.class)
public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
/**
* Test to show empty batch with both OK_NEW_SCHEMA and EMIT outcome is not
* ignored by Limit and is pass through to the downstream operator.
*/
@Test
public void testLimitEmptyBatchEmitOutcome() throws Throwable {
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 1);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += limitBatch.getRecordCount();
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += limitBatch.getRecordCount();
assertEquals(0, outputRecordCount);
}
/**
* Test to validate limit considers all the data until it sees EMIT outcome
* and return output batch with data that meets the limit criteria.
*/
@Test
public void testLimitNonEmptyBatchEmitOutcome() throws Throwable {
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 1);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
outputRecordCount += limitBatch.getRecordCount();
assertEquals(0, outputRecordCount);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
outputRecordCount += limitBatch.getRecordCount();
assertEquals(1, outputRecordCount);
}
/**
* Test to show that once a limit number of records is produced using first
* set of batches then on getting a batch with EMIT outcome, the limit state
* is again refreshed and applied to next set of batches with data.
*/
@Test
public void testLimitResetsAfterFirstEmitOutcome() throws Throwable {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 1);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
// State refresh happens and limit again works on new data batches
assertEquals(0, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
assertEquals(1, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
}
/**
* Test to show that when the limit number of records is found with first
* incoming batch, then next empty incoming batch with OK outcome is ignored,
* but the empty EMIT outcome batch is not ignored. Empty incoming batch with
* EMIT outcome produces empty output batch with EMIT outcome.
*/
@Test
public void testLimitNonEmptyFirst_EmptyOKEmitOutcome() throws Throwable {
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.NONE);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 1);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, limitBatch.getRecordCount());
// OK will not be received since it's was accompanied with empty batch
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
}
/**
* Test to show that limit refreshes it's state after seeing first EMIT
* outcome and works on data batches following it as new set's of incoming
* batch and apply the limits rule from fresh on those. So for first set of
* batches with OK_NEW_SCHEMA and EMIT outcome but total number of records
* received being less than limit condition, it still produces an output with
* that many records (in this case 1 even though limit number of records is
* 2).
* <p>
* After seeing EMIT, it refreshes it's state and operate on next input
* batches to again return limit number of records. So for 3rd batch with 2
* records but with EMIT outcome it produces an output batch with 2 records
* not with 1 since state is refreshed.
*/
@Test
public void testMultipleLimitWithEMITOutcome() throws Throwable {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 2);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
// first limit evaluation
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, limitBatch.getRecordCount());
// After seeing EMIT limit will refresh it's state and again evaluate limit on next set of input batches
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(2, limitBatch.getRecordCount());
// Since limit is hit it will return NONE
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.NONE);
}
/**
* Test shows that limit operates on multiple input batches until it finds
* limit number of records or it sees an EMIT outcome to refresh it's state.
*/
@Test
public void testLimitNonEmptyFirst_NonEmptyOK_EmptyBatchEmitOutcome() throws Throwable {
final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
.addRow(2, 20, "item2")
.build();
inputContainer.add(nonEmptyInputRowSet.container());
inputContainer.add(emptyInputRowSet.container());
inputContainer.add(nonEmptyInputRowSet2.container());
inputContainer.add(emptyInputRowSet.container());
inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.OK);
inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
// Only set for this Test class
mockInputBatch.useUnnestKillHandlingForLimit(true);
final Limit limitConf = new Limit(null, 0, 2);
@SuppressWarnings("resource")
final LimitRecordBatch limitBatch = new LimitRecordBatch(limitConf, operatorFixture.getFragmentContext(),
mockInputBatch);
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertEquals(1, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.OK);
assertEquals(1, limitBatch.getRecordCount());
assertTrue(limitBatch.next() == RecordBatch.IterOutcome.EMIT);
assertEquals(0, limitBatch.getRecordCount());
nonEmptyInputRowSet2.clear();
}
}