IMPALA-7984: Port runtime filter from Thrift RPC to KRPC

Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.

In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.

Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.

Testing:
This patch has passed the exhaustive tests.

Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8
Reviewed-on: http://gerrit.cloudera.org:8080/13882
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/14974
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index f216911..1e4938d 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,6 +21,7 @@
 #include <iostream>
 #include <vector>
 
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/buffer-pool.h"
@@ -36,6 +37,8 @@
 using namespace std;
 using namespace impala;
 
+using kudu::rpc::RpcController;
+
 // Tests Bloom filter performance on:
 //
 // 1. Construct/destruct pairs
@@ -283,18 +286,44 @@
   explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
     BloomFilter bf(client);
     CHECK(bf.Init(log_bufferpool_size).ok());
-    BloomFilter::ToThrift(&bf, &tbf1);
-    BloomFilter::ToThrift(&bf, &tbf2);
+
+    RpcController controller1;
+    RpcController controller2;
+    BloomFilter::ToProtobuf(&bf, &controller1, &pbf1);
+    BloomFilter::ToProtobuf(&bf, &controller2, &pbf2);
+
+    // Need to set 'always_false_' of pbf2 to false because
+    // (i) 'always_false_' of a BloomFilter is set to true when the Bloom filter
+    // hasn't had any elements inserted (since nothing is inserted to the
+    /// BloomFilter bf),
+    // (ii) ToProtobuf() will set 'always_false_' of a BloomFilterPB
+    // to true, and
+    // (iii) Or() will check 'always_false_' of the output BloomFilterPB is not true
+    /// before performing the corresponding bit operations.
+    /// The field 'always_false_' was added by IMPALA-5789, which aims to allow
+    /// an HdfsScanner to early terminate the scan at file and split granularities.
+    pbf2.set_always_false(false);
+
+    int64_t directory_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_size);
+    string d1(reinterpret_cast<const char*>(bf.directory_), directory_size);
+    string d2(reinterpret_cast<const char*>(bf.directory_), directory_size);
+
+    directory1 = d1;
+    directory2 = d2;
+
     bf.Close();
   }
 
-  TBloomFilter tbf1, tbf2;
+  BloomFilterPB pbf1, pbf2;
+  string directory1, directory2;
 };
 
 void Benchmark(int batch_size, void* data) {
   TestData* d = reinterpret_cast<TestData*>(data);
   for (int i = 0; i < batch_size; ++i) {
-    BloomFilter::Or(d->tbf1, &d->tbf2);
+    BloomFilter::Or(d->pbf1, reinterpret_cast<const uint8_t*>((d->directory1).data()),
+        &(d->pbf2), reinterpret_cast<uint8_t*>(const_cast<char*>((d->directory2).data())),
+        d->directory1.size());
   }
 }
 
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index fee4d39..656bbc3 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -39,29 +39,6 @@
     : ImpalaInternalServiceClient(iprot, oprot) {
   }
 
-/// We intentionally disable this clang warning as we intend to hide the
-/// the same-named functions defined in the base class.
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-
-  void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    ImpalaInternalServiceClient::send_UpdateFilter(params);
-    *send_done = true;
-    ImpalaInternalServiceClient::recv_UpdateFilter(_return);
-  }
-
-  void PublishFilter(TPublishFilterResult& _return, const TPublishFilterParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    ImpalaInternalServiceClient::send_PublishFilter(params);
-    *send_done = true;
-    ImpalaInternalServiceClient::recv_PublishFilter(_return);
-  }
-
-#pragma clang diagnostic pop
-
 };
 
 }
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 26d0de3..fdf44c1 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -29,7 +29,6 @@
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "rpc/thrift-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
 
 #include "common/names.h"
 using namespace apache::thrift;
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 062a97d..72e245e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -23,6 +23,7 @@
 #include "exec/exec-node.h"
 #include "exec/kudu-util.h"
 #include "exec/scan-node.h"
+#include "gen-cpp/data_stream_service.proxy.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -37,6 +38,7 @@
 #include "runtime/fragment-instance-state.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "service/control-service.h"
+#include "service/data-stream-service.h"
 #include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
 #include "util/network-util.h"
@@ -549,21 +551,35 @@
   return true;
 }
 
-void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(rpc_params.dst_query_id == query_id_);
+void Coordinator::BackendState::PublishFilter(
+    const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+  DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
   if (IsDone()) return;
 
-  if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
+  if (fragments_.count(rpc_params.dst_fragment_idx()) == 0) return;
   Status status;
-  ImpalaBackendConnection backend_client(
-      ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
-  if (!status.ok()) return;
-  TPublishFilterResult res;
-  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, rpc_params, &res);
-  if (!status.ok()) {
-    LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail();
+
+  std::unique_ptr<DataStreamServiceProxy> proxy;
+  Status get_proxy_status =
+      DataStreamService::GetProxy(krpc_host_, host_.hostname, &proxy);
+  if (!get_proxy_status.ok()) {
+    // Failing to send a filter is not a query-wide error - the remote fragment will
+    // continue regardless.
+    LOG(ERROR) << "Couldn't get proxy: " << get_proxy_status.msg().msg();
+    return;
+  }
+
+  PublishFilterResultPB res;
+  kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
+  if (!rpc_status.ok()) {
+    LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
+    return;
+  }
+  if (res.status().status_code() != TErrorCode::OK) {
+    LOG(ERROR) << "PublishFilter() operation failed: "
+               << Status(res.status()).GetDetail();
   }
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9d390e2..fd80b86 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,7 +106,8 @@
 
   /// Make a PublishFilter rpc with given params if this backend has instances of the
   /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
-  void PublishFilter(const TPublishFilterParams& rpc_params);
+  void PublishFilter(
+      const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
 
   /// Cancel execution at this backend if anything is running. Returns true
   /// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index e16abea..55c6b3d 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -61,12 +61,13 @@
   FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
     : desc_(desc), src_(src) {
     // bloom_filter_ is a disjunction so the unit value is always_false.
-    bloom_filter_.always_false = true;
-    min_max_filter_.always_false = true;
+    bloom_filter_.set_always_false(true);
+    min_max_filter_.set_always_false(true);
   }
 
-  TBloomFilter& bloom_filter() { return bloom_filter_; }
-  TMinMaxFilter& min_max_filter() { return min_max_filter_; }
+  BloomFilterPB& bloom_filter() { return bloom_filter_; }
+  string& bloom_filter_directory() { return bloom_filter_directory_; }
+  MinMaxFilterPB& min_max_filter() { return min_max_filter_; }
   std::vector<FilterTarget>* targets() { return &targets_; }
   const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -81,16 +82,17 @@
   void set_num_producers(int num_producers) { num_producers_ = num_producers; }
   bool disabled() const {
     if (is_bloom_filter()) {
-      return bloom_filter_.always_true;
+      return bloom_filter_.always_true();
     } else {
       DCHECK(is_min_max_filter());
-      return min_max_filter_.always_true;
+      return min_max_filter_.always_true();
     }
   }
 
   /// Aggregates partitioned join filters and updates memory consumption.
   /// Disables filter if always_true filter is received or OOM is hit.
-  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
+  void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
+      kudu::rpc::RpcContext* context);
 
   /// Disables a filter. A disabled filter consumes no memory.
   void Disable(MemTracker* tracker);
@@ -110,13 +112,16 @@
   int num_producers_ = 0;
 
   /// Filters aggregated from all source plan nodes, to be broadcast to all
-  /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
-  /// need no aggregation).
+  /// destination plan fragment instances. Only set for partitioned joins (broadcast
+  /// joins need no aggregation).
   /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
   /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
   /// the filter is moved from the following member to the output structure.
-  TBloomFilter bloom_filter_;
-  TMinMaxFilter min_max_filter_;
+  BloomFilterPB bloom_filter_;
+  /// When the filter is a Bloom filter, we use this string to store the contents of the
+  /// aggregated Bloom filter.
+  string bloom_filter_directory_;
+  MinMaxFilterPB min_max_filter_;
 
   /// Time at which first local filter arrived.
   int64_t first_arrival_time_ = 0L;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 9ac5ee6..48d81f9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -31,8 +31,9 @@
 #include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -55,6 +56,9 @@
 
 #include "common/names.h"
 
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
 using namespace apache::thrift;
 using namespace rapidjson;
 using boost::algorithm::iequals;
@@ -1107,7 +1111,7 @@
   return result;
 }
 
-void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) {
   shared_lock<shared_mutex> lock(filter_routing_table_->lock);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
@@ -1118,8 +1122,9 @@
   DCHECK(filter_routing_table_->is_complete)
       << "Filter received before routing table complete";
 
-  TPublishFilterParams rpc_params;
+  PublishFilterParamsPB rpc_params;
   unordered_set<int> target_fragment_idxs;
+  string bloom_filter_directory;
   {
     lock_guard<SpinLock> l(filter_routing_table_->update_lock);
     if (!IsExecuting()) {
@@ -1127,17 +1132,17 @@
                 << query_id();
       return;
     }
-    auto it = filter_routing_table_->id_to_filter.find(params.filter_id);
+    auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
     if (it == filter_routing_table_->id_to_filter.end()) {
-      LOG(INFO) << "Could not find filter with id: " << params.filter_id;
+      LOG(INFO) << "Could not find filter with id: " << params.filter_id();
       return;
     }
     FilterState* state = &it->second;
 
     DCHECK(state->desc().has_remote_targets)
-          << "Coordinator received filter that has only local targets";
+        << "Coordinator received filter that has only local targets";
 
-    // Check if the filter has already been sent, which could happen in four cases:
+    // Check if the filter has already been sent, which could happen in five cases:
     //   * if one local filter had always_true set - no point waiting for other local
     //     filters that can't affect the aggregated global filter
     //   * if this is a broadcast join, and another local filter was already received
@@ -1145,6 +1150,7 @@
     //     immediately.
     //   * query execution finished and resources were released: filters do not need
     //     to be processed.
+    //   * if the inbound sidecar for Bloom filter cannot be successfully retrieved.
     if (state->disabled()) return;
 
     if (filter_updates_received_->value() == 0) {
@@ -1152,7 +1158,7 @@
     }
     filter_updates_received_->Add(1);
 
-    state->ApplyUpdate(params, this);
+    state->ApplyUpdate(params, this, context);
 
     if (state->pending_count() > 0 && !state->disabled()) return;
     // At this point, we either disabled this filter or aggregation is complete.
@@ -1167,33 +1173,36 @@
     }
 
     if (state->is_bloom_filter()) {
-      // Assign outgoing bloom filter.
-      TBloomFilter& aggregated_filter = state->bloom_filter();
-
-      swap(rpc_params.bloom_filter, aggregated_filter);
-      DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
-          || !rpc_params.bloom_filter.directory.empty());
-      DCHECK(aggregated_filter.directory.empty());
-      rpc_params.__isset.bloom_filter = true;
+      // Assign an outgoing bloom filter.
+      *rpc_params.mutable_bloom_filter() = state->bloom_filter();
+      bloom_filter_directory.swap(state->bloom_filter_directory());
+      DCHECK(rpc_params.bloom_filter().always_false()
+          || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
     } else {
       DCHECK(state->is_min_max_filter());
-      MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
-      rpc_params.__isset.min_max_filter = true;
+      MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
     }
 
     // Filter is complete, and can be released.
     state->Disable(filter_mem_tracker_);
   }
 
-  rpc_params.__set_dst_query_id(query_id());
-  rpc_params.__set_filter_id(params.filter_id);
+  TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+  rpc_params.set_filter_id(params.filter_id());
 
   // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       if (!IsExecuting()) goto cleanup;
-      rpc_params.__set_dst_fragment_idx(fragment_idx);
-      bs->PublishFilter(rpc_params);
+      rpc_params.set_dst_fragment_idx(fragment_idx);
+      RpcController controller;
+      if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+          && !rpc_params.bloom_filter().always_true()) {
+        BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
+            bloom_filter_directory);
+      }
+      // TODO: make this asynchronous.
+      bs->PublishFilter(rpc_params, controller);
     }
   }
 
@@ -1201,13 +1210,13 @@
   // For bloom filters, the memory used in the filter_routing_table_ is transfered to
   // rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
   // here to ensure that the MemTracker is updated after the memory is actually freed.
-  if (rpc_params.__isset.bloom_filter) {
-    filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
+  if (rpc_params.has_bloom_filter()) {
+    filter_mem_tracker_->Release(bloom_filter_directory.size());
   }
 }
 
-void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
-    Coordinator* coord) {
+void Coordinator::FilterState::ApplyUpdate(
+    const UpdateFilterParamsPB& params, Coordinator* coord, RpcContext* context) {
   DCHECK(!disabled());
   DCHECK_GT(pending_count_, 0);
   DCHECK_EQ(completion_time_, 0L);
@@ -1217,38 +1226,52 @@
 
   --pending_count_;
   if (is_bloom_filter()) {
-    DCHECK(params.__isset.bloom_filter);
-    if (params.bloom_filter.always_true) {
+    DCHECK(params.has_bloom_filter());
+    if (params.bloom_filter().always_true()) {
       Disable(coord->filter_mem_tracker_);
-    } else if (bloom_filter_.always_false) {
-      int64_t heap_space = params.bloom_filter.directory.size();
-      if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
-        VLOG_QUERY << "Not enough memory to allocate filter: "
-                   << PrettyPrinter::Print(heap_space, TUnit::BYTES)
-                   << " (query_id=" << PrintId(coord->query_id()) << ")";
-        // Disable, as one missing update means a correct filter cannot be produced.
-        Disable(coord->filter_mem_tracker_);
-      } else {
-        // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
-        // move the payload from the request rather than copy it and take double the
-        // memory cost. After this point, params.bloom_filter is an empty filter and
-        // should not be read.
-        TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
-        swap(bloom_filter_, *non_const_filter);
-        DCHECK_EQ(non_const_filter->directory.size(), 0);
+    } else if (params.bloom_filter().always_false()) {
+      if (!bloom_filter_.has_log_bufferpool_space()) {
+        bloom_filter_ = BloomFilterPB(params.bloom_filter());
       }
     } else {
-      BloomFilter::Or(params.bloom_filter, &bloom_filter_);
+      // If the incoming Bloom filter is neither an always true filter nor an
+      // always false filter, then it must be the case that a non-empty sidecar slice
+      // has been received. Refer to BloomFilter::ToProtobuf() for further details.
+      DCHECK(params.bloom_filter().has_directory_sidecar_idx());
+      kudu::Slice sidecar_slice;
+      kudu::Status status = context->GetInboundSidecar(
+          params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+      if (!status.ok()) {
+        LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
+        Disable(coord->filter_mem_tracker_);
+      } else if (bloom_filter_.always_false()) {
+        int64_t heap_space = sidecar_slice.size();
+        if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+          VLOG_QUERY << "Not enough memory to allocate filter: "
+                     << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+                     << " (query_id=" << PrintId(coord->query_id()) << ")";
+          // Disable, as one missing update means a correct filter cannot be produced.
+          Disable(coord->filter_mem_tracker_);
+        } else {
+          bloom_filter_ = params.bloom_filter();
+          bloom_filter_directory_ = sidecar_slice.ToString();
+        }
+      } else {
+        DCHECK_EQ(bloom_filter_directory_.size(), sidecar_slice.size());
+        BloomFilter::Or(params.bloom_filter(), sidecar_slice.data(), &bloom_filter_,
+            reinterpret_cast<uint8_t*>(const_cast<char*>(bloom_filter_directory_.data())),
+            sidecar_slice.size());
+      }
     }
   } else {
     DCHECK(is_min_max_filter());
-    DCHECK(params.__isset.min_max_filter);
-    if (params.min_max_filter.always_true) {
+    DCHECK(params.has_min_max_filter());
+    if (params.min_max_filter().always_true()) {
       Disable(coord->filter_mem_tracker_);
-    } else if (min_max_filter_.always_false) {
-      MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
+    } else if (min_max_filter_.always_false()) {
+      MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
     } else {
-      MinMaxFilter::Or(params.min_max_filter, &min_max_filter_,
+      MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_,
           ColumnType::FromThrift(desc_.src_expr.nodes[0].type));
     }
   }
@@ -1260,15 +1283,15 @@
 
 void Coordinator::FilterState::Disable(MemTracker* tracker) {
   if (is_bloom_filter()) {
-    bloom_filter_.always_true = true;
-    bloom_filter_.always_false = false;
-    tracker->Release(bloom_filter_.directory.size());
-    bloom_filter_.directory.clear();
-    bloom_filter_.directory.shrink_to_fit();
+    bloom_filter_.set_always_true(true);
+    bloom_filter_.set_always_false(false);
+    tracker->Release(bloom_filter_directory_.size());
+    bloom_filter_directory_.clear();
+    bloom_filter_directory_.shrink_to_fit();
   } else {
     DCHECK(is_min_max_filter());
-    min_max_filter_.always_true = true;
-    min_max_filter_.always_false = false;
+    min_max_filter_.set_always_true(true);
+    min_max_filter_.set_always_false(false);
   }
 }
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9ed3324..7b05fde 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -31,12 +31,19 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/dml-exec-state.h"
 #include "util/counting-barrier.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class AuxErrorInfoPB;
@@ -172,7 +179,7 @@
   /// with others for the same filter ID into a global filter. If all updates for that
   /// filter ID have been received (may be 1 or more per filter), broadcast the global
   /// filter to fragment instances.
-  void UpdateFilter(const TUpdateFilterParams& params);
+  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Adds to 'document' a serialized array of all backends in a member named
   /// 'backend_states'.
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 90bbf48..4ffdb1d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -122,6 +122,12 @@
     stream_mgr_->CloseSender(request, response, rpc_context);
   }
 
+  virtual void UpdateFilter(
+      const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
+
+  virtual void PublishFilter(const PublishFilterParamsPB* req,
+      PublishFilterResultPB* resp, RpcContext* context) {}
+
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
  private:
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index b34cde0..ea2d126 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -22,6 +22,7 @@
 #include <ostream>
 
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/multi-precision.h"
 #include "runtime/types.h"
 
@@ -59,8 +60,8 @@
     return FromDouble(t.precision, t.scale, d, round, overflow);
   }
 
-  /// Returns a new DecimalValue created from the value in 'tvalue'.
-  static inline DecimalValue FromTColumnValue(const TColumnValue& tvalue);
+  /// Returns a new DecimalValue created from the value in 'value_pb'.
+  static inline DecimalValue FromColumnValuePB(const ColumnValuePB& value_pb);
 
   static inline DecimalValue FromDouble(int precision, int scale, double d,
       bool round, bool* overflow);
@@ -196,11 +197,10 @@
 
   inline DecimalValue<T> Abs() const;
 
-  /// Store the binary representation of this DecimalValue in 'tvalue'.
-  void ToTColumnValue(TColumnValue* tvalue) const {
+  /// Store the binary representation of this DecimalValue in 'value_pb'.
+  void ToColumnValuePB(ColumnValuePB* value_pb) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(&value_);
-    tvalue->decimal_val.assign(data, data + sizeof(T));
-    tvalue->__isset.decimal_val = true;
+    value_pb->mutable_decimal_val()->assign(data, data + sizeof(T));
   }
 
  private:
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index 6480099..b2d5fc0 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -61,9 +61,9 @@
 }
 
 template <typename T>
-inline DecimalValue<T> DecimalValue<T>::FromTColumnValue(const TColumnValue& tvalue) {
+inline DecimalValue<T> DecimalValue<T>::FromColumnValuePB(const ColumnValuePB& value_pb) {
   T value = 0;
-  memcpy(&value, tvalue.decimal_val.c_str(), tvalue.decimal_val.length());
+  memcpy(&value, value_pb.decimal_val().c_str(), value_pb.decimal_val().length());
   return DecimalValue<T>(value);
 }
 
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 7d7fea4..d68c469 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,7 +27,6 @@
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "exec/kudu-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index b74bbe3..9236852 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -32,6 +32,7 @@
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/exchange-node.h"
 #include "exec/scan-node.h"
+#include "kudu/rpc/rpc_context.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
@@ -49,6 +50,7 @@
 #include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
+using kudu::rpc::RpcContext;
 using namespace impala;
 using namespace apache::thrift;
 
@@ -528,10 +530,11 @@
   return opened_promise_.Get();
 }
 
-void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
+void FragmentInstanceState::PublishFilter(
+    const PublishFilterParamsPB& params, RpcContext* context) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
-            << " filter_id=" << params.filter_id;
-  runtime_state_->filter_bank()->PublishGlobalFilter(params);
+            << " filter_id=" << params.filter_id();
+  runtime_state_->filter_bank()->PublishGlobalFilter(params, context);
 }
 
 const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 69e081b..c151457 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -29,6 +29,7 @@
 #include "util/promise.h"
 
 #include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/row-batch.h"
@@ -36,6 +37,12 @@
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class TPlanFragmentCtx;
@@ -90,7 +97,7 @@
   Status WaitForOpen();
 
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
-  void PublishFilter(const TPublishFilterParams& params);
+  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Called periodically by query state thread to get the current status of this fragment
   /// instance. The fragment instance's status is stored in 'instance_status' and its
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index e5a804c..678aecd 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -23,6 +23,7 @@
 #include "common/thread-debug-info.h"
 #include "exec/kudu-util.h"
 #include "exprs/expr.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
@@ -40,6 +41,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
 #include "service/control-service.h"
+#include "service/data-stream-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
@@ -50,7 +52,6 @@
 #include "gen-cpp/control_service.proxy.h"
 
 using kudu::MonoDelta;
-using kudu::rpc::RpcController;
 using kudu::rpc::RpcSidecar;
 
 #include "common/names.h"
@@ -675,11 +676,11 @@
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
-void QueryState::PublishFilter(const TPublishFilterParams& params) {
+void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext* context) {
   if (!WaitForPrepare().ok()) return;
-  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
-  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
-    fis->PublishFilter(params);
+  DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx()), 1);
+  for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx()]) {
+    fis->PublishFilter(params, context);
   }
 }
 
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 86156f4..f359f1a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,12 +27,19 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/tmp-file-mgr.h"
 #include "util/container-util.h"
 #include "util/counting-barrier.h"
 #include "util/uid-util.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class ControlServiceProxy;
@@ -192,7 +199,7 @@
       const TUniqueId& instance_id, FragmentInstanceState** fi_state);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  void PublishFilter(const TPublishFilterParams& params);
+  void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 56f63aa..21a0bfd 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -17,19 +17,26 @@
 
 #include "runtime/runtime-filter-bank.h"
 
+#include <chrono>
+
 #include <boost/algorithm/string/join.hpp>
 
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.proxy.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
+#include "service/data-stream-service.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
 #include "util/bloom-filter.h"
@@ -38,6 +45,9 @@
 
 #include "common/names.h"
 
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
 using namespace impala;
 using namespace boost;
 using namespace strings;
@@ -102,32 +112,35 @@
   return ret;
 }
 
-namespace {
+void RuntimeFilterBank::UpdateFilterCompleteCb(
+    const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
+  const kudu::Status controller_status = rpc_controller->status();
 
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
-    ImpalaBackendClientCache* client_cache) {
-  Status status;
-  ImpalaBackendConnection coord(client_cache, address, &status);
-  if (!status.ok()) {
-    // Failing to send a filter is not a query-wide error - the remote fragment will
-    // continue regardless.
-    // TODO: Retry.
-    LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
-    return;
+  // In the case of an unsuccessful KRPC call, e.g., request dropped due to
+  // backpressure, we only log this event w/o retrying. Failing to send a
+  // filter is not a query-wide error - the remote fragment will continue
+  // regardless.
+  if (!controller_status.ok()) {
+    LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
   }
-  TUpdateFilterResult res;
-  status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
+  // DataStreamService::UpdateFilter() should never set an error status
+  DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
 
+  {
+    std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+    DCHECK_GT(num_inflight_rpcs_, 0);
+    --num_inflight_rpcs_;
+  }
+  krpcs_done_cv_.notify_one();
 }
 
 void RuntimeFilterBank::UpdateFilterFromLocal(
     int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
   DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
-  TUpdateFilterParams params;
+  // This function is only called from ExecNode::Open() or more specifically
+  // PartitionedHashJoinNode::Open().
+  DCHECK(!closed_);
   // A runtime filter may have both local and remote targets.
   bool has_local_target = false;
   bool has_remote_target = false;
@@ -159,64 +172,108 @@
 
   if (has_remote_target
       && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
-    params.__set_filter_id(filter_id);
-    params.__set_query_id(state_->query_id());
+    UpdateFilterParamsPB params;
+    // The memory associated with the following 2 objects needs to live until
+    // the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
+    // Hence, we allocate these 2 objects in 'obj_pool_'.
+    UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
+    RpcController* controller = obj_pool_.Add(new RpcController);
+
+    TUniqueIdToUniqueIdPB(state_->query_id(), params.mutable_query_id());
+    params.set_filter_id(filter_id);
     if (type == TRuntimeFilterType::BLOOM) {
-      BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-      params.__isset.bloom_filter = true;
+      BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
     } else {
-      DCHECK(type == TRuntimeFilterType::MIN_MAX);
-      min_max_filter->ToThrift(&params.min_max_filter);
-      params.__isset.min_max_filter = true;
+      DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
+      min_max_filter->ToProtobuf(params.mutable_min_max_filter());
+    }
+    const TNetworkAddress& krpc_address = state_->query_ctx().coord_krpc_address;
+    const TNetworkAddress& host_address = state_->query_ctx().coord_address;
+
+    // Use 'proxy' to send the filter to the coordinator.
+    unique_ptr<DataStreamServiceProxy> proxy;
+    Status get_proxy_status =
+        DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
+    if (!get_proxy_status.ok()) {
+      // Failing to send a filter is not a query-wide error - the remote fragment will
+      // continue regardless.
+      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
+          host_address.hostname, get_proxy_status.msg().msg());
+      return;
     }
 
-    ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
-        SendFilterToCoordinator, state_->query_ctx().coord_address, params,
-        ExecEnv::GetInstance()->impalad_client_cache()));
+    // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
+    // in Close() until all in-flight RPCs complete.
+    {
+      unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+      DCHECK_GE(num_inflight_rpcs_, 0);
+      ++num_inflight_rpcs_;
+    }
+
+    proxy->UpdateFilterAsync(params, res, controller,
+        boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
   }
 }
 
-void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
+void RuntimeFilterBank::PublishGlobalFilter(
+    const PublishFilterParamsPB& params, RpcContext* context) {
   lock_guard<mutex> l(runtime_filter_lock_);
   if (closed_) return;
-  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
+  RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id());
   DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
-                                        << params.filter_id;
+                                        << params.filter_id();
 
   BloomFilter* bloom_filter = nullptr;
   MinMaxFilter* min_max_filter = nullptr;
   if (it->second->is_bloom_filter()) {
-    DCHECK(params.__isset.bloom_filter);
-    if (params.bloom_filter.always_true) {
+    DCHECK(params.has_bloom_filter());
+    if (params.bloom_filter().always_true()) {
       bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
     } else {
-      int64_t required_space =
-          BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
+      int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
+          params.bloom_filter().log_bufferpool_space());
       DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
           << "BufferPool Client should have enough reservation to fulfill bloom filter "
              "allocation";
       bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
-      Status status = bloom_filter->Init(params.bloom_filter);
-      if (!status.ok()) {
-        LOG(ERROR) << "Unable to allocate memory for bloom filter: "
-                   << status.GetDetail();
-        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+
+      kudu::Slice sidecar_slice;
+      if (params.bloom_filter().has_directory_sidecar_idx()) {
+        kudu::Status status = context->GetInboundSidecar(
+            params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+        if (!status.ok()) {
+          LOG(ERROR) << "Failed to get Bloom filter sidecar: "
+                     << status.message().ToString();
+          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+        }
       } else {
-        bloom_filters_.push_back(bloom_filter);
-        DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
-        bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+        DCHECK(params.bloom_filter().always_false());
+      }
+
+      if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
+        Status status = bloom_filter->Init(
+            params.bloom_filter(), sidecar_slice.data(), sidecar_slice.size());
+        if (!status.ok()) {
+          LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+                     << status.GetDetail();
+          bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+        } else {
+          bloom_filters_.push_back(bloom_filter);
+          DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+          bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+        }
       }
     }
   } else {
     DCHECK(it->second->is_min_max_filter());
-    DCHECK(params.__isset.min_max_filter);
-    min_max_filter = MinMaxFilter::Create(
-        params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
+    DCHECK(params.has_min_max_filter());
+    min_max_filter = MinMaxFilter::Create(params.min_max_filter(), it->second->type(),
+        &obj_pool_, filter_mem_tracker_.get());
     min_max_filters_.push_back(min_max_filter);
   }
   it->second->SetFilter(bloom_filter, min_max_filter);
   state_->runtime_profile()->AddInfoString(
-      Substitute("Filter $0 arrival", params.filter_id),
+      Substitute("Filter $0 arrival", params.filter_id()),
       PrettyPrinter::Print(it->second->arrival_delay_ms(), TUnit::TIME_MS));
 }
 
@@ -278,8 +335,21 @@
 }
 
 void RuntimeFilterBank::Close() {
-  lock_guard<mutex> l(runtime_filter_lock_);
+  // Wait for all in-flight RPCs to complete before closing the filters.
+  {
+    unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
+    while (num_inflight_rpcs_ > 0) {
+      krpcs_done_cv_.wait(l1);
+    }
+  }
+
+  lock_guard<mutex> l2(runtime_filter_lock_);
   CancelLocked();
+  // We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
+  // drain because the async build thread in
+  // BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
+  // Close() is called so there shouldn't be any new RPCs being issued when this function
+  // is called.
   closed_ = true;
   for (BloomFilter* filter : bloom_filters_) filter->Close();
   for (MinMaxFilter* filter : min_max_filters_) filter->Close();
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 1208f03..78a95cf 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
 
 #include "codegen/impala-ir.h"
 #include "common/object-pool.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/mem-pool.h"
 #include "runtime/types.h"
@@ -29,6 +30,15 @@
 #include <boost/thread/lock_guard.hpp>
 #include <boost/unordered_map.hpp>
 
+#include <condition_variable>
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 
 class BloomFilter;
@@ -36,7 +46,6 @@
 class MinMaxFilter;
 class RuntimeFilter;
 class RuntimeState;
-class TBloomFilter;
 class TRuntimeFilterDesc;
 class TQueryCtx;
 
@@ -94,7 +103,8 @@
 
   /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
-  void PublishGlobalFilter(const TPublishFilterParams& params);
+  void PublishGlobalFilter(
+      const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
   /// 'filter_size' would have an expected false-positive rate which would exceed
@@ -150,6 +160,16 @@
   /// All filters expected to be consumed by the local plan fragment instance.
   RuntimeFilterMap consumed_filters_;
 
+  /// Lock protecting 'num_inflight_rpcs_' and it should not be taken at the same
+  /// time as runtime_filter_lock_.
+  SpinLock num_inflight_rpcs_lock_;
+  /// Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
+  /// KRPC calls to prevent the memory pointed to by a BloomFilter* being
+  /// deallocated in RuntimeFilterBank::Close() before all KRPC calls have
+  /// been completed.
+  int32_t num_inflight_rpcs_ = 0;
+  std::condition_variable_any krpcs_done_cv_;
+
   /// Fragment instance's runtime state.
   RuntimeState* state_;
 
@@ -184,6 +204,11 @@
   /// in ClaimBufferReservation(). Reservations are returned to the initial reservations
   /// pool in Close().
   BufferPool::ClientHandle buffer_pool_client_;
+
+  /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
+  /// UpdateFilterFromLocal().
+  void UpdateFilterCompleteCb(
+      const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
 };
 
 }
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5b65f7a..618788c 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -67,7 +67,8 @@
 
   MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
 
-  /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
+  /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
+  /// depending on the type of this RuntimeFilter. Can only legally be called
   /// once per filter. Does not acquire the memory associated with 'bloom_filter'.
   void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
 
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 2b2ce79..4eb5c37 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -27,6 +27,7 @@
 
 #include "common/global-types.h"
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "udf/udf.h"
 #include "util/hash-util.h"
 
@@ -166,17 +167,16 @@
     *ptp = boost::posix_time::ptime(date_, time_);
   }
 
-  // Store the binary representation of this TimestampValue in 'tvalue'.
-  void ToTColumnValue(TColumnValue* tvalue) const {
+  // Store the binary representation of this TimestampValue in 'pvalue'.
+  void ToColumnValuePB(ColumnValuePB* pvalue) const {
     const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
-    tvalue->timestamp_val.assign(data, data + Size());
-    tvalue->__isset.timestamp_val = true;
+    pvalue->mutable_timestamp_val()->assign(data, data + Size());
   }
 
-  // Returns a new TimestampValue created from the value in 'tvalue'.
-  static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
+  // Returns a new TimestampValue created from the value in 'value_pb'.
+  static TimestampValue FromColumnValuePB(const ColumnValuePB& value_pb) {
     TimestampValue value;
-    memcpy(&value, tvalue.timestamp_val.c_str(), Size());
+    memcpy(&value, value_pb.timestamp_val().c_str(), Size());
     value.Validate();
     return value;
   }
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index 02642c4..ad38900 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -20,7 +20,6 @@
 
 #include <jni.h>
 
-#include "gen-cpp/ImpalaInternalService.h"
 #include "common/status.h"
 #include "util/metrics-fwd.h"
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 1bdcfcf..6197d1c 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1409,9 +1409,10 @@
   return coord_->UpdateBackendExecStatus(request, thrift_profiles);
 }
 
-void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {
+void ClientRequestState::UpdateFilter(
+    const UpdateFilterParamsPB& params, RpcContext* context) {
   DCHECK(coord_.get());
-  coord_->UpdateFilter(params);
+  coord_->UpdateFilter(params, context);
 }
 
 bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index ac901a4..1ef3b1d 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -190,7 +190,7 @@
   /// object.
   Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
       const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
-  void UpdateFilter(const TUpdateFilterParams& params);
+  void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
   /// Populate DML stats in 'dml_result' if this request succeeded.
   /// Sets 'query_status' to the overall query status.
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 42b14f9..71ec8cc 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -29,7 +29,9 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
+#include "service/impala-server.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
 
@@ -106,6 +108,38 @@
   ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
 
+void DataStreamService::UpdateFilter(
+    const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
+  DCHECK(req->has_filter_id());
+  DCHECK(req->has_query_id());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  ExecEnv::GetInstance()->impala_server()->UpdateFilter(resp, *req, context);
+  RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+}
+
+void DataStreamService::PublishFilter(
+    const PublishFilterParamsPB* req, PublishFilterResultPB* resp, RpcContext* context) {
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
+  DCHECK(req->has_filter_id());
+  DCHECK(req->has_dst_query_id());
+  DCHECK(req->has_dst_fragment_idx());
+  DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+  QueryState::ScopedRef qs(ProtoToQueryId(req->dst_query_id()));
+
+  if (qs.get() != nullptr) {
+    qs->PublishFilter(*req, context);
+    RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+  } else {
+    string err_msg = Substitute("Query State not found for query_id=$0",
+        PrintId(ProtoToQueryId(req->dst_query_id())));
+    LOG(INFO) << err_msg;
+    RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
+  }
+}
+
 template<typename ResponsePBType>
 void DataStreamService::RespondRpc(const Status& status,
     ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 539974c..24a2249 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -63,6 +63,16 @@
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
 
+  /// Called by fragment instances that produce local runtime filters to deliver them to
+  /// the coordinator for aggregation and broadcast.
+  virtual void UpdateFilter(const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp,
+      kudu::rpc::RpcContext* context);
+
+  /// Called by the coordinator to deliver global runtime filters to fragments for
+  /// application at plan nodes.
+  virtual void PublishFilter(const PublishFilterParamsPB* req,
+      PublishFilterResultPB* resp, kudu::rpc::RpcContext* context);
+
   /// Respond to a RPC passed in 'response'/'ctx' with 'status' and release
   /// the payload memory from 'mem_tracker'. Takes ownership of 'ctx'.
   template<typename ResponsePBType>
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index d703406..69a5736 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -22,7 +22,6 @@
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "common/status.h"
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 22a8243..3b7af8a 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -43,24 +43,3 @@
       Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
   status.SetTStatus(status_container);
 }
-
-void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
-    const TUpdateFilterParams& params) {
-  DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
-  DCHECK(params.__isset.filter_id);
-  DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
-  impala_server_->UpdateFilter(return_val, params);
-}
-
-void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
-    const TPublishFilterParams& params) {
-  DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
-  DCHECK(params.__isset.filter_id);
-  DCHECK(params.__isset.dst_query_id);
-  DCHECK(params.__isset.dst_fragment_idx);
-  DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
-  QueryState::ScopedRef qs(params.dst_query_id);
-  if (qs.get() == nullptr) return;
-  qs->PublishFilter(params);
-}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index c75122b..425678b 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -30,10 +30,6 @@
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
   ImpalaInternalService();
-  virtual void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params);
-  virtual void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params);
 
  private:
   ImpalaServer* impala_server_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 11ea058..76184f8 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,6 +53,7 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-trace.h"
@@ -95,7 +96,6 @@
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/DataSinks_types.h"
 #include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/LineageGraph_types.h"
 #include "gen-cpp/Frontend_types.h"
 
@@ -113,6 +113,7 @@
 using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using kudu::GetRandomSeed32;
+using kudu::rpc::RpcContext;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -2537,17 +2538,18 @@
   return Status::OK();
 }
 
-void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
-    const TUpdateFilterParams& params) {
-  DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.filter_id);
+void ImpalaServer::UpdateFilter(UpdateFilterResultPB* result,
+    const UpdateFilterParamsPB& params, RpcContext* context) {
+  DCHECK(params.has_query_id());
+  DCHECK(params.has_filter_id());
   shared_ptr<ClientRequestState> client_request_state =
-      GetClientRequestState(params.query_id);
+      GetClientRequestState(ProtoToQueryId(params.query_id()));
   if (client_request_state.get() == nullptr) {
-    LOG(INFO) << "Could not find client request state: " << PrintId(params.query_id);
+    LOG(INFO) << "Could not find client request state: "
+              << PrintId(ProtoToQueryId(params.query_id()));
     return;
   }
-  client_request_state->UpdateFilter(params);
+  client_request_state->UpdateFilter(params, context);
 }
 
 Status ImpalaServer::CheckNotShuttingDown() const {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7e78532..0746784 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -49,6 +49,12 @@
 #include "util/thread-pool.h"
 #include "util/time.h"
 
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
 namespace impala {
 using kudu::ThreadSafeRandom;
 
@@ -57,12 +63,7 @@
 class CancellationWork;
 class ImpalaHttpHandler;
 class RowDescriptor;
-class TCatalogUpdate;
-class TPlanExecRequest;
-class TPlanExecParams;
 class TDmlResult;
-class TReportExecStatusArgs;
-class TReportExecStatusResult;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -345,8 +346,8 @@
   virtual void CloseImpalaOperation(
       TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);
 
-  /// ImpalaInternalService rpcs
-  void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
+  void UpdateFilter(UpdateFilterResultPB* return_val, const UpdateFilterParamsPB& params,
+      kudu::rpc::RpcContext* context);
 
   /// Generates a unique id for this query and sets it in the given query context.
   /// Prepares the given query context by populating fields required for evaluating
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index e8e7e2e..7a7d27f 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,6 +21,7 @@
 #include <unordered_set>
 #include <vector>
 
+#include "kudu/rpc/rpc_controller.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/mem-tracker.h"
@@ -28,11 +29,14 @@
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 
+#include "gen-cpp/data_stream_service.pb.h"
+
 using namespace std;
 
-namespace {
-
 using namespace impala;
+using kudu::rpc::RpcController;
+
+namespace bloom_filter_test_util {
 
 // Make a random uint64_t, avoiding the absent high bit and the low-entropy low bits
 // produced by rand().
@@ -68,24 +72,47 @@
 // Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
 // verifies both produce the same result. 'success' is set to true if both union
 // computations returned the same result and set to false otherwise.
-TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success) {
-  TBloomFilter thrift_x, thrift_y;
-  BloomFilter::ToThrift(&x, &thrift_x);
-  BloomFilter::ToThrift(&y, &thrift_y);
-  BloomFilter::Or(thrift_x, &thrift_y);
+void BfUnion(const BloomFilter& x, const BloomFilter& y, int64_t directory_size,
+    bool* success, BloomFilterPB* protobuf, std::string* directory) {
+  BloomFilterPB protobuf_x, protobuf_y;
+  RpcController controller_x;
+  RpcController controller_y;
+  BloomFilter::ToProtobuf(&x, &controller_x, &protobuf_x);
+  BloomFilter::ToProtobuf(&y, &controller_y, &protobuf_y);
+
+  string directory_x(reinterpret_cast<const char*>(x.directory_), directory_size);
+  string directory_y(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+  BloomFilter::Or(protobuf_x, reinterpret_cast<const uint8_t*>(directory_x.data()),
+      &protobuf_y, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y.data())),
+      directory_size);
+
   {
     CpuInfo::TempDisable t1(CpuInfo::AVX);
     CpuInfo::TempDisable t2(CpuInfo::AVX2);
-    TBloomFilter thrift_x2, thrift_y2;
-    BloomFilter::ToThrift(&x, &thrift_x2);
-    BloomFilter::ToThrift(&y, &thrift_y2);
-    BloomFilter::Or(thrift_x2, &thrift_y2);
-    *success = thrift_y.directory == thrift_y2.directory;
+    BloomFilterPB protobuf_x2, protobuf_y2;
+    RpcController controller_x2;
+    RpcController controller_y2;
+    BloomFilter::ToProtobuf(&x, &controller_x2, &protobuf_x2);
+    BloomFilter::ToProtobuf(&y, &controller_y2, &protobuf_y2);
+
+    string directory_x2(reinterpret_cast<const char*>(x.directory_), directory_size);
+    string directory_y2(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+    BloomFilter::Or(protobuf_x2, reinterpret_cast<const uint8_t*>(directory_x2.data()),
+        &protobuf_y2, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y2.data())),
+        directory_size);
+
+    *success = directory_y.compare(directory_y2) == 0;
   }
-  return thrift_y;
+
+  *protobuf = protobuf_y;
+  *directory = directory_y;
 }
 
-}  // namespace
+} // namespace bloom_filter_test_util
+
+using namespace bloom_filter_test_util;
 
 namespace impala {
 
@@ -204,12 +231,15 @@
     return bloom_filter;
   }
 
-  BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
+  BloomFilter* CreateBloomFilter(BloomFilterPB filter_pb, const std::string& directory) {
     int64_t filter_size =
-        BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
+        BloomFilter::GetExpectedMemoryUsed(filter_pb.log_bufferpool_space());
     EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
     BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
-    EXPECT_OK(bloom_filter->Init(t_filter));
+
+    EXPECT_OK(bloom_filter->Init(
+        filter_pb, reinterpret_cast<const uint8_t*>(directory.data()), directory.size()));
+
     bloom_filters_.push_back(bloom_filter);
     EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
     return bloom_filter;
@@ -311,7 +341,7 @@
   }
 }
 
-TEST_F(BloomFilterTest, Thrift) {
+TEST_F(BloomFilterTest, Protobuf) {
   BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
   // Check no unexpected new false positives.
@@ -320,19 +350,27 @@
     if (!BfFind(*bf, i)) missing_ints.insert(i);
   }
 
-  TBloomFilter to_thrift;
-  BloomFilter::ToThrift(bf, &to_thrift);
-  EXPECT_EQ(to_thrift.always_true, false);
+  BloomFilterPB to_protobuf;
 
-  BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
-  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
-  for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
+  RpcController controller;
+  BloomFilter::ToProtobuf(bf, &controller, &to_protobuf);
 
-  BloomFilter::ToThrift(NULL, &to_thrift);
-  EXPECT_EQ(to_thrift.always_true, true);
+  EXPECT_EQ(to_protobuf.always_true(), false);
+
+  std::string directory(reinterpret_cast<const char*>(bf->directory_),
+      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01)));
+
+  BloomFilter* from_protobuf = CreateBloomFilter(to_protobuf, directory);
+
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_protobuf, i));
+  for (int missing : missing_ints) ASSERT_FALSE(BfFind(*from_protobuf, missing));
+
+  RpcController controller_2;
+  BloomFilter::ToProtobuf(nullptr, &controller_2, &to_protobuf);
+  EXPECT_EQ(to_protobuf.always_true(), true);
 }
 
-TEST_F(BloomFilterTest, ThriftOr) {
+TEST_F(BloomFilterTest, ProtobufOr) {
   BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
   BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
 
@@ -340,7 +378,15 @@
   for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
 
   bool success;
-  BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
+  BloomFilterPB protobuf;
+  std::string directory;
+  int64_t directory_size =
+      BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01));
+
+  BfUnion(*bf1, *bf2, directory_size, &success, &protobuf, &directory);
+
+  BloomFilter* bf3 = CreateBloomFilter(protobuf, directory);
+
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
@@ -348,8 +394,10 @@
   // Insert another value to aggregated BloomFilter.
   for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
 
-  // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
-  BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
+  // Apply BloomFilterPB back to BloomFilter and verify if aggregation was correct.
+  BfUnion(*bf1, *bf3, directory_size, &success, &protobuf, &directory);
+  BloomFilter* bf4 = CreateBloomFilter(protobuf, directory);
+
   ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
   for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
   for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 0a2f8fc..c41c81b 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,6 +17,8 @@
 
 #include "util/bloom-filter.h"
 
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 
@@ -55,12 +57,13 @@
   return Status::OK();
 }
 
-Status BloomFilter::Init(const TBloomFilter& thrift) {
-  RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
-  if (directory_ != nullptr && !thrift.always_false) {
+Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+    size_t directory_in_size) {
+  RETURN_IF_ERROR(Init(protobuf.log_bufferpool_space()));
+  if (directory_ != nullptr && !protobuf.always_false()) {
     always_false_ = false;
-    DCHECK_EQ(thrift.directory.size(), directory_size());
-    memcpy(directory_, &thrift.directory[0], thrift.directory.size());
+    DCHECK_EQ(directory_in_size, directory_size());
+    memcpy(directory_, directory_in, directory_in_size);
   }
   return Status::OK();
 }
@@ -73,32 +76,66 @@
   }
 }
 
-void BloomFilter::ToThrift(TBloomFilter* thrift) const {
-  thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
-  if (always_false_) {
-    thrift->always_false = true;
-    thrift->always_true = false;
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+    kudu::rpc::RpcController* controller, const char* directory,
+    unsigned long directory_size) {
+  DCHECK(rpc_params != nullptr);
+  DCHECK(!rpc_params->always_false());
+  DCHECK(!rpc_params->always_true());
+  kudu::Slice dir_slice(directory, directory_size);
+  unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+      kudu::rpc::RpcSidecar::FromSlice(dir_slice);
+
+  int sidecar_idx = -1;
+  kudu::Status sidecar_status =
+      controller->AddOutboundSidecar(std::move(rpc_sidecar), &sidecar_idx);
+  if (!sidecar_status.ok()) {
+    LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString();
+    // If AddOutboundSidecar() fails, we 'disable' the BloomFilterPB by setting it to
+    // an always true filter.
+    rpc_params->set_always_false(false);
+    rpc_params->set_always_true(true);
     return;
   }
-  thrift->directory.assign(reinterpret_cast<const char*>(directory_),
-      static_cast<unsigned long>(directory_size()));
-  thrift->always_false = false;
-  thrift->always_true = false;
+  rpc_params->set_directory_sidecar_idx(sidecar_idx);
+  rpc_params->set_always_false(false);
+  rpc_params->set_always_true(false);
 }
 
-void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
-  DCHECK(thrift != nullptr);
-  if (filter == nullptr) {
-    thrift->always_true = true;
-    DCHECK_EQ(thrift->always_false, false);
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+    kudu::rpc::RpcController* controller, const string& directory) {
+      AddDirectorySidecar(rpc_params, controller,
+      reinterpret_cast<const char*>(&(directory[0])),
+      static_cast<unsigned long>(directory.size()));
+}
+
+void BloomFilter::ToProtobuf(
+    BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
+  protobuf->set_log_bufferpool_space(log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
+  if (always_false_) {
+    protobuf->set_always_false(true);
+    protobuf->set_always_true(false);
     return;
   }
-  filter->ToThrift(thrift);
+  BloomFilter::AddDirectorySidecar(protobuf, controller,
+      reinterpret_cast<const char*>(directory_),
+      static_cast<unsigned long>(directory_size()));
+}
+
+void BloomFilter::ToProtobuf(const BloomFilter* filter,
+    kudu::rpc::RpcController* controller, BloomFilterPB* protobuf) {
+  DCHECK(protobuf != nullptr);
+  // If filter == nullptr, then this BloomFilter is an always true filter.
+  if (filter == nullptr) {
+    protobuf->set_always_true(true);
+    DCHECK(!protobuf->always_false());
+    return;
+  }
+  filter->ToProtobuf(protobuf, controller);
 }
 
 // The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
 // compile with -fno-strict-aliasing.
-
 void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
   // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
   // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
@@ -184,20 +221,17 @@
 }
 } //namespace
 
-void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
+void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
+    BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) {
   DCHECK(out != nullptr);
   DCHECK(&in != out);
   // These cases are impossible in current code. If they become possible in the future,
   // memory usage should be tracked accordingly.
-  DCHECK(!out->always_false);
-  DCHECK(!out->always_true);
-  DCHECK(!in.always_true);
-  if (in.always_false) return;
-  DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
-  DCHECK_EQ(in.directory.size(), out->directory.size())
-      << "Equal log heap space " << in.log_bufferpool_space
-      << ", but different directory sizes: " << in.directory.size() << ", "
-      << out->directory.size();
+  DCHECK(!out->always_false());
+  DCHECK(!out->always_true());
+  DCHECK(!in.always_true());
+  if (in.always_false()) return;
+  DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space());
   // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
   // written in a way that is very friendly to auto-vectorization. Instead, we manually
   // vectorize, increasing the speed by up to 56x.
@@ -205,13 +239,14 @@
   // TODO: Tune gcc flags to auto-vectorize the trivial loop instead of hand-vectorizing
   // it. This might not be possible.
   if (CpuInfo::IsSupported(CpuInfo::AVX)) {
-    OrEqualArrayAvx(in.directory.size(), &in.directory[0], &out->directory[0]);
+    OrEqualArrayAvx(directory_size, reinterpret_cast<const char*>(directory_in),
+        reinterpret_cast<char*>(directory_out));
   } else {
-    const __m128i* simd_in = reinterpret_cast<const __m128i*>(&in.directory[0]);
+    const __m128i* simd_in = reinterpret_cast<const __m128i*>(directory_in);
     const __m128i* const simd_in_end =
-        reinterpret_cast<const __m128i*>(&in.directory[0] + in.directory.size());
-    __m128i* simd_out = reinterpret_cast<__m128i*>(&out->directory[0]);
-    // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+        reinterpret_cast<const __m128i*>(directory_in + directory_size);
+    __m128i* simd_out = reinterpret_cast<__m128i*>(directory_out);
+    // directory_in has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
     // == 16, we can do two _mm_or_si128's in each iteration without checking array
     // bounds.
     while (simd_in != simd_in_end) {
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 73cb01e..9c22120 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -27,11 +27,35 @@
 
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
 #include "gutil/macros.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 
+namespace kudu {
+namespace rpc {
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+class BloomFilter;
+} // namespace impala
+
+// Need this forward declaration since we make bloom_filter_test_util::BfUnion() a friend
+// function.
+namespace bloom_filter_test_util {
+void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
+    int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
+    std::string* directory);
+} // namespace bloom_filter_test_util
+
+// Need this forward declaration since we make either::TestData a friend struct.
+namespace either {
+struct TestData;
+} // namespace either
+
 namespace impala {
 
 /// A BloomFilter stores sets of items and offers a query operation indicating whether or
@@ -66,17 +90,21 @@
 
   /// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
   /// calls to Insert() and Find() should only be done between the calls to Init() and
-  /// Close().Init and Close are safe to call multiple times.
+  /// Close(). Init and Close are safe to call multiple times.
   Status Init(const int log_bufferpool_space);
-  Status Init(const TBloomFilter& thrift);
+  Status Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+      size_t directory_in_size);
   void Close();
 
   /// Representation of a filter which allows all elements to pass.
   static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
 
-  /// Converts 'filter' to its corresponding Thrift representation. If the first argument
-  /// is NULL, it is interpreted as a complete filter which contains all elements.
-  static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
+  /// Converts 'filter' to its corresponding Protobuf representation.
+  /// If the first argument is NULL, it is interpreted as a complete filter which
+  /// contains all elements.
+  /// Also sets a sidecar on 'controller' containing the Bloom filter's directory.
+  static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
+      BloomFilterPB* protobuf);
 
   bool AlwaysFalse() const { return always_false_; }
 
@@ -91,8 +119,12 @@
   /// high probabilty) if it is not.
   bool Find(const uint32_t hash) const noexcept;
 
-  /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
-  static void Or(const TBloomFilter& in, TBloomFilter* out);
+  /// This function computes the logical OR of 'directory_in' with 'directory_out'
+  /// and stores the result in 'directory_out'.
+  /// Additional checks are also performed to make sure the related fields of
+  /// 'in' and 'out' are well-defined.
+  static void Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out,
+      uint8_t* directory_out, size_t directory_size);
 
   /// As more distinct items are inserted into a BloomFilter, the false positive rate
   /// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
@@ -119,6 +151,20 @@
     return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
   }
 
+  /// The following two functions set a sidecar on 'controller' containing the Bloom
+  /// filter's directory. Two interfaces are provided because this function may be called
+  /// in different contexts depending on whether or not the caller has access to an
+  /// instantiated BloomFilter. It is also required that 'rpc_params' is neither an
+  /// always false nor an always true Bloom filter when calling this function. Moreover,
+  /// since we directly pass the reference to Bloom filter's directory when instantiating
+  /// the corresponding RpcSidecar, we have to make sure that 'directory' is alive until
+  /// the RPC is done.
+  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+      kudu::rpc::RpcController* controller, const char* directory,
+      unsigned long directory_size);
+  static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+      kudu::rpc::RpcController* controller, const string& directory);
+
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
   bool always_false_ = true;
@@ -182,8 +228,8 @@
     return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
   }
 
-  /// Serializes this filter as Thrift.
-  void ToThrift(TBloomFilter* thrift) const;
+  /// Serializes this filter as Protobuf.
+  void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
 
 /// Some constants used in hashing. #defined for efficiency reasons.
 #define IMPALA_BLOOM_HASH_CONSTANTS                                             \
@@ -196,6 +242,23 @@
       __attribute__((aligned(32))) = {IMPALA_BLOOM_HASH_CONSTANTS};
 
   DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+
+  /// List 'BloomFilterTest_Protobuf_Test' as a friend class to run the backend
+  /// test in 'bloom-filter-test.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend class BloomFilterTest_Protobuf_Test;
+
+  /// List 'bloom_filter_test_util::BfUnion()' as a friend function to run the backend
+  /// test in 'bloom-filter-test.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend void bloom_filter_test_util::BfUnion(const impala::BloomFilter& x,
+      const impala::BloomFilter& y, int64_t directory_size, bool* success,
+      impala::BloomFilterPB* protobuf, std::string* directory);
+
+  /// List 'either::Test' as a friend struct to run the benchmark in
+  /// 'bloom-filter-benchmark.cc' since it has to access the private field of
+  /// 'directory_' in BloomFilter.
+  friend struct either::TestData;
 };
 
 // To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index a75ca1a..a5c0667 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -18,6 +18,7 @@
 #include "testutil/gtest-util.h"
 #include "util/min-max-filter.h"
 
+#include "gen-cpp/data_stream_service.pb.h"
 #include "runtime/decimal-value.h"
 #include "runtime/decimal-value.inline.h"
 #include "runtime/string-value.inline.h"
@@ -50,15 +51,15 @@
   EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_bool_val(false);
-  tFilter1.max.__set_bool_val(true);
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_bool_val(false);
-  tFilter2.max.__set_bool_val(false);
-  MinMaxFilter::Or(tFilter1, &tFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
-  EXPECT_FALSE(tFilter2.min.bool_val);
-  EXPECT_TRUE(tFilter2.max.bool_val);
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_bool_val(false);
+  pFilter1.mutable_max()->set_bool_val(true);
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_bool_val(false);
+  pFilter2.mutable_max()->set_bool_val(false);
+  MinMaxFilter::Or(pFilter1, &pFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
+  EXPECT_FALSE(pFilter2.min().bool_val());
+  EXPECT_TRUE(pFilter2.max().bool_val());
 
   filter->Close();
 }
@@ -84,14 +85,14 @@
   // Test the behavior of an empty filter.
   EXPECT_TRUE(int_filter->AlwaysFalse());
   EXPECT_FALSE(int_filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  int_filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_FALSE(tFilter.min.__isset.int_val);
-  EXPECT_FALSE(tFilter.max.__isset.int_val);
+  MinMaxFilterPB pFilter;
+  int_filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_FALSE(pFilter.min().has_int_val());
+  EXPECT_FALSE(pFilter.max().has_int_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -109,25 +110,25 @@
   int_filter->Insert(&i4);
   CheckIntVals(int_filter, i4, i2);
 
-  int_filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.int_val, i4);
-  EXPECT_EQ(tFilter.max.int_val, i2);
+  int_filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().int_val(), i4);
+  EXPECT_EQ(pFilter.max().int_val(), i2);
   MinMaxFilter* int_filter2 =
-      MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
   CheckIntVals(int_filter2, i4, i2);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_int_val(4);
-  tFilter1.max.__set_int_val(8);
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_int_val(2);
-  tFilter2.max.__set_int_val(7);
-  MinMaxFilter::Or(tFilter1, &tFilter2, int_type);
-  EXPECT_EQ(tFilter2.min.int_val, 2);
-  EXPECT_EQ(tFilter2.max.int_val, 8);
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_int_val(4);
+  pFilter1.mutable_max()->set_int_val(8);
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_int_val(2);
+  pFilter2.mutable_max()->set_int_val(7);
+  MinMaxFilter::Or(pFilter1, &pFilter2, int_type);
+  EXPECT_EQ(pFilter2.min().int_val(), 2);
+  EXPECT_EQ(pFilter2.max().int_val(), 8);
 
   int_filter->Close();
   empty_filter->Close();
@@ -162,13 +163,13 @@
   filter->MaterializeValues();
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
+  MinMaxFilterPB pFilter;
+  filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
 
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -191,11 +192,11 @@
   filter->MaterializeValues();
   CheckStringVals(filter, c, d);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.string_val, c);
-  EXPECT_EQ(tFilter.max.string_val, d);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().string_val(), c);
+  EXPECT_EQ(pFilter.max().string_val(), d);
 
   // Test that strings longer than 1024 are truncated.
   string b1030(1030, 'b');
@@ -227,14 +228,14 @@
   for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
   CheckStringVals(filter, b1024, truncTrailMaxChar);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(tFilter.min.string_val, b1024);
-  EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(pFilter.min().string_val(), b1024);
+  EXPECT_EQ(pFilter.max().string_val(), truncTrailMaxChar);
 
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   CheckStringVals(filter2, b1024, truncTrailMaxChar);
 
   // Check that if the entire string is the max char and therefore after truncating for
@@ -249,12 +250,12 @@
   filter->Insert(&cVal);
   EXPECT_TRUE(filter->AlwaysTrue());
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_TRUE(tFilter.always_true);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_TRUE(pFilter.always_true());
 
   MinMaxFilter* always_true_filter =
-      MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
   EXPECT_FALSE(always_true_filter->AlwaysFalse());
   EXPECT_TRUE(always_true_filter->AlwaysTrue());
 
@@ -276,20 +277,20 @@
   limit_filter->MaterializeValues();
   EXPECT_TRUE(limit_filter->AlwaysTrue());
 
-  limit_filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_TRUE(tFilter.always_true);
+  limit_filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_TRUE(pFilter.always_true());
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  tFilter1.min.__set_string_val("a");
-  tFilter1.max.__set_string_val("d");
-  TMinMaxFilter tFilter2;
-  tFilter2.min.__set_string_val("b");
-  tFilter2.max.__set_string_val("e");
-  MinMaxFilter::Or(tFilter1, &tFilter2, string_type);
-  EXPECT_EQ(tFilter2.min.string_val, "a");
-  EXPECT_EQ(tFilter2.max.string_val, "e");
+  MinMaxFilterPB pFilter1;
+  pFilter1.mutable_min()->set_string_val("a");
+  pFilter1.mutable_max()->set_string_val("d");
+  MinMaxFilterPB pFilter2;
+  pFilter2.mutable_min()->set_string_val("b");
+  pFilter2.mutable_max()->set_string_val("e");
+  MinMaxFilter::Or(pFilter1, &pFilter2, string_type);
+  EXPECT_EQ(pFilter2.min().string_val(), "a");
+  EXPECT_EQ(pFilter2.max().string_val(), "e");
 
   filter->Close();
   empty_filter->Close();
@@ -317,14 +318,14 @@
   // Test the behavior of an empty filter.
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  TMinMaxFilter tFilter;
-  filter->ToThrift(&tFilter);
-  EXPECT_TRUE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
-  EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
+  MinMaxFilterPB pFilter;
+  filter->ToProtobuf(&pFilter);
+  EXPECT_TRUE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_FALSE(pFilter.min().has_timestamp_val());
+  EXPECT_FALSE(pFilter.max().has_timestamp_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
 
@@ -342,25 +343,25 @@
   filter->Insert(&t4);
   CheckTimestampVals(filter, t2, t3);
 
-  filter->ToThrift(&tFilter);
-  EXPECT_FALSE(tFilter.always_false);
-  EXPECT_FALSE(tFilter.always_true);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
+  filter->ToProtobuf(&pFilter);
+  EXPECT_FALSE(pFilter.always_false());
+  EXPECT_FALSE(pFilter.always_true());
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.min()), t2);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.max()), t3);
   MinMaxFilter* filter2 =
-      MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+      MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
   CheckTimestampVals(filter2, t2, t3);
 
   // Check the behavior of Or.
-  TMinMaxFilter tFilter1;
-  t2.ToTColumnValue(&tFilter1.min);
-  t4.ToTColumnValue(&tFilter1.max);
-  TMinMaxFilter tFilter2;
-  t1.ToTColumnValue(&tFilter2.min);
-  t3.ToTColumnValue(&tFilter2.max);
-  MinMaxFilter::Or(tFilter1, &tFilter2, timestamp_type);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
-  EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+  MinMaxFilterPB pFilter1;
+  t2.ToColumnValuePB(pFilter1.mutable_min());
+  t4.ToColumnValuePB(pFilter1.mutable_max());
+  MinMaxFilterPB pFilter2;
+  t1.ToColumnValuePB(pFilter2.mutable_min());
+  t3.ToColumnValuePB(pFilter2.mutable_max());
+  MinMaxFilter::Or(pFilter1, &pFilter2, timestamp_type);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.min()), t2);
+  EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.max()), t3);
 
   filter->Close();
   empty_filter->Close();
@@ -391,16 +392,16 @@
 }
 
 void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type,
-    TMinMaxFilter* tFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
+    MinMaxFilterPB* pFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
   EXPECT_TRUE(filter->AlwaysFalse());
   EXPECT_FALSE(filter->AlwaysTrue());
-  filter->ToThrift(tFilter);
-  EXPECT_TRUE(tFilter->always_false);
-  EXPECT_FALSE(tFilter->always_true);
-  EXPECT_FALSE(tFilter->min.__isset.decimal_val);
-  EXPECT_FALSE(tFilter->max.__isset.decimal_val);
+  filter->ToProtobuf(pFilter);
+  EXPECT_TRUE(pFilter->always_false());
+  EXPECT_FALSE(pFilter->always_true());
+  EXPECT_FALSE(pFilter->min().has_decimal_val());
+  EXPECT_FALSE(pFilter->max().has_decimal_val());
   MinMaxFilter* empty_filter =
-      MinMaxFilter::Create(*tFilter, column_type, obj_pool, mem_tracker);
+      MinMaxFilter::Create(*pFilter, column_type, obj_pool, mem_tracker);
   EXPECT_TRUE(empty_filter->AlwaysFalse());
   EXPECT_FALSE(empty_filter->AlwaysTrue());
   empty_filter->Close();
@@ -427,30 +428,30 @@
     CheckDecimalVals(filter##SIZE, d3##SIZE, d2##SIZE);                              \
   } while (false)
 
-#define DECIMAL_CHECK_THRIFT(SIZE)                                                  \
-  do {                                                                              \
-    filter##SIZE->ToThrift(&tFilter##SIZE);                                         \
-    EXPECT_FALSE(tFilter##SIZE.always_false);                                       \
-    EXPECT_FALSE(tFilter##SIZE.always_true);                                        \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.min), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.max), d2##SIZE); \
-    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                           \
-        tFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);              \
-    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                          \
-    filter##SIZE##2->Close();                                                       \
+#define DECIMAL_CHECK_PROTOBUF(SIZE)                                                   \
+  do {                                                                                 \
+    filter##SIZE->ToProtobuf(&pFilter##SIZE);                                          \
+    EXPECT_FALSE(pFilter##SIZE.always_false());                                        \
+    EXPECT_FALSE(pFilter##SIZE.always_true());                                         \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.min()), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.max()), d2##SIZE); \
+    MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create(                              \
+        pFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker);                 \
+    CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE);                             \
+    filter##SIZE##2->Close();                                                          \
   } while (false)
 
-#define DECIMAL_CHECK_OR(SIZE)                                                       \
-  do {                                                                               \
-    TMinMaxFilter tFilter1##SIZE;                                                    \
-    d3##SIZE.ToTColumnValue(&tFilter1##SIZE.min);                                    \
-    d2##SIZE.ToTColumnValue(&tFilter1##SIZE.max);                                    \
-    TMinMaxFilter tFilter2##SIZE;                                                    \
-    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.min);                                    \
-    d1##SIZE.ToTColumnValue(&tFilter2##SIZE.max);                                    \
-    MinMaxFilter::Or(tFilter1##SIZE, &tFilter2##SIZE, decimal##SIZE##_type);         \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.min), d3##SIZE); \
-    EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.max), d2##SIZE); \
+#define DECIMAL_CHECK_OR(SIZE)                                                          \
+  do {                                                                                  \
+    MinMaxFilterPB pFilter1##SIZE;                                                      \
+    d3##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_min());                             \
+    d2##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_max());                             \
+    MinMaxFilterPB pFilter2##SIZE;                                                      \
+    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_min());                             \
+    d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_max());                             \
+    MinMaxFilter::Or(pFilter1##SIZE, &pFilter2##SIZE, decimal##SIZE##_type);            \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.min()), d3##SIZE); \
+    EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.max()), d2##SIZE); \
   } while (false)
 
 // Tests that a DecimalMinMaxFilter returns the expected min/max after having values
@@ -475,13 +476,13 @@
   MinMaxFilter* filter8 = MinMaxFilter::Create(decimal8_type, &obj_pool, &mem_tracker);
   MinMaxFilter* filter16 = MinMaxFilter::Create(decimal16_type, &obj_pool, &mem_tracker);
 
-  // Create thrift minmax filters
-  TMinMaxFilter tFilter4, tFilter8, tFilter16;
+  // Create protobuf minmax filters
+  MinMaxFilterPB pFilter4, pFilter8, pFilter16;
 
   // Test the behavior of an empty filter.
-  CheckDecimalEmptyFilter(filter4, decimal4_type, &tFilter4, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter8, decimal8_type, &tFilter8, &obj_pool, &mem_tracker);
-  CheckDecimalEmptyFilter(filter16, decimal16_type, &tFilter16, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter4, decimal4_type, &pFilter4, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter8, decimal8_type, &pFilter8, &obj_pool, &mem_tracker);
+  CheckDecimalEmptyFilter(filter16, decimal16_type, &pFilter16, &obj_pool, &mem_tracker);
 
   // Insert and check
   DECIMAL_INSERT_AND_CHECK(4, 9, 5, 2345.67891, 3456.78912, 1234.56789);
@@ -490,10 +491,10 @@
   DECIMAL_INSERT_AND_CHECK(16, 38, 19, 2345678912345678912.2345678912345678912,
       3456789123456789123.3456789123456789123, 1234567891234567891.1234567891234567891);
 
-  // Thrift check
-  DECIMAL_CHECK_THRIFT(4);
-  DECIMAL_CHECK_THRIFT(8);
-  DECIMAL_CHECK_THRIFT(16);
+  // Protobuf check
+  DECIMAL_CHECK_PROTOBUF(4);
+  DECIMAL_CHECK_PROTOBUF(8);
+  DECIMAL_CHECK_PROTOBUF(16);
 
   // Check the behavior of Or.
   DECIMAL_CHECK_OR(4);
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index c6da060..dd9b351 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -82,61 +82,55 @@
   }
 }
 
-#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE)  \
-  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                            \
-      "class.impala::" #NAME "MinMaxFilter";                                   \
-  NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) {        \
-    DCHECK(!thrift.always_true);                                               \
-    if (thrift.always_false) {                                                 \
-      min_ = numeric_limits<TYPE>::max();                                      \
-      max_ = numeric_limits<TYPE>::lowest();                                   \
-    } else {                                                                   \
-      DCHECK(thrift.__isset.min);                                              \
-      DCHECK(thrift.__isset.max);                                              \
-      DCHECK(thrift.min.__isset.THRIFT_TYPE##_val);                            \
-      DCHECK(thrift.max.__isset.THRIFT_TYPE##_val);                            \
-      min_ = thrift.min.THRIFT_TYPE##_val;                                     \
-      max_ = thrift.max.THRIFT_TYPE##_val;                                     \
-    }                                                                          \
-  }                                                                            \
-  PrimitiveType NAME##MinMaxFilter::type() {                                   \
-    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                               \
-  }                                                                            \
-  void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {             \
-    if (!AlwaysFalse()) {                                                      \
-      thrift->min.__set_##THRIFT_TYPE##_val(min_);                             \
-      thrift->__isset.min = true;                                              \
-      thrift->max.__set_##THRIFT_TYPE##_val(max_);                             \
-      thrift->__isset.max = true;                                              \
-    }                                                                          \
-    thrift->__set_always_false(AlwaysFalse());                                 \
-    thrift->__set_always_true(false);                                          \
-  }                                                                            \
-  string NAME##MinMaxFilter::DebugString() const {                             \
-    stringstream out;                                                          \
-    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_            \
-        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";     \
-    return out.str();                                                          \
-  }                                                                            \
-  void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {   \
-    if (out->always_false) {                                                   \
-      out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);            \
-      out->__isset.min = true;                                                 \
-      out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);            \
-      out->__isset.max = true;                                                 \
-      out->__set_always_false(false);                                          \
-    } else {                                                                   \
-      out->min.__set_##THRIFT_TYPE##_val(                                      \
-          std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val));     \
-      out->max.__set_##THRIFT_TYPE##_val(                                      \
-          std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val));     \
-    }                                                                          \
-  }                                                                            \
-  void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
-    out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val);              \
-    out->__isset.min = true;                                                   \
-    out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val);              \
-    out->__isset.max = true;                                                   \
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE)        \
+  const char* NAME##MinMaxFilter::LLVM_CLASS_NAME =                                    \
+      "class.impala::" #NAME "MinMaxFilter";                                           \
+  NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) {             \
+    DCHECK(!protobuf.always_true());                                                   \
+    if (protobuf.always_false()) {                                                     \
+      min_ = numeric_limits<TYPE>::max();                                              \
+      max_ = numeric_limits<TYPE>::lowest();                                           \
+    } else {                                                                           \
+      DCHECK(protobuf.has_min());                                                      \
+      DCHECK(protobuf.has_max());                                                      \
+      DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val());                              \
+      DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val());                              \
+      min_ = protobuf.min().PROTOBUF_TYPE##_val();                                     \
+      max_ = protobuf.max().PROTOBUF_TYPE##_val();                                     \
+    }                                                                                  \
+  }                                                                                    \
+  PrimitiveType NAME##MinMaxFilter::type() {                                           \
+    return PrimitiveType::TYPE_##PRIMITIVE_TYPE;                                       \
+  }                                                                                    \
+  void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {                \
+    if (!AlwaysFalse()) {                                                              \
+      protobuf->mutable_min()->set_##PROTOBUF_TYPE##_val(min_);                        \
+      protobuf->mutable_max()->set_##PROTOBUF_TYPE##_val(max_);                        \
+    }                                                                                  \
+    protobuf->set_always_false(AlwaysFalse());                                         \
+    protobuf->set_always_true(false);                                                  \
+  }                                                                                    \
+  string NAME##MinMaxFilter::DebugString() const {                                     \
+    stringstream out;                                                                  \
+    out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_                    \
+        << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")";             \
+    return out.str();                                                                  \
+  }                                                                                    \
+  void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {         \
+    if (out->always_false()) {                                                         \
+      out->mutable_min()->set_bool_val(in.min().PROTOBUF_TYPE##_val());                \
+      out->mutable_max()->set_bool_val(in.max().PROTOBUF_TYPE##_val());                \
+      out->set_always_false(false);                                                    \
+    } else {                                                                           \
+      out->mutable_min()->set_##PROTOBUF_TYPE##_val(                                   \
+          std::min(in.min().PROTOBUF_TYPE##_val(), out->min().PROTOBUF_TYPE##_val())); \
+      out->mutable_max()->set_##PROTOBUF_TYPE##_val(                                   \
+          std::max(in.max().PROTOBUF_TYPE##_val(), out->max().PROTOBUF_TYPE##_val())); \
+    }                                                                                  \
+  }                                                                                    \
+  void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {       \
+    out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val());     \
+    out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val());     \
   }
 
 NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
@@ -222,19 +216,17 @@
 const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
 
 StringMinMaxFilter::StringMinMaxFilter(
-    const TMinMaxFilter& thrift, MemTracker* mem_tracker)
-  : mem_pool_(mem_tracker),
-    min_buffer_(&mem_pool_),
-    max_buffer_(&mem_pool_) {
-  always_false_ = thrift.always_false;
-  always_true_ = thrift.always_true;
+    const MinMaxFilterPB& protobuf, MemTracker* mem_tracker)
+  : mem_pool_(mem_tracker), min_buffer_(&mem_pool_), max_buffer_(&mem_pool_) {
+  always_false_ = protobuf.always_false();
+  always_true_ = protobuf.always_true();
   if (!always_true_ && !always_false_) {
-    DCHECK(thrift.__isset.min);
-    DCHECK(thrift.__isset.max);
-    DCHECK(thrift.min.__isset.string_val);
-    DCHECK(thrift.max.__isset.string_val);
-    min_ = StringValue(thrift.min.string_val);
-    max_ = StringValue(thrift.max.string_val);
+    DCHECK(protobuf.has_min());
+    DCHECK(protobuf.has_max());
+    DCHECK(protobuf.min().has_string_val());
+    DCHECK(protobuf.max().has_string_val());
+    min_ = StringValue(protobuf.min().string_val());
+    max_ = StringValue(protobuf.max().string_val());
     CopyToBuffer(&min_buffer_, &min_, min_.len);
     CopyToBuffer(&max_buffer_, &max_, max_.len);
   }
@@ -277,17 +269,13 @@
   }
 }
 
-void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void StringMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_true_ && !always_false_) {
-    thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
-    thrift->min.__isset.string_val = true;
-    thrift->__isset.min = true;
-    thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
-    thrift->max.__isset.string_val = true;
-    thrift->__isset.max = true;
+    protobuf->mutable_min()->set_string_val(static_cast<char*>(min_.ptr), min_.len);
+    protobuf->mutable_max()->set_string_val(static_cast<char*>(max_.ptr), max_.len);
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(always_true_);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(always_true_);
 }
 
 string StringMinMaxFilter::DebugString() const {
@@ -298,28 +286,26 @@
   return out.str();
 }
 
-void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  if (out->always_false) {
-    out->min.__set_string_val(in.min.string_val);
-    out->__isset.min = true;
-    out->max.__set_string_val(in.max.string_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+void StringMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  if (out->always_false()) {
+    out->mutable_min()->set_string_val(in.min().string_val());
+    out->mutable_max()->set_string_val(in.max().string_val());
+    out->set_always_false(false);
   } else {
-    StringValue in_min_val = StringValue(in.min.string_val);
-    StringValue out_min_val = StringValue(out->min.string_val);
-    if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
-    StringValue in_max_val = StringValue(in.max.string_val);
-    StringValue out_max_val = StringValue(out->max.string_val);
-    if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
+    StringValue in_min_val = StringValue(in.min().string_val());
+    StringValue out_min_val = StringValue(out->min().string_val());
+    if (in_min_val < out_min_val)
+      out->mutable_min()->set_string_val(in.min().string_val());
+    StringValue in_max_val = StringValue(in.max().string_val());
+    StringValue out_max_val = StringValue(out->max().string_val());
+    if (in_max_val > out_max_val)
+      out->mutable_max()->set_string_val(in.max().string_val());
   }
 }
 
-void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_string_val(in.min.string_val);
-  out->__isset.min = true;
-  out->max.__set_string_val(in.max.string_val);
-  out->__isset.max = true;
+void StringMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_string_val(in.min().string_val());
+  out->mutable_max()->set_string_val(in.max().string_val());
 }
 
 void StringMinMaxFilter::CopyToBuffer(
@@ -349,13 +335,13 @@
 const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
     "class.impala::TimestampMinMaxFilter";
 
-TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
-  always_false_ = thrift.always_false;
+TimestampMinMaxFilter::TimestampMinMaxFilter(const MinMaxFilterPB& protobuf) {
+  always_false_ = protobuf.always_false();
   if (!always_false_) {
-    DCHECK(thrift.min.__isset.timestamp_val);
-    DCHECK(thrift.max.__isset.timestamp_val);
-    min_ = TimestampValue::FromTColumnValue(thrift.min);
-    max_ = TimestampValue::FromTColumnValue(thrift.max);
+    DCHECK(protobuf.min().has_timestamp_val());
+    DCHECK(protobuf.max().has_timestamp_val());
+    min_ = TimestampValue::FromColumnValuePB(protobuf.min());
+    max_ = TimestampValue::FromColumnValuePB(protobuf.max());
   }
 }
 
@@ -363,15 +349,13 @@
   return PrimitiveType::TYPE_TIMESTAMP;
 }
 
-void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void TimestampMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_false_) {
-    min_.ToTColumnValue(&thrift->min);
-    thrift->__isset.min = true;
-    max_.ToTColumnValue(&thrift->max);
-    thrift->__isset.max = true;
+    min_.ToColumnValuePB(protobuf->mutable_min());
+    max_.ToColumnValuePB(protobuf->mutable_max());
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(false);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(false);
 }
 
 string TimestampMinMaxFilter::DebugString() const {
@@ -381,45 +365,46 @@
   return out.str();
 }
 
-void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  if (out->always_false) {
-    out->min.__set_timestamp_val(in.min.timestamp_val);
-    out->__isset.min = true;
-    out->max.__set_timestamp_val(in.max.timestamp_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+void TimestampMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  if (out->always_false()) {
+    out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+    out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+    out->set_always_false(false);
   } else {
-    TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
-    TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
-    if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
-    TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
-    TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
-    if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
+    TimestampValue in_min_val = TimestampValue::FromColumnValuePB(in.min());
+    TimestampValue out_min_val = TimestampValue::FromColumnValuePB(out->min());
+    if (in_min_val < out_min_val) {
+      out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+    }
+    TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
+    TimestampValue out_max_val = TimestampValue::FromColumnValuePB(out->max());
+    if (in_max_val > out_max_val) {
+      out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+    }
   }
 }
 
-void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_timestamp_val(in.min.timestamp_val);
-  out->__isset.min = true;
-  out->max.__set_timestamp_val(in.max.timestamp_val);
-  out->__isset.max = true;
+void TimestampMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+  out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
 }
 
 // DECIMAL
 const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxFilter";
-#define DECIMAL_SET_MINMAX(SIZE)                                       \
-  do {                                                                 \
-    DCHECK(thrift.min.__isset.decimal_val);                            \
-    DCHECK(thrift.max.__isset.decimal_val);                            \
-    min##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.min); \
-    max##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.max); \
+#define DECIMAL_SET_MINMAX(SIZE)                                            \
+  do {                                                                      \
+    DCHECK(protobuf.min().has_decimal_val());                               \
+    DCHECK(protobuf.max().has_decimal_val());                               \
+    min##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.min()); \
+    max##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.max()); \
   } while (false)
 
 // Construct the Decimal min-max filter when the min-max filter information
 // comes in through thrift.  This can get called in coordinator, after the filter
 // is sent by executor
-DecimalMinMaxFilter::DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision)
-  : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(thrift.always_false) {
+DecimalMinMaxFilter::DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision)
+  : size_(ColumnType::GetDecimalByteSize(precision)),
+    always_false_(protobuf.always_false()) {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
@@ -441,34 +426,32 @@
   return PrimitiveType::TYPE_DECIMAL;
 }
 
-#define DECIMAL_TO_THRIFT(SIZE)                \
-  do {                                         \
-    min##SIZE##_.ToTColumnValue(&thrift->min); \
-    max##SIZE##_.ToTColumnValue(&thrift->max); \
+#define DECIMAL_TO_PROTOBUF(SIZE)                          \
+  do {                                                     \
+    min##SIZE##_.ToColumnValuePB(protobuf->mutable_min()); \
+    max##SIZE##_.ToColumnValuePB(protobuf->mutable_max()); \
   } while (false)
 
 // Construct a thrift min-max filter.  Will be called by the executor
 // to be sent to the coordinator
-void DecimalMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
   if (!always_false_) {
     switch (size_) {
       case DECIMAL_SIZE_4BYTE:
-        DECIMAL_TO_THRIFT(4);
+        DECIMAL_TO_PROTOBUF(4);
         break;
       case DECIMAL_SIZE_8BYTE:
-        DECIMAL_TO_THRIFT(8);
+        DECIMAL_TO_PROTOBUF(8);
         break;
       case DECIMAL_SIZE_16BYTE:
-        DECIMAL_TO_THRIFT(16);
+        DECIMAL_TO_PROTOBUF(16);
         break;
       default:
         DCHECK(false) << "DecimalMinMaxFilter: Unknown decimal byte size: " << size_;
     }
-    thrift->__isset.min = true;
-    thrift->__isset.max = true;
   }
-  thrift->__set_always_false(always_false_);
-  thrift->__set_always_true(false);
+  protobuf->set_always_false(always_false_);
+  protobuf->set_always_true(false);
 }
 
 void DecimalMinMaxFilter::Insert(void* val) {
@@ -514,25 +497,24 @@
   return out.str();
 }
 
-#define DECIMAL_OR(SIZE)                                    \
-  do {                                                      \
-    if (Decimal##SIZE##Value::FromTColumnValue(in.min)      \
-        < Decimal##SIZE##Value::FromTColumnValue(out->min)) \
-      out->min.__set_decimal_val(in.min.decimal_val);       \
-    if (Decimal##SIZE##Value::FromTColumnValue(in.max)      \
-        > Decimal##SIZE##Value::FromTColumnValue(out->max)) \
-      out->max.__set_decimal_val(in.max.decimal_val);       \
+#define DECIMAL_OR(SIZE)                                           \
+  do {                                                             \
+    if (Decimal##SIZE##Value::FromColumnValuePB(in.min())          \
+        < Decimal##SIZE##Value::FromColumnValuePB(out->min()))     \
+      out->mutable_min()->set_decimal_val(in.min().decimal_val()); \
+    if (Decimal##SIZE##Value::FromColumnValuePB(in.max())          \
+        > Decimal##SIZE##Value::FromColumnValuePB(out->max()))     \
+      out->mutable_max()->set_decimal_val(in.max().decimal_val()); \
   } while (false)
 
-void DecimalMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision) {
-  if (in.always_false) {
+void DecimalMinMaxFilter::Or(
+    const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision) {
+  if (in.always_false()) {
     return;
-  } else if (out->always_false) {
-    out->min.__set_decimal_val(in.min.decimal_val);
-    out->__isset.min = true;
-    out->max.__set_decimal_val(in.max.decimal_val);
-    out->__isset.max = true;
-    out->__set_always_false(false);
+  } else if (out->always_false()) {
+    out->mutable_min()->set_decimal_val(in.min().decimal_val());
+    out->mutable_max()->set_decimal_val(in.max().decimal_val());
+    out->set_always_false(false);
   } else {
     int size = ColumnType::GetDecimalByteSize(precision);
     switch (size) {
@@ -551,11 +533,9 @@
   }
 }
 
-void DecimalMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->min.__set_decimal_val(in.min.decimal_val);
-  out->__isset.min = true;
-  out->max.__set_decimal_val(in.max.decimal_val);
-  out->__isset.max = true;
+void DecimalMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->mutable_min()->set_decimal_val(in.min().decimal_val());
+  out->mutable_max()->set_decimal_val(in.max().decimal_val());
 }
 
 // MinMaxFilter
@@ -595,29 +575,29 @@
   return nullptr;
 }
 
-MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
+MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType type,
     ObjectPool* pool, MemTracker* mem_tracker) {
   switch (type.type) {
     case PrimitiveType::TYPE_BOOLEAN:
-      return pool->Add(new BoolMinMaxFilter(thrift));
+      return pool->Add(new BoolMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_TINYINT:
-      return pool->Add(new TinyIntMinMaxFilter(thrift));
+      return pool->Add(new TinyIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_SMALLINT:
-      return pool->Add(new SmallIntMinMaxFilter(thrift));
+      return pool->Add(new SmallIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_INT:
-      return pool->Add(new IntMinMaxFilter(thrift));
+      return pool->Add(new IntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_BIGINT:
-      return pool->Add(new BigIntMinMaxFilter(thrift));
+      return pool->Add(new BigIntMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_FLOAT:
-      return pool->Add(new FloatMinMaxFilter(thrift));
+      return pool->Add(new FloatMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_DOUBLE:
-      return pool->Add(new DoubleMinMaxFilter(thrift));
+      return pool->Add(new DoubleMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_STRING:
-      return pool->Add(new StringMinMaxFilter(thrift, mem_tracker));
+      return pool->Add(new StringMinMaxFilter(protobuf, mem_tracker));
     case PrimitiveType::TYPE_TIMESTAMP:
-      return pool->Add(new TimestampMinMaxFilter(thrift));
+      return pool->Add(new TimestampMinMaxFilter(protobuf));
     case PrimitiveType::TYPE_DECIMAL:
-      return pool->Add(new DecimalMinMaxFilter(thrift, type.precision));
+      return pool->Add(new DecimalMinMaxFilter(protobuf, type.precision));
     default:
       DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
   }
@@ -625,93 +605,93 @@
 }
 
 void MinMaxFilter::Or(
-    const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType) {
-  if (in.always_false || out->always_true) return;
-  if (in.always_true) {
-    out->__set_always_true(true);
+    const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType) {
+  if (in.always_false() || out->always_true()) return;
+  if (in.always_true()) {
+    out->set_always_true(true);
     return;
   }
-  if (in.min.__isset.bool_val) {
-    DCHECK(out->min.__isset.bool_val);
+  if (in.min().has_bool_val()) {
+    DCHECK(out->min().has_bool_val());
     BoolMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.byte_val) {
-    DCHECK(out->min.__isset.byte_val);
+  } else if (in.min().has_byte_val()) {
+    DCHECK(out->min().has_byte_val());
     TinyIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.short_val) {
-    DCHECK(out->min.__isset.short_val);
+  } else if (in.min().has_short_val()) {
+    DCHECK(out->min().has_short_val());
     SmallIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.int_val) {
-    DCHECK(out->min.__isset.int_val);
+  } else if (in.min().has_int_val()) {
+    DCHECK(out->min().has_int_val());
     IntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.long_val) {
-    DCHECK(out->min.__isset.long_val);
+  } else if (in.min().has_long_val()) {
+    DCHECK(out->min().has_long_val());
     BigIntMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.double_val) {
+  } else if (in.min().has_double_val()) {
     // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
-    DCHECK(out->min.__isset.double_val);
+    DCHECK(out->min().has_double_val());
     DoubleMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.string_val) {
-    DCHECK(out->min.__isset.string_val);
+  } else if (in.min().has_string_val()) {
+    DCHECK(out->min().has_string_val());
     StringMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.timestamp_val) {
-    DCHECK(out->min.__isset.timestamp_val);
+  } else if (in.min().has_timestamp_val()) {
+    DCHECK(out->min().has_timestamp_val());
     TimestampMinMaxFilter::Or(in, out);
     return;
-  } else if (in.min.__isset.decimal_val) {
-    DCHECK(out->min.__isset.decimal_val);
+  } else if (in.min().has_decimal_val()) {
+    DCHECK(out->min().has_decimal_val());
     DecimalMinMaxFilter::Or(in, out, columnType.precision);
     return;
   }
   DCHECK(false) << "Unsupported MinMaxFilter type.";
 }
 
-void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
-  out->__set_always_false(in.always_false);
-  out->__set_always_true(in.always_true);
-  if (in.always_false || in.always_true) return;
-  if (in.min.__isset.bool_val) {
-    DCHECK(!out->min.__isset.bool_val);
+void MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+  out->set_always_false(in.always_false());
+  out->set_always_true(in.always_true());
+  if (in.always_false() || in.always_true()) return;
+  if (in.min().has_bool_val()) {
+    DCHECK(!out->min().has_bool_val());
     BoolMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.byte_val) {
-    DCHECK(!out->min.__isset.byte_val);
+  } else if (in.min().has_byte_val()) {
+    DCHECK(!out->min().has_byte_val());
     TinyIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.short_val) {
-    DCHECK(!out->min.__isset.short_val);
+  } else if (in.min().has_short_val()) {
+    DCHECK(!out->min().has_short_val());
     SmallIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.int_val) {
-    DCHECK(!out->min.__isset.int_val);
+  } else if (in.min().has_int_val()) {
+    DCHECK(!out->min().has_int_val());
     IntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.long_val) {
-    // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
-    DCHECK(!out->min.__isset.long_val);
+  } else if (in.min().has_long_val()) {
+    // Handles TimestampMinMaxFilter also as ColumnValuePB doesn't have a timestamp type.
+    DCHECK(!out->min().has_long_val());
     BigIntMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.double_val) {
-    // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
-    DCHECK(!out->min.__isset.double_val);
+  } else if (in.min().has_double_val()) {
+    // Handles FloatMinMaxFilter also as ColumnValuePB doesn't have a float type.
+    DCHECK(!out->min().has_double_val());
     DoubleMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.string_val) {
-    DCHECK(!out->min.__isset.string_val);
+  } else if (in.min().has_string_val()) {
+    DCHECK(!out->min().has_string_val());
     StringMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.timestamp_val) {
-    DCHECK(!out->min.__isset.timestamp_val);
+  } else if (in.min().has_timestamp_val()) {
+    DCHECK(!out->min().has_timestamp_val());
     TimestampMinMaxFilter::Copy(in, out);
     return;
-  } else if (in.min.__isset.decimal_val) {
-    DCHECK(!out->min.__isset.decimal_val);
+  } else if (in.min().has_decimal_val()) {
+    DCHECK(!out->min().has_decimal_val());
     DecimalMinMaxFilter::Copy(in, out);
     return;
   }
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 82cf54c..ee08894 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -74,25 +74,25 @@
   /// until this is called.
   virtual void MaterializeValues() {}
 
-  /// Convert this filter to a thrift representation.
-  virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
+  /// Convert this filter to a protobuf representation.
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const = 0;
 
   virtual std::string DebugString() const = 0;
 
   /// Returns a new MinMaxFilter with the given type, allocated from 'mem_tracker'.
   static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemTracker* mem_tracker);
 
-  /// Returns a new MinMaxFilter created from the thrift representation, allocated from
+  /// Returns a new MinMaxFilter created from the protobuf representation, allocated from
   /// 'mem_tracker'.
-  static MinMaxFilter* Create(const TMinMaxFilter& thrift, ColumnType type,
+  static MinMaxFilter* Create(const MinMaxFilterPB& protobuf, ColumnType type,
       ObjectPool* pool, MemTracker* mem_tracker);
 
   /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
   static void Or(
-      const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType);
+      const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType);
 
   /// Copies the contents of 'in' into 'out'.
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Returns the LLVM_CLASS_NAME for the given type.
   static std::string GetLlvmClassName(PrimitiveType type);
@@ -108,7 +108,7 @@
       min_ = std::numeric_limits<TYPE>::max();                                \
       max_ = std::numeric_limits<TYPE>::lowest();                             \
     }                                                                         \
-    NAME##MinMaxFilter(const TMinMaxFilter& thrift);                          \
+    NAME##MinMaxFilter(const MinMaxFilterPB& protobuf);                       \
     virtual ~NAME##MinMaxFilter() {}                                          \
     virtual void* GetMin() override { return &min_; }                         \
     virtual void* GetMax() override { return &max_; }                         \
@@ -121,10 +121,10 @@
       return min_ == std::numeric_limits<TYPE>::max()                         \
           && max_ == std::numeric_limits<TYPE>::lowest();                     \
     }                                                                         \
-    virtual void ToThrift(TMinMaxFilter* thrift) const override;              \
+    virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;         \
     virtual std::string DebugString() const override;                         \
-    static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);              \
-    static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);            \
+    static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);            \
+    static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);          \
     static const char* LLVM_CLASS_NAME;                                       \
                                                                               \
    private:                                                                   \
@@ -148,7 +148,7 @@
       max_buffer_(&mem_pool_),
       always_false_(true),
       always_true_(false) {}
-  StringMinMaxFilter(const TMinMaxFilter& thrift, MemTracker* mem_tracker);
+  StringMinMaxFilter(const MinMaxFilterPB& protobuf, MemTracker* mem_tracker);
   virtual ~StringMinMaxFilter() {}
   virtual void Close() override { mem_pool_.FreeAll(); }
 
@@ -164,11 +164,11 @@
   /// truncating them if necessary.
   virtual void MaterializeValues() override;
 
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -208,7 +208,7 @@
 class TimestampMinMaxFilter : public MinMaxFilter {
  public:
   TimestampMinMaxFilter() { always_false_ = true; }
-  TimestampMinMaxFilter(const TMinMaxFilter& thrift);
+  TimestampMinMaxFilter(const MinMaxFilterPB& protobuf);
   virtual ~TimestampMinMaxFilter() {}
 
   virtual void* GetMin() override { return &min_; }
@@ -218,11 +218,11 @@
   virtual void Insert(void* val) override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   /// Struct name in LLVM IR.
   static const char* LLVM_CLASS_NAME;
@@ -269,7 +269,7 @@
   DecimalMinMaxFilter(int precision)
     : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(true) {}
 
-  DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision);
+  DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision);
   virtual ~DecimalMinMaxFilter() {}
 
   virtual void* GetMin() override {
@@ -286,11 +286,11 @@
   virtual PrimitiveType type() override;
   virtual bool AlwaysTrue() const override { return false; }
   virtual bool AlwaysFalse() const override { return always_false_; }
-  virtual void ToThrift(TMinMaxFilter* thrift) const override;
+  virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
   virtual std::string DebugString() const override;
 
-  static void Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision);
-  static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+  static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision);
+  static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
 
   void Insert4(void* val);
   void Insert8(void* val);
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 0bf2a97..6ec84b4 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -45,3 +45,20 @@
   NONE = 0; // No compression.
   LZ4 = 1;
 }
+
+// This is a union over all possible return types.
+// TODO: if we upgrade to proto3, then we can use the oneof feature in Protobuf 3 in
+// the following to save some memory because only one of the fields below is set at a
+// time.
+message ColumnValuePB {
+  optional bool bool_val = 1;
+  optional int32 byte_val = 6;
+  optional int32 short_val = 7;
+  optional int32 int_val = 2;
+  optional int64 long_val = 3;
+  optional double double_val = 4;
+  optional string string_val = 5;
+  optional string binary_val = 8;
+  optional string timestamp_val = 9;
+  optional bytes decimal_val = 10;
+}
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index b0e2b5d..708b814 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -78,6 +78,77 @@
   optional int64 receiver_latency_ns = 2;
 }
 
+message BloomFilterPB {
+  // Log_2 of the bufferpool space required for this filter.
+  // See BloomFilter::BloomFilter() for details.
+  optional int32 log_bufferpool_space = 1;
+
+  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+  // not meaningful.
+  optional bool always_true = 2;
+  optional bool always_false = 3;
+
+  // The sidecar index associated with the directory of a Bloom filter.
+  // A directory is a list of buckets representing the Bloom Filter contents,
+  // laid out contiguously in one string for efficiency of (de)serialisation.
+  // See BloomFilter::Bucket and BloomFilter::directory_.
+  optional int32 directory_sidecar_idx = 4;
+}
+
+message MinMaxFilterPB {
+  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+  optional bool always_true = 1;
+
+  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+  optional bool always_false = 2;
+
+  optional ColumnValuePB min = 3;
+  optional ColumnValuePB max = 4;
+}
+
+message UpdateFilterParamsPB {
+  // Filter ID, unique within a query.
+  optional int32 filter_id = 1;
+
+  // Query that this filter is for.
+  optional UniqueIdPB query_id = 2;
+
+  optional BloomFilterPB bloom_filter = 3;
+
+  optional MinMaxFilterPB min_max_filter = 4;
+}
+
+message UpdateFilterResultPB {
+  optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
+}
+
+message PublishFilterParamsPB {
+  // Filter ID, unique within a query.
+  optional int32 filter_id = 1;
+
+  // Query that this filter is for.
+  optional UniqueIdPB dst_query_id = 2;
+
+  // Index of fragment to receive this filter
+  optional int32 dst_fragment_idx = 3;
+
+  // Actual bloom_filter payload
+  optional BloomFilterPB bloom_filter = 4;
+
+  // Actual min_max_filter payload
+  optional MinMaxFilterPB min_max_filter = 5;
+}
+
+message PublishFilterResultPB {
+  optional StatusPB status = 1;
+
+  // Latency for response in the receiving daemon in nanoseconds.
+  optional int64 receiver_latency_ns = 2;
+}
+
 // Handles data transmission between fragment instances.
 service DataStreamService {
   // Override the default authorization method.
@@ -90,4 +161,12 @@
 
   // Called by a sender to close the channel between fragment instances.
   rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
+
+  // Called by fragment instances that produce local runtime filters to deliver them to
+  // the coordinator for aggregation and broadcast.
+  rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
+
+  // Called by the coordinator to deliver global runtime filters to fragments for
+  // application at plan nodes.
+  rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 55eb0cf..cbe3f50 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -766,82 +766,6 @@
   11: required i64 max_memory_multiple = 0;
 }
 
-struct TBloomFilter {
-  // Log_2 of the bufferpool space required for this filter.
-  // See BloomFilter::BloomFilter() for details.
-  1: required i32 log_bufferpool_space
-
-  // List of buckets representing the Bloom Filter contents, laid out contiguously in one
-  // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
-  // BloomFilter::directory_.
-  2: binary directory
-
-  // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
-  // not meaningful.
-  3: required bool always_true
-  4: required bool always_false
-}
-
-struct TMinMaxFilter {
-  // If true, filter allows all elements to pass and 'min'/'max' will not be set.
-  1: required bool always_true
-
-  // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
-  2: required bool always_false
-
-  3: optional Data.TColumnValue min
-  4: optional Data.TColumnValue max
-}
-
-// UpdateFilter
-
-struct TUpdateFilterParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // Filter ID, unique within a query.
-  // required in V1
-  2: optional i32 filter_id
-
-  // Query that this filter is for.
-  // required in V1
-  3: optional Types.TUniqueId query_id
-
-  // required in V1
-  4: optional TBloomFilter bloom_filter
-
-  5: optional TMinMaxFilter min_max_filter
-}
-
-struct TUpdateFilterResult {
-}
-
-
-// PublishFilter
-
-struct TPublishFilterParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // Filter ID to update
-  // required in V1
-  2: optional i32 filter_id
-
-  // required in V1
-  3: optional Types.TUniqueId dst_query_id
-
-  // Index of fragment to receive this filter
-  // required in V1
-  4: optional Types.TFragmentIdx dst_fragment_idx
-
-  // Actual bloom_filter payload
-  // required in V1
-  5: optional TBloomFilter bloom_filter
-
-  6: optional TMinMaxFilter min_max_filter
-}
-
-struct TPublishFilterResult {
-}
-
 struct TParseDateStringResult {
   // True iff date string was successfully parsed
   1: required bool valid
@@ -854,11 +778,4 @@
 
 service ImpalaInternalService {
 
-  // Called by fragment instances that produce local runtime filters to deliver them to
-  // the coordinator for aggregation and broadcast.
-  TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
-
-  // Called by the coordinator to deliver global runtime filters to fragments for
-  // application at plan nodes.
-  TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
 }