/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.join;

import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.util.ArrayList;
import java.util.List;

import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@Category(OperatorTest.class)
public class TestLateralJoinCorrectness extends SubOperatorTest {

  // Operator Context for mock batch
  private static OperatorContext operatorContext;

  // Left Batch Schema
  private static TupleMetadata leftSchema;

  // Right Batch Schema
  private static TupleMetadata rightSchema;

  // Empty left RowSet
  private static RowSet.SingleRowSet emptyLeftRowSet;

  // Non-Empty left RowSet
  private static RowSet.SingleRowSet nonEmptyLeftRowSet;

  // List of left incoming containers
  private static final List<VectorContainer> leftContainer = new ArrayList<>(5);

  // List of left IterOutcomes
  private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList<>(5);

  // Empty right RowSet
  private static RowSet.SingleRowSet emptyRightRowSet;

  // Non-Empty right RowSet
  private static RowSet.SingleRowSet nonEmptyRightRowSet;

  // List of right incoming containers
  private static final List<VectorContainer> rightContainer = new ArrayList<>(5);

  // List of right IterOutcomes
  private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList<>(5);

  // Lateral Join POP Config
  private static LateralJoinPOP ljPopConfig;

  @BeforeClass
  public static void setUpBeforeClass() throws Exception {
    PhysicalOperator mockPopConfig = new MockStorePOP(null);
    operatorContext = fixture.newOperatorContext(mockPopConfig);

    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    leftSchema = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();
    emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();

    rightSchema = new SchemaBuilder()
      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.INT)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();
    emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
  }

  @AfterClass
  public static void tearDownAfterClass() throws Exception {
    operatorContext.close();
    emptyLeftRowSet.clear();
    emptyRightRowSet.clear();
  }


  @Before
  public void beforeTest() throws Exception {
    nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema)
      .addRow(1, 10, "item1")
      .build();

    nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 1, 11, "item11")
      .addRow(1, 2, 21, "item21")
      .addRow(1, 3, 31, "item31")
      .build();
  }

  @After
  public void afterTest() throws Exception {
    nonEmptyLeftRowSet.clear();
    nonEmptyRightRowSet.clear();
    leftContainer.clear();
    leftOutcomes.clear();
    rightContainer.clear();
    rightOutcomes.clear();
  }

  /**
   * Helper method to check if input outcome is one of terminal state or not
   *
   * @param outcome - input IterOutcome state to check for
   * @return
   */
  private boolean isTerminal(RecordBatch.IterOutcome outcome) {
    return (outcome == RecordBatch.IterOutcome.NONE);
  }

  /**
   * With both left and right batch being empty, the {@link LateralJoinBatch#buildSchema()} phase
   * should still build the output container schema and return empty batch
   *
   * @throws Exception
   */
  @Test
  public void testBuildSchemaEmptyLRBatch() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(emptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, emptyLeftRowSet.container().getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, emptyRightRowSet.container().getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 0);

      while (!isTerminal(ljBatch.next())) {
        // do nothing
      }
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * With both left and right batch being non-empty, the {@link LateralJoinBatch#buildSchema()} phase
   * will fail with {@link IllegalStateException} since it expects first batch from right branch of LATERAL
   * to be always empty
   *
   * @throws Exception
   */
  @Test
  public void testBuildSchemaNonEmptyLRBatch() throws Exception {

    // Get the left container with dummy data
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(nonEmptyRightRowSet.container());
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      fail();
    } catch (AssertionError | Exception error) {
      // Expected since first right batch is supposed to be empty
      assertTrue(error instanceof IllegalStateException);
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * With non-empty left and empty right batch, the {@link LateralJoinBatch#buildSchema()} phase
   * should still build the output container schema and return empty batch
   *
   * @throws Exception
   */
  @Test
  public void testBuildSchemaNonEmptyLEmptyRBatch() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 0);

      // Since Right batch is empty it should drain left and return NONE
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());

    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * This case should never happen since With empty left there cannot be non-empty right batch, the
   * {@link LateralJoinBatch#buildSchema()} phase should fail() with {@link IllegalStateException}
   *
   * @throws Exception
   */
  @Test
  public void testBuildSchemaEmptyLNonEmptyRBatch() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(emptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(nonEmptyRightRowSet.container());
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      fail();

    } catch (AssertionError | Exception error) {
      // Expected since first right batch is supposed to be empty
      assertTrue(error instanceof IllegalStateException);
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test for receiving unexpected EMIT outcome during build phase on left side.
   * @throws Exception
   */
  @Test
  public void testBuildSchemaWithEMITOutcome() throws Exception {
    // Get the left container with dummy data for Lateral Join
    leftContainer.add(emptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(nonEmptyRightRowSet.container());
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      ljBatch.next();
      fail();
    } catch (AssertionError | Exception error) {
      // Expected since first right batch is supposed to be empty
      assertTrue(error instanceof IllegalStateException);
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test to show correct IterOutcome produced by LATERAL when one record in left batch produces only 1 right batch
   * with EMIT outcome. Then output of LATERAL should be produced by OK outcome after the join. It verifies the number
   * of records in the output batch based on left and right input batches.
   *
   * @throws Exception
   */
  @Test
  public void test1RecordLeftBatchTo1RightRecordBatch() throws Exception {
    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test to show correct IterOutcome & output produced by LATERAL when one record in left batch produces 2 right
   * batches with OK and EMIT outcome respectively. Then output of LATERAL should be produced with OK outcome after the
   * join is done using both right batch with the left batch. It verifies the number of records in the output batch
   * based on left and right input batches.
   *
   * @throws Exception
   */
  @Test
  public void test1RecordLeftBatchTo2RightRecordBatch() throws Exception {
    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyLeftRowSet.rowCount() * (nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet2.rowCount())));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test to show that if the result of cross join between left batch and empty right batch is an empty
   * batch and next batch on left side result in NONE outcome, then there is no output produced and result
   * returned to downstream is NONE.
   *
   * @throws Exception
   */
  @Test
  public void test1RecordLeftBatchToEmptyRightBatch() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test to show LATERAL tries to pack the output batch until it's full or all the data is consumed from left and
   * right side. We have multiple left and right batches which fits after join inside the same output batch, hence
   * LATERAL only generates one output batch.
   *
   * @throws Exception
   */
  @Test
  public void testFillingUpOutputBatch() throws Exception {
    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * When multiple left batches are received with different schema, then LATERAL produces output for each schema type
   * separately (even though output batch is not filled completely) and handles the schema change in left batch.
   * Moreover in this case the schema change was only for columns which are not produced by the UNNEST or right branch.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingSchemaChangeForNonUnnestField() throws Exception {

    // Create left input schema 2
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(2, "20", "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      // This means 2 output record batches were received because of Schema change
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * When multiple left and right batches are received with different schema, then LATERAL produces output for each
   * schema type separately (even though output batch is not filled completely) and handles the schema change correctly.
   * Moreover in this case the schema change was for both left and right branch (produced by UNNEST with empty batch).
   *
   * @throws Exception
   */
  @Test
  public void testHandlingSchemaChangeForUnnestField() throws Exception {
    // Create left input schema 2
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    // Create right input schema
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.VARCHAR)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();


    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(2, "20", "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 4, "41", "item41")
      .addRow(1, 5, "51", "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    // first OK_NEW_SCHEMA batch
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    // second OK_NEW_SCHEMA batch. Right side batch for OK_New_Schema is always empty
    rightContainer.add(emptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      // This means 2 output record batches were received because of Schema change
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      //fail();
      throw error;
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      emptyRightRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Verify if there is no schema change on left side and LATERAL still sees an unexpected schema change on right side
   * then it handles it correctly.
   * handle it corr
   * @throws Exception
   */
  @Test
  public void testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception {
    // Create left input schema 2
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    // Create right input schema
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.VARCHAR)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();


    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(2, "20", "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 4, "41", "item41")
      .addRow(1, 5, "51", "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    // first OK_NEW_SCHEMA batch
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    // second OK_NEW_SCHEMA batch. Right side batch for OK_New_Schema is always empty
    rightContainer.add(emptyRightRowSet2.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      ljBatch.next();
      fail();
    } catch (AssertionError | Exception error) {
      // Expected since first right batch is supposed to be empty
      assertTrue(error instanceof IllegalStateException);
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      emptyRightRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL rebuilds the
   * schema each time and sends output in multiple output batches
   * The schema change was only for columns which are not produced by the UNNEST or right branch.
   *
   * @throws Exception
   */
  @Test
  public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throws Exception {

    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      // This means only 1 output record batch was received.
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL correctly
   * handles it by re-creating the schema and producing multiple batches of final output
   * The schema change is for columns common on both left and right side.
   *
   * @throws Exception
   */
  @Test
  public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForUnnestField() throws Exception {
    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      // This means only 1 output record batch was received.
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }


  /**
   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
   * correctly and suppresses schema change operation by producing output in same batch created with initial schema.
   * The schema change was only for columns which are not produced by the UNNEST or right branch.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingEMITFromLeft() throws Exception {

    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(3, 30, "item30")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(emptyLeftRowSet.container());
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());

      // 1st output batch is received for first EMIT from LEFT side
      assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // 2nd output batch is received for second EMIT from LEFT side
      assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // Compare the total records generated in 2 output batches with expected count.
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test for the case where LATERAL received a left batch with OK outcome and then populate the Join output in the
   * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive
   * NONE outcome from left side. Then in this case LATERAL should produce the previous output batch with OK outcome
   * and then handle NONE outcome in future next() call.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingNoneAfterOK() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());

      // 1st output batch is received for first EMIT from LEFT side
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // Compare the total records generated in 2 output batches with expected count.
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test for the case when LATERAL received a left batch with OK outcome and then populate the Join output in the
   * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive
   * EMIT outcome from left side with empty batch. Then in this case LATERAL should produce the previous output batch
   * with EMIT outcome.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingEmptyEMITAfterOK() throws Exception {

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(emptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());

      // 1st output batch is received for first EMIT from LEFT side
      assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // Compare the total records generated in 2 output batches with expected count.
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test for the case when LATERAL received a left batch with OK outcome and then populate the Join output in the
   * outgoing batch. There is still some space left in output batch so LATERAL call's next() on left side and receive
   * EMIT outcome from left side with non-empty batch. Then in this case LATERAL should produce the output batch with
   * EMIT outcome if second left and right batches are also consumed entirely in same output batch.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingNonEmptyEMITAfterOK() throws Exception {

    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());

      // 1st output batch is received for first EMIT from LEFT side
      assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // Compare the total records generated in 2 output batches with expected count.
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()) +
          (leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error){
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Temporary test to validate LATERAL handling output batch getting filled without consuming full output from left
   * and right batch join.
   * <p>
   * For this test we are updating {@link LateralJoinBatch#MAX_BATCH_ROW_COUNT} by making it public, which might not expected
   * after including the BatchSizing logic
   * TODO: Update the test after incorporating the BatchSizing change.
   *
   * @throws Exception
   */
  @Test
  public void testHandlingNonEmpty_EMITAfterOK_WithMultipleOutput() throws Exception {

    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    final int maxBatchSize = 2;
    ljBatch.setUseMemoryManager(false);
    ljBatch.setMaxOutputRowCount(maxBatchSize);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());

      // 1st output batch
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(ljBatch.getRecordCount() == maxBatchSize);

      // 2nd output batch
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == maxBatchSize);
      totalRecordCount += ljBatch.getRecordCount();

      // 3rd output batch
      assertTrue(RecordBatch.IterOutcome.EMIT == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();

      // Compare the total records generated in 2 output batches with expected count.
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()) +
          (leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test to check basic left lateral join is working correctly or not. We create a left batch with one and
   * corresponding right batch with zero rows and check if output still get's populated with left side of data or not.
   * Expectation is since it's a left join and even though right batch is empty the left row will be pushed to output
   * batch.
   *
   * @throws Exception
   */
  @Test
  public void testBasicLeftLateralJoin() throws Exception {
    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.container().getRecordCount());
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Test to see if there are multiple rows in left batch and for some rows right side produces multiple batches such
   * that some are with records and some are empty then we are not duplicating those left side records based on left
   * join type. In this case all the output will be produces only in 1 record batch.
   *
   * @throws Exception
   */
  @Test
  public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
    // Get the left container with dummy data for Lateral Join
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(1, 10, "item10")
      .addRow(2, 20, "item20")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(2, 6, 60, "item61")
      .addRow(2, 7, 70, "item71")
      .addRow(2, 8, 80, "item81")
      .build();

    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      final int expectedOutputRecordCount = 6; // 3 for first left row and 1 for second left row
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount);
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test to see if there are multiple rows in left batch and for some rows right side produces batch with records
   * and for other rows right side produces empty batches then based on left join type we are populating the output
   * batch correctly. Expectation is that for left rows if we find corresponding right rows then we will output both
   * using cross-join but for left rows for which there is empty right side we will produce only left row in output
   * batch. In this case all the output will be produces only in 1 record batch.
   *
   * @throws Exception
   */
  @Test
  public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception {
    // Get the left container with dummy data for Lateral Join
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(1, 10, "item10")
      .addRow(2, 20, "item20")
      .addRow(3, 30, "item30")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(3, 6, 60, "item61")
      .addRow(3, 7, 70, "item71")
      .addRow(3, 8, 80, "item81")
      .build();

    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      final int expectedOutputRecordCount = 7; // 3 for first left row and 1 for second left row
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount);
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      //fail();
      throw error;
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test to see if there are multiple rows in left batch and for some rows right side produces batch with records
   * and for other rows right side produces empty batches then based on left join type we are populating the output
   * batch correctly. Expectation is that for left rows if we find corresponding right rows then we will output both
   * using cross-join but for left rows for which there is empty right side we will produce only left row in output
   * batch. But in this test we have made the Batch size very small so that output will be returned in multiple
   * output batches. This test verifies even in this case indexes are manipulated correctly and outputs are produced
   * correctly.
   * TODO: Update the test case based on Batch Sizing project since then the variable might not be available.
   *
   * @throws Exception
   */
  @Test
  public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Exception {
    // Get the left container with dummy data for Lateral Join
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(1, 10, "item10")
      .addRow(2, 20, "item20")
      .addRow(3, 30, "item30")
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(3, 6, 60, "item61")
      .addRow(3, 7, 70, "item71")
      .addRow(3, 8, 80, "item81")
      .build();

    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    int originalMaxBatchSize = 2;
    ljBatch.setUseMemoryManager(false);
    ljBatch.setMaxOutputRowCount(originalMaxBatchSize);

    try {
      final int expectedOutputRecordCount = 7; // 3 for first left row and 1 for second left row
      int actualOutputRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      actualOutputRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      actualOutputRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      actualOutputRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      actualOutputRecordCount += ljBatch.getRecordCount();
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      //fail();
      throw error;
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      //leftRowSet2.clear();
      //nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * This test generates an operator tree for multiple UNNEST at same level by stacking 2 LATERAL and UNNEST pair on
   * top of each other. Then we call next() on top level LATERAL to simulate the operator tree and compare the
   * outcome and record count generated with expected values.
   * @throws Exception
   */
  @Test
  public void testMultipleUnnestAtSameLevel() throws Exception {

    // ** Prepare first pair of left batch and right batch for Lateral_1 **
    leftContainer.add(nonEmptyLeftRowSet.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for Lateral_2 **

    // Get the right container with dummy data for Lateral Join_2

    // Create right input schema
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right_1", TypeProtos.MinorType.INT)
      .add("cost_right_1", TypeProtos.MinorType.INT)
      .add("name_right_1", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2).build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 6, 60, "item61")
      .addRow(1, 7, 70, "item71")
      .addRow(1, 8, 80, "item81")
      .build();

    final List<VectorContainer> rightContainer2 = new ArrayList<>(5);
    // Get the right container with dummy data
    rightContainer2.add(emptyRightRowSet2.container());
    rightContainer2.add(nonEmptyRightRowSet2.container());
    rightContainer2.add(emptyRightRowSet2.container());
    rightContainer2.add(emptyRightRowSet2.container());

    final List<RecordBatch.IterOutcome> rightOutcomes2 = new ArrayList<>(5);
    rightOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes2.add(RecordBatch.IterOutcome.OK);
    rightOutcomes2.add(RecordBatch.IterOutcome.OK);
    rightOutcomes2.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer2, rightOutcomes2, rightContainer2.get(0).getSchema());

    final LateralJoinBatch ljBatch_2 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      ljBatch_1, rightMockBatch_2);

    try {
      final int expectedOutputRecordCount = 3; // 3 from the lower level lateral and then finally 3 for 1st row in
      // second lateral and 0 for other 2 rows.

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch_2.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch_2.next());
      int actualOutputRecordCount = ljBatch_2.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch_2.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch_2.close();
      rightMockBatch_2.close();
      ljBatch_1.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      rightContainer2.clear();
      rightOutcomes2.clear();
    }
  }

  /**
   * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
   * (using MockRecord Batch) as left and right child of lower level LATERAL. Then we call next() on top level
   * LATERAL to simulate the operator tree and compare the outcome and record count generated with expected values.
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral() throws Exception {

    // ** Prepare first pair of left batch and right batch for Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER,
      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    // Create a left batch with implicit column for lower lateral left unnest
    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 2, 20, "item2")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 5, 51, "item51")
      .addRow(1, 6, 61, "item61")
      .addRow(1, 7, 71, "item71")
      .build();
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet_1.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for Lateral_2 **

    // Create left input schema
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left_1", TypeProtos.MinorType.INT)
      .add("cost_left_1", TypeProtos.MinorType.INT)
      .add("name_left_1", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(6, 60, "item6")
      .build();

    // Get the left container with dummy data
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
    leftContainer2.add(emptyLeftRowSet2.container());
    leftContainer2.add(nonEmptyLeftRowSet2.container());

    // Get the left outcomes with dummy data
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLateral);

    try {
      final int expectedOutputRecordCount = 6;

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next());
      int actualOutputRecordCount = upperLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLateral.close();
      leftMockBatch_2.close();
      lowerLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
   * (using MockRecord Batch) as left and right child of lower level LATERAL. Then we call next() on top level
   * LATERAL to simulate the operator tree and compare the outcome and record count generated with expected values.
   * This test also changes the MAX_BATCH_ROW_COUNT to simulate the output being produced in multiple batches.
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral_MultipleOutput() throws Exception {

    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 2, 20, "item2")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 5, 51, "item51")
      .addRow(1, 6, 61, "item61")
      .addRow(1, 7, 71, "item71")
      .build();
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet_1.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // Use below api to enforce static output batch limit
    lowerLateral.setUseMemoryManager(false);
    lowerLateral.setMaxOutputRowCount(2);

    // ** Prepare second pair of left and right batch for upper LATERAL Lateral_2 **

    // Create left input schema
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left_1", TypeProtos.MinorType.INT)
      .add("cost_left_1", TypeProtos.MinorType.INT)
      .add("name_left_1", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(6, 60, "item6")
      .build();

    // Get the left container with dummy data
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
    leftContainer2.add(emptyLeftRowSet2.container());
    leftContainer2.add(nonEmptyLeftRowSet2.container());

    // Get the left incoming batch outcomes
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLateral);

    // Use below api to enforce static output batch limit
    upperLateral.setUseMemoryManager(false);
    upperLateral.setMaxOutputRowCount(2);

    try {
      final int expectedOutputRecordCount = 6;

      int actualOutputRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next());
      actualOutputRecordCount += upperLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next());
      actualOutputRecordCount += upperLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLateral.next());
      actualOutputRecordCount += upperLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLateral.close();
      leftMockBatch_2.close();
      lowerLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
   * (using MockRecord Batch) as left and right child of lower level LATERAL. In this setup the test try to simulate
   * the SchemaChange happening at upper level LATERAL left incoming second batch, which also results into the
   * SchemaChange of left UNNEST of lower level LATERAL. This test validates that the schema change is handled
   * correctly by both upper and lower level LATERAL.
   *
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {

    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());

    // Create left input schema2 for schema change batch
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("new_id_left", TypeProtos.MinorType.INT)
      .add("new_cost_left", TypeProtos.MinorType.INT)
      .add("new_name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(1, 1111, 10001, "NewRecord")
      .build();

    leftContainer.add(emptyLeftRowSet_Schema2.container());
    leftContainer.add(nonEmptyLeftRowSet_Schema2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 5, 51, "item51")
      .addRow(1, 6, 61, "item61")
      .addRow(1, 7, 71, "item71")
      .build();
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet_1.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for upper level Lateral_2 **

    // Create left input schema for first batch
    TupleMetadata leftSchema3 = new SchemaBuilder()
      .add("id_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_new", TypeProtos.MinorType.INT)
      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
      .addRow(6, 60, "item6")
      .build();

    // Get left input schema for second left batch
    TupleMetadata leftSchema4 = new SchemaBuilder()
      .add("id_left_new_new", TypeProtos.MinorType.INT)
      .add("cost_left_new_new", TypeProtos.MinorType.VARCHAR)
      .add("name_left_new_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
      .addRow(100, "100", "item100")
      .build();

    // Build Left container for upper level LATERAL operator
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);

    // Get the left container with dummy data
    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());

    // Get the left container outcomes for upper level LATERAL operator
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLevelLateral);

    try {
      // 3 for first batch on left side and another 3 for next left batch
      final int expectedOutputRecordCount = 6;
      int actualOutputRecordCount = 0;

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLevelLateral.close();
      leftMockBatch_2.close();
      lowerLevelLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
   * (using MockRecord Batch) as left and right child of lower level LATERAL. In this setup the test try to simulate
   * the SchemaChange happening at upper level LATERAL left incoming second batch, which also results into the
   * SchemaChange of right UNNEST of lower level LATERAL. This test validates that the schema change is handled
   * correctly by both upper and lower level LATERAL.
   *
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1111, 10001, "NewRecord")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right_new", TypeProtos.MinorType.INT)
      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 5, "51", "item51")
      .addRow(1, 6, "61", "item61")
      .addRow(1, 7, "71", "item71")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet_rightSchema2.container());
    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for upper level Lateral_2 **

    // Create left input schema for first batch
    TupleMetadata leftSchema3 = new SchemaBuilder()
      .add("id_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_new", TypeProtos.MinorType.INT)
      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
      .addRow(6, 60, "item6")
      .build();

    // Get left input schema for second left batch
    TupleMetadata leftSchema4 = new SchemaBuilder()
      .add("id_left_new_new", TypeProtos.MinorType.INT)
      .add("cost_left_new_new", TypeProtos.MinorType.VARCHAR)
      .add("name_left_new_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
      .addRow(100, "100", "item100")
      .build();

    // Build Left container for upper level LATERAL operator
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);

    // Get the left container with dummy data
    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());

    // Get the left container outcomes for upper level LATERAL operator
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLevelLateral);

    try {
      // 3 for first batch on left side and another 3 for next left batch
      final int expectedOutputRecordCount = 6;
      int actualOutputRecordCount = 0;

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLevelLateral.close();
      leftMockBatch_2.close();
      lowerLevelLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * This test generates an operator tree for multi level LATERAL by stacking 2 LATERAL and finally an UNNEST pair
   * (using MockRecord Batch) as left and right child of lower level LATERAL. In this setup the test try to simulate
   * the SchemaChange happening at upper level LATERAL left incoming second batch, which also results into the
   * SchemaChange of both left&right UNNEST of lower level LATERAL. This test validates that the schema change is
   * handled correctly by both upper and lower level LATERAL.
   *
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exception {
    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();

    // Create left input schema for first batch
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_new", TypeProtos.MinorType.INT)
      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(1, 6, 60, "item6")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());
    leftContainer.add(emptyLeftRowSet_leftSchema2.container());
    leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right_new", TypeProtos.MinorType.INT)
      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 5, "51", "item51")
      .addRow(1, 6, "61", "item61")
      .addRow(1, 7, "71", "item71")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet_rightSchema2.container());
    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for upper level Lateral_2 **

    // Create left input schema for first batch
    TupleMetadata leftSchema3 = new SchemaBuilder()
      .add("id_left_left", TypeProtos.MinorType.INT)
      .add("cost_left_left", TypeProtos.MinorType.INT)
      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
      .addRow(6, 60, "item6")
      .build();

    // Get left input schema for second left batch
    TupleMetadata leftSchema4 = new SchemaBuilder()
      .add("id_left_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
      .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
      .addRow(100, "100", "item100")
      .build();

    // Build Left container for upper level LATERAL operator
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);

    // Get the left container with dummy data
    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());

    // Get the left container outcomes for upper level LATERAL operator
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLevelLateral);

    try {
      // 3 for first batch on left side and another 3 for next left batch
      final int expectedOutputRecordCount = 6;
      int actualOutputRecordCount = 0;

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLevelLateral.close();
      leftMockBatch_2.close();
      lowerLevelLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * Test unsupported incoming batch to LATERAL with SelectionVector
   * @throws Exception
   */
  @Test
  public void testUnsupportedSelectionVector() throws Exception {
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .withSv2()
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      ljBatch.next();
      fail();
    } catch (UserException e) {
      assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
    }
  }

  /**
   * Test to verify if OK_NEW_SCHEMA is received from left side of LATERAL post build schema phase and EMIT is
   * received from right side of LATERAL for each row on left side, then Lateral sends OK_NEW_SCHEMA downstream with
   * the output batch. LATERAL shouldn't send any batch with EMIT outcome to the downstream operator as it is the
   * consumer of all the EMIT outcomes. It will work fine in case of Multilevel LATERAL too since there the lower
   * LATERAL only sends EMIT after it receives it from left UNNEST.
   * @throws Exception
   */
  @Test
  public void test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Exception {
    // Get the left container with dummy data for Lateral Join
    TupleMetadata leftSchema3 = new SchemaBuilder()
      .add("id_left_left", TypeProtos.MinorType.INT)
      .add("cost_left_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
      .addRow(6, "60", "item6")
      .addRow(7, "70", "item7")
      .build();

    leftContainer.add(emptyLeftRowSet.container());
    leftContainer.add(nonEmptyLeftRowSet_leftSchema3.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(2, 10, 100, "list10")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.OK);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(ljBatch.getRecordCount() == 0);

      // Since Right batch is empty it should drain left and return NONE
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      final int expectedOutputCount = nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet2.rowCount();
      assertTrue(ljBatch.getRecordCount() == expectedOutputCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
    }
  }

  /**
   * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from right side post buildSchema phase then it
   * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and later consuming it to produce actual
   * output batch with some data
   */
  @Test
  public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Exception {
    // Create left input schema 2
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    // Create right input schema
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.VARCHAR)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();


    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(2, "20", "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .build();

    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 4, "41", "item41")
      .addRow(1, 5, "51", "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    // first OK_NEW_SCHEMA batch
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty OK_NEW_SCHEMA batch
    rightContainer.add(emptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      int totalRecordCount = 0;
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      // This means 2 output record batches were received because of Schema change
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertEquals(0, ljBatch.getRecordCount());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      totalRecordCount += ljBatch.getRecordCount();
      assertTrue(totalRecordCount ==
        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));

      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
      emptyRightRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  /**
   * Test to verify in case of Multilevel lateral when a non-empty OK_NEW_SCHEMA batch post build schema phase is
   * received from right most UNNEST of lower LATERAL then pipeline works fine.
   * @throws Exception
   */
  @Test
  public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception {
    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());

    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
      .addRow(1, 1, 10, "item1")
      .build();

    // Create left input schema for first batch
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_new", TypeProtos.MinorType.INT)
      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(1, 6, 60, "item6")
      .build();

    leftContainer.add(emptyLeftRowSet_1.container());
    leftContainer.add(nonEmptyLeftRowSet_1.container());
    leftContainer.add(emptyLeftRowSet_leftSchema2.container());
    leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    TupleMetadata rightSchema2 = new SchemaBuilder()
      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
      .add("id_right_new", TypeProtos.MinorType.INT)
      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
      .addRow(1, 5, "51", "item51")
      .addRow(1, 6, "61", "item61")
      .addRow(1, 7, "71", "item71")
      .build();

    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // non-empty batch with Ok_new_schema
    rightContainer.add(emptyRightRowSet_rightSchema2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_1, rightMockBatch_1);

    // ** Prepare second pair of left and right batch for upper level Lateral_2 **

    // Create left input schema for first batch
    TupleMetadata leftSchema3 = new SchemaBuilder()
      .add("id_left_left", TypeProtos.MinorType.INT)
      .add("cost_left_left", TypeProtos.MinorType.INT)
      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
      .addRow(6, 60, "item6")
      .build();

    // Get left input schema for second left batch
    TupleMetadata leftSchema4 = new SchemaBuilder()
      .add("id_left_left_new", TypeProtos.MinorType.INT)
      .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
      .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
      .addRow(100, "100", "item100")
      .build();

    // Build Left container for upper level LATERAL operator
    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);

    // Get the left container with dummy data
    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());

    // Get the left container outcomes for upper level LATERAL operator
    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());

    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
      leftMockBatch_2, lowerLevelLateral);

    try {
      // 3 for first batch on left side and another 3 for next left batch
      final int expectedOutputRecordCount = 6;
      int actualOutputRecordCount = 0;

      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
      actualOutputRecordCount += upperLevelLateral.getRecordCount();
      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      upperLevelLateral.close();
      leftMockBatch_2.close();
      lowerLevelLateral.close();
      leftMockBatch_1.close();
      rightMockBatch_1.close();
      leftContainer2.clear();
      leftOutcomes2.clear();
    }
  }

  /**
   * Test to verify that for first left incoming if there is no right side incoming batch and then second left
   * incoming comes with schema change, then the schema change with empty output batch for first incoming is handled
   * properly.
   * @throws Exception
   */
  @Test
  public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Exception {
    // Create left input schema 2
    TupleMetadata leftSchema2 = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("cost_left", TypeProtos.MinorType.VARCHAR)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .buildSchema();


    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
      .addRow(2, "20", "item20")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    // first OK_NEW_SCHEMA batch
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
      leftMockBatch, rightMockBatch);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      // This means 2 output record batches were received because of Schema change
      assertEquals(3, ljBatch.getRecordCount());
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } catch (AssertionError | Exception error) {
      fail();
    } finally {
      // Close all the resources for this test case
      ljBatch.close();
      leftMockBatch.close();
      rightMockBatch.close();
      leftRowSet2.clear();
    }
  }

  private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
                                   CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedCols);
    final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);

    try {
      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
      new RowSetComparison(expectedRowSet).verify(actualRowSet);
      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
    } finally {
      ljBatch.close();
      left.close();
      right.close();
      expectedRowSet.clear();
    }
  }

  @Test
  public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    TupleMetadata expectedSchema = new SchemaBuilder()
      .add("id_left", TypeProtos.MinorType.INT)
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .add("id_right", TypeProtos.MinorType.INT)
      .add("cost_right", TypeProtos.MinorType.INT)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow(1, "item1", 1, 11, "item11")
      .addRow(1, "item1", 2, 21, "item21")
      .addRow(1, "item1", 3, 31, "item31")
      .addRow(2, "item20", 4, 41, "item41")
      .addRow(2, "item20", 5, 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    List<SchemaPath> excludedCols = new ArrayList<>();
    excludedCols.add(SchemaPath.getSimplePath("cost_left"));

    try {
      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
    } finally {
      // Close all the resources for this test case
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }

  @Test
  public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
    // Create data for left input
    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
      .addRow(2, 20, "item20")
      .build();

    // Create data for right input
    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
      .addRow(1, 4, 41, "item41")
      .addRow(1, 5, 51, "item51")
      .build();

    TupleMetadata expectedSchema = new SchemaBuilder()
      .add("name_left", TypeProtos.MinorType.VARCHAR)
      .add("cost_right", TypeProtos.MinorType.INT)
      .add("name_right", TypeProtos.MinorType.VARCHAR)
      .buildSchema();

    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
      .addRow("item1", 11, "item11")
      .addRow("item1", 21, "item21")
      .addRow("item1", 31, "item31")
      .addRow("item20", 41, "item41")
      .addRow("item20", 51, "item51")
      .build();

    // Get the left container with dummy data for Lateral Join
    leftContainer.add(nonEmptyLeftRowSet.container());
    leftContainer.add(leftRowSet2.container());

    // Get the left IterOutcomes for Lateral Join
    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    leftOutcomes.add(RecordBatch.IterOutcome.OK);

    // Create Left MockRecordBatch
    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());

    // Get the right container with dummy data
    rightContainer.add(emptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet.container());
    rightContainer.add(nonEmptyRightRowSet2.container());

    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);

    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());

    List<SchemaPath> excludedCols = new ArrayList<>();
    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
    excludedCols.add(SchemaPath.getSimplePath("id_left"));
    excludedCols.add(SchemaPath.getSimplePath("id_right"));

    try {
      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
    } finally {
      // Close all the resources for this test case
      leftRowSet2.clear();
      nonEmptyRightRowSet2.clear();
    }
  }
}
