IMPALA-10112: Remove FpRateTooHigh() check for bloom filter

FpRateTooHigh() check for bloom filter that can be disabled if the
observed false-positive probability (FPP) rate is higher than
FLAGS_max_filter_error_rate. This patch remove FpRateTooHigh() check for
several reasons:

1. Partition filters are probably still worth evaluating even if there
   are false positives because it is cheap and beneficial to eliminate a
   partition.

2. Runtime filters are dynamically disabled on the scan side if they are
   ineffective. Even an ALWAYS_TRUE_FILTER that substitutes a disabled
   filter is also still being evaluated and not entirely free.

3. The disabling is less likely to kick in for partitioned joins because
   it only applied to a small subset of the filters before the Or()
   operation.

4. FpRateTooHigh() use num_build_rows to approximate the FPP rate of the
   resulting filter. This can be inaccurate because it does not take
   account of duplicate values of the filter key on the build side.

This patch also removes some tests in test_runtime_filters.py that check
cancellation of filters having a high FPP rate.

Testing:
- Run and pass core tests.
- Manually test and verify in a real large cluster (TPC-DS 30TB scale)
  that there is only a little (< 2.1%) to no performance regression
  incurred from the removal of high FPP check. The test used TPC-DS
  queries Q14a, Q50, Q64, Q71, Q84, Q93, and a modified Q93 with the
  left outer join replaced by an inner join.
  The query time comparison between having an FPP check versus without
  having an FPP check are the following:

  | Query | With check (ms) | Without check (ms) |
  |-------+-----------------+--------------------|
  | Q14a  |          129801 |             125992 |
  | Q50   |           72519 |              72652 |
  | Q64   |          150684 |             145241 |
  | Q71   |           21009 |              20358 |
  | Q84   |           29334 |              29944 |
  | Q93   |           91782 |              91923 |
  | Q93m  |           92897 |              92135 |

Change-Id: Id9f8f40764b4f6664cc81b0da428afea8e3588d4
Reviewed-on: http://gerrit.cloudera.org:8080/16499
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 9043960..28d1fc5 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -924,25 +924,11 @@
   VLOG(3) << "Join builder (join_node_id_=" << join_node_id_ << ") publishing "
           << filter_ctxs_.size() << " filters.";
   int32_t num_enabled_filters = 0;
-  // Use 'num_build_rows' to estimate FP-rate of each Bloom filter, and publish
-  // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
-  // serialisation time, and reduces the cost of applying the filter at the scan - most
-  // significantly for per-row filters. However, the number of build rows could be a very
-  // poor estimate of the NDV - particularly if the filter expression is a function of
-  // several columns.
-  // TODO: Better heuristic.
   for (const FilterContext& ctx : filter_ctxs_) {
-    // TODO: Consider checking actual number of bits set in filter to compute FP rate.
-    // TODO: Consider checking this every few batches or so.
     BloomFilter* bloom_filter = nullptr;
     if (ctx.local_bloom_filter != nullptr) {
-      if (runtime_state_->filter_bank()->FpRateTooHigh(
-              ctx.filter->filter_size(), num_build_rows)) {
-        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
-      } else {
-        bloom_filter = ctx.local_bloom_filter;
-        ++num_enabled_filters;
-      }
+      bloom_filter = ctx.local_bloom_filter;
+      ++num_enabled_filters;
     } else if (ctx.local_min_max_filter != nullptr
         && !ctx.local_min_max_filter->AlwaysTrue()) {
       ++num_enabled_filters;
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index a1f9c85..8ed1e38 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -53,10 +53,9 @@
 using namespace impala;
 using namespace strings;
 
-DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
-    "positives in a runtime bloom filter before it is disabled. Also, if not overridden "
-    "by the RUNTIME_FILTER_ERROR_RATE query option, the target false positive "
-    "probability used to determine the ideal size for each bloom filter size.");
+DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The target false positive "
+    "probability used to determine the ideal size for each bloom filter size. This value "
+    "can be overriden by the RUNTIME_FILTER_ERROR_RATE query option.");
 
 const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
@@ -410,12 +409,6 @@
   return min_max_filter;
 }
 
-bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
-  double fpp =
-      BloomFilter::FalsePositiveProb(observed_ndv, BitUtil::Log2Ceiling64(filter_size));
-  return fpp > FLAGS_max_filter_error_rate;
-}
-
 vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
   vector<unique_lock<SpinLock>> locks;
   for (auto& entry : filters_) locks.emplace_back(entry.second->lock);
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 81bd518..b44ea45 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -130,11 +130,6 @@
   void PublishGlobalFilter(
       const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
-  /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
-  /// 'filter_size' would have an expected false-positive rate which would exceed
-  /// FLAGS_max_filter_error_rate.
-  bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
-
   /// Returns a bloom_filter that can be used by an operator to produce a local filter,
   /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
   /// the RuntimeFilterBank and should not be deleted by the caller. The filter identified
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index b14d6e6..2510140 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -4,22 +4,10 @@
 # Test case 1: bloom filters with high expected FP rate get disabled.
 # To trigger this path, we limit the size of the filter to a much lower value than
 # required to achieve the desired FP rate.
+#
+# IMPALA-10112: This test is removed because high FP rate check has been removed
 ####################################################
-
-SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
-SET RUNTIME_FILTER_MODE=GLOBAL;
-SET RUNTIME_FILTER_MAX_SIZE=64K;
-with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
-select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
-    join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
----- RESULTS
-0
----- RUNTIME_PROFILE
-row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
-====
-
-
----- QUERY
+#
 ####################################################
 # Test case 2: Filter sizes change according to their NDV
 ####################################################
@@ -113,7 +101,6 @@
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
 ---- RUNTIME_PROFILE
-row_regex: .*0 of 1 Runtime Filter Published.*
 row_regex: .*Filter 0 \(64.00 KB\).*
 ====
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
deleted file mode 100644
index f766b8c..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters_wait.test
+++ /dev/null
@@ -1,19 +0,0 @@
-====
----- QUERY
-####################################################
-# Regression test for IMPALA-3141: Disabled filters should send dummy filters
-# to unblock waiters. We limit the size of the filter to a much lower value than required
-# to achieve the desired FP rate so that it gets rejected/disabled during FP rate check.
-####################################################
-
-SET RUNTIME_FILTER_WAIT_TIME_MS=600000;
-SET RUNTIME_FILTER_MODE=GLOBAL;
-SET RUNTIME_FILTER_MAX_SIZE=64K;
-with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
-select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
-    join (select * from l LIMIT 500000) b on a.l_orderkey = -b.l_orderkey;
----- RESULTS
-0
----- RUNTIME_PROFILE
-row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
-====
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 0048c28..4184e50 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -192,16 +192,6 @@
       self.execute_query("SET ENABLED_RUNTIME_FILTER_TYPES=BLOOM")
     self.run_test_case('QueryTest/bloom_filters', vector)
 
-  def test_bloom_wait_time(self, vector):
-    """Test that a query that has global filters does not wait for them if run in LOCAL
-    mode"""
-    now = time.time()
-    self.run_test_case('QueryTest/bloom_filters_wait', vector)
-    duration_s = time.time() - now
-    assert duration_s < (WAIT_TIME_MS / 1000), \
-        "Query took too long (%ss, possibly waiting for missing filters?)" \
-        % str(duration_s)
-
 
 @SkipIfLocal.multiple_impalad
 class TestMinMaxFilters(ImpalaTestSuite):