IMPALA-9146: Add a configurable limit for the size of broadcast input.

Impala's DistributedPlanner may sometimes accidentally choose broadcast
distribution for inputs that are larger than the destination executor's
total memory. This could potentially happen if the cluster membership is
not accurately known and the planner's cost computation of the
broadcastCost vs partitionCost happens to favor the broadcast
distribution. This causes spilling and severely affects performance.
Although the DistributedPlanner does a mem_limit check before picking
broadcast, the mem_limit is not an accurate reflection since it is
assigned during admission control.

As a safety here we introduce an explicit configurable limit:
broadcast_bytes_limit for the size of the broadcast input and set it
to default of 32GB. The default is chosen based on analysis of existing
benchmark queries and representative workloads such that in vast
majority of the cases the parameter value does not need to be changed.
If the estimated input size on the build side is greater than this
threshold, the DistributedPlanner will fall back to a partition
distribution. Setting this parameter to 0 causes it to be ignored.

Testing:
 - Ran all regression tests on Jenkins successfully
 - Added a few unit testis in PlannerTest that (a) set the
broadcast_bytes_limit to a small value and checks whether the
distributed plan does hash partitioning on the build side instead
of broadcast, (b) pass a broadcast hint to override the config
setting, (c) verify the standard case where broadcast threshold
is larger than the build input size.

Change-Id: Ibe5639ca38acb72e0194aa80bc6ebb6cafb2acd9
Reviewed-on: http://gerrit.cloudera.org:8080/14690
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index e587103..87bb3d5 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -151,6 +151,7 @@
       {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(mem_limit_executors), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(broadcast_bytes_limit), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5d1ab07..a787865 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -907,6 +907,15 @@
         query_options->__set_mem_limit_executors(bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::BROADCAST_BYTES_LIMIT: {
+        // Parse the broadcast_bytes limit and validate it
+        int64_t broadcast_bytes_limit;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "broadcast bytes limit for join operations",
+                &broadcast_bytes_limit));
+        query_options->__set_broadcast_bytes_limit(broadcast_bytes_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 01112ad..032bfb5 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MEM_LIMIT_EXECUTORS + 1);\
+      TImpalaQueryOptions::BROADCAST_BYTES_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -192,7 +192,8 @@
   QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
       TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)
+  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
+  QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 62b396e..3ba97c9 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -402,7 +402,11 @@
   96: optional i64 parquet_object_store_split_size = 268435456;
 
   // See comment in ImpalaService.thrift
-  97: optional i64 mem_limit_executors = 0
+  97: optional i64 mem_limit_executors = 0;
+
+  // See comment in ImpalaService.thrift
+  // The default value is set to 32 GB
+  98: optional i64 broadcast_bytes_limit = 34359738368;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index a80eb45..33d50ba 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -484,6 +484,12 @@
   // a) an int (= number of bytes);
   // b) a float followed by "M" (MB) or "G" (GB)
   MEM_LIMIT_EXECUTORS = 96
+
+  // The max number of estimated bytes eligible for a Broadcast operation during a join.
+  // If the planner thinks the total bytes sent to all destinations of a broadcast
+  // exchange will exceed this limit, it will not consider a broadcast and instead
+  // fall back on a hash partition exchange. 0 or -1 means this has no effect.
+  BROADCAST_BYTES_LIMIT = 97
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
index 4297c87..cfd238b 100644
--- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
@@ -556,14 +556,21 @@
          ctx_.getQueryOptions().getDefault_join_distribution_mode());
    }
 
-   // Decide the distribution mode based on the estimated costs and the mem limit.
    int mt_dop = ctx_.getQueryOptions().mt_dop;
+
+   // Decide the distribution mode based on the estimated costs, the mem limit and
+   // the broadcast bytes limit. The last value is a safety check to ensure we
+   // don't broadcast very large inputs (for example in case the broadcast cost was
+   // not computed correctly and the query mem limit has not been set or set too high)
    long htSize = Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
    // TODO: IMPALA-4224: update this once we can share the broadcast join data between
    // finstances.
    if (mt_dop > 1) htSize *= mt_dop;
    long memLimit = ctx_.getQueryOptions().mem_limit;
-   if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit)) {
+   long broadcast_bytes_limit = ctx_.getQueryOptions().getBroadcast_bytes_limit();
+
+   if (broadcastCost <= partitionCost && (memLimit == 0 || htSize <= memLimit) &&
+           (broadcast_bytes_limit == 0 || htSize <= broadcast_bytes_limit)) {
      return DistributionMode.BROADCAST;
    }
    // Partitioned was cheaper or the broadcast HT would not fit within the mem limit.
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index c123ad8..fa81c84 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1003,4 +1003,19 @@
                                         PlannerTestOption.INCLUDE_RESOURCE_HEADER,
                                         PlannerTestOption.VALIDATE_RESOURCES));
   }
+
+  @Test
+  public void testBroadcastBytesLimit() {
+    TQueryOptions options = new TQueryOptions();
+    // broadcast limit is smaller than the build side of hash join, so we should
+    // NOT pick broadcast unless it is overridden through a join hint
+    options.setBroadcast_bytes_limit(100);
+    runPlannerTestFile("broadcast-bytes-limit", "tpch_parquet", options);
+
+    // broadcast limit is larger than the build side of hash join, so we SHOULD
+    // pick broadcast (i.e verify the standard case)
+    options.setBroadcast_bytes_limit(1000000);
+    runPlannerTestFile("broadcast-bytes-limit-large", "tpch_parquet", options);
+  }
+
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 58692a6..7a7e8ca 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -855,6 +855,12 @@
     runPlannerTestFile(testFile, dbName, defaultQueryOptions(), testOptions);
   }
 
+  protected void runPlannerTestFile(
+      String testFile, String dbName, TQueryOptions options) {
+    runPlannerTestFile(testFile, dbName, options,
+        Collections.<PlannerTestOption>emptySet());
+  }
+
   private void runPlannerTestFile(String testFile, String dbName, TQueryOptions options,
         Set<PlannerTestOption> testOptions) {
     String fileName = testDir_.resolve(testFile + ".test").toString();
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test
new file mode 100644
index 0000000..bd27412
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit-large.test
@@ -0,0 +1,24 @@
+# Check that broadcast distribution IS chosen if size of
+# hash join build side input is smaller than broadcast_bytes_limit
+select c_name from customer inner join nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test
new file mode 100644
index 0000000..b187890
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/broadcast-bytes-limit.test
@@ -0,0 +1,53 @@
+# check that broadcast distribution is NOT chosen if estimated size of hash join
+# build side input is greater than broadcast_bytes_limit
+select c_name from customer inner join nation on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+05:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--04:EXCHANGE [HASH(n_nationkey)]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+03:EXCHANGE [HASH(c_nationkey)]
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====
+# negative test: check that broadcast distribution IS chosen
+# even if estimated size of hash join build side input is
+# greater than broadcast_bytes_limit because the query has a
+# join hint forcing broadcast distribution
+select c_name from customer inner join /* +broadcast */ nation
+    on c_nationkey = n_nationkey
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: c_nationkey = n_nationkey
+|  runtime filters: RF000 <- n_nationkey
+|  row-size=34B cardinality=150.00K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [tpch_parquet.nation]
+|     HDFS partitions=1/1 files=1 size=3.04KB
+|     row-size=2B cardinality=25
+|
+00:SCAN HDFS [tpch_parquet.customer]
+   HDFS partitions=1/1 files=1 size=12.34MB
+   runtime filters: RF000 -> c_nationkey
+   row-size=32B cardinality=150.00K
+====