/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.join;

import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertTrue;

public class TestLateralJoinCorrectnessBatchProcessing extends SubOperatorTest {

  // Operator Context for mock batch
  private static OperatorContext operatorContext;

  // Left Batch Schema
  private static TupleMetadata leftSchema;

  // Right Batch Schema
  private static TupleMetadata rightSchema;

  // Right Batch Schema
  private static TupleMetadata expectedSchema;

  // Right Batch Schema
  private static TupleMetadata expectedSchemaLeftJoin;

  // Empty left RowSet
  private static RowSet.SingleRowSet emptyLeftRowSet;

  // Non-Empty left RowSet
  private static RowSet.SingleRowSet nonEmptyLeftRowSet;

  // List of left incoming containers
  private static final List<VectorContainer> leftContainer = new ArrayList<>(5);

  // List of left IterOutcomes
  private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList<>(5);

  // Empty right RowSet
  private static RowSet.SingleRowSet emptyRightRowSet;

  // Non-Empty right RowSet
  private static RowSet.SingleRowSet nonEmptyRightRowSet;

  // List of right incoming containers
  private static final List<VectorContainer> rightContainer = new ArrayList<>(5);

  // List of right IterOutcomes
  private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList<>(5);

  // Lateral Join POP Config
  private static LateralJoinPOP ljPopConfig;

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    PhysicalOperator mockPopConfig = new MockStorePOP(null);
    operatorContext = fixture.newOperatorContext(mockPopConfig);
    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    leftSchema = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();
    emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();

    rightSchema = new SchemaBuilder()
      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.INT)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();
    emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();

    expectedSchema = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.INT)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    expectedSchemaLeftJoin = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .add("id_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
      .add("cost_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
      .add("name_right", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
      .buildSchema();
  }

  @AfterClass
  public static void tearDownAfterClass() throws Exception {
    operatorContext.close();
    emptyLeftRowSet.clear();
    emptyRightRowSet.clear();
  }


  @Before
  public void beforeTest() throws Exception {
    nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema)
      .addRow(1, 10, "item1")
      .addRow(2, 20, "item2")
      .addRow(3, 30, "item3")
      .addRow(4, 40, "item4")
      .build();

    nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .addRow(3, 33, 330, "item33")
      .addRow(4, 44, 440, "item44")
      .build();
  }

  @After
  public void afterTest() throws Exception {
    nonEmptyLeftRowSet.clear();
    leftContainer.clear();
    leftOutcomes.clear();
    nonEmptyRightRowSet.clear();
    rightContainer.clear();
    rightOutcomes.clear();
  }

  @Test
  public void testLeftAndRightAllMatchingRows_SingleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightAllMatchingRows_MultipleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(4, 44, 440, "item44")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .build();

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyLeftRowSet.rowCount() + nonEmptyRightRowSet3.rowCount()));

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet3.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightAllMatchingRows_SecondBatch_Empty() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithMissingRows_SingleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(4, 44, 440, "item44")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithMissingRows_LeftJoin_SingleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(4, 44, 440, "item44")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", null, null, null)
      .addRow(3, 30, "item3", null, null, null)
      .addRow(4, 40, "item4", 44, 440, "item44")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithInitialMissingRows_MultipleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(2, 22, 220, "item22")
      .addRow(3, 33, 330, "item33")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(4, 44, 440, "item44_1")
      .addRow(4, 44, 440, "item44_2")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44_1")
      .addRow(4, 40, "item4", 44, 440, "item44_2")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithInitialMissingRows_LeftJoin_MultipleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(2, 22, 220, "item22")
      .addRow(3, 33, 330, "item33")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(4, 44, 440, "item44_1")
      .addRow(4, 44, 440, "item44_2")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(1, 10, "item1", null, null, null)
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44_1")
      .addRow(4, 40, "item4", 44, 440, "item44_2")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithLastMissingRows_MultipleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(3, 33, 330, "item33_1")
      .addRow(3, 33, 330, "item33_2")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33_1")
      .addRow(3, 30, "item3", 33, 330, "item33_2")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRightWithLastMissingRows_LeftJoin_MultipleBatch() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(3, 33, 330, "item33_1")
      .addRow(3, 33, 330, "item33_2")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33_1")
      .addRow(3, 30, "item3", 33, 330, "item33_2")
      .addRow(4, 40, "item4", null, null, null)
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRight_OutputFull_InRightBatchMiddle() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
      .addRow(4, 44, 440, "item44_2_1")
      .addRow(4, 44, 440, "item44_2_2")
      .addRow(4, 44, 440, "item44_2_3")
      .addRow(4, 44, 440, "item44_2_4")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .addRow(4, 40, "item4", 44, 440, "item44")
      .addRow(4, 40, "item4", 44, 440, "item44_2_1")
      .addRow(4, 40, "item4", 44, 440, "item44_2_2")
      .build();

    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchema)
      .addRow(4, 40, "item4", 44, 440, "item44_2_3")
      .addRow(4, 40, "item4", 44, 440, "item44_2_4")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet3.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);
    ljBatch.setMaxOutputRowCount(6);
    ljBatch.setUseMemoryManager(false);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 6);

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      // Release container memory for this output batch since other operators will do the same
      VectorAccessibleUtilities.clear(ljBatch);

      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet3.rowCount() - 6));


      // verify results
      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      expectedRowSet.clear();
      expectedRowSet1.clear();
    }
  }

  @Test
  public void testLeftAndRight_OutputFull_WithPendingLeftRow() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .addRow(3, 33, 330, "item33")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", 33, 330, "item33")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    ljBatch.setMaxOutputRowCount(3);
    ljBatch.setUseMemoryManager(false);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", null, null, null)
      .build();

    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(4, 40, "item4", null, null, null)
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    ljBatch.setMaxOutputRowCount(3);
    ljBatch.setUseMemoryManager(false);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 3);

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      // Release output container memory for this batch as other operators will do
      VectorAccessibleUtilities.clear(ljBatch);

      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyLeftRowSet.rowCount() - 3));

      // verify results
      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
      expectedRowSet1.clear();
    }
  }

  @Test
  public void testMultipleLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(5, 50, "item5")
      .addRow(6, 60, "item6")
      .build();

    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(nonEmptyLeftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 11, 110, "item11")
      .addRow(2, 22, 220, "item22")
      .build();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(1, 10, "item1", 11, 110, "item11")
      .addRow(2, 20, "item2", 22, 220, "item22")
      .addRow(3, 30, "item3", null, null, null)
      .build();

    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
      .addRow(4, 40, "item4", null, null, null)
      .addRow(5, 50, "item5", null, null, null)
      .addRow(6, 60, "item6", null, null, null)
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    ljBatch.setMaxOutputRowCount(3);
    ljBatch.setUseMemoryManager(false);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 3);

      // verify results
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);

      // Release output container memory for this batch as other operators will do
      VectorAccessibleUtilities.clear(ljBatch);

      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyLeftRowSet.rowCount() + nonEmptyLeftRowSet2.rowCount() - 3));

      // verify results
      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyLeftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
      expectedRowSet.clear();
      expectedRowSet1.clear();
    }
  }
}
