blob: ac7d3d2a2139f091f70131d104ea9408119a3436 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.unit;
import static org.apache.drill.test.TestBuilder.mapOf;
import java.util.List;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ComplexToJson;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.test.LegacyOperatorTestBuilder;
import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Lists;
public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
@Test
public void testSimpleProject() {
Project projectConf = new Project(parseExprs("x+5", "x"), null);
List<String> jsonBatches = Lists.newArrayList(
"[{\"x\": 5 },{\"x\": 10 }]",
"[{\"x\": 20 },{\"x\": 30 },{\"x\": 40 }]");
legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(jsonBatches)
.baselineColumns("x")
.baselineValues(10l)
.baselineValues(15l)
.baselineValues(25l)
.baselineValues(35l)
.baselineValues(45l)
.go();
}
@Test
public void testProjectComplexOutput() {
Project projectConf = new Project(parseExprs("convert_from(json_col, 'JSON')", "complex_col"), null);
List<String> jsonBatches = Lists.newArrayList(
"[{\"json_col\": \"{ \\\"a\\\" : 1 }\"}]",
"[{\"json_col\": \"{ \\\"a\\\" : 5 }\"}]");
legacyOpTestBuilder()
.physicalOperator(projectConf)
.inputDataStreamJson(jsonBatches)
.baselineColumns("complex_col")
.baselineValues(mapOf("a", 1l))
.baselineValues(mapOf("a", 5l))
.go();
}
@SuppressWarnings("unchecked")
@Test
public void testSimpleHashJoin() {
HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT, null);
// TODO - figure out where to add validation, column names must be unique, even between the two batches,
// for all columns, not just the one in the join condition
// TODO - if any are common between the two, it is failing in the generated setup method in HashJoinProbeGen
List<String> leftJsonBatches = Lists.newArrayList(
"[{\"x\": 5, \"a\" : \"a string\"}]",
"[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x1\": 5, \"a2\" : \"asdf\"}]",
"[{\"x1\": 6, \"a2\" : \"qwerty\"},{\"x1\": 5, \"a2\" : \"12345\"}]");
legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a", "a2", "x1")
.baselineValues(5l, "a string", "asdf", 5l)
.baselineValues(5l, "a string", "12345", 5l)
.baselineValues(5l, "a different string", "asdf", 5l)
.baselineValues(5l, "a different string", "12345", 5l)
.baselineValues(5l, "meh", "asdf", 5l)
.baselineValues(5l, "meh", "12345", 5l)
.go();
}
@SuppressWarnings("unchecked")
@Test
public void testSimpleMergeJoin() {
MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT);
// TODO - figure out where to add validation, column names must be unique, even between the two batches,
// for all columns, not just the one in the join condition
List<String> leftJsonBatches = Lists.newArrayList(
"[{\"x\": 5, \"a\" : \"a string\"}]",
"[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x1\": 5, \"a2\" : \"asdf\"}]",
"[{\"x1\": 5, \"a2\" : \"12345\"}, {\"x1\": 6, \"a2\" : \"qwerty\"}]");
legacyOpTestBuilder()
.physicalOperator(joinConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a", "a2", "x1")
.baselineValues(5l, "a string", "asdf", 5l)
.baselineValues(5l, "a string", "12345", 5l)
.baselineValues(5l, "a different string", "asdf", 5l)
.baselineValues(5l, "a different string", "12345", 5l)
.baselineValues(5l, "meh", "asdf", 5l)
.baselineValues(5l, "meh", "12345", 5l)
.go();
}
@Test
public void testSimpleHashAgg() {
HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("b_sum", "a")
.baselineValues(6l, 5l)
.baselineValues(8l, 3l)
.go();
}
@Test
public void testSimpleStreamAgg() {
StreamingAggregate aggConf = new StreamingAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"));
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
legacyOpTestBuilder()
.physicalOperator(aggConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("b_sum", "a")
.baselineValues(6l, 5l)
.baselineValues(8l, 3l)
.go();
}
@Test
public void testComplexToJson() {
ComplexToJson complexToJson = new ComplexToJson(null);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": {\"b\" : 1 }}]",
"[{\"a\": {\"b\" : 5}},{\"a\": {\"b\" : 8}}]");
legacyOpTestBuilder()
.physicalOperator(complexToJson)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a")
.baselineValues("{\n \"b\" : 1\n}")
.baselineValues("{\n \"b\" : 5\n}")
.baselineValues("{\n \"b\" : 8\n}")
.go();
}
@Test
public void testFilter() {
Filter filterConf = new Filter(null, parseExpr("a=5"), 1.0f);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
legacyOpTestBuilder()
.physicalOperator(filterConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 1l)
.baselineValues(5l, 5l)
.go();
}
@Test
public void testFlatten() {
final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b"));
List<String> inputJsonBatches = Lists.newArrayList();
StringBuilder batchString = new StringBuilder();
for (int j = 0; j < 1; j++) {
batchString.append("[");
for (int i = 0; i < 1; i++) {
batchString.append("{\"a\": 5, \"b\" : [5, 6, 7]}");
}
batchString.append("]");
inputJsonBatches.add(batchString.toString());
}
LegacyOperatorTestBuilder opTestBuilder = legacyOpTestBuilder()
.physicalOperator(flatten)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 5l)
.baselineValues(5l, 6l)
.baselineValues(5l, 7l);
opTestBuilder.go();
}
@Test
public void testExternalSort() {
ExternalSort sortConf = new ExternalSort(null,
Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
legacyOpTestBuilder()
.physicalOperator(sortConf)
.maxAllocation(15_000_000L)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 1l)
.baselineValues(40l, 3l)
.baselineValues(5l, 5l)
.baselineValues(3l, 8l)
.baselineValues(13l, 100l)
.go();
}
private void externalSortLowMemoryHelper(int batchSize, int numberOfBatches, long initReservation, long maxAllocation) {
ExternalSort sortConf = new ExternalSort(null,
Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
List<String> inputJsonBatches = Lists.newArrayList();
StringBuilder batchString = new StringBuilder();
for (int j = 0; j < numberOfBatches; j++) {
batchString.append("[");
for (int i = 0; i < batchSize; i++) {
batchString.append("{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8},");
}
batchString.append("{\"a\": 5, \"b\" : 1 }");
batchString.append("]");
inputJsonBatches.add(batchString.toString());
}
LegacyOperatorTestBuilder opTestBuilder =
legacyOpTestBuilder()
.initReservation(initReservation)
.maxAllocation(maxAllocation)
.physicalOperator(sortConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b");
for (int i = 0; i < numberOfBatches; i++) {
opTestBuilder.baselineValues(5l, 1l);
}
for (int i = 0; i < batchSize * numberOfBatches; i++) {
opTestBuilder.baselineValues(5l, 5l);
}
for (int i = 0; i < batchSize * numberOfBatches; i++) {
opTestBuilder.baselineValues(3l, 8l);
}
opTestBuilder.go();
}
// TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate buffer of size 262144 (rounded from 147456) due to memory limit. Current allocation: 16422656
// look in ExternalSortBatch for this JIRA number, changing this percentage of the allocator limit that is
// the threshold for spilling (it worked with 0.65 for me) "fixed" the problem but hurt perf, will want
// to find a better solutions to this problem. When it is fixed this threshold will likely become unnecessary
@Test
@Ignore("DRILL-4438")
public void testExternalSortLowMemory1() {
externalSortLowMemoryHelper(4960, 100, 10000000, 16500000);
}
// TODO- believe this was failing in the scan not the sort, may not require a fix
@Test
@Ignore("DRILL-4438")
public void testExternalSortLowMemory2() {
externalSortLowMemoryHelper(4960, 100, 10000000, 15000000);
}
// TODO - believe this was failing in the scan not the sort, may not require a fix
@Test
@Ignore("DRILL-4438")
public void testExternalSortLowMemory3() {
externalSortLowMemoryHelper(40960, 10, 10000000, 10000000);
}
// TODO - Failing with - org.apache.drill.exec.exception.OutOfMemoryException: Unable to allocate sv2 buffer after repeated attempts
// see comment above testExternalSortLowMemory1 about TODO left in ExternalSortBatch
@Test
@Ignore("DRILL-4438")
public void testExternalSortLowMemory4() {
externalSortLowMemoryHelper(15960, 30, 10000000, 14500000);
}
@Test
public void testTopN() {
TopN sortConf = new TopN(null,
Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false, 3);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]",
"[{\"a\": 40, \"b\" : 3},{\"a\": 13, \"b\" : 100}]");
legacyOpTestBuilder()
.physicalOperator(sortConf)
.inputDataStreamJson(inputJsonBatches)
.baselineColumns("a", "b")
.baselineValues(5l, 1l)
.baselineValues(40l, 3l)
.baselineValues(5l, 5l)
.go();
}
// TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch
// need to figure out how to mock these
@SuppressWarnings("unchecked")
@Ignore
@Test
public void testSimpleMergingReceiver() {
MergingReceiverPOP mergeConf = new MergingReceiverPOP(-1, Lists.<MinorFragmentEndpoint>newArrayList(),
Lists.newArrayList(ordering("x", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);
List<String> leftJsonBatches = Lists.newArrayList(
"[{\"x\": 5, \"a\" : \"a string\"}]",
"[{\"x\": 5, \"a\" : \"a different string\"},{\"x\": 5, \"a\" : \"meh\"}]");
List<String> rightJsonBatches = Lists.newArrayList(
"[{\"x\": 5, \"a\" : \"asdf\"}]",
"[{\"x\": 5, \"a\" : \"12345\"}, {\"x\": 6, \"a\" : \"qwerty\"}]");
legacyOpTestBuilder()
.physicalOperator(mergeConf)
.inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches))
.baselineColumns("x", "a")
.baselineValues(5l, "a string")
.baselineValues(5l, "a different string")
.baselineValues(5l, "meh")
.baselineValues(5l, "asdf")
.baselineValues(5l, "12345")
.baselineValues(6l, "qwerty")
.go();
}
}