IMPALA-9154: Make runtime filter propagation asynchronous

This patch fixes a bug introduced by IMPALA-7984 that ports the
functions implementing the aggregation and propagation of runtime
filters from Thrift RPC to KRPC.

Specifically, in IMPALA-7984, the propagation of an aggregated
runtime filter was implemented using the synchronous KRPC. Hence, when
there is a very limited number of KRPC threads for Impala's data stream
service, e.g., 1, there will be a deadlock if the node running the
Coordinator is trying to propagate the aggregated filter to the same
node running the Coordinator since there is no available thread to
receive the aggregated filter.

This patch makes the propagation of an aggregated runtime filter
asynchronous to address the issue described above. To prevent the
memory consumed by the aggregated filter from being reclaimed when the
aggregated filter is still referenced by some inflight KRPC's, we add an
additional field in the class Coordinator::FilterState to keep track of
the number of inflight KRPC's for the propagation of this aggregated
filter to make sure that we will reclaim the memory only when all the
associated KRPC's have completed. Moreover, when ReleaseExecResources()
is invoked by the Coordinator to release all the resources associated
with query execution, including the memory consumed by the aggregated
runtime filters, we make sure the consumed memory by the aggregated
filters is released only when the inflight KRPC's associated with each
aggregated filter have finished.

Testing:
- Passed primitive_many_fragments.test with the database tpch30 in an
  Impala minicluster started with the parameter
  --impalad_args=--datastream_service_num_svc_threads=1.
- Passed the exhaustive tests in the DEBUG build.
- Passed the core tests in the ASAN build.

Change-Id: Ifb6726d349be701f3a0602b2ad5a934082f188a0
Reviewed-on: http://gerrit.cloudera.org:8080/14975
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 72e245e..20d7b6f 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -551,8 +551,9 @@
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(
-    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+void Coordinator::BackendState::PublishFilter(FilterState* state,
+    MemTracker* mem_tracker, const PublishFilterParamsPB& rpc_params,
+    RpcController& controller, PublishFilterResultPB& res) {
   DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
@@ -571,15 +572,42 @@
     return;
   }
 
-  PublishFilterResultPB res;
-  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
-  if (!rpc_status.ok()) {
-    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
-    return;
+  state->IncrementNumInflightRpcs(1);
+
+  proxy->PublishFilterAsync(rpc_params, &res, &controller,
+      boost::bind(&Coordinator::BackendState::PublishFilterCompleteCb, this, &controller,
+                                state, mem_tracker));
+}
+
+void Coordinator::BackendState::PublishFilterCompleteCb(
+    const kudu::rpc::RpcController* rpc_controller, FilterState* state,
+    MemTracker* mem_tracker) {
+  const kudu::Status controller_status = rpc_controller->status();
+
+  // In the case of an unsuccessful KRPC call, we only log this event w/o retrying.
+  // Failing to send a filter is not a query-wide error - the remote fragment will
+  // continue regardless.
+  if (!controller_status.ok()) {
+    LOG(ERROR) << "PublishFilter() failed: " << controller_status.message().ToString();
   }
-  if (res.status().status_code() != TErrorCode::OK) {
-    LOG(ERROR) << "PublishFilter() operation failed: "
-               << Status(res.status()).GetDetail();
+
+  {
+    lock_guard<SpinLock> l(state->lock());
+
+    state->IncrementNumInflightRpcs(-1);
+
+    if (state->num_inflight_rpcs() == 0) {
+      // Since we disabled the filter once complete and held FilterState::lock_ while
+      // issuing all PublishFilter() rpcs, at this point there can't be any more
+      // PublishFilter() rpcs issued.
+      DCHECK(state->disabled());
+      if (state->is_bloom_filter() && state->bloom_filter_directory().size() > 0) {
+        mem_tracker->Release(state->bloom_filter_directory().size());
+        state->bloom_filter_directory().clear();
+        state->bloom_filter_directory().shrink_to_fit();
+      }
+      state->get_publish_filter_done_cv().notify_one();
+    }
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index fd80b86..432857a 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,8 +106,11 @@
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(
-      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
+  void PublishFilter(FilterState* state, MemTracker* mem_tracker,
+      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller,
+      PublishFilterResultPB& res);
+  void PublishFilterCompleteCb(const kudu::rpc::RpcController* rpc_controller,
+      FilterState* state, MemTracker* mem_tracker);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index 55c6b3d..7e92883 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
+#include <condition_variable>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -56,6 +56,9 @@
 /// A filter is disabled if an always_true filter update is received, an OOM is hit,
 /// filter aggregation is complete or if the query is complete.
 /// Once a filter is disabled, subsequent updates for that filter are ignored.
+///
+/// This class is not thread safe. Callers must always take 'lock()' themselves when
+/// calling any FilterState functions if thread safety is needed.
 class Coordinator::FilterState {
  public:
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
@@ -88,14 +91,30 @@
       return min_max_filter_.always_true();
     }
   }
+  int num_inflight_rpcs() const { return num_inflight_publish_filter_rpcs_; }
+  SpinLock& lock() { return lock_; }
+  std::condition_variable_any& get_publish_filter_done_cv() {
+    return publish_filter_done_cv_;
+  }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
   void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
       kudu::rpc::RpcContext* context);
 
-  /// Disables a filter. A disabled filter consumes no memory.
-  void Disable(MemTracker* tracker);
+  /// Disables the filter and releases the consumed memory if the filter is a Bloom
+  /// filter.
+  void DisableAndRelease(MemTracker* tracker);
+  /// Disables the filter but does not release the consumed memory.
+  void Disable();
+
+  void IncrementNumInflightRpcs(int i) {
+    num_inflight_publish_filter_rpcs_ += i;
+    DCHECK_GE(num_inflight_publish_filter_rpcs_, 0);
+  }
+
+  /// Waits until any inflight PublishFilter rpcs have completed.
+  void WaitForPublishFilter();
 
  private:
   /// Contains the specification of the runtime filter.
@@ -129,8 +148,15 @@
   /// Time at which all local filters arrived.
   int64_t completion_time_ = 0L;
 
-  /// TODO: Add a per-object lock so that we can avoid holding the global routing table
+  /// Per-object lock so that we can avoid holding the global routing table
   /// lock for every filter update.
+  SpinLock lock_;
+
+  /// Keeps track of the number of inflight PublishFilter rpcs.
+  int num_inflight_publish_filter_rpcs_ = 0;
+
+  /// Signaled when 'num_inflight_rpcs' reaches 0.
+  std::condition_variable_any publish_filter_done_cv_;
 };
 
 /// Struct to contain all of the data structures for filter routing. Coordinator
@@ -146,9 +172,6 @@
   // The value is source plan node id and the filter ID.
   boost::unordered_map<int, std::vector<TRuntimeFilterSource>> finstance_filters_produced;
 
-  /// Synchronizes updates to the state of this routing table.
-  SpinLock update_lock;
-
   /// Protects this routing table.
   /// Usage pattern:
   /// 1. To update the routing table: Acquire shared access on 'lock' and
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 48d81f9..9458578 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -348,10 +348,13 @@
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
         DCHECK(filter_mode_ == TRuntimeFilterMode::GLOBAL || filter.has_local_targets);
-        auto i = filter_routing_table_->id_to_filter.emplace(
-            filter.filter_id, FilterState(filter, plan_node.node_id)).first;
-        FilterState* f = &(i->second);
+        auto i = filter_routing_table_->id_to_filter
+                     .emplace(std::piecewise_construct,
+                         std::forward_as_tuple(filter.filter_id),
+                         std::forward_as_tuple(filter, plan_node.node_id))
+                     .first;
 
+        FilterState* f = &(i->second);
         // source plan node of filter
         if (plan_node.__isset.hash_join_node) {
           // Set the 'pending_count_' to zero to indicate that for a filter with
@@ -1045,9 +1048,11 @@
   }
 
   for (auto& filter : filter_routing_table_->id_to_filter) {
-    FilterState* state = &filter.second;
-    state->Disable(filter_mem_tracker_);
+    unique_lock<SpinLock> l(filter.second.lock());
+    filter.second.WaitForPublishFilter();
+    filter.second.DisableAndRelease(filter_mem_tracker_);
   }
+
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -1124,21 +1129,22 @@
 
   PublishFilterParamsPB rpc_params;
   unordered_set<int> target_fragment_idxs;
-  string bloom_filter_directory;
+  if (!IsExecuting()) {
+    LOG(INFO) << "Filter update received for non-executing query with id: "
+        << query_id();
+    return;
+  }
+  auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
+  if (it == filter_routing_table_.get()->id_to_filter.end()) {
+    // This should not be possible since 'id_to_filter' is never changed after
+    // InitFilterRoutingTable().
+    DCHECK(false);
+    LOG(INFO) << "Could not find filter with id: " << rpc_params.filter_id();
+    return;
+  }
+  FilterState* state = &it->second;
   {
-    lock_guard<SpinLock> l(filter_routing_table_->update_lock);
-    if (!IsExecuting()) {
-      LOG(INFO) << "Filter update received for non-executing query with id: "
-                << query_id();
-      return;
-    }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
-    if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
-      return;
-    }
-    FilterState* state = &it->second;
-
+    lock_guard<SpinLock> l(state->lock());
     DCHECK(state->desc().has_remote_targets)
         << "Coordinator received filter that has only local targets";
 
@@ -1165,7 +1171,7 @@
 
     // No more updates are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
-    for (const FilterTarget& target: *state->targets()) {
+    for (const FilterTarget& target : *state->targets()) {
       // Don't publish the filter to targets that are in the same fragment as the join
       // that produced it.
       if (target.is_local) continue;
@@ -1175,44 +1181,47 @@
     if (state->is_bloom_filter()) {
       // Assign an outgoing bloom filter.
       *rpc_params.mutable_bloom_filter() = state->bloom_filter();
-      bloom_filter_directory.swap(state->bloom_filter_directory());
+
       DCHECK(rpc_params.bloom_filter().always_false()
-          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
+          || rpc_params.bloom_filter().always_true()
+          || !state->bloom_filter_directory().empty());
+
     } else {
       DCHECK(state->is_min_max_filter());
       MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
     }
 
-    // Filter is complete, and can be released.
-    state->Disable(filter_mem_tracker_);
-  }
+    // Filter is complete. We disable it so future UpdateFilter rpcs will be ignored,
+    // e.g., if it was a broadcast join.
+    state->Disable();
 
-  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
-  rpc_params.set_filter_id(params.filter_id());
+    TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+    rpc_params.set_filter_id(params.filter_id());
 
-  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
-  for (BackendState* bs: backend_states_) {
-    for (int fragment_idx: target_fragment_idxs) {
-      if (!IsExecuting()) goto cleanup;
-      rpc_params.set_dst_fragment_idx(fragment_idx);
-      RpcController controller;
-      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
-          && !rpc_params.bloom_filter().always_true()) {
-        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
-            bloom_filter_directory);
+    // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
+    for (BackendState* bs : backend_states_) {
+      for (int fragment_idx : target_fragment_idxs) {
+        if (!IsExecuting()) {
+          if (rpc_params.has_bloom_filter()) {
+            filter_mem_tracker_->Release(state->bloom_filter_directory().size());
+            state->bloom_filter_directory().clear();
+            state->bloom_filter_directory().shrink_to_fit();
+          }
+          return;
+        }
+
+        rpc_params.set_dst_fragment_idx(fragment_idx);
+        RpcController* controller = obj_pool()->Add(new RpcController);
+        PublishFilterResultPB* res = obj_pool()->Add(new PublishFilterResultPB);
+        if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+            && !rpc_params.bloom_filter().always_true()) {
+          BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), controller,
+              state->bloom_filter_directory());
+        }
+        bs->PublishFilter(state, filter_mem_tracker_, rpc_params, *controller, *res);
       }
-      // TODO: make this asynchronous.
-      bs->PublishFilter(rpc_params, controller);
     }
   }
-
-cleanup:
-  // For bloom filters, the memory used in the filter_routing_table_ is transfered to
-  // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
-  // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.has_bloom_filter()) {
-    filter_mem_tracker_->Release(bloom_filter_directory.size());
-  }
 }
 
 void Coordinator::FilterState::ApplyUpdate(
@@ -1228,7 +1237,7 @@
   if (is_bloom_filter()) {
     DCHECK(params.has_bloom_filter());
     if (params.bloom_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (params.bloom_filter().always_false()) {
       if (!bloom_filter_.has_log_bufferpool_space()) {
         bloom_filter_ = BloomFilterPB(params.bloom_filter());
@@ -1243,7 +1252,7 @@
           params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
       if (!status.ok()) {
         LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
-        Disable(coord->filter_mem_tracker_);
+        DisableAndRelease(coord->filter_mem_tracker_);
       } else if (bloom_filter_.always_false()) {
         int64_t heap_space = sidecar_slice.size();
         if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
@@ -1251,7 +1260,7 @@
                      << PrettyPrinter::Print(heap_space, TUnit::BYTES)
                      << " (query_id=" << PrintId(coord->query_id()) << ")";
           // Disable, as one missing update means a correct filter cannot be produced.
-          Disable(coord->filter_mem_tracker_);
+          DisableAndRelease(coord->filter_mem_tracker_);
         } else {
           bloom_filter_ = params.bloom_filter();
           bloom_filter_directory_ = sidecar_slice.ToString();
@@ -1267,7 +1276,7 @@
     DCHECK(is_min_max_filter());
     DCHECK(params.has_min_max_filter());
     if (params.min_max_filter().always_true()) {
-      Disable(coord->filter_mem_tracker_);
+      DisableAndRelease(coord->filter_mem_tracker_);
     } else if (min_max_filter_.always_false()) {
       MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
     } else {
@@ -1281,13 +1290,19 @@
   }
 }
 
-void Coordinator::FilterState::Disable(MemTracker* tracker) {
+void Coordinator::FilterState::DisableAndRelease(MemTracker* tracker) {
+  Disable();
   if (is_bloom_filter()) {
-    bloom_filter_.set_always_true(true);
-    bloom_filter_.set_always_false(false);
     tracker->Release(bloom_filter_directory_.size());
     bloom_filter_directory_.clear();
     bloom_filter_directory_.shrink_to_fit();
+  }
+}
+
+void Coordinator::FilterState::Disable() {
+  if (is_bloom_filter()) {
+    bloom_filter_.set_always_true(true);
+    bloom_filter_.set_always_false(false);
   } else {
     DCHECK(is_min_max_filter());
     min_max_filter_.set_always_true(true);
@@ -1295,6 +1310,12 @@
   }
 }
 
+void Coordinator::FilterState::WaitForPublishFilter() {
+  while (num_inflight_publish_filter_rpcs_ > 0) {
+    publish_filter_done_cv_.wait(lock_);
+  }
+}
+
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 7b05fde..aa03f30 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -100,8 +100,8 @@
 ///
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
-/// 2. filter_lock_
-/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_, ExecSummary::lock
+/// 2. Coordinator::FilterRoutingTable::lock
+/// 3. exec_state_lock_, backend_states_init_lock_, FilterState::lock_, ExecSummary::lock
 /// 4. Coordinator::BackendState::lock_ (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files