IMPALA-9081: fix mt_dop validation tests
This undoes the hack of pretending that it's not a test environment for
that single test. That had side effects, e.g. for the metadata loading
path.
Instead we have a special flag to enable the validation code in
frontend tests.
Note that the plans change to include join build sinks as an
expected result of undoing the hack.
Change-Id: I2e8823c562395e13f318d1ad6eed883d2d9d771f
Reviewed-on: http://gerrit.cloudera.org:8080/14707
Reviewed-by: Anurag Mantripragada <anurag@cloudera.com>
Reviewed-by: Thomas Tauber-Marshall <tmarshall@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index 6299dd4..c2425a1 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -27,13 +27,13 @@
private int numCores_;
- // The minimum size of buffer spilled to disk by spilling nodes. Used in
- // PlanNode.computeResourceProfile(). Currently the backend only support a single
- // spillable buffer size, so this is equal to PlanNode.DEFAULT_SPILLABLE_BUFFER_BYTES,
- // except in planner tests.
// Indicates whether this is an environment for testing.
private boolean isTestEnv_;
+ // Whether we should do the same mt_dop validation in frontend tests as in the Impala
+ // service.
+ private boolean enableMtDopValidation_;
+
public RuntimeEnv() {
reset();
}
@@ -44,6 +44,7 @@
public void reset() {
numCores_ = Runtime.getRuntime().availableProcessors();
isTestEnv_ = false;
+ enableMtDopValidation_ = false;
}
public int getNumCores() { return numCores_; }
@@ -53,5 +54,7 @@
public boolean isKuduSupported() {
return "true".equals(System.getenv("KUDU_IS_SUPPORTED"));
}
+ public boolean isMtDopValidationEnabled() { return enableMtDopValidation_; }
+ public void setEnableMtDopValidation(boolean v) { enableMtDopValidation_ = v; }
}
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index b8a3c4a..a0e4fda 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -129,12 +129,12 @@
// sinks: we only allow MT_DOP > 0 with such plans if --unlock_mt_dop=true is
// specified. We allow single node plans with mt_dop since there is no actual
// parallelism.
- if (!ctx_.isSingleNodeExec()
- && ctx_.getQueryOptions().mt_dop > 0
- && !RuntimeEnv.INSTANCE.isTestEnv()
+ if (!ctx_.isSingleNodeExec() && ctx_.getQueryOptions().mt_dop > 0
+ && (!RuntimeEnv.INSTANCE.isTestEnv()
+ || RuntimeEnv.INSTANCE.isMtDopValidationEnabled())
&& !BackendConfig.INSTANCE.isMtDopUnlocked()
- && (ctx_.hasTableSink() ||
- singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
+ && (ctx_.hasTableSink()
+ || singleNodePlanner.hasUnsupportedMtDopJoin(singleNodePlan))) {
if (BackendConfig.INSTANCE.mtDopAutoFallback()) {
// Fall back to non-dop mode. This assumes that the mt_dop value is only used
// in the distributed planning process, which should be generally true as long
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index ab4bfb4..c123ad8 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -634,13 +634,14 @@
TQueryOptions options = defaultQueryOptions();
options.setMt_dop(3);
options.setDisable_hdfs_num_rows_estimate(false);
+ options.setExplain_level(TExplainLevel.EXTENDED);
try {
// Temporarily unset the test env such that unsupported queries with mt_dop > 0
// throw an exception. Those are otherwise allowed for testing parallel plans.
- RuntimeEnv.INSTANCE.setTestEnv(false);
+ RuntimeEnv.INSTANCE.setEnableMtDopValidation(true);
runPlannerTestFile("mt-dop-validation-hdfs-num-rows-est-enabled", options);
} finally {
- RuntimeEnv.INSTANCE.setTestEnv(true);
+ RuntimeEnv.INSTANCE.setEnableMtDopValidation(false);
}
}
@@ -651,13 +652,14 @@
TQueryOptions options = defaultQueryOptions();
options.setMt_dop(3);
options.setDisable_hdfs_num_rows_estimate(true);
+ options.setExplain_level(TExplainLevel.EXTENDED);
try {
// Temporarily unset the test env such that unsupported queries with mt_dop > 0
// throw an exception. Those are otherwise allowed for testing parallel plans.
- RuntimeEnv.INSTANCE.setTestEnv(false);
+ RuntimeEnv.INSTANCE.setEnableMtDopValidation(true);
runPlannerTestFile("mt-dop-validation", options);
} finally {
- RuntimeEnv.INSTANCE.setTestEnv(true);
+ RuntimeEnv.INSTANCE.setEnableMtDopValidation(false);
}
}
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
index 4a3361e..4fc23b5 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation-hdfs-num-rows-est-enabled.test
@@ -21,11 +21,18 @@
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:NESTED LOOP JOIN [CROSS JOIN]
+| join table id: 00
| mem-estimate=0B mem-reservation=0B thread-reservation=0
| tuple-ids=0,1 row-size=0B cardinality=550.56K
| in pipelines: 00(GETNEXT), 01(OPEN)
|
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| | Per-Host Resources: included in parent fragment
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| |
+| 01:SCAN HDFS [functional_parquet.alltypestiny b]
| HDFS partitions=4/4 files=4 size=11.67KB
| stored statistics:
| table: rows=unavailable size=unavailable
@@ -68,6 +75,7 @@
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:HASH JOIN [INNER JOIN]
+| hash-table-id=00
| hash predicates: a.id = b.id
| fk/pk conjuncts: assumed fk/pk
| runtime filters: RF000[bloom] <- b.id
@@ -75,7 +83,14 @@
| tuple-ids=0,1 row-size=8B cardinality=742
| in pipelines: 00(GETNEXT), 01(OPEN)
|
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| | Per-Host Resources: included in parent fragment
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | build expressions: b.id
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| |
+| 01:SCAN HDFS [functional_parquet.alltypestiny b]
| HDFS partitions=4/4 files=4 size=11.67KB
| stored statistics:
| table: rows=unavailable size=unavailable
@@ -123,11 +138,11 @@
| output: count(int_col)
| group by: bigint_col
| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=1 row-size=16B cardinality=1.27K
+| tuple-ids=1 row-size=16B cardinality=1.28K
| in pipelines: 01(GETNEXT), 00(OPEN)
|
00:SCAN HDFS [functional_parquet.alltypes]
- HDFS partitions=24/24 files=24 size=200.33KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -137,7 +152,7 @@
parquet statistics predicates: id < CAST(10 AS INT)
parquet dictionary predicates: id < CAST(10 AS INT)
mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
- tuple-ids=0 row-size=16B cardinality=1.27K
+ tuple-ids=0 row-size=16B cardinality=1.28K
in pipelines: 00(GETNEXT)
---- PARALLELPLANS
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -165,12 +180,12 @@
| output: count:merge(int_col)
| group by: bigint_col
| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=1 row-size=16B cardinality=1.27K
+| tuple-ids=1 row-size=16B cardinality=1.28K
| in pipelines: 04(GETNEXT), 00(OPEN)
|
03:EXCHANGE [HASH(bigint_col)]
-| mem-estimate=186.63KB mem-reservation=0B thread-reservation=0
-| tuple-ids=1 row-size=16B cardinality=1.27K
+| mem-estimate=186.69KB mem-reservation=0B thread-reservation=0
+| tuple-ids=1 row-size=16B cardinality=1.28K
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
@@ -179,11 +194,11 @@
| output: count(int_col)
| group by: bigint_col
| mem-estimate=128.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=1 row-size=16B cardinality=1.27K
+| tuple-ids=1 row-size=16B cardinality=1.28K
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
- HDFS partitions=24/24 files=24 size=200.33KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -193,7 +208,7 @@
parquet statistics predicates: id < CAST(10 AS INT)
parquet dictionary predicates: id < CAST(10 AS INT)
mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
- tuple-ids=0 row-size=16B cardinality=1.27K
+ tuple-ids=0 row-size=16B cardinality=1.28K
in pipelines: 00(GETNEXT)
====
# Single-table scan/filter/analytic should work.
@@ -213,17 +228,17 @@
| order by: id ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=4,3 row-size=16B cardinality=1.27K
+| tuple-ids=4,3 row-size=16B cardinality=1.28K
| in pipelines: 01(GETNEXT)
|
01:SORT
| order by: int_col ASC NULLS FIRST, id ASC
| mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=4 row-size=8B cardinality=1.27K
+| tuple-ids=4 row-size=8B cardinality=1.28K
| in pipelines: 01(GETNEXT), 00(OPEN)
|
00:SCAN HDFS [functional_parquet.alltypes]
- HDFS partitions=24/24 files=24 size=200.33KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -233,18 +248,18 @@
parquet statistics predicates: id < CAST(10 AS INT)
parquet dictionary predicates: id < CAST(10 AS INT)
mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
- tuple-ids=0 row-size=8B cardinality=1.27K
+ tuple-ids=0 row-size=8B cardinality=1.28K
in pipelines: 00(GETNEXT)
---- PARALLELPLANS
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-| Per-Host Resources: mem-estimate=222.63KB mem-reservation=0B thread-reservation=1
+| Per-Host Resources: mem-estimate=222.69KB mem-reservation=0B thread-reservation=1
PLAN-ROOT SINK
| output exprs: row_number()
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
04:EXCHANGE [UNPARTITIONED]
-| mem-estimate=222.63KB mem-reservation=0B thread-reservation=0
-| tuple-ids=4,3 row-size=16B cardinality=1.27K
+| mem-estimate=222.69KB mem-reservation=0B thread-reservation=0
+| tuple-ids=4,3 row-size=16B cardinality=1.28K
| in pipelines: 01(GETNEXT)
|
F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=9
@@ -255,24 +270,24 @@
| order by: id ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=4,3 row-size=16B cardinality=1.27K
+| tuple-ids=4,3 row-size=16B cardinality=1.28K
| in pipelines: 01(GETNEXT)
|
01:SORT
| order by: int_col ASC NULLS FIRST, id ASC
| mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
-| tuple-ids=4 row-size=8B cardinality=1.27K
+| tuple-ids=4 row-size=8B cardinality=1.28K
| in pipelines: 01(GETNEXT), 00(OPEN)
|
03:EXCHANGE [HASH(int_col)]
-| mem-estimate=111.32KB mem-reservation=0B thread-reservation=0
-| tuple-ids=0 row-size=8B cardinality=1.27K
+| mem-estimate=111.34KB mem-reservation=0B thread-reservation=0
+| tuple-ids=0 row-size=8B cardinality=1.28K
| in pipelines: 00(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
- HDFS partitions=24/24 files=24 size=200.33KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -282,6 +297,6 @@
parquet statistics predicates: id < CAST(10 AS INT)
parquet dictionary predicates: id < CAST(10 AS INT)
mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
- tuple-ids=0 row-size=8B cardinality=1.27K
+ tuple-ids=0 row-size=8B cardinality=1.28K
in pipelines: 00(GETNEXT)
====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 3c4e032..10e95a1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -16,11 +16,18 @@
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:NESTED LOOP JOIN [CROSS JOIN]
+| join table id: 00
| mem-estimate=2.00GB mem-reservation=0B thread-reservation=0
| tuple-ids=0,1 row-size=0B cardinality=unavailable
| in pipelines: 00(GETNEXT), 01(OPEN)
|
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| | Per-Host Resources: included in parent fragment
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| |
+| 01:SCAN HDFS [functional_parquet.alltypestiny b]
| HDFS partitions=4/4 files=4 size=11.67KB
| stored statistics:
| table: rows=unavailable size=unavailable
@@ -63,6 +70,7 @@
| in pipelines: 03(GETNEXT), 00(OPEN)
|
02:HASH JOIN [INNER JOIN]
+| hash-table-id=00
| hash predicates: a.id = b.id
| fk/pk conjuncts: assumed fk/pk
| runtime filters: RF000[bloom] <- b.id
@@ -70,7 +78,14 @@
| tuple-ids=0,1 row-size=8B cardinality=unavailable
| in pipelines: 00(GETNEXT), 01(OPEN)
|
-|--01:SCAN HDFS [functional_parquet.alltypestiny b]
+|--F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+| | Per-Host Resources: included in parent fragment
+| JOIN BUILD
+| | join-table-id=00 plan-id=01 cohort-id=01
+| | build expressions: b.id
+| | mem-estimate=0B mem-reservation=0B thread-reservation=0
+| |
+| 01:SCAN HDFS [functional_parquet.alltypestiny b]
| HDFS partitions=4/4 files=4 size=11.67KB
| stored statistics:
| table: rows=unavailable size=unavailable
@@ -137,7 +152,7 @@
| mem-estimate=100.00KB mem-reservation=0B thread-reservation=0
|
00:SCAN HDFS [functional_parquet.alltypes]
- HDFS partitions=24/24 files=24 size=200.25KB
+ HDFS partitions=24/24 files=24 size=201.80KB
stored statistics:
table: rows=unavailable size=unavailable
partitions: 0/24 rows=unavailable
@@ -176,7 +191,7 @@
| in pipelines: 01(GETNEXT), 00(OPEN)
|
00:SCAN HDFS [functional_parquet.alltypes]
- HDFS partitions=24/24 files=24 size=200.25KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -232,7 +247,7 @@
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
- HDFS partitions=24/24 files=24 size=200.25KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -272,7 +287,7 @@
| in pipelines: 01(GETNEXT), 00(OPEN)
|
00:SCAN HDFS [functional_parquet.alltypes]
- HDFS partitions=24/24 files=24 size=200.25KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -321,7 +336,7 @@
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=9
Per-Host Resources: mem-estimate=48.00MB mem-reservation=48.00KB thread-reservation=3
00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
- HDFS partitions=24/24 files=24 size=200.25KB
+ HDFS partitions=24/24 files=24 size=201.80KB
predicates: id < CAST(10 AS INT)
stored statistics:
table: rows=unavailable size=unavailable
@@ -390,12 +405,12 @@
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_nested_parquet.customer c]
- HDFS partitions=1/1 files=4 size=288.98MB
+ HDFS partitions=1/1 files=4 size=288.99MB
predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
stored statistics:
- table: rows=150.00K size=288.98MB
+ table: rows=150.00K size=288.99MB
columns missing stats: c_orders
extrapolated-rows=disabled max-scan-range-rows=50.12K
parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -466,12 +481,12 @@
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
- HDFS partitions=1/1 files=4 size=288.98MB
+ HDFS partitions=1/1 files=4 size=288.99MB
predicates: c_custkey < CAST(10 AS BIGINT), !empty(c.c_orders)
predicates on o: !empty(o.o_lineitems), o_orderkey < CAST(5 AS BIGINT)
predicates on o_lineitems: l_linenumber < CAST(3 AS INT)
stored statistics:
- table: rows=150.00K size=288.98MB
+ table: rows=150.00K size=288.99MB
columns missing stats: c_orders
extrapolated-rows=disabled max-scan-range-rows=50.12K
parquet statistics predicates: c_custkey < CAST(10 AS BIGINT)
@@ -531,11 +546,11 @@
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_nested_parquet.customer c]
- HDFS partitions=1/1 files=4 size=288.98MB
+ HDFS partitions=1/1 files=4 size=288.99MB
predicates: !empty(c.c_orders), !empty(c.c_orders)
predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
stored statistics:
- table: rows=150.00K size=288.98MB
+ table: rows=150.00K size=288.99MB
columns missing stats: c_orders, c_orders
extrapolated-rows=disabled max-scan-range-rows=50.12K
parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
@@ -593,11 +608,11 @@
| in pipelines: 00(GETNEXT)
|
00:SCAN HDFS [tpch_nested_parquet.customer c, RANDOM]
- HDFS partitions=1/1 files=4 size=288.98MB
+ HDFS partitions=1/1 files=4 size=288.99MB
predicates: !empty(c.c_orders), !empty(c.c_orders)
predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)
stored statistics:
- table: rows=150.00K size=288.98MB
+ table: rows=150.00K size=288.99MB
columns missing stats: c_orders, c_orders
extrapolated-rows=disabled max-scan-range-rows=50.12K
parquet statistics predicates on o1: o1.o_orderkey < CAST(5 AS BIGINT)