IMPALA-9081: fix mt_dop validation tests

This undoes the hack of pretending that it's not a test environment for
that single test. That had side effects, e.g. for the metadata loading
path.

Instead we have a special flag to enable the validation code in
frontend tests.

Note that the plans change to include join build sinks as an
expected result of undoing the hack.

Change-Id: I2e8823c562395e13f318d1ad6eed883d2d9d771f
Reviewed-on: http://gerrit.cloudera.org:8080/14707
Reviewed-by: Anurag Mantripragada <anurag@cloudera.com>
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index 6299dd4..c2425a1 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -27,13 +27,13 @@
 
   private int numCores_;
 
-  // The minimum size of buffer spilled to disk by spilling nodes. Used in
-  // PlanNode.computeResourceProfile(). Currently the backend only support a single
-  // spillable buffer size, so this is equal to PlanNode.DEFAULT_SPILLABLE_BUFFER_BYTES,
-  // except in planner tests.
   // Indicates whether this is an environment for testing.
   private boolean isTestEnv_;
 
+  // Whether we should do the same mt_dop validation in frontend tests as in the Impala
+  // service.
+  private boolean enableMtDopValidation_;
+
   public RuntimeEnv() {
     reset();
   }
@@ -44,6 +44,7 @@
   public void reset() {
     numCores_ = Runtime.getRuntime().availableProcessors();
     isTestEnv_ = false;
+    enableMtDopValidation_ = false;
   }
 
   public int getNumCores() { return numCores_; }
@@ -53,5 +54,7 @@
   public boolean isKuduSupported() {
     return "true".equals(System.getenv("KUDU_IS_SUPPORTED"));
   }
+  public boolean isMtDopValidationEnabled() { return enableMtDopValidation_; }
+  public void setEnableMtDopValidation(boolean v) { enableMtDopValidation_ = v; }
 
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index b8a3c4a..a0e4fda 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -129,12 +129,12 @@
     // sinks: we only allow MT_DOP > 0 with such plans if --unlock_mt_dop=true is
     // specified. We allow single node plans with mt_dop since there is no actual
     // parallelism.
-    if (!ctx_.isSingleNodeExec()
-        && ctx_.getQueryOptions().mt_dop > 0
-        && !RuntimeEnv.INSTANCE.isTestEnv()
+    if (!ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop > 0
+        && (!RuntimeEnv.INSTANCE.isTestEnv()
+               || RuntimeEnv.INSTANCE.isMtDopValidationEnabled())
         && !BackendConfig.INSTANCE.isMtDopUnlocked()
-        && (ctx_.hasTableSink() ||
-            singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
+        && (ctx_.hasTableSink()
+               || singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
       if (BackendConfig.INSTANCE.mtDopAutoFallback()) {
         // Fall back to non-dop mode. This assumes that the mt_dop value is only used
         // in the distributed planning process, which should be generally true as long
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index ab4bfb4..c123ad8 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -634,13 +634,14 @@
     TQueryOptions options = defaultQueryOptions();
     options.setMt_dop(3);
     options.setDisable_hdfs_num_rows_estimate(false);
+    options.setExplain_level(TExplainLevel.EXTENDED);
     try {
       // Temporarily unset the test env such that unsupported queries with mt_dop > 0
       // throw an exception. Those are otherwise allowed for testing parallel plans.
-      RuntimeEnv.INSTANCE.setTestEnv(false);
+      RuntimeEnv.INSTANCE.setEnableMtDopValidation(true);
       runPlannerTestFile("mt-dop-validation-hdfs-num-rows-est-enabled", options);
     } finally {
-      RuntimeEnv.INSTANCE.setTestEnv(true);
+      RuntimeEnv.INSTANCE.setEnableMtDopValidation(false);
     }
   }
 
@@ -651,13 +652,14 @@
     TQueryOptions options = defaultQueryOptions();
     options.setMt_dop(3);
     options.setDisable_hdfs_num_rows_estimate(true);
+    options.setExplain_level(TExplainLevel.EXTENDED);
     try {
       // Temporarily unset the test env such that unsupported queries with mt_dop > 0
       // throw an exception. Those are otherwise allowed for testing parallel plans.
-      RuntimeEnv.INSTANCE.setTestEnv(false);
+      RuntimeEnv.INSTANCE.setEnableMtDopValidation(true);
       runPlannerTestFile("mt-dop-validation", options);
     } finally {
-      RuntimeEnv.INSTANCE.setTestEnv(true);
+      RuntimeEnv.INSTANCE.setEnableMtDopValidation(false);
     }
   }
 
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
index 4a3361e..4fc23b5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
@@ -21,11 +21,18 @@
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  join table id: 00
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=0B cardinality=550.56K
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  |  Per-Host Resources: included in parent fragment
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |
+|  01:SCAN HDFS [functional_parquet.alltypestiny b]
 |     HDFS partitions=4/4 files=4 size=11.67KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
@@ -68,6 +75,7 @@
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:HASH JOIN [INNER JOIN]
+|  hash-table-id=00
 |  hash predicates: a.id = b.id
 |  fk/pk conjuncts: assumed fk/pk
 |  runtime filters: RF000[bloom] <- b.id
@@ -75,7 +83,14 @@
 |  tuple-ids=0,1 row-size=8B cardinality=742
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  |  Per-Host Resources: included in parent fragment
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: b.id
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |
+|  01:SCAN HDFS [functional_parquet.alltypestiny b]
 |     HDFS partitions=4/4 files=4 size=11.67KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
@@ -123,11 +138,11 @@
 |  output: count(int_col)
 |  group by: bigint_col
 |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  tuple-ids=1 row-size=16B cardinality=1.28K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -137,7 +152,7 @@
    parquet statistics predicates: id < CAST(10 AS INT)
    parquet dictionary predicates: id < CAST(10 AS INT)
    mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
-   tuple-ids=0 row-size=16B cardinality=1.27K
+   tuple-ids=0 row-size=16B cardinality=1.28K
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -165,12 +180,12 @@
 |  output: count:merge(int_col)
 |  group by: bigint_col
 |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  tuple-ids=1 row-size=16B cardinality=1.28K
 |  in pipelines: 04(GETNEXT), 00(OPEN)
 |
 03:EXCHANGE [HASH(bigint_col)]
-|  mem-estimate=186.63KB mem-reservation=0B thread-reservation=0
-|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  mem-estimate=186.69KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=16B cardinality=1.28K
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
@@ -179,11 +194,11 @@
 |  output: count(int_col)
 |  group by: bigint_col
 |  mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=1 row-size=16B cardinality=1.27K
+|  tuple-ids=1 row-size=16B cardinality=1.28K
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -193,7 +208,7 @@
    parquet statistics predicates: id < CAST(10 AS INT)
    parquet dictionary predicates: id < CAST(10 AS INT)
    mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
-   tuple-ids=0 row-size=16B cardinality=1.27K
+   tuple-ids=0 row-size=16B cardinality=1.28K
    in pipelines: 00(GETNEXT)
 ====
 # Single-table scan/filter/analytic should work.
@@ -213,17 +228,17 @@
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  tuple-ids=4,3 row-size=16B cardinality=1.28K
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=4 row-size=8B cardinality=1.27K
+|  tuple-ids=4 row-size=8B cardinality=1.28K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -233,18 +248,18 @@
    parquet statistics predicates: id < CAST(10 AS INT)
    parquet dictionary predicates: id < CAST(10 AS INT)
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
-   tuple-ids=0 row-size=8B cardinality=1.27K
+   tuple-ids=0 row-size=8B cardinality=1.28K
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=222.63KB mem-reservation=0B thread-reservation=1
+|  Per-Host Resources: mem-estimate=222.69KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: row_number()
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 04:EXCHANGE [UNPARTITIONED]
-|  mem-estimate=222.63KB mem-reservation=0B thread-reservation=0
-|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  mem-estimate=222.69KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=4,3 row-size=16B cardinality=1.28K
 |  in pipelines: 01(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
@@ -255,24 +270,24 @@
 |  order by: id ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=4,3 row-size=16B cardinality=1.27K
+|  tuple-ids=4,3 row-size=16B cardinality=1.28K
 |  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST, id ASC
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
-|  tuple-ids=4 row-size=8B cardinality=1.27K
+|  tuple-ids=4 row-size=8B cardinality=1.28K
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 03:EXCHANGE [HASH(int_col)]
-|  mem-estimate=111.32KB mem-reservation=0B thread-reservation=0
-|  tuple-ids=0 row-size=8B cardinality=1.27K
+|  mem-estimate=111.34KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=1.28K
 |  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.33KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -282,6 +297,6 @@
    parquet statistics predicates: id < CAST(10 AS INT)
    parquet dictionary predicates: id < CAST(10 AS INT)
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
-   tuple-ids=0 row-size=8B cardinality=1.27K
+   tuple-ids=0 row-size=8B cardinality=1.28K
    in pipelines: 00(GETNEXT)
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 3c4e032..10e95a1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -16,11 +16,18 @@
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:NESTED LOOP JOIN [CROSS JOIN]
+|  join table id: 00
 |  mem-estimate=2.00GB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=0B cardinality=unavailable
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  |  Per-Host Resources: included in parent fragment
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |
+|  01:SCAN HDFS [functional_parquet.alltypestiny b]
 |     HDFS partitions=4/4 files=4 size=11.67KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
@@ -63,6 +70,7 @@
 |  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:HASH JOIN [INNER JOIN]
+|  hash-table-id=00
 |  hash predicates: a.id = b.id
 |  fk/pk conjuncts: assumed fk/pk
 |  runtime filters: RF000[bloom] <- b.id
@@ -70,7 +78,14 @@
 |  tuple-ids=0,1 row-size=8B cardinality=unavailable
 |  in pipelines: 00(GETNEXT), 01(OPEN)
 |
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  |  Per-Host Resources: included in parent fragment
+|  JOIN BUILD
+|  |  join-table-id=00 plan-id=01 cohort-id=01
+|  |  build expressions: b.id
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |
+|  01:SCAN HDFS [functional_parquet.alltypestiny b]
 |     HDFS partitions=4/4 files=4 size=11.67KB
 |     stored statistics:
 |       table: rows=unavailable size=unavailable
@@ -137,7 +152,7 @@
 |  mem-estimate=100.00KB mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.25KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -176,7 +191,7 @@
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.25KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -232,7 +247,7 @@
 |  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.25KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -272,7 +287,7 @@
 |  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   HDFS partitions=24/24 files=24 size=200.25KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -321,7 +336,7 @@
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
 Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   HDFS partitions=24/24 files=24 size=200.25KB
+   HDFS partitions=24/24 files=24 size=201.80KB
    predicates: id < CAST(10 AS INT)
    stored statistics:
      table: rows=unavailable size=unavailable
@@ -390,12 +405,12 @@
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=288.98MB
+   HDFS partitions=1/1 files=4 size=288.99MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=288.98MB
+     table: rows=150.00K size=288.99MB
      columns missing stats: c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -466,12 +481,12 @@
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=288.98MB
+   HDFS partitions=1/1 files=4 size=288.99MB
    predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
    predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
    stored statistics:
-     table: rows=150.00K size=288.98MB
+     table: rows=150.00K size=288.99MB
      columns missing stats: c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -531,11 +546,11 @@
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   HDFS partitions=1/1 files=4 size=288.98MB
+   HDFS partitions=1/1 files=4 size=288.99MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=288.98MB
+     table: rows=150.00K size=288.99MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
@@ -593,11 +608,11 @@
 |     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
-   HDFS partitions=1/1 files=4 size=288.98MB
+   HDFS partitions=1/1 files=4 size=288.99MB
    predicates: !empty(c.c_orders), !empty(c.c_orders)
    predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
    stored statistics:
-     table: rows=150.00K size=288.98MB
+     table: rows=150.00K size=288.99MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled max-scan-range-rows=50.12K
    parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)