blob: 761f0e69d4da8cd3d722b90fdc062e9843e9f4f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertTrue;
public class TestLateralJoinCorrectnessBatchProcessing extends SubOperatorTest {
// Operator Context for mock batch
private static OperatorContext operatorContext;
// Left Batch Schema
private static TupleMetadata leftSchema;
// Right Batch Schema
private static TupleMetadata rightSchema;
// Right Batch Schema
private static TupleMetadata expectedSchema;
// Right Batch Schema
private static TupleMetadata expectedSchemaLeftJoin;
// Empty left RowSet
private static RowSet.SingleRowSet emptyLeftRowSet;
// Non-Empty left RowSet
private static RowSet.SingleRowSet nonEmptyLeftRowSet;
// List of left incoming containers
private static final List<VectorContainer> leftContainer = new ArrayList<>(5);
// List of left IterOutcomes
private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList<>(5);
// Empty right RowSet
private static RowSet.SingleRowSet emptyRightRowSet;
// Non-Empty right RowSet
private static RowSet.SingleRowSet nonEmptyRightRowSet;
// List of right incoming containers
private static final List<VectorContainer> rightContainer = new ArrayList<>(5);
// List of right IterOutcomes
private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList<>(5);
// Lateral Join POP Config
private static LateralJoinPOP ljPopConfig;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
PhysicalOperator mockPopConfig = new MockStorePOP(null);
operatorContext = fixture.newOperatorContext(mockPopConfig);
ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
leftSchema = new SchemaBuilder()
.add("id_left", TypeProtos.MinorType.INT)
.add("cost_left", TypeProtos.MinorType.INT)
.add("name_left", TypeProtos.MinorType.VARCHAR)
.buildSchema();
emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();
rightSchema = new SchemaBuilder()
.add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
.add("id_right", TypeProtos.MinorType.INT)
.add("cost_right", TypeProtos.MinorType.INT)
.add("name_right", TypeProtos.MinorType.VARCHAR)
.buildSchema();
emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
expectedSchema = new SchemaBuilder()
.add("id_left", TypeProtos.MinorType.INT)
.add("cost_left", TypeProtos.MinorType.INT)
.add("name_left", TypeProtos.MinorType.VARCHAR)
.add("id_right", TypeProtos.MinorType.INT)
.add("cost_right", TypeProtos.MinorType.INT)
.add("name_right", TypeProtos.MinorType.VARCHAR)
.buildSchema();
expectedSchemaLeftJoin = new SchemaBuilder()
.add("id_left", TypeProtos.MinorType.INT)
.add("cost_left", TypeProtos.MinorType.INT)
.add("name_left", TypeProtos.MinorType.VARCHAR)
.add("id_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
.add("cost_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
.add("name_right", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
.buildSchema();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
operatorContext.close();
emptyLeftRowSet.clear();
emptyRightRowSet.clear();
}
@Before
public void beforeTest() throws Exception {
nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema)
.addRow(1, 10, "item1")
.addRow(2, 20, "item2")
.addRow(3, 30, "item3")
.addRow(4, 40, "item4")
.build();
nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.addRow(3, 33, 330, "item33")
.addRow(4, 44, 440, "item44")
.build();
}
@After
public void afterTest() throws Exception {
nonEmptyLeftRowSet.clear();
leftContainer.clear();
leftOutcomes.clear();
nonEmptyRightRowSet.clear();
rightContainer.clear();
rightOutcomes.clear();
}
@Test
public void testLeftAndRightAllMatchingRows_SingleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightAllMatchingRows_MultipleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(4, 44, 440, "item44")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet3.container());
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44")
.addRow(4, 40, "item4", 44, 440, "item44")
.build();
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyLeftRowSet.rowCount() + nonEmptyRightRowSet3.rowCount()));
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet3.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightAllMatchingRows_SecondBatch_Empty() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet.container());
rightContainer.add(emptyRightRowSet.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithMissingRows_SingleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(4, 44, 440, "item44")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(4, 40, "item4", 44, 440, "item44")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithMissingRows_LeftJoin_SingleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(4, 44, 440, "item44")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", null, null, null)
.addRow(3, 30, "item3", null, null, null)
.addRow(4, 40, "item4", 44, 440, "item44")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithInitialMissingRows_MultipleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(2, 22, 220, "item22")
.addRow(3, 33, 330, "item33")
.build();
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(4, 44, 440, "item44_1")
.addRow(4, 44, 440, "item44_2")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44_1")
.addRow(4, 40, "item4", 44, 440, "item44_2")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightContainer.add(nonEmptyRightRowSet3.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithInitialMissingRows_LeftJoin_MultipleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(2, 22, 220, "item22")
.addRow(3, 33, 330, "item33")
.build();
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(4, 44, 440, "item44_1")
.addRow(4, 44, 440, "item44_2")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(1, 10, "item1", null, null, null)
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44_1")
.addRow(4, 40, "item4", 44, 440, "item44_2")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightContainer.add(nonEmptyRightRowSet3.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithLastMissingRows_MultipleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.build();
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(3, 33, 330, "item33_1")
.addRow(3, 33, 330, "item33_2")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33_1")
.addRow(3, 30, "item3", 33, 330, "item33_2")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightContainer.add(nonEmptyRightRowSet3.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRightWithLastMissingRows_LeftJoin_MultipleBatch() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.build();
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(3, 33, 330, "item33_1")
.addRow(3, 33, 330, "item33_2")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33_1")
.addRow(3, 30, "item3", 33, 330, "item33_2")
.addRow(4, 40, "item4", null, null, null)
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightContainer.add(nonEmptyRightRowSet3.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRight_OutputFull_InRightBatchMiddle() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
.addRow(4, 44, 440, "item44_2_1")
.addRow(4, 44, 440, "item44_2_2")
.addRow(4, 44, 440, "item44_2_3")
.addRow(4, 44, 440, "item44_2_4")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.addRow(4, 40, "item4", 44, 440, "item44")
.addRow(4, 40, "item4", 44, 440, "item44_2_1")
.addRow(4, 40, "item4", 44, 440, "item44_2_2")
.build();
final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchema)
.addRow(4, 40, "item4", 44, 440, "item44_2_3")
.addRow(4, 40, "item4", 44, 440, "item44_2_4")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet3.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.OK);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
ljBatch.setMaxOutputRowCount(6);
ljBatch.setUseMemoryManager(false);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == 6);
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release container memory for this output batch since other operators will do the same
VectorAccessibleUtilities.clear(ljBatch);
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet3.rowCount() - 6));
// verify results
RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
expectedRowSet.clear();
expectedRowSet1.clear();
}
}
@Test
public void testLeftAndRight_OutputFull_WithPendingLeftRow() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.addRow(3, 33, 330, "item33")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", 33, 330, "item33")
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
ljBatch.setMaxOutputRowCount(3);
ljBatch.setUseMemoryManager(false);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
}
}
@Test
public void testLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
leftContainer.add(nonEmptyLeftRowSet.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", null, null, null)
.build();
final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(4, 40, "item4", null, null, null)
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
ljBatch.setMaxOutputRowCount(3);
ljBatch.setUseMemoryManager(false);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == 3);
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release output container memory for this batch as other operators will do
VectorAccessibleUtilities.clear(ljBatch);
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyLeftRowSet.rowCount() - 3));
// verify results
RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
expectedRowSet1.clear();
}
}
@Test
public void testMultipleLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema)
.addRow(5, 50, "item5")
.addRow(6, 60, "item6")
.build();
leftContainer.add(nonEmptyLeftRowSet.container());
leftContainer.add(nonEmptyLeftRowSet2.container());
// Get the left IterOutcomes for Lateral Join
leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
leftOutcomes.add(RecordBatch.IterOutcome.OK);
// Create Left MockRecordBatch
final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
// Get the right container with dummy data
final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
.addRow(1, 11, 110, "item11")
.addRow(2, 22, 220, "item22")
.build();
final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(1, 10, "item1", 11, 110, "item11")
.addRow(2, 20, "item2", 22, 220, "item22")
.addRow(3, 30, "item3", null, null, null)
.build();
final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
.addRow(4, 40, "item4", null, null, null)
.addRow(5, 50, "item5", null, null, null)
.addRow(6, 60, "item6", null, null, null)
.build();
rightContainer.add(emptyRightRowSet.container());
rightContainer.add(nonEmptyRightRowSet2.container());
rightContainer.add(emptyRightRowSet.container());
rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
leftMockBatch, rightMockBatch);
ljBatch.setMaxOutputRowCount(3);
ljBatch.setUseMemoryManager(false);
try {
assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() == 3);
// verify results
RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet).verify(actualRowSet);
// Release output container memory for this batch as other operators will do
VectorAccessibleUtilities.clear(ljBatch);
assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
assertTrue(ljBatch.getRecordCount() ==
(nonEmptyLeftRowSet.rowCount() + nonEmptyLeftRowSet2.rowCount() - 3));
// verify results
RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
} finally {
// Close all the resources for this test case
ljBatch.close();
leftMockBatch.close();
rightMockBatch.close();
nonEmptyLeftRowSet2.clear();
nonEmptyRightRowSet2.clear();
expectedRowSet.clear();
expectedRowSet1.clear();
}
}
}