blob: f6d961dc39e0200113598a3250d55ece5d64e1f9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.drill.exec.physical.impl.unnest;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.UnnestPOP;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
import org.apache.drill.exec.planner.logical.DrillLogicalTestUtils;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.List;
import static;
public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
// Operator Context for mock batch
public static OperatorContext operatorContext;
public static PhysicalOperator mockPopConfig;
public static LateralJoinPOP ljPopConfig;
@BeforeClass public static void setUpBeforeClass() throws Exception {
mockPopConfig = new MockStorePOP(null);
ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
operatorContext = fixture.newOperatorContext(mockPopConfig);
@AfterClass public static void tearDownAfterClass() throws Exception {
public void testUnnestFixedWidthColumn() {
Object[][] data = {
{ new int[] {1, 2},
new int[] {3, 4, 5}},
{ new int[] {6, 7, 8, 9},
new int[] {10, 11, 12, 13, 14}}
// Create input schema
TupleMetadata incomingSchema =
new SchemaBuilder()
.add("rowNumber", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.INT)
TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
Integer[][][] baseline = {
{1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, //rowNum
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14} // unnestColumn_flat
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
public void testUnnestVarWidthColumn() {
Object[][] data = {
{ new String[] {"", "zero"},
new String[] {"one", "two", "three"}},
{ new String[] {"four", "five", "six", "seven"},
new String[] {"eight", "nine", "ten", "eleven", "twelve"}}
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("someColumn", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
Object[][][] baseline = {
{1, 1, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4}, // rowNum
{"", "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven",
"twelve"} // unnestColumn_flat
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
public void testUnnestMapColumn() {
Object[][] data = getMapData();
// Create input schema
TupleMetadata incomingSchema = getRepeatedMapSchema();
TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
Object[][][] baseline = getMapBaseline();
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
public void testUnnestEmptyList() {
Object[][] data = {
{ new String[] {},
new String[] {}
{ new String[] {},
new String[] {}
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("someColumn", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
// All batches are empty
String[][][] baseline = {{{}}};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
public void testUnnestMultipleNewSchemaIncoming() {
// Schema changes in incoming have no effect on unnest unless the type of the
// unnest column itself has changed
Object[][] data = {
new String[] {"0", "1"},
new String[] {"2", "3", "4"}
new String[] {"5", "6" },
new String[] {"9"}
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("someColumn", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema, incomingSchema};
Object[][][] baseline = {
{1, 1, 2, 2, 2, 3, 3},
{"0", "1", "2", "3", "4", "5", "6"}
RecordBatch.IterOutcome[] iterOutcomes = {
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
public void testUnnestSchemaChange() {
Object[][] data = {
new String[] {"0", "1"},
new String[] {"2", "3", "4"}
new String[] {"5", "6" },
new int[] {9}
// Create input schema
TupleMetadata incomingSchema1 = new SchemaBuilder()
.add("someColumn", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.VARCHAR).buildSchema();
TupleMetadata incomingSchema2 = new SchemaBuilder()
.add("someColumn", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema1, incomingSchema1, incomingSchema2};
Object[][][] baseline = {
{1, 1, 2, 2, 2, 3, 3},
{"0", "1", "2", "3", "4", "5", "6"}
RecordBatch.IterOutcome[] iterOutcomes = {
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) {
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
for (int i = 0; i < data.length; i++) {
for (int j = 0; j < data[i].length; j++) {
data[i][j] = new int[inputBatchSize];
for (int k =0; k < inputBatchSize; k++) {
((int[])data[i][j])[k] = k;
Integer[][][] baseline = new Integer[2][2][];
baseline[0][0] = new Integer[limitOutputBatchSize];
baseline[0][1] = new Integer[limitOutputBatchSize];
baseline[1][0] = new Integer[1];
baseline[1][1] = new Integer[1];
for (int i = 0; i < limitOutputBatchSize; i++) {
baseline[0][0][i] = 1;
baseline[0][1][i] = i;
baseline[1][0][0] = 1; // row Num
baseline[1][1][0] = limitOutputBatchSize; // value
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("rowNumber", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes);
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
public void testUnnestLimitBatchSize_WithExcludedCols() {
LateralJoinPOP previoudPop = ljPopConfig;
List<SchemaPath> excludedCols = new ArrayList<>();
ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedCols);
final int limitedOutputBatchSize = 127;
final int inputBatchSize = limitedOutputBatchSize + 1;
// Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
// 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2
// size of lateral output batch = (N+1)*8 bytes, where N = output batch row count
// Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column
// = (N+1) * (size of row id) + (N+1) * (size of single array entry)
// = (N+1)*4 + (N+1) * 4
// = (N+1) * 8
// configure the output batch size to be one more record than that so that the batch sizer can round down
final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1);
testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true);
ljPopConfig = previoudPop;
public void testUnnestLimitBatchSize() {
final int limitedOutputBatchSize = 127;
final int inputBatchSize = limitedOutputBatchSize + 1;
// size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
// Lateral output batch size = N * input row size + N * size of single unnest column
// = N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
// + N * 4
// = N * (4 + 2*4 + (N+1)*4 ) + N * 4
// = N * (16 + 4N) + N * 4
// = 4N * (N + 5)
// configure the output batch size to be one more record than that so that the batch sizer can round down
final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false);
// Limit sends a kill. Unnest has more than one record batch for a record when
// the kill is sent.
public void testUnnestKillFromLimitSubquery1() {
// similar to previous test; we split a record across more than one batch.
// but we also set a limit less than the size of the batch so only one batch gets output.
final int limitedOutputBatchSize = 127;
final int inputBatchSize = limitedOutputBatchSize + 1;
final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
for (int i = 0; i < data.length; i++) {
for (int j = 0; j < data[i].length; j++) {
data[i][j] = new int[inputBatchSize];
for (int k =0; k < inputBatchSize; k++) {
((int[])data[i][j])[k] = k;
// because of kill we only get one batch back
Integer[][][] baseline = new Integer[1][2][];
baseline[0][0] = new Integer[limitedOutputBatchSize];
baseline[0][1] = new Integer[limitedOutputBatchSize];
for (int i = 0; i < limitedOutputBatchSize; i++) {
baseline[0][0][i] = 1;
baseline[0][1][i] = i;
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("rowNumber", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
try {
testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
// Limit sends a kill. Unnest has exactly one record batch for a record when
// the kill is sent. This test is actually useless since it tests the behaviour of
// lateral which doesn't send kill at all if it gets an EMIT. We expect limit
// to do so, so let's keep the test to demonstrate the expected behaviour.
public void testUnnestKillFromLimitSubquery2() {
// similar to previous test but the size of the array fits exactly into the record batch;
final int limitedOutputBatchSize = 127;
final int inputBatchSize = limitedOutputBatchSize + 1;
final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
// single record batch with single row. The unnest column has one
// more record than the batch size we want in the output
Object[][] data = new Object[1][1];
for (int i = 0; i < data.length; i++) {
for (int j = 0; j < data[i].length; j++) {
data[i][j] = new int[inputBatchSize];
for (int k =0; k < inputBatchSize; k++) {
((int[])data[i][j])[k] = k;
// because of kill we only get one batch back
Integer[][][] baseline = new Integer[1][2][];
baseline[0][0] = new Integer[limitedOutputBatchSize];
baseline[0][1] = new Integer[limitedOutputBatchSize];
for (int i = 0; i < limitedOutputBatchSize; i++) {
baseline[0][0][i] = 1;
baseline[0][1][i] = i;
// Create input schema
TupleMetadata incomingSchema = new SchemaBuilder()
.add("rowNumber", TypeProtos.MinorType.INT)
.addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
TupleMetadata[] incomingSchemas = {incomingSchema};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
try {
testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
} finally {
fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
public void testUnnestNonArrayColumn() {
Object[][] data = {
{ new Integer (1),
new Integer (3)},
{ new Integer (6),
new Integer (10)}
// Create input schema
TupleMetadata incomingSchema =
new SchemaBuilder()
.add("rowNumber", TypeProtos.MinorType.INT)
.add("unnestColumn", TypeProtos.MinorType.INT)
TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
// We expect an Exception
Integer[][][] baseline = {};
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
} catch (UserException|UnsupportedOperationException e) {
return; // succeeded
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());
// test unnest for various input conditions without invoking kill
private <T> void testUnnest(
TupleMetadata[] incomingSchemas,
RecordBatch.IterOutcome[] iterOutcomes,
T[][] data,
T[][][] baseline,
boolean excludeUnnestColumn) throws Exception{
testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn);
// test unnest for various input conditions optionally invoking kill. if the kill or killBatch
// parameter is greater than 0 then the record batch is sent a kill after that many batches have been processed
private <T> void testUnnest( TupleMetadata[] incomingSchemas,
RecordBatch.IterOutcome[] iterOutcomes,
int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record
int execKill, // number of batches after which to kill the execution (!)
T[][] data,
T[][][] baseline,
boolean excludeUnnestColumn) throws Exception {
// Get the incoming container with dummy data for LJ
final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
// Create data
ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
int rowNumber = 0;
int batchNum = 0;
for (Object[] recordBatch : data) {
RowSetBuilder rowSetBuilder = fixture.rowSetBuilder(incomingSchemas[batchNum]);
for ( Object rowData : recordBatch) {
rowSetBuilder.addRow(++rowNumber, rowData);
RowSet.SingleRowSet rowSet =;
// Get the unnest POPConfig
final UnnestPOP unnestPopConfig = new UnnestPOP(null, SchemaPath.getCompoundPath("unnestColumn"), DrillUnnestRelBase.IMPLICIT_COLUMN);
// Get the IterOutcomes for LJ
final List<RecordBatch.IterOutcome> outcomes = new ArrayList<>(iterOutcomes.length);
for(RecordBatch.IterOutcome o : iterOutcomes) {
// Create incoming MockRecordBatch
final MockRecordBatch incomingMockBatch =
new MockRecordBatch(fixture.getFragmentContext(), operatorContext, incomingContainer, outcomes,
// setup Unnest record batch
final UnnestRecordBatch unnestBatch =
new UnnestRecordBatch(unnestPopConfig, fixture.getFragmentContext());
// project is required to rename the columns so as to disambiguate the same column name from
// unnest operator and the regular scan.
final Project projectPopConfig = new Project(DrillLogicalTestUtils.parseExprs("unnestColumn", "unnestColumn1",
unnestPopConfig.getImplicitColumn(), unnestPopConfig.getImplicitColumn()), null);
final ProjectRecordBatch projectBatch =
new ProjectRecordBatch( projectPopConfig, unnestBatch, fixture.getFragmentContext());
final LateralJoinBatch lateralJoinBatch =
new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(), incomingMockBatch, projectBatch);
// set pointer to Lateral in unnest
unnestBatch.setIncoming((LateralContract) lateralJoinBatch);
// Simulate the pipeline by calling next on the incoming
// results is an array of batches, each batch being an array of output vectors.
List<List<ValueVector> > resultList = new ArrayList<>();
List<List<ValueVector> > results = null;
int batchesProcessed = 0;
try {
while (!isTerminal( {
if (lateralJoinBatch.getRecordCount() > 0) {
addBatchToResults(resultList, lateralJoinBatch);
if (batchesProcessed == execKill) {
// Errors are reported by throwing an exception.
// Simulate by skipping out of the loop
// else nothing to do
} catch (UserException e) {
throw e;
} catch (Exception e) {
// Check results against baseline
results = resultList;
int batchIndex = 0;
int vectorIndex = 0;
//int valueIndex = 0;
for (List<ValueVector> batch: results) {
int vectorCount= batch.size();
int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
expectedVectorCount += baseline[batchIndex].length;
if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column
fail("Test failed in validating unnest output. Batch column count mismatch.");
for (ValueVector vv : batch) {
if(vv.getField().getName().equals("unnestColumn")) {
continue; // skip the original input column
int valueCount = vv.getAccessor().getValueCount();
if (valueCount!= baseline[batchIndex][vectorIndex].length) {
fail("Test failed in validating unnest output. Value count mismatch in batch number " + (batchIndex+1) +""
+ ".");
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
if (vv instanceof MapVector) {
if (!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
.getObject(valueIndex))) {
fail("Test failed in validating unnest(Map) output. Value mismatch");
} else if (vv instanceof VarCharVector) {
Object val = vv.getAccessor().getObject(valueIndex);
if (((String) baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[]" + vectorIndex + "][" + valueIndex
+ "]" + ": " + baseline[vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
} else {
Object val = vv.getAccessor().getObject(valueIndex);
if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
+ "]" + ": "
+ baseline[batchIndex][vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
} catch (UserException e) {
throw e; // Valid exception
} catch (Exception e) {
fail("Test failed. Exception : " + e.getMessage());
} finally {
// Close all the resources for this test case
if (results != null) {
for (List<ValueVector> batch : results) {
for (ValueVector vv : batch) {
for(RowSet.SingleRowSet rowSet: rowSets) {
* Build a schema with a repeated map -
* {
* rowNum,
* mapColumn : [
* {
* colA,
* colB : [
* varcharCol
* ]
* }
* ]
* }
* @see org.apache.drill.exec.physical.resultSet.impl.TestResultSetLoaderMapArray TestResultSetLoaderMapArray for
* similar schema and data
* @return TupleMetadata corresponding to the schema
private TupleMetadata getRepeatedMapSchema() {
TupleMetadata schema = new SchemaBuilder()
.add("rowNum", TypeProtos.MinorType.INT)
.add("colA", TypeProtos.MinorType.INT)
.addArray("colB", TypeProtos.MinorType.VARCHAR)
return schema;
private Object[][] getMapData( ) {
Object[][] d = {
new Object[] {},
new Object[] {
new Object[] {11, new String[] {"1.1.1", "1.1.2" }},
new Object[] {12, new String[] {"1.2.1", "1.2.2" }}
new Object[] {
new Object[] {21, new String[] {"2.1.1", "2.1.2" }},
new Object[] {22, new String[] {}},
new Object[] {23, new String[] {"2.3.1", "2.3.2" }}
new Object[] {
new Object[] {31, new String[] {"3.1.1", "3.1.2" }},
new Object[] {32, new String[] {"3.2.1", "3.2.2" }}
return d;
private Object[][][] getMapBaseline() {
Object[][][] d = {
return d;
private Object[][][] getNestedMapBaseline() {
Object[][][] d = {
return d;
private boolean compareMapBaseline(Object baselineValue, Object vector) {
String vv = vector.toString();
String b = (String)baselineValue;
return vv.equalsIgnoreCase(b);
private int addBatchToResults(List<List<ValueVector> > resultList, RecordBatch inputBatch) {
int count = 0;
final RecordBatchData batchCopy = new RecordBatchData(inputBatch, operatorContext.getAllocator());
boolean success = false;
try {
count = batchCopy.getRecordCount();
success = true;
} finally {
if (!success) {
return count;
private boolean isTerminal(RecordBatch.IterOutcome outcome) {
return (outcome == RecordBatch.IterOutcome.NONE);
* Run a plan like the following for various input batches :
* Lateral1
* / \
* / Lateral2
* Scan / \
* / \
* Project1 Project2
* / \
* / \
* Unnest1 Unnest2
* @param incomingSchemas
* @param iterOutcomes
* @param execKill
* @param data
* @param baseline
* @param <T>
* @throws Exception
private <T> void testNestedUnnest( TupleMetadata[] incomingSchemas,
RecordBatch.IterOutcome[] iterOutcomes,
int execKill, // number of batches after which to kill the execution (!)
T[][] data,
T[][][] baseline) throws Exception {
// Get the incoming container with dummy data for LJ
final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
// Create data
ArrayList<RowSet.SingleRowSet> rowSets = new ArrayList<>();
int rowNumber = 0;
int batchNum = 0;
for ( Object[] recordBatch : data) {
RowSetBuilder rowSetBuilder = fixture.rowSetBuilder(incomingSchemas[batchNum]);
for ( Object rowData : recordBatch) {
rowSetBuilder.addRow(++rowNumber, rowData);
RowSet.SingleRowSet rowSet =;
// Get the unnest POPConfig
final UnnestPOP unnestPopConfig1 = new UnnestPOP(null, SchemaPath.getSimplePath("unnestColumn"), DrillUnnestRelBase.IMPLICIT_COLUMN);
final UnnestPOP unnestPopConfig2 = new UnnestPOP(null, SchemaPath.getSimplePath("colB"), DrillUnnestRelBase.IMPLICIT_COLUMN);
// Get the IterOutcomes for LJ
final List<RecordBatch.IterOutcome> outcomes = new ArrayList<>(iterOutcomes.length);
for(RecordBatch.IterOutcome o : iterOutcomes) {
// Create incoming MockRecordBatch
final MockRecordBatch incomingMockBatch =
new MockRecordBatch(fixture.getFragmentContext(), operatorContext, incomingContainer, outcomes,
// setup Unnest record batch
final UnnestRecordBatch unnestBatch1 =
new UnnestRecordBatch(unnestPopConfig1, fixture.getFragmentContext());
final UnnestRecordBatch unnestBatch2 =
new UnnestRecordBatch(unnestPopConfig2, fixture.getFragmentContext());
// Create intermediate Project
final Project projectPopConfig1 =
new Project(DrillLogicalTestUtils.parseExprs("unnestColumn.colB", "colB",
unnestPopConfig1.getImplicitColumn(), unnestPopConfig1.getImplicitColumn()), unnestPopConfig1);
final ProjectRecordBatch projectBatch1 =
new ProjectRecordBatch(projectPopConfig1, unnestBatch1, fixture.getFragmentContext());
final Project projectPopConfig2 =
new Project(DrillLogicalTestUtils.parseExprs("colB", "unnestColumn2",
unnestPopConfig2.getImplicitColumn(), unnestPopConfig2.getImplicitColumn()), unnestPopConfig2);
final ProjectRecordBatch projectBatch2 =
new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
final LateralJoinBatch lateralJoinBatch2 =
new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);
final LateralJoinBatch lateralJoinBatch1 =
new LateralJoinBatch(ljPopConfig1, fixture.getFragmentContext(), incomingMockBatch, lateralJoinBatch2);
// set pointer to Lateral in unnest
unnestBatch1.setIncoming((LateralContract) lateralJoinBatch1);
unnestBatch2.setIncoming((LateralContract) lateralJoinBatch2);
// Simulate the pipeline by calling next on the incoming
// results is an array ot batches, each batch being an array of output vectors.
List<List<ValueVector> > resultList = new ArrayList<>();
List<List<ValueVector> > results = null;
int batchesProcessed = 0;
try {
while (!isTerminal( {
if (lateralJoinBatch1.getRecordCount() > 0) {
addBatchToResults(resultList, lateralJoinBatch1);
if (batchesProcessed == execKill) {
lateralJoinBatch1.getContext().getExecutorState().fail(new DrillException("Testing failure of execution."));
// else nothing to do
} catch (UserException e) {
throw e;
} catch (Exception e) {
throw new Exception ("Test failed to execute because: " + e.getMessage());
// Check results against baseline
results = resultList;
int batchIndex = 0;
int vectorIndex = 0;
//int valueIndex = 0;
for ( List<ValueVector> batch: results) {
int vectorCount= batch.size();
if (vectorCount!= baseline[batchIndex].length+2) { // baseline does not include the original unnest column(s)
fail("Test failed in validating unnest output. Batch column count mismatch.");
for (ValueVector vv : batch) {
if(vv.getField().getName().equals("unnestColumn") || vv.getField().getName().equals("colB")) {
continue; // skip the original input column
int valueCount = vv.getAccessor().getValueCount();
if (valueCount!= baseline[batchIndex][vectorIndex].length) {
fail("Test failed in validating unnest output. Value count mismatch in batch number " + (batchIndex+1) +""
+ ".");
for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
if (vv instanceof MapVector) {
if (!compareMapBaseline(baseline[batchIndex][vectorIndex][valueIndex], vv
.getObject(valueIndex))) {
fail("Test failed in validating unnest(Map) output. Value mismatch");
} else if (vv instanceof VarCharVector) {
Object val = vv.getAccessor().getObject(valueIndex);
if (((String) baseline[batchIndex][vectorIndex][valueIndex]).compareTo(val.toString()) != 0) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[]" + vectorIndex + "][" + valueIndex
+ "]" + ": " + baseline[vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
} else {
Object val = vv.getAccessor().getObject(valueIndex);
if (!baseline[batchIndex][vectorIndex][valueIndex].equals(val)) {
fail("Test failed in validating unnest output. Value mismatch. Baseline value[" + vectorIndex + "][" + valueIndex
+ "]" + ": "
+ baseline[batchIndex][vectorIndex][valueIndex] + " VV.getObject(valueIndex): " + val);
} catch (UserException e) {
throw e; // Valid exception
} catch (Exception e) {
fail("Test failed. Exception : " + e.getMessage());
} finally {
// Close all the resources for this test case
if (results != null) {
for (List<ValueVector> batch : results) {
for (ValueVector vv : batch) {
for(RowSet.SingleRowSet rowSet: rowSets) {
public void testNestedUnnestMapColumn() {
Object[][] data = getMapData();
// Create input schema
TupleMetadata incomingSchema = getRepeatedMapSchema();
TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
Object[][][] baseline = getNestedMapBaseline();
RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
try {
testNestedUnnest(incomingSchemas, iterOutcomes, 0, data, baseline);
} catch (Exception e) {
fail("Failed due to exception: " + e.getMessage());