IMPALA-3335: Allow single-node optimization with joins

Enable the single-node optimization for queries with joins.

Testing:
* Ran exhaustive tests
* Looped TPC-DS overnight with NUM_NODES=1 against an impalad
  mini-cluster with a single dedicated coordinator

Change-Id: I6b189271630214960ed482cb2b552fba9f246770
Reviewed-on: http://gerrit.cloudera.org:8080/16521
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e87ad0a..5c3ac01 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -661,8 +661,7 @@
   private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
     MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
     singleNodePlan.accept(visitor);
-    // TODO: IMPALA-3335: support the optimization for plans with joins.
-    if (!visitor.valid() || visitor.foundJoinNode()) return;
+    if (!visitor.valid()) return;
     // This optimization executes the plan on a single node so the threshold must
     // be based on the total number of rows processed.
     long maxRowsProcessed = visitor.getMaxRowsProcessed();
diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
index 7f51f8c..fb10a74 100644
--- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
+++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.util;
 
-import org.apache.impala.planner.JoinNode;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.ScanNode;
@@ -32,7 +31,6 @@
   // True if we should abort because we don't have valid estimates
   // for a plan node.
   private boolean valid_ = true;
-  private boolean foundJoinNode_ ;
 
   // Max number of rows processed across all instances of a plan node.
   private long maxRowsProcessed_ ;
@@ -43,7 +41,6 @@
   @Override
   public void visit(PlanNode caller) {
     if (!valid_) return;
-    if (caller instanceof JoinNode) foundJoinNode_ = true;
 
     PlanFragment fragment = caller.getFragment();
     int numNodes = fragment == null ? 1 : fragment.getNumNodes();
@@ -88,9 +85,4 @@
     return maxRowsProcessedPerNode_;
   }
 
-  public boolean foundJoinNode() {
-    Preconditions.checkState(valid_);
-    return foundJoinNode_;
-  }
-
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index c2e88fb..3817a82 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -230,7 +230,7 @@
    limit: 5
    row-size=80B cardinality=5
 ====
-# Two scans cannot run in the same fragment. IMPALA-561
+# testbl does not have stats computed, so the small query optimization will be disabled
 select * from
   functional.testtbl a join functional.testtbl b on a.id = b.id
 ---- DISTRIBUTEDPLAN
@@ -274,9 +274,28 @@
    HDFS partitions=1/1 files=0 size=0B
    row-size=24B cardinality=0
 ====
+# IMPALA-3335: test that queries with joins can be run as small queries
 select * from
   functional.alltypestiny a
 where a.id in (select id from functional.alltypestiny limit 5) limit 5
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: a.id = id
+|  runtime filters: RF000 <- id
+|  limit: 5
+|  row-size=89B cardinality=5
+|
+|--01:SCAN HDFS [functional.alltypestiny]
+|     HDFS partitions=4/4 files=4 size=460B
+|     limit: 5
+|     row-size=4B cardinality=5
+|
+00:SCAN HDFS [functional.alltypestiny a]
+   HDFS partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> a.id
+   row-size=89B cardinality=8
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |