IMPALA-3335: Allow single-node optimization with joins
Enable the single-node optimization for queries with joins.
Testing:
* Ran exhaustive tests
* Looped TPC-DS overnight with NUM_NODES=1 against an impalad
mini-cluster with a single dedicated coordinator
Change-Id: I6b189271630214960ed482cb2b552fba9f246770
Reviewed-on: http://gerrit.cloudera.org:8080/16521
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e87ad0a..5c3ac01 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -661,8 +661,7 @@
private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
singleNodePlan.accept(visitor);
- // TODO: IMPALA-3335: support the optimization for plans with joins.
- if (!visitor.valid() || visitor.foundJoinNode()) return;
+ if (!visitor.valid()) return;
// This optimization executes the plan on a single node so the threshold must
// be based on the total number of rows processed.
long maxRowsProcessed = visitor.getMaxRowsProcessed();
diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
index 7f51f8c..fb10a74 100644
--- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
+++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
@@ -17,7 +17,6 @@
package org.apache.impala.util;
-import org.apache.impala.planner.JoinNode;
import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.ScanNode;
@@ -32,7 +31,6 @@
// True if we should abort because we don't have valid estimates
// for a plan node.
private boolean valid_ = true;
- private boolean foundJoinNode_ ;
// Max number of rows processed across all instances of a plan node.
private long maxRowsProcessed_ ;
@@ -43,7 +41,6 @@
@Override
public void visit(PlanNode caller) {
if (!valid_) return;
- if (caller instanceof JoinNode) foundJoinNode_ = true;
PlanFragment fragment = caller.getFragment();
int numNodes = fragment == null ? 1 : fragment.getNumNodes();
@@ -88,9 +85,4 @@
return maxRowsProcessedPerNode_;
}
- public boolean foundJoinNode() {
- Preconditions.checkState(valid_);
- return foundJoinNode_;
- }
-
}
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index c2e88fb..3817a82 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -230,7 +230,7 @@
limit: 5
row-size=80B cardinality=5
====
-# Two scans cannot run in the same fragment. IMPALA-561
+# testbl does not have stats computed, so the small query optimization will be disabled
select * from
functional.testtbl a join functional.testtbl b on a.id = b.id
---- DISTRIBUTEDPLAN
@@ -274,9 +274,28 @@
HDFS partitions=1/1 files=0 size=0B
row-size=24B cardinality=0
====
+# IMPALA-3335: test that queries with joins can be run as small queries
select * from
functional.alltypestiny a
where a.id in (select id from functional.alltypestiny limit 5) limit 5
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT SEMI JOIN]
+| hash predicates: a.id = id
+| runtime filters: RF000 <- id
+| limit: 5
+| row-size=89B cardinality=5
+|
+|--01:SCAN HDFS [functional.alltypestiny]
+| HDFS partitions=4/4 files=4 size=460B
+| limit: 5
+| row-size=4B cardinality=5
+|
+00:SCAN HDFS [functional.alltypestiny a]
+ HDFS partitions=4/4 files=4 size=460B
+ runtime filters: RF000 -> a.id
+ row-size=89B cardinality=8
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|