| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.physical.unit; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import org.apache.calcite.rel.core.JoinRelType; |
| import org.apache.drill.common.exceptions.ExecutionSetupException; |
| import org.apache.drill.common.expression.SchemaPath; |
| import org.apache.drill.common.expression.LogicalExpression; |
| import org.apache.drill.common.expression.FunctionCall; |
| import org.apache.drill.common.expression.FieldReference; |
| import org.apache.drill.common.expression.ExpressionPosition; |
| import org.apache.drill.exec.physical.base.AbstractBase; |
| import org.apache.drill.exec.physical.base.PhysicalOperator; |
| import org.apache.drill.exec.physical.config.FlattenPOP; |
| import org.apache.drill.exec.physical.config.HashAggregate; |
| import org.apache.drill.exec.physical.config.HashJoinPOP; |
| import org.apache.drill.exec.physical.config.MergeJoinPOP; |
| import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; |
| import org.apache.drill.exec.physical.config.Project; |
| import org.apache.drill.exec.physical.config.UnionAll; |
| import org.apache.drill.exec.physical.impl.ScanBatch; |
| import org.apache.drill.exec.planner.physical.AggPrelBase; |
| import org.apache.drill.exec.record.RecordBatchSizer; |
| import org.apache.drill.exec.record.RecordBatch; |
| import org.apache.drill.exec.record.VectorAccessible; |
| import org.apache.drill.exec.record.VectorWrapper; |
| import org.apache.drill.exec.util.JsonStringArrayList; |
| import org.apache.drill.exec.util.JsonStringHashMap; |
| import org.apache.drill.exec.util.Text; |
| import org.apache.drill.exec.vector.UInt4Vector; |
| import org.apache.drill.exec.vector.ValueVector; |
| import org.apache.drill.exec.vector.complex.RepeatedListVector; |
| import org.apache.drill.exec.vector.complex.RepeatedValueVector; |
| import org.apache.drill.test.LegacyOperatorTestBuilder; |
| import org.apache.drill.test.PhysicalOpUnitTestBase; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| |
| public class TestOutputBatchSize extends PhysicalOpUnitTestBase { |
| private static final long initReservation = AbstractBase.INIT_ALLOCATION; |
| private static final long maxAllocation = AbstractBase.MAX_ALLOCATION; |
| // Keeping row count below 4096 so we do not produce more than one batch. |
| // scanBatch with json reader produces batches of 4k. |
| private int numRows = 4000; |
| private static final String wideString = |
| "b00dUrA0oa2i4ZEHg6zvPXPXlVQYB2BXe8T5gIEtvUDzcN6yUkIqyS07gaAy8k4ac6Bn1cxblsXFnkp8g8hiQkUMJPyl6" + |
| "l0jTdsIzQ4PkVCURGGyF0aduGqCXUaKp91gqkRMvLhHhmrHdEb22QN20dXEHSygR7vrb2zZhhfWeJbXRsesuYDqdGig801IAS6VWRIdQtJ6gaRhCdNz"; |
| |
| /** |
| * Figures out what will be total size of the batches for a given Json input batch. |
| */ |
| private long getExpectedSize(List<String> expectedJsonBatches) throws ExecutionSetupException { |
| // Create a dummy scanBatch to figure out the size. |
| RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), fragContext, getReaderListForJsonBatches(expectedJsonBatches, fragContext)); |
| Iterable<VectorAccessible> batches = new BatchIterator(scanBatch); |
| |
| long totalSize = 0; |
| for (VectorAccessible batch : batches) { |
| RecordBatchSizer sizer = new RecordBatchSizer(batch); |
| totalSize += sizer.getNetBatchSize(); |
| } |
| return totalSize; |
| } |
| |
| @Test |
| public void testProjectMap() throws Exception { |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, |
| // {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] |
| StringBuilder batchString = new StringBuilder("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + "abc" + "\"," + |
| " \"c\" : { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + |
| " \"d\": { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); |
| batchString.append(i != numRows - 1 ? "}," : "}]"); |
| } |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| inputJsonBatches.add(batchString.toString()); |
| |
| StringBuilder expectedString = new StringBuilder("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedString.append("{\"aplusamount\": 105"); |
| expectedString.append(i != numRows - 1 ? "}," : "}]"); |
| } |
| |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| expectedJsonBatches.add(expectedString.toString()); |
| |
| String[] baselineColumns = new String[1]; |
| baselineColumns[0] = "aplusamount"; |
| |
| String[] expr = {"a + c.amount ", baselineColumns[0]}; |
| |
| Project projectConf = new Project(parseExprs(expr), null); |
| mockOpContext(projectConf, initReservation, maxAllocation); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(projectConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns(baselineColumns) |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| Long[] baseLineValues = {(5l + 100l)}; // a + c.amount |
| for (int i = 0; i < numRows; i++) { |
| opTestBuilder.baselineValues((Object[]) baseLineValues); |
| } |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testProjectVariableWidthFunctions() throws Exception { |
| //size calculators |
| StringBuilder batchString = new StringBuilder("["); |
| String strValue = "abcde"; |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\" : " + "\"" + strValue + "\""); |
| batchString.append(i != numRows - 1 ? "}," : "}]"); |
| } |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // inputSize, as calculated below will be numRows * (inputRowsize), |
| // inputRowSize = metadata cols + sizeof("abcde"), numRows = 4000 |
| // So, inputSize = 4000 * ( 4 + 1 + 5 ) = 40000 |
| // inputSize is used as the batch memory limit for the tests. |
| // Depending on the function being evaluated, different output batch counts will be expected |
| long inputSize = getExpectedSize(inputJsonBatches); |
| String inputSizeStr = inputSize + ""; |
| |
| String [][] functions = |
| { //{ OP name, OP result, OP SQL str, Memory Limit, Num Expected Batches } |
| |
| // concat() o/p size will be 2 x input size, so at least 2 batches expected |
| {"concat", strValue + strValue, "concat(a,a)", inputSizeStr, 2 + ""}, |
| // upper() o/p size will same as input size, so at least 1 batch is expected |
| {"upper", strValue.toUpperCase(),"upper(a)", inputSizeStr, 1 + ""}, |
| // repeat() is assumed to produce a row-size of 50. |
| // input row size is 10 (null vector + offset vector + abcde) |
| // so at least 5 batches are expected |
| {"repeat", strValue + strValue, "repeatstr(a, 2)", inputSizeStr, 5 + ""}, |
| // substr() is assumed to produce a row size which is same as input |
| // so at least 1 batch is expected |
| {"substr", strValue.substring(0, 4), "substr(a, 1, 4)", inputSizeStr, 1 + "" } |
| }; |
| |
| for (String[] fn : functions) { |
| String outputColumnName = fn[0] + "_result"; |
| String operationResult = fn[1]; |
| String exprStr = fn[2]; |
| long memoryLimit = Long.valueOf(fn[3]); |
| int expectedNumBatches = Integer.valueOf(fn[4]); |
| |
| StringBuilder expectedString = new StringBuilder("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedString.append("{\"" + outputColumnName + "\":" + operationResult); |
| expectedString.append(i != numRows - 1 ? "}," : "}]"); |
| } |
| |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| expectedJsonBatches.add(expectedString.toString()); |
| |
| String[] baselineColumns = new String[1]; |
| baselineColumns[0] = outputColumnName; |
| |
| String[] expr = {exprStr, baselineColumns[0]}; |
| |
| Project projectConf = new Project(parseExprs(expr), null); |
| mockOpContext(projectConf, initReservation, maxAllocation); |
| |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", memoryLimit); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(projectConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns(baselineColumns) |
| .expectedNumBatches(expectedNumBatches) // verify number of batches |
| .expectedBatchSize(memoryLimit); // verify batch size. |
| |
| String[] baseLineValues = {operationResult}; //operation(a, a) |
| for (int i = 0; i < numRows; i++) { |
| opTestBuilder.baselineValues((Object[]) baseLineValues); |
| } |
| opTestBuilder.go(); |
| } |
| } |
| |
| |
| @Test |
| public void testProjectFixedWidthTransfer() throws Exception { |
| testProjectFixedWidthImpl(true, 100); |
| } |
| |
| @Test |
| public void testProjectFixedWidthNewColumn() throws Exception { |
| testProjectFixedWidthImpl(false, 100); |
| } |
| |
| /** |
| * Tests BatchSizing of fixed-width transfers and new column creations in Project. |
| * Transfer: Evaluates 'select *' |
| * New Columns: Evalutes 'select C0 + 5 as C0 ... C[columnCount] + 5 as C[columnCount] |
| * @param transfer |
| * @throws Exception |
| */ |
| |
| public void testProjectFixedWidthImpl(boolean transfer, int columnCount) throws Exception { |
| |
| //generate a row with N columns C0..C[columnCount], value in a column is same as column id |
| StringBuilder jsonRow = new StringBuilder("{"); |
| String[] baselineColumns = new String [columnCount]; |
| Object[] baselineValues = new Long[columnCount]; |
| |
| int exprSize = (transfer ? 2 : 2 * columnCount); |
| String[] expr = new String[exprSize]; |
| |
| // Expr for a 'select *' as expected by parseExprs() |
| if (transfer) { |
| expr[0] = "`**`"; |
| expr[1] = "`**`"; |
| } |
| |
| for (int i = 0; i < columnCount; i++) { |
| jsonRow.append("\"" + "C" + i + "\": " + i + ((i == columnCount - 1) ? "" : ",")); |
| baselineColumns[i] = "C" + i; |
| if (!transfer) { |
| expr[i * 2] = baselineColumns[i] + " + 5"; |
| expr[i * 2 + 1] = baselineColumns[i]; |
| } |
| baselineValues[i] = (long)(transfer ? i : i + 5); |
| } |
| jsonRow.append("}"); |
| StringBuilder batchString = new StringBuilder("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append(jsonRow + ((i == numRows - 1) ? "" : ",")); |
| } |
| batchString.append("]"); |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| inputJsonBatches.add(batchString.toString()); |
| |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| expectedJsonBatches.add(batchString.toString()); |
| |
| Project projectConf = new Project(parseExprs(expr), null); |
| mockOpContext(projectConf, initReservation, maxAllocation); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(projectConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns(baselineColumns) |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (int i = 0; i < numRows; i++) { |
| opTestBuilder.baselineValues(baselineValues); |
| } |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testProjectVariableWidthTransfer() throws Exception { |
| testProjectVariableWidthImpl(true, 50, "ABCDEFGHIJ"); |
| } |
| |
| @Test |
| public void testProjectVariableWidthNewColumn() throws Exception { |
| testProjectVariableWidthImpl(false, 50, "ABCDEFGHIJ"); |
| } |
| |
| @Test |
| public void testProjectZeroWidth() throws Exception { |
| testProjectVariableWidthImpl(true, 50, ""); |
| } |
| |
| |
| public void testProjectVariableWidthImpl(boolean transfer, int columnCount, String testString) throws Exception { |
| |
| StringBuilder jsonRow = new StringBuilder("{"); |
| String[] baselineColumns = new String [columnCount]; |
| Object[] baselineValues = new String[columnCount]; |
| int exprSize = (transfer ? 2 : 2 * columnCount); |
| String[] expr = new String[exprSize]; |
| |
| // Expr for a 'select *' as expected by parseExprs() |
| if (transfer) { |
| expr[0] = "`**`"; |
| expr[1] = "`**`"; |
| } |
| |
| for (int i = 0; i < columnCount; i++) { |
| jsonRow.append("\"" + "C" + i + "\": " + "\"" + testString + "\"" + ((i == columnCount - 1) ? "" : ",")); |
| baselineColumns[i] = "C" + i; |
| if (!transfer) { |
| expr[i * 2] = "lower(" + baselineColumns[i] + ")"; |
| expr[i * 2 + 1] = baselineColumns[i]; |
| } |
| baselineValues[i] = (transfer ? testString : StringUtils.lowerCase(testString)); |
| } |
| jsonRow.append("}"); |
| StringBuilder batchString = new StringBuilder("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append(jsonRow + ((i == numRows - 1) ? "" : ",")); |
| } |
| batchString.append("]"); |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| inputJsonBatches.add(batchString.toString()); |
| |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| expectedJsonBatches.add(batchString.toString()); |
| |
| Project projectConf = new Project(parseExprs(expr), null); |
| mockOpContext(projectConf, initReservation, maxAllocation); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(projectConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns(baselineColumns) |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (int i = 0; i < numRows; i++) { |
| opTestBuilder.baselineValues(baselineValues); |
| } |
| opTestBuilder.go(); |
| } |
| |
| /** |
| * Test expression with transfer and new columns |
| * @throws Exception |
| */ |
| @Test |
| public void testProjectVariableWidthMixed() throws Exception { |
| String testString = "ABCDEFGHIJ"; |
| StringBuilder jsonRow = new StringBuilder("{"); |
| // 50 new columns and 1 transfer |
| final int colCount = 50 + 1; |
| String[] baselineColumns = new String [colCount]; |
| Object[] baselineValues = new String[colCount]; |
| int exprSize = 2 * colCount; |
| String[] expr = new String[exprSize]; |
| |
| // columns C1 ... C50 |
| for (int i = 1; i < colCount; i++) { |
| jsonRow.append("\"" + "C" + i + "\": " + "\"" + testString + "\"" + ((i == colCount - 1) ? "" : ",")); |
| baselineColumns[i] = "C" + i; |
| // New columns lower(C1) as C1, ... lower(C50) as C50 |
| expr[i * 2] = "lower(" + baselineColumns[i] + ")"; |
| expr[i * 2 + 1] = baselineColumns[i]; |
| |
| baselineValues[i] = StringUtils.lowerCase(testString); |
| } |
| |
| |
| //Transfer: C1 as COL1TR |
| expr[0] = "C1"; |
| expr[1] = "COL1TR"; |
| baselineColumns[0] = "COL1TR"; |
| baselineValues[0] = testString; |
| String expectedJsonRow = jsonRow.toString() + ", \"COL1TR\": \"" + testString + "\"}"; |
| jsonRow.append("}"); |
| |
| StringBuilder batchString = new StringBuilder("["); |
| StringBuilder expectedString = new StringBuilder("["); |
| |
| for (int i = 0; i < numRows; i++) { |
| batchString.append(jsonRow + ((i == numRows - 1) ? "" : ",")); |
| expectedString.append(expectedJsonRow + ((i == numRows - 1) ? "" : ",")); |
| } |
| batchString.append("]"); |
| expectedString.append("]"); |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| inputJsonBatches.add(batchString.toString()); |
| |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| expectedJsonBatches.add(expectedString.toString()); |
| |
| Project projectConf = new Project(parseExprs(expr), null); |
| mockOpContext(projectConf, initReservation, maxAllocation); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(projectConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns(baselineColumns) |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (int i = 0; i < numRows; i++) { |
| opTestBuilder.baselineValues(baselineValues); |
| } |
| opTestBuilder.go(); |
| } |
| |
| |
| |
| @Test |
| public void testFlattenFixedWidth() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : [6,7,8,9] |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]}"); |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : 6 |
| // "a" : 5, "b" : wideString, "c" : 7 |
| // "a" : 5, "b" : wideString, "c" : 8 |
| // "a" : 5, "b" : wideString, "c" : 9 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9},"); |
| } |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, 6l); |
| opTestBuilder.baselineValues(5l, wideString, 7l); |
| opTestBuilder.baselineValues(5l, wideString, 8l); |
| opTestBuilder.baselineValues(5l, wideString, 9l); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenVariableWidth() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker", "peacock"] |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]}"); |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : parrot |
| // "a" : 5, "b" : wideString, "c" : hummingbird |
| // "a" : 5, "b" : wideString, "c" : owl |
| // "a" : 5, "b" : wideString, "c" : woodpecker |
| // "a" : 5, "b" : wideString, "c" : peacock |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"},"); |
| } |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, "parrot"); |
| opTestBuilder.baselineValues(5l, wideString, "hummingbird"); |
| opTestBuilder.baselineValues(5l, wideString, "owl"); |
| opTestBuilder.baselineValues(5l, wideString, "woodpecker"); |
| opTestBuilder.baselineValues(5l, wideString, "peacock"); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenFixedWidthList() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : [[1,2,3,4], [5,6,7,8]] |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]"); |
| batchString.append("},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]"); |
| batchString.append("}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : [1,2,3,4] |
| // "a" : 5, "b" : wideString, "c" : [5,6,7,8] |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"},"); |
| } |
| |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"}"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize); // verify batch size. |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(1L, 2L, 3L, 4L))); |
| opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(5L, 6L, 7L, 8L))); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenVariableWidthList() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : [["parrot", "hummingbird", "owl", "woodpecker"], ["hawk", "nightingale", "swallow", "peacock"]] |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + |
| "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]"); |
| batchString.append("},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + |
| "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]"); |
| batchString.append("}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker"] |
| // "a" : 5, "b" : wideString, "c" : ["hawk", "nightingale", "swallow", "peacock"] |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]},"); |
| } |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]}"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize); // verify batch size. |
| |
| final JsonStringArrayList<Text> birds1 = new JsonStringArrayList<Text>() {{ |
| add(new Text("parrot")); |
| add(new Text("hummingbird")); |
| add(new Text("owl")); |
| add(new Text("woodpecker")); |
| }}; |
| |
| final JsonStringArrayList<Text> birds2 = new JsonStringArrayList<Text>() {{ |
| add(new Text("hawk")); |
| add(new Text("nightingale")); |
| add(new Text("swallow")); |
| add(new Text("peacock")); |
| }}; |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, birds1); |
| opTestBuilder.baselineValues(5l, wideString, birds2); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenMap() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + |
| "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + |
| " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); |
| batchString.append("]},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + |
| "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777," + |
| " \"type\":\"sports\"}," + |
| " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); |
| batchString.append("]}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:7777777, type:sports} |
| // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries} |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "{\"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}},"); |
| } |
| |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "{\"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}}"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); |
| resultExpected1.put("trans_id", new Text("t1")); |
| resultExpected1.put("amount", new Long(100)); |
| resultExpected1.put("trans_time", new Long(7777777)); |
| resultExpected1.put("type", new Text("sports")); |
| |
| JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); |
| resultExpected2.put("trans_id", new Text("t2")); |
| resultExpected2.put("amount", new Long(1000)); |
| resultExpected2.put("trans_time", new Long(8888888)); |
| resultExpected2.put("type", new Text("groceries")); |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, resultExpected1); |
| opTestBuilder.baselineValues(5l, wideString, resultExpected2); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenListOfMaps() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, |
| // "c" : [ [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}], |
| // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}], |
| // [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] ] |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]"); |
| batchString.append("]},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]"); |
| batchString.append("]}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] |
| // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); |
| } |
| |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},"); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| "[ { \"trans_id\":\"t1\", \"amount\":100, " + |
| "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " + |
| "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]}"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| final JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); |
| resultExpected1.put("trans_id", new Text("t1")); |
| resultExpected1.put("amount", new Long(100)); |
| resultExpected1.put("trans_time", new Long(7777777)); |
| resultExpected1.put("type", new Text("sports")); |
| |
| final JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); |
| resultExpected2.put("trans_id", new Text("t2")); |
| resultExpected2.put("amount", new Long(1000)); |
| resultExpected2.put("trans_time", new Long(8888888)); |
| resultExpected2.put("type", new Text("groceries")); |
| |
| final JsonStringArrayList<JsonStringHashMap<String, Object>> results = new JsonStringArrayList<JsonStringHashMap<String, Object>>() {{ |
| add(resultExpected1); |
| add(resultExpected2); |
| }}; |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, results); |
| opTestBuilder.baselineValues(5l, wideString, results); |
| opTestBuilder.baselineValues(5l, wideString, results); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenNestedMap() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : 5, "b" : wideString, |
| // "c" : [ {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}, |
| // {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} ] |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| StringBuilder innerMap = new StringBuilder(); |
| innerMap.append("{ \"trans_id\":\"inner_trans_t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}"); |
| |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + |
| ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]"); |
| batchString.append("},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}"); |
| batchString.append("]}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} |
| // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"} }, "); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"} }, "); |
| } |
| |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"} }, "); |
| expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " + |
| " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " + |
| "\"type\":\"sports\"} }"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| JsonStringHashMap<String, Object> innerMapResult = new JsonStringHashMap<>(); |
| innerMapResult.put("trans_id", new Text("inner_trans_t1")); |
| innerMapResult.put("amount", new Long(100)); |
| innerMapResult.put("trans_time", new Long(7777777)); |
| innerMapResult.put("type", new Text("sports")); |
| |
| JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>(); |
| resultExpected1.put("trans_id", new Text("t1")); |
| resultExpected1.put("amount", new Long(100)); |
| resultExpected1.put("trans_time", new Long(7777777)); |
| resultExpected1.put("type", new Text("sports")); |
| resultExpected1.put("innerMap", innerMapResult); |
| |
| JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>(); |
| resultExpected2.put("trans_id", new Text("t2")); |
| resultExpected2.put("amount", new Long(1000)); |
| resultExpected2.put("trans_time", new Long(8888888)); |
| resultExpected2.put("type", new Text("groceries")); |
| resultExpected2.put("innerMap", innerMapResult); |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, resultExpected1); |
| opTestBuilder.baselineValues(5l, wideString, resultExpected2); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenUpperLimit() throws Exception { |
| // test the upper limit of 65535 records per batch. |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| StringBuilder flattenElement = new StringBuilder(); |
| |
| // Create list of 1000 elements |
| flattenElement.append("["); |
| for (int i = 0; i < 1000; i++) { |
| flattenElement.append(i); |
| flattenElement.append(","); |
| } |
| flattenElement.append(1000); |
| flattenElement.append("]"); |
| |
| batchString.append("["); |
| |
| numRows = 100; |
| |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"c\":" + flattenElement + "},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"c\":" + flattenElement + "}"); |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| for (int j = 0; j < 1000; j++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"c\" :"); |
| expectedBatchString.append(j); |
| expectedBatchString.append("},"); |
| } |
| } |
| for (int j = 0; j < 999; j++) { |
| expectedBatchString.append("{\"a\": 5, " + "\"c\" :"); |
| expectedBatchString.append(j); |
| expectedBatchString.append("},"); |
| } |
| |
| expectedBatchString.append("{\"a\": 5, " + "\"c\" :"); |
| expectedBatchString.append(1000); |
| expectedBatchString.append("}"); |
| |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get 16 batches because of upper bound of 65535 rows. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records. |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| for (long j = 0; j < 1001; j++) { |
| opTestBuilder.baselineValues(5l, j); |
| } |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenLowerLimit() throws Exception { |
| // test the lower limit of at least one batch |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| StringBuilder flattenElement = new StringBuilder(); |
| |
| // Create list of 10 elements |
| flattenElement.append("["); |
| for (int i = 0; i < 10; i++) { |
| flattenElement.append(i); |
| flattenElement.append(","); |
| } |
| flattenElement.append(10); |
| flattenElement.append("]"); |
| |
| // create list of wideStrings |
| final StringBuilder wideStrings = new StringBuilder(); |
| wideStrings.append("["); |
| for (int i = 0; i < 10; i++) { |
| wideStrings.append("\"" + wideString + "\","); |
| } |
| wideStrings.append("\"" + wideString + "\""); |
| wideStrings.append("]"); |
| |
| batchString.append("["); |
| batchString.append("{\"a\": " + wideStrings + "," + "\"c\":" + flattenElement); |
| batchString.append("}]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // set very low value of batch size for a large record size. |
| // This is to test we atleast get one record per batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 1024); |
| |
| // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record. |
| // do not check the output batch size as it will be more than configured value of 1024, so we get |
| // at least one record out. |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "c") |
| .expectedNumBatches(10); // verify number of batches |
| |
| final JsonStringArrayList<Text> results = new JsonStringArrayList<Text>() {{ |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| add(new Text(wideString)); |
| }}; |
| |
| for (long j = 0; j < 11; j++) { |
| opTestBuilder.baselineValues(results, j); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenEmptyList() throws Exception { |
| final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b")); |
| |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| StringBuilder flattenElement = new StringBuilder(); |
| |
| flattenElement.append("["); |
| flattenElement.append("]"); |
| |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "},"); |
| } |
| batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "}"); |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b") |
| .expectZeroRows(); |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testFlattenLargeRecords() throws Exception { |
| PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c")); |
| mockOpContext(flatten, initReservation, maxAllocation); |
| |
| // create input rows like this. |
| // "a" : <id1>, "b" : wideString, "c" : [ 10 wideStrings ] |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| int arrayLength = 10; |
| StringBuilder test = new StringBuilder(); |
| test.append("[ \""); |
| for (int i = 0; i < arrayLength; i++) { |
| test.append(wideString); |
| test.append("\",\""); |
| } |
| test.append(wideString); |
| test.append("\"]"); |
| |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{" + "\"a\" :" + (new StringBuilder().append(i)) + ",\"b\": \"" + wideString + "\"," + |
| "\"c\": " + test + "},"); |
| } |
| batchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\"," + |
| "\"c\": " + test + "}"); |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // output rows will be like this. |
| // "a" : <id1>, "b" : wideString, "c" : wideString |
| |
| // Figure out what will be approximate total output size out of flatten for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int k = 0; k < (numRows) * 11; k++) { |
| expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(k)) + ",\"b\": \"" + wideString + "\","); |
| expectedBatchString.append("\"c\": \"" + wideString + "\"},"); |
| } |
| expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\","); |
| expectedBatchString.append("\"c\": \"" + wideString + "\"}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(flatten) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b", "c") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize / 2); // verify batch size. |
| |
| for (long k = 0; k < ((numRows + 1)); k++) { |
| for (int j = 0; j < arrayLength + 1; j++) { |
| opTestBuilder.baselineValues(k, wideString, wideString); |
| } |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testMergeJoinMultipleOutputBatches() throws Exception { |
| MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(mergeJoin, initReservation, maxAllocation); |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(mergeJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testMergeJoinSingleOutputBatch() throws Exception { |
| MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(mergeJoin, initReservation, maxAllocation); |
| |
| // create multiple batches from both sides. |
| numRows = 4096 * 2; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to twice of total size expected. |
| // We should get 1 batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(mergeJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(1) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testMergeJoinUpperLimit() throws Exception { |
| // test the upper limit of 65535 records per batch. |
| MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT); |
| mockOpContext(mergeJoin, initReservation, maxAllocation); |
| |
| numRows = 100000; |
| |
| // create left input rows like this. |
| // "a1" : 5, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 |
| // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 |
| // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 |
| |
| // expect two batches, batch limited by 65535 records |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(mergeJoin) |
| .baselineColumns("a1", "c1", "a2", "c2") |
| .expectedNumBatches(2) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, i, 6l, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testMergeJoinLowerLimit() throws Exception { |
| // test the lower limit of at least one batch |
| MergeJoinPOP mergeJoin = new MergeJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT); |
| mockOpContext(mergeJoin, initReservation, maxAllocation); |
| |
| numRows = 10; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| |
| // set very low value of output batch size so we can do only one row per batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(mergeJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(10) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testUnionOutputBatch() throws Exception { |
| UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator> emptyList()); |
| mockOpContext(unionAll, initReservation, maxAllocation); |
| |
| // create batches from both sides. |
| numRows = 4000; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to twice of total size expected. |
| // We should get 2 batches, one for the left and one for the right. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(unionAll) |
| .baselineColumns("a1", "b1", "c1") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testUnionMultipleOutputBatches() throws Exception { |
| UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator> emptyList()); |
| mockOpContext(unionAll, initReservation, maxAllocation); |
| |
| // create multiple batches from both sides. |
| numRows = 8000; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows*2; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to half of total size expected. |
| // We should get 4 batches, 2 for the left and 2 for the right. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(unionAll) |
| .baselineColumns("a1", "b1", "c1") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testUnionLowerLimit() throws Exception { |
| UnionAll unionAll = new UnionAll(Collections.<PhysicalOperator> emptyList()); |
| mockOpContext(unionAll, initReservation, maxAllocation); |
| |
| // create multiple batches from both sides. |
| numRows = 10; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows*2; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size very low so we get only one row per batch. |
| // We should get 22 batches for 22 rows. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(unionAll) |
| .baselineColumns("a1","b1", "c1") |
| .expectedNumBatches(22) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashJoinMultipleOutputBatches() throws Exception { |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| numRows = 4000 * 2; |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows+1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashJoinSingleOutputBatch() throws Exception { |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| // create multiple batches from both sides. |
| numRows = 4096 * 2; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to twice of total size expected. |
| // We should get 1 batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(1) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashJoinUpperLimit() throws Exception { |
| // test the upper limit of 65535 records per batch. |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| numRows = 100000; |
| |
| // create left input rows like this. |
| // "a1" : 5, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 |
| // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 |
| // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 |
| |
| // expect two batches, batch limited by 65535 records |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "c1", "a2", "c2") |
| .expectedNumBatches(2) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, i, 6l, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashJoinLowerLimit() throws Exception { |
| // test the lower limit of at least one batch |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.INNER); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| numRows = 10; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| |
| // set very low value of output batch size so we can do only one row per batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(10) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testRightOuterHashJoin() throws Exception { |
| |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.RIGHT); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| numRows = 4000 * 2; |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testLeftOuterHashJoin() throws Exception { |
| |
| HashJoinPOP hashJoin = new HashJoinPOP(null, null, |
| Lists.newArrayList(joinCond("c1", "EQUALS", "c2")), JoinRelType.LEFT); |
| mockOpContext(hashJoin, initReservation, maxAllocation); |
| |
| numRows = 4000 * 2; |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in merge join. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows+1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| |
| } |
| |
| @Test |
| public void testSimpleHashAgg() { |
| HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); |
| List<String> inputJsonBatches = Lists.newArrayList( |
| "[{\"a\": 5, \"b\" : 1 }]", |
| "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]"); |
| |
| legacyOpTestBuilder() |
| .physicalOperator(aggConf) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("b_sum", "a") |
| .baselineValues(6l, 5l) |
| .baselineValues(8l, 3l) |
| .go(); |
| } |
| |
| @Test |
| public void testHashAggSum() throws ExecutionSetupException { |
| HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); |
| |
| // create input rows like this. |
| // "a" : 1, "b" : 1 |
| // "a" : 1, "b" : 1 |
| // "a" : 1, "b" : 1 |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| } |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" ); |
| |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of hash agg for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 1, "b" : 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": " + i + ", \"b\": " + (3*i) + "},"); |
| } |
| expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" ); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashAgg) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b_sum") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize/2); // verify batch size. |
| |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues((long)i, (long)3*i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashAggAvg() throws ExecutionSetupException { |
| HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("avg(b)", "b_avg"), 1.0f); |
| |
| // create input rows like this. |
| // "a" : 1, "b" : 1 |
| // "a" : 1, "b" : 1 |
| // "a" : 1, "b" : 1 |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + i + "},"); |
| } |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" ); |
| |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of hash agg for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 1, "b" : 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": " + i + ", \"b\": " + (3*i) + "},"); |
| } |
| expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + numRows + "}" ); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashAgg) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b_avg") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize/2); // verify batch size. |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues((long)i, (double)i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testHashAggMax() throws ExecutionSetupException { |
| HashAggregate hashAgg = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("max(b)", "b_max"), 1.0f); |
| |
| // create input rows like this. |
| // "a" : 1, "b" : "a" |
| // "a" : 2, "b" : "aa" |
| // "a" : 3, "b" : "aaa" |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"a\": " + i + ", \"b\": " + "\"a\"" + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + "\"aa\"" + "},"); |
| batchString.append("{\"a\": " + i + ", \"b\": " + "\"aaa\"" + "},"); |
| } |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"a\"" + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aa\"" + "}," ); |
| batchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aaa\"" + "}" ); |
| |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Figure out what will be approximate total output size out of hash agg for input above |
| // We will use this sizing information to set output batch size so we can produce desired |
| // number of batches that can be verified. |
| |
| // output rows will be like this. |
| // "a" : 1, "b" : "aaa" |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a\": " + i + ", \"b\": " + "\"aaa\"" + "},"); |
| } |
| expectedBatchString.append("{\"a\": " + numRows + ", \"b\": " + "\"aaa\"" + "}" ); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately get 2 batches and max of 4. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(hashAgg) |
| .inputDataStreamJson(inputJsonBatches) |
| .baselineColumns("a", "b_max") |
| .expectedNumBatches(2) // verify number of batches |
| .expectedBatchSize(totalSize); // verify batch size. |
| |
| for (int i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues((long)i, "aaa"); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testNestedLoopJoinMultipleOutputBatches() throws Exception { |
| LogicalExpression functionCallExpr = new FunctionCall("equal", |
| ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), |
| (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), |
| ExpressionPosition.UNKNOWN); |
| |
| NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); |
| mockOpContext(nestedLoopJoin, initReservation, maxAllocation); |
| |
| numRows = 4000 * 2; |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(nestedLoopJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows+1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| |
| } |
| |
| @Test |
| public void testNestedLoopJoinSingleOutputBatch() throws Exception { |
| LogicalExpression functionCallExpr = new FunctionCall("equal", |
| ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), |
| (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), |
| ExpressionPosition.UNKNOWN); |
| |
| NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); |
| |
| // create multiple batches from both sides. |
| numRows = 4096 * 2; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to twice of total size expected. |
| // We should get 1 batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(nestedLoopJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(1) // verify number of batches |
| .expectedBatchSize(totalSize) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testNestedLoopJoinUpperLimit() throws Exception { |
| // test the upper limit of 65535 records per batch. |
| LogicalExpression functionCallExpr = new FunctionCall("<", |
| ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), |
| (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), |
| ExpressionPosition.UNKNOWN); |
| |
| NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); |
| |
| numRows = 500; |
| |
| // create left input rows like this. |
| // "a1" : 5, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "c1" : 1, "a2":6, "c2": 1 |
| // "a1" : 5, "c1" : 2, "a2":6, "c2": 2 |
| // "a1" : 5, "c1" : 3, "a2":6, "c2": 3 |
| |
| // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250 |
| // expect two batches, batch limited by 65535 records |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(nestedLoopJoin) |
| .baselineColumns("a1", "c1", "a2", "c2") |
| .expectedNumBatches(2) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows+1; i++) { |
| for (long j = i+1; j < numRows+1; j++) { |
| opTestBuilder.baselineValues(5l, i, 6l, j); |
| } |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testNestedLoopJoinLowerLimit() throws Exception { |
| // test the lower limit of at least one batch |
| LogicalExpression functionCallExpr = new FunctionCall("equal", |
| ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), |
| (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), |
| ExpressionPosition.UNKNOWN); |
| |
| NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr); |
| |
| numRows = 10; |
| |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| |
| // set very low value of output batch size so we can do only one row per batch. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(nestedLoopJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(10) // verify number of batches |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows + 1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| } |
| |
| @Test |
| public void testLeftNestedLoopJoin() throws Exception { |
| LogicalExpression functionCallExpr = new FunctionCall("equal", |
| ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN), |
| (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)), |
| ExpressionPosition.UNKNOWN); |
| |
| NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr); |
| |
| numRows = 4000 * 2; |
| // create left input rows like this. |
| // "a1" : 5, "b1" : wideString, "c1" : <id> |
| List<String> leftJsonBatches = Lists.newArrayList(); |
| StringBuilder leftBatchString = new StringBuilder(); |
| leftBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},"); |
| } |
| leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}"); |
| leftBatchString.append("]"); |
| |
| leftJsonBatches.add(leftBatchString.toString()); |
| |
| // create right input rows like this. |
| // "a2" : 6, "b2" : wideString, "c2" : <id> |
| List<String> rightJsonBatches = Lists.newArrayList(); |
| StringBuilder rightBatchString = new StringBuilder(); |
| rightBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| rightBatchString.append("]"); |
| rightJsonBatches.add(rightBatchString.toString()); |
| |
| // output rows will be like this. |
| // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1 |
| // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2 |
| // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3 |
| List<String> expectedJsonBatches = Lists.newArrayList(); |
| StringBuilder expectedBatchString = new StringBuilder(); |
| expectedBatchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},"); |
| } |
| expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows); |
| expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}"); |
| expectedBatchString.append("]"); |
| expectedJsonBatches.add(expectedBatchString.toString()); |
| |
| long totalSize = getExpectedSize(expectedJsonBatches); |
| |
| // set the output batch size to 1/2 of total size expected. |
| // We will get approximately 4 batches. |
| fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2); |
| |
| LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder() |
| .physicalOperator(nestedLoopJoin) |
| .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2") |
| .expectedNumBatches(4) // verify number of batches |
| .expectedBatchSize(totalSize / 2) // verify batch size |
| .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches)); |
| |
| for (long i = 0; i < numRows+1; i++) { |
| opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i); |
| } |
| |
| opTestBuilder.go(); |
| |
| } |
| |
| @Test |
| public void testSizerRepeatedList() throws Exception { |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| StringBuilder newString = new StringBuilder(); |
| newString.append("[ [1,2,3,4], [5,6,7,8] ]"); |
| |
| numRows = 9; |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"c\" : " + newString); |
| batchString.append("},"); |
| } |
| batchString.append("{\"c\" : " + newString); |
| batchString.append("}"); |
| |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Create a dummy scanBatch to figure out the size. |
| RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), |
| fragContext, getReaderListForJsonBatches(inputJsonBatches, fragContext)); |
| |
| VectorAccessible va = new BatchIterator(scanBatch).iterator().next(); |
| RecordBatchSizer sizer = new RecordBatchSizer(va); |
| |
| assertEquals(1, sizer.columns().size()); |
| RecordBatchSizer.ColumnSize column = sizer.columns().get("c"); |
| assertNotNull(column); |
| |
| /** |
| * stdDataSize:8*5*5, stdNetSize:8*5*5 + 4*5 + 4*5 + 4, |
| * dataSizePerEntry:8*8, netSizePerEntry:8*8 + 4*2 + 4, |
| * totalDataSize:8*8*10, totalNetSize:netSizePerEntry*10, valueCount:10, |
| * elementCount:10, estElementCountPerArray:1, isVariableWidth:false |
| */ |
| assertEquals(200, column.getStdDataSizePerEntry()); |
| assertEquals(244, column.getStdNetSizePerEntry()); |
| assertEquals(64, column.getDataSizePerEntry()); |
| assertEquals(76, column.getNetSizePerEntry()); |
| assertEquals(640, column.getTotalDataSize()); |
| assertEquals(760, column.getTotalNetSize()); |
| assertEquals(10, column.getValueCount()); |
| assertEquals(20, column.getElementCount()); |
| assertEquals(2, column.getCardinality(), 0.01); |
| assertEquals(false, column.isVariableWidth()); |
| |
| final int testRowCount = 1000; |
| final int testRowCountPowerTwo = 2048; |
| |
| for (VectorWrapper<?> vw : va) { |
| ValueVector v = vw.getValueVector(); |
| v.clear(); |
| |
| RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName()); |
| |
| // Allocates to nearest power of two |
| colSize.allocateVector(v, testRowCount); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity()); |
| |
| // Get inner vector of delegate vector. |
| ValueVector vector = ((RepeatedValueVector) v).getDataVector(); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 4 (inner array cardinality) * row count number of values. |
| ValueVector dataVector = ((RepeatedValueVector) vector).getDataVector(); |
| assertEquals(Integer.highestOneBit((testRowCount*8) << 1), dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) vector).getOffsetVector(); |
| assertEquals((Integer.highestOneBit(testRowCount*2) << 1), offsetVector.getValueCapacity()); |
| v.clear(); |
| |
| // Allocates the same as value passed since it is already power of two. |
| // -1 is done for adjustment needed for offset vector. |
| colSize.allocateVector(v, testRowCountPowerTwo - 1); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity()); |
| |
| // Get inner vector of delegate vector. |
| vector = ((RepeatedValueVector) v).getDataVector(); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 4 (inner array cardinality) * row count number of values. |
| dataVector = ((RepeatedValueVector) vector).getDataVector(); |
| assertEquals(testRowCountPowerTwo * 8, dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) vector).getOffsetVector(); |
| assertEquals(testRowCountPowerTwo * 2, offsetVector.getValueCapacity()); |
| v.clear(); |
| |
| // MAX ROW COUNT |
| colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity()); |
| |
| // Get inner vector of delegate vector. |
| vector = ((RepeatedValueVector) v).getDataVector(); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 4 (inner array cardinality) * row count number of values. |
| dataVector = ((RepeatedValueVector) vector).getDataVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*8, dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) vector).getOffsetVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*2, offsetVector.getValueCapacity()); |
| v.clear(); |
| |
| // MIN ROW COUNT |
| colSize.allocateVector(v, 0); |
| |
| // offset vector of delegate vector i.e. outer array should have 1 value. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT, offsetVector.getValueCapacity()); |
| |
| // Get inner vector of delegate vector. |
| vector = ((RepeatedValueVector) v).getDataVector(); |
| |
| // Data vector of inner vector should have 1 value |
| dataVector = ((RepeatedValueVector) vector).getDataVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT, dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * 1. |
| offsetVector = ((RepeatedValueVector) vector).getOffsetVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT*2, offsetVector.getValueCapacity()); |
| v.clear(); |
| } |
| } |
| |
| @Test |
| public void testSizerRepeatedRepeatedList() throws Exception { |
| List<String> inputJsonBatches = Lists.newArrayList(); |
| StringBuilder batchString = new StringBuilder(); |
| |
| StringBuilder newString = new StringBuilder(); |
| newString.append("[ [[1,2,3,4], [5,6,7,8]], [[1,2,3,4], [5,6,7,8]] ]"); |
| |
| numRows = 9; |
| batchString.append("["); |
| for (int i = 0; i < numRows; i++) { |
| batchString.append("{\"c\" : " + newString); |
| batchString.append("},"); |
| } |
| batchString.append("{\"c\" : " + newString); |
| batchString.append("}"); |
| |
| batchString.append("]"); |
| inputJsonBatches.add(batchString.toString()); |
| |
| // Create a dummy scanBatch to figure out the size. |
| RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), |
| fragContext, getReaderListForJsonBatches(inputJsonBatches, fragContext)); |
| |
| VectorAccessible va = new BatchIterator(scanBatch).iterator().next(); |
| RecordBatchSizer sizer = new RecordBatchSizer(va); |
| |
| assertEquals(1, sizer.columns().size()); |
| RecordBatchSizer.ColumnSize column = sizer.columns().get("c"); |
| assertNotNull(column); |
| |
| /** |
| * stdDataSize:8*5*5*5, stdNetSize:8*5*5*5 + 8*5*5 + 8*5 + 4, |
| * dataSizePerEntry:16*8, netSizePerEntry:16*8 + 16*4 + 4*2 + 4*2, |
| * totalDataSize:16*8*10, totalNetSize:netSizePerEntry*10, valueCount:10, |
| * elementCount:10, estElementCountPerArray:1, isVariableWidth:false |
| */ |
| assertEquals(1000, column.getStdDataSizePerEntry()); |
| assertEquals(1244, column.getStdNetSizePerEntry()); |
| assertEquals(128, column.getDataSizePerEntry()); |
| assertEquals(156, column.getNetSizePerEntry()); |
| assertEquals(1280, column.getTotalDataSize()); |
| assertEquals(1560, column.getTotalNetSize()); |
| assertEquals(10, column.getValueCount()); |
| assertEquals(20, column.getElementCount()); |
| assertEquals(2, column.getCardinality(), 0.01); |
| assertEquals(false, column.isVariableWidth()); |
| |
| final int testRowCount = 1000; |
| final int testRowCountPowerTwo = 2048; |
| |
| for (VectorWrapper<?> vw : va) { |
| ValueVector v = vw.getValueVector(); |
| v.clear(); |
| |
| RecordBatchSizer.ColumnSize colSize = sizer.getColumn(v.getField().getName()); |
| |
| // Allocates to nearest power of two |
| colSize.allocateVector(v, testRowCount); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals((Integer.highestOneBit(testRowCount) << 1), offsetVector.getValueCapacity()); |
| |
| // Get data vector of delegate vector. This is repeated list again |
| ValueVector dataVector = ((RepeatedListVector) v).getDataVector(); |
| |
| // offset vector of delegate vector of the inner repeated list |
| // This should have row count * 2 number of values. |
| offsetVector = ((RepeatedListVector) dataVector).getOffsetVector(); |
| assertEquals((Integer.highestOneBit(testRowCount*2) << 1), offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should have row count * 2 number of values - 1 (for offset vector adjustment). |
| ValueVector innerDataVector = ((RepeatedValueVector) dataVector).getDataVector(); |
| assertEquals((Integer.highestOneBit((testRowCount*2) << 1) - 1), dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) innerDataVector).getOffsetVector(); |
| assertEquals((Integer.highestOneBit(testRowCount*4) << 1), offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| dataVector = ((RepeatedValueVector) innerDataVector).getDataVector(); |
| assertEquals(Integer.highestOneBit(testRowCount << 1) * 16, dataVector.getValueCapacity()); |
| |
| v.clear(); |
| |
| // Allocates the same as value passed since it is already power of two. |
| // -1 is done for adjustment needed for offset vector. |
| colSize.allocateVector(v, testRowCountPowerTwo - 1); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(testRowCountPowerTwo, offsetVector.getValueCapacity()); |
| |
| // Get data vector of delegate vector. This is repeated list again |
| dataVector = ((RepeatedListVector) v).getDataVector(); |
| |
| // offset vector of delegate vector of the inner repeated list |
| // This should have row count * 2 number of values. |
| offsetVector = ((RepeatedListVector) dataVector).getOffsetVector(); |
| assertEquals(testRowCountPowerTwo*2, offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should have row count * 2 number of values - 1 (for offset vector adjustment). |
| innerDataVector = ((RepeatedValueVector) dataVector).getDataVector(); |
| assertEquals(testRowCountPowerTwo*2 - 1, dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) innerDataVector).getOffsetVector(); |
| assertEquals(testRowCountPowerTwo*4, offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| dataVector = ((RepeatedValueVector) innerDataVector).getDataVector(); |
| assertEquals(testRowCountPowerTwo * 16, dataVector.getValueCapacity()); |
| |
| v.clear(); |
| |
| // MAX ROW COUNT |
| colSize.allocateVector(v, ValueVector.MAX_ROW_COUNT - 1); |
| |
| // offset vector of delegate vector i.e. outer array should have row count number of values. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT, offsetVector.getValueCapacity()); |
| |
| // Get data vector of delegate vector. This is repeated list again |
| dataVector = ((RepeatedListVector) v).getDataVector(); |
| |
| // offset vector of delegate vector of the inner repeated list |
| // This should have row count * 2 number of values. |
| offsetVector = ((RepeatedListVector) dataVector).getOffsetVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*2, offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should have row count * 2 number of values - 1 (for offset vector adjustment). |
| innerDataVector = ((RepeatedValueVector) dataVector).getDataVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*2 - 1, dataVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| offsetVector = ((RepeatedValueVector) innerDataVector).getOffsetVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*4, offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should |
| // have 2 (outer array cardinality) * 2 (inner array cardinality) * row count number of values. |
| dataVector = ((RepeatedValueVector) innerDataVector).getDataVector(); |
| assertEquals(ValueVector.MAX_ROW_COUNT*16, dataVector.getValueCapacity()); |
| |
| v.clear(); |
| |
| // MIN ROW COUNT |
| colSize.allocateVector(v, 0); |
| |
| // offset vector of delegate vector i.e. outer array should have 1 value. |
| offsetVector = ((RepeatedListVector) v).getOffsetVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT, offsetVector.getValueCapacity()); |
| |
| // Get data vector of delegate vector. This is repeated list again |
| dataVector = ((RepeatedListVector) v).getDataVector(); |
| |
| // offset vector of delegate vector of the inner repeated list |
| offsetVector = ((RepeatedListVector) dataVector).getOffsetVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT, offsetVector.getValueCapacity()); |
| |
| // offset vector of inner vector should have |
| // 2 (outer array cardinality) * 1. |
| offsetVector = ((RepeatedValueVector) innerDataVector).getOffsetVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT*2, offsetVector.getValueCapacity()); |
| |
| // Data vector of inner vector should 1 value. |
| dataVector = ((RepeatedValueVector) innerDataVector).getDataVector(); |
| assertEquals(ValueVector.MIN_ROW_COUNT, dataVector.getValueCapacity()); |
| |
| v.clear(); |
| |
| } |
| } |
| |
| } |