IMPALA-10110: bloom filter target fpp query option

This adds a BLOOM_FILTER_ERROR_RATE option that takes a
value between 0 and 1 (exclusive) that can override
the default target false positive probability (fpp)
value of 0.75 for selecting the filter size.

It does not affect whether filters are disabled
at runtime.

Adds estimated FPP and bloom size to the routing
table so we have some observability. Here is an
example:

tpch_kudu> select count(*) from customer join nation on n_nationkey = c_nationkey;

 ID  Src. Node  Tgt. Node(s)  Target type  Partition filter  Pending (Expected)  First arrived  Completed  Enabled  Bloom Size    Est fpp
-----------------------------------------------------------------------------------------------------------------------------------------
  1          2             0        LOCAL             false               0 (3)            N/A        N/A     true     MIN_MAX
  0          2             0        LOCAL             false               0 (3)            N/A        N/A     true     1.00 MB   1.04e-37

Testing:
Added a test that shows the query option affecting filter size.

Ran core tests.

Change-Id: Ifb123a0ea1e0e95d95df9837c1f0222fd60361f3
Reviewed-on: http://gerrit.cloudera.org:8080/16377
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 119e40b..104a639 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -18,6 +18,8 @@
 #include "runtime/coordinator.h"
 
 #include <cerrno>
+#include <iomanip>
+#include <sstream>
 #include <unordered_set>
 
 #include <thrift/protocol/TDebugProtocol.h>
@@ -48,6 +50,7 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "service/client-request-state.h"
+#include "util/bit-util.h"
 #include "util/bloom-filter.h"
 #include "util/hdfs-bulk-ops.h"
 #include "util/hdfs-util.h"
@@ -577,6 +580,8 @@
     table_printer.AddColumn("Completed", false);
   }
   table_printer.AddColumn("Enabled", false);
+  table_printer.AddColumn("Bloom Size", false);
+  table_printer.AddColumn("Est fpp", false);
   for (auto& v: filter_routing_table_->id_to_filter) {
     vector<string> row;
     const FilterState& state = v.second;
@@ -613,6 +618,20 @@
     // completion to prevent further update. In such case, we should check if all filter
     // updates have been successfully received.
     row.push_back(state.enabled() || state.received_all_updates() ? "true" : "false");
+
+    // Add size and fpp for bloom filters, the filter type otherwise.
+    if (state.is_bloom_filter()) {
+      int64_t filter_size = state.desc().filter_size_bytes;
+      row.push_back(PrettyPrinter::Print(filter_size, TUnit::BYTES));
+      double fpp = BloomFilter::FalsePositiveProb(
+          state.desc().ndv_estimate, BitUtil::Log2Ceiling64(filter_size));
+      stringstream ss;
+      ss << setprecision(3) << fpp;
+      row.push_back(ss.str());
+    } else {
+      row.push_back(PrintThriftEnum(state.desc().type));
+      row.push_back("");
+    }
     table_printer.AddRow(row);
   }
   // Add a line break, as in all contexts this is called we need to start a new line to
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index f199ffd..a1f9c85 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -54,7 +54,9 @@
 using namespace strings;
 
 DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
-    "positives in a runtime filter before it is disabled.");
+    "positives in a runtime bloom filter before it is disabled. Also, if not overridden "
+    "by the RUNTIME_FILTER_ERROR_RATE query option, the target false positive "
+    "probability used to determine the ideal size for each bloom filter size.");
 
 const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 180ca50..710412c 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -372,6 +372,26 @@
     TestError("8191"); // default value of FLAGS_min_buffer_size is 8KB
     TestOk("64KB", 64 * 1024);
   }
+  {
+    // RUNTIME_FILTER_ERROR_RATE is a double in range (0.0, 1.0)
+    OptionDef<double> key_def = MAKE_OPTIONDEF(runtime_filter_error_rate);
+    auto TestOk = MakeTestOkFn(options, key_def);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestOk("0.5", 0.5);
+    TestOk("0.01", 0.01);
+    TestOk("0.001", 0.001);
+    TestOk("0.0001", 0.0001);
+    TestOk("0.0000000001", 0.0000000001);
+    TestOk("0.999999999", 0.999999999);
+    TestOk(" 0.9", 0.9);
+    // Out of range values
+    TestError("1");
+    TestError("0");
+    TestError("-1");
+    TestError("-0.1");
+    TestError("1.1");
+    TestError("Not a number!");
+  }
 }
 
 TEST(QueryOptions, ParseQueryOptions) {
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 181a4dd..73c05ac 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -942,6 +942,17 @@
         query_options->__set_spool_all_results_for_retries(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::RUNTIME_FILTER_ERROR_RATE: {
+        StringParser::ParseResult result;
+        const double val =
+            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || val <= 0 || val >= 1) {
+          return Status(Substitute("Invalid runtime filter error rate: "
+                "'$0'. Only values between 0 to 1 (exclusive) are allowed.", value));
+        }
+        query_options->__set_runtime_filter_error_rate(val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 8147aff..06fd32f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::SPOOL_ALL_RESULTS_FOR_RETRIES + 1);\
+      TImpalaQueryOptions::RUNTIME_FILTER_ERROR_RATE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -211,6 +211,8 @@
       REFRESH_UPDATED_HMS_PARTITIONS, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(runtime_filter_error_rate, RUNTIME_FILTER_ERROR_RATE,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 40e0f28..f31f895 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -95,6 +95,7 @@
 PRINT_THRIFT_ENUM_IMPL(TPrefetchMode)
 PRINT_THRIFT_ENUM_IMPL(TReplicaPreference)
 PRINT_THRIFT_ENUM_IMPL(TRuntimeFilterMode)
+PRINT_THRIFT_ENUM_IMPL(TRuntimeFilterType)
 PRINT_THRIFT_ENUM_IMPL(TSessionType)
 PRINT_THRIFT_ENUM_IMPL(TStmtType)
 PRINT_THRIFT_ENUM_IMPL(TUnit)
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 98c61e0..5852c82 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -72,6 +72,7 @@
 std::string PrintThriftEnum(const TPrefetchMode::type& value);
 std::string PrintThriftEnum(const TReplicaPreference::type& value);
 std::string PrintThriftEnum(const TRuntimeFilterMode::type& value);
+std::string PrintThriftEnum(const TRuntimeFilterType::type& value);
 std::string PrintThriftEnum(const TSessionType::type& value);
 std::string PrintThriftEnum(const TStmtType::type& value);
 std::string PrintThriftEnum(const TUnit::type& value);
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index fe27b07..601a81b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -444,6 +444,9 @@
 
   // See comment in ImpalaService.thrift
   110: optional bool spool_all_results_for_retries = true;
+
+  // See comment in ImpalaService.thrift
+  111: optional double runtime_filter_error_rate;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 1fbf5f9..2661b7c 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -567,6 +567,10 @@
   // when any of them are ready. Note that in this case, query retry will be skipped if
   // the client has fetched some results.
   SPOOL_ALL_RESULTS_FOR_RETRIES = 109
+
+  // A value (0.0, 1.0) that is the target false positive probability for runtime bloom
+  // filters. If not set, falls back to max_filter_error_rate.
+  RUNTIME_FILTER_ERROR_RATE = 110
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index 29fef5f..8356d78 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -123,6 +123,9 @@
     // Pre-computed default filter size, in bytes, rounded up to a power of two.
     public final long defaultVal;
 
+    // Target false positive probability, between 0 and 1 exclusive.
+    public final double targetFpp;
+
     public FilterSizeLimits(TQueryOptions tQueryOptions) {
       // Round up all limits to a power of two and make sure filter size is more
       // than the min buffer size that can be allocated by the buffer pool.
@@ -138,6 +141,13 @@
       long defaultValue = tQueryOptions.getRuntime_bloom_filter_size();
       defaultValue = Math.max(defaultValue, minVal);
       defaultVal = BitUtil.roundUpToPowerOf2(Math.min(defaultValue, maxVal));
+
+      // Target FPP is determined by runtime_filter_error_rate query option, or if that
+      // is not set, --max_filter_error_rate, which was the legacy option controlling
+      // this that also had other effects.
+      targetFpp = tQueryOptions.isSetRuntime_filter_error_rate() ?
+          tQueryOptions.getRuntime_filter_error_rate() :
+          BackendConfig.INSTANCE.getMaxFilterErrorRate();
     }
   };
 
@@ -527,8 +537,8 @@
         filterSizeBytes_ = filterSizeLimits.defaultVal;
         return;
       }
-      double fpp = BackendConfig.INSTANCE.getMaxFilterErrorRate();
-      int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, fpp);
+      double targetFpp = filterSizeLimits.targetFpp;
+      int logFilterSize = FeSupport.GetMinLogSpaceForBloomFilter(ndvEstimate_, targetFpp);
       filterSizeBytes_ = 1L << logFilterSize;
       filterSizeBytes_ = Math.max(filterSizeBytes_, filterSizeLimits.minVal);
       filterSizeBytes_ = Math.min(filterSizeBytes_, filterSizeLimits.maxVal);
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index d4fec4b..b14d6e6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -138,3 +138,22 @@
 row_regex: .*1 of 1 Runtime Filter Published.*
 row_regex: .*Filter 0 \(8.00 KB\).*
 ====
+---- QUERY
+####################################################
+# Test case 5: RUNTIME_FILTER_ERROR_RATE affects filter size.
+# This uses the first query from test case 2 and modifies the fpp to
+# result in a different filter size.
+####################################################
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
+SET RUNTIME_FILTER_MIN_SIZE=64KB;
+SET RUNTIME_FILTER_ERROR_RATE=0.01;
+with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+    join (select * from l LIMIT 125000) b on a.l_orderkey = -b.l_orderkey;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(256.00 KB\).*
+====