IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Previously the aggregation and propagation of a runtime filter in Impala is
implemented using Thrift RPC, which suffers from a disadvantage that the number
of connections in a cluster grows with both the number of queries and cluster
size. This patch ports the functions that implement the aggregation and
propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(),
respctively, to KRPC, which requires only one connection per direction between
every pair of hosts, thus reducing the number of connections in a cluster.
In addition, this patch also incorporates KRPC sidecar when the runtime filter
is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the
Bloom filter contents when a Bloom filter is serialized to be transmitted and
hence reduces the serialization overhead. Due to the incorporation of KRPC
sidecar, a SpinLock is also added to prevent a BloomFilter from being
deallocated before its associated KRPC call finishes.
Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are
also modified accordingly because of the changes to the signatures of some
functions in BloomFilter.
Testing:
This patch has passed the exhaustive tests.
Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8
Reviewed-on: http://gerrit.cloudera.org:8080/13882
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/14974
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
diff --git a/be/src/benchmarks/bloom-filter-benchmark.cc b/be/src/benchmarks/bloom-filter-benchmark.cc
index f216911..1e4938d 100644
--- a/be/src/benchmarks/bloom-filter-benchmark.cc
+++ b/be/src/benchmarks/bloom-filter-benchmark.cc
@@ -21,6 +21,7 @@
#include <iostream>
#include <vector>
+#include "kudu/rpc/rpc_controller.h"
#include "runtime/bufferpool/buffer-allocator.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/bufferpool/buffer-pool.h"
@@ -36,6 +37,8 @@
using namespace std;
using namespace impala;
+using kudu::rpc::RpcController;
+
// Tests Bloom filter performance on:
//
// 1. Construct/destruct pairs
@@ -283,18 +286,44 @@
explicit TestData(int log_bufferpool_size, BufferPool::ClientHandle* client) {
BloomFilter bf(client);
CHECK(bf.Init(log_bufferpool_size).ok());
- BloomFilter::ToThrift(&bf, &tbf1);
- BloomFilter::ToThrift(&bf, &tbf2);
+
+ RpcController controller1;
+ RpcController controller2;
+ BloomFilter::ToProtobuf(&bf, &controller1, &pbf1);
+ BloomFilter::ToProtobuf(&bf, &controller2, &pbf2);
+
+ // Need to set 'always_false_' of pbf2 to false because
+ // (i) 'always_false_' of a BloomFilter is set to true when the Bloom filter
+ // hasn't had any elements inserted (since nothing is inserted to the
+ /// BloomFilter bf),
+ // (ii) ToProtobuf() will set 'always_false_' of a BloomFilterPB
+ // to true, and
+ // (iii) Or() will check 'always_false_' of the output BloomFilterPB is not true
+ /// before performing the corresponding bit operations.
+ /// The field 'always_false_' was added by IMPALA-5789, which aims to allow
+ /// an HdfsScanner to early terminate the scan at file and split granularities.
+ pbf2.set_always_false(false);
+
+ int64_t directory_size = BloomFilter::GetExpectedMemoryUsed(log_bufferpool_size);
+ string d1(reinterpret_cast<const char*>(bf.directory_), directory_size);
+ string d2(reinterpret_cast<const char*>(bf.directory_), directory_size);
+
+ directory1 = d1;
+ directory2 = d2;
+
bf.Close();
}
- TBloomFilter tbf1, tbf2;
+ BloomFilterPB pbf1, pbf2;
+ string directory1, directory2;
};
void Benchmark(int batch_size, void* data) {
TestData* d = reinterpret_cast<TestData*>(data);
for (int i = 0; i < batch_size; ++i) {
- BloomFilter::Or(d->tbf1, &d->tbf2);
+ BloomFilter::Or(d->pbf1, reinterpret_cast<const uint8_t*>((d->directory1).data()),
+ &(d->pbf2), reinterpret_cast<uint8_t*>(const_cast<char*>((d->directory2).data())),
+ d->directory1.size());
}
}
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index fee4d39..656bbc3 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -39,29 +39,6 @@
: ImpalaInternalServiceClient(iprot, oprot) {
}
-/// We intentionally disable this clang warning as we intend to hide the
-/// the same-named functions defined in the base class.
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-
- void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params,
- bool* send_done) {
- DCHECK(!*send_done);
- ImpalaInternalServiceClient::send_UpdateFilter(params);
- *send_done = true;
- ImpalaInternalServiceClient::recv_UpdateFilter(_return);
- }
-
- void PublishFilter(TPublishFilterResult& _return, const TPublishFilterParams& params,
- bool* send_done) {
- DCHECK(!*send_done);
- ImpalaInternalServiceClient::send_PublishFilter(params);
- *send_done = true;
- ImpalaInternalServiceClient::recv_PublishFilter(_return);
- }
-
-#pragma clang diagnostic pop
-
};
}
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 26d0de3..fdf44c1 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -29,7 +29,6 @@
#include "util/metrics.h"
#include "util/network-util.h"
#include "rpc/thrift-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
#include "common/names.h"
using namespace apache::thrift;
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 062a97d..72e245e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -23,6 +23,7 @@
#include "exec/exec-node.h"
#include "exec/kudu-util.h"
#include "exec/scan-node.h"
+#include "gen-cpp/data_stream_service.proxy.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/monotime.h"
@@ -37,6 +38,7 @@
#include "runtime/fragment-instance-state.h"
#include "runtime/krpc-data-stream-sender.h"
#include "service/control-service.h"
+#include "service/data-stream-service.h"
#include "util/counting-barrier.h"
#include "util/error-util-internal.h"
#include "util/network-util.h"
@@ -549,21 +551,35 @@
return true;
}
-void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
- DCHECK(rpc_params.dst_query_id == query_id_);
+void Coordinator::BackendState::PublishFilter(
+ const PublishFilterParamsPB& rpc_params, RpcController& controller) {
+ DCHECK_EQ(ProtoToQueryId(rpc_params.dst_query_id()), query_id_);
// If the backend is already done, it's not waiting for this filter, so we skip
// sending it in this case.
if (IsDone()) return;
- if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
+ if (fragments_.count(rpc_params.dst_fragment_idx()) == 0) return;
Status status;
- ImpalaBackendConnection backend_client(
- ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
- if (!status.ok()) return;
- TPublishFilterResult res;
- status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, rpc_params, &res);
- if (!status.ok()) {
- LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail();
+
+ std::unique_ptr<DataStreamServiceProxy> proxy;
+ Status get_proxy_status =
+ DataStreamService::GetProxy(krpc_host_, host_.hostname, &proxy);
+ if (!get_proxy_status.ok()) {
+ // Failing to send a filter is not a query-wide error - the remote fragment will
+ // continue regardless.
+ LOG(ERROR) << "Couldn't get proxy: " << get_proxy_status.msg().msg();
+ return;
+ }
+
+ PublishFilterResultPB res;
+ kudu::Status rpc_status = proxy->PublishFilter(rpc_params, &res, &controller);
+ if (!rpc_status.ok()) {
+ LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString();
+ return;
+ }
+ if (res.status().status_code() != TErrorCode::OK) {
+ LOG(ERROR) << "PublishFilter() operation failed: "
+ << Status(res.status()).GetDetail();
}
}
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9d390e2..fd80b86 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -106,7 +106,8 @@
/// Make a PublishFilter rpc with given params if this backend has instances of the
/// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
- void PublishFilter(const TPublishFilterParams& rpc_params);
+ void PublishFilter(
+ const PublishFilterParamsPB& rpc_params, kudu::rpc::RpcController& controller);
/// Cancel execution at this backend if anything is running. Returns true
/// if cancellation was attempted, false otherwise.
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
index e16abea..55c6b3d 100644
--- a/be/src/runtime/coordinator-filter-state.h
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -61,12 +61,13 @@
FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
: desc_(desc), src_(src) {
// bloom_filter_ is a disjunction so the unit value is always_false.
- bloom_filter_.always_false = true;
- min_max_filter_.always_false = true;
+ bloom_filter_.set_always_false(true);
+ min_max_filter_.set_always_false(true);
}
- TBloomFilter& bloom_filter() { return bloom_filter_; }
- TMinMaxFilter& min_max_filter() { return min_max_filter_; }
+ BloomFilterPB& bloom_filter() { return bloom_filter_; }
+ string& bloom_filter_directory() { return bloom_filter_directory_; }
+ MinMaxFilterPB& min_max_filter() { return min_max_filter_; }
std::vector<FilterTarget>* targets() { return &targets_; }
const std::vector<FilterTarget>& targets() const { return targets_; }
int64_t first_arrival_time() const { return first_arrival_time_; }
@@ -81,16 +82,17 @@
void set_num_producers(int num_producers) { num_producers_ = num_producers; }
bool disabled() const {
if (is_bloom_filter()) {
- return bloom_filter_.always_true;
+ return bloom_filter_.always_true();
} else {
DCHECK(is_min_max_filter());
- return min_max_filter_.always_true;
+ return min_max_filter_.always_true();
}
}
/// Aggregates partitioned join filters and updates memory consumption.
/// Disables filter if always_true filter is received or OOM is hit.
- void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
+ void ApplyUpdate(const UpdateFilterParamsPB& params, Coordinator* coord,
+ kudu::rpc::RpcContext* context);
/// Disables a filter. A disabled filter consumes no memory.
void Disable(MemTracker* tracker);
@@ -110,13 +112,16 @@
int num_producers_ = 0;
/// Filters aggregated from all source plan nodes, to be broadcast to all
- /// destination plan fragment instances. Only set for partitioned joins (broadcast joins
- /// need no aggregation).
+ /// destination plan fragment instances. Only set for partitioned joins (broadcast
+ /// joins need no aggregation).
/// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
/// output structure in the case of a broadcast join. Similarly, for partitioned joins,
/// the filter is moved from the following member to the output structure.
- TBloomFilter bloom_filter_;
- TMinMaxFilter min_max_filter_;
+ BloomFilterPB bloom_filter_;
+ /// When the filter is a Bloom filter, we use this string to store the contents of the
+ /// aggregated Bloom filter.
+ string bloom_filter_directory_;
+ MinMaxFilterPB min_max_filter_;
/// Time at which first local filter arrived.
int64_t first_arrival_time_ = 0L;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 9ac5ee6..48d81f9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -31,8 +31,9 @@
#include "common/hdfs.h"
#include "exec/data-sink.h"
#include "exec/plan-root-sink.h"
-#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-instance-state.h"
#include "runtime/hdfs-fs-cache.h"
@@ -55,6 +56,9 @@
#include "common/names.h"
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
using namespace apache::thrift;
using namespace rapidjson;
using boost::algorithm::iequals;
@@ -1107,7 +1111,7 @@
return result;
}
-void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
+void Coordinator::UpdateFilter(const UpdateFilterParamsPB& params, RpcContext* context) {
shared_lock<shared_mutex> lock(filter_routing_table_->lock);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "UpdateFilter() called although runtime filters are disabled";
@@ -1118,8 +1122,9 @@
DCHECK(filter_routing_table_->is_complete)
<< "Filter received before routing table complete";
- TPublishFilterParams rpc_params;
+ PublishFilterParamsPB rpc_params;
unordered_set<int> target_fragment_idxs;
+ string bloom_filter_directory;
{
lock_guard<SpinLock> l(filter_routing_table_->update_lock);
if (!IsExecuting()) {
@@ -1127,17 +1132,17 @@
<< query_id();
return;
}
- auto it = filter_routing_table_->id_to_filter.find(params.filter_id);
+ auto it = filter_routing_table_->id_to_filter.find(params.filter_id());
if (it == filter_routing_table_->id_to_filter.end()) {
- LOG(INFO) << "Could not find filter with id: " << params.filter_id;
+ LOG(INFO) << "Could not find filter with id: " << params.filter_id();
return;
}
FilterState* state = &it->second;
DCHECK(state->desc().has_remote_targets)
- << "Coordinator received filter that has only local targets";
+ << "Coordinator received filter that has only local targets";
- // Check if the filter has already been sent, which could happen in four cases:
+ // Check if the filter has already been sent, which could happen in five cases:
// * if one local filter had always_true set - no point waiting for other local
// filters that can't affect the aggregated global filter
// * if this is a broadcast join, and another local filter was already received
@@ -1145,6 +1150,7 @@
// immediately.
// * query execution finished and resources were released: filters do not need
// to be processed.
+ // * if the inbound sidecar for Bloom filter cannot be successfully retrieved.
if (state->disabled()) return;
if (filter_updates_received_->value() == 0) {
@@ -1152,7 +1158,7 @@
}
filter_updates_received_->Add(1);
- state->ApplyUpdate(params, this);
+ state->ApplyUpdate(params, this, context);
if (state->pending_count() > 0 && !state->disabled()) return;
// At this point, we either disabled this filter or aggregation is complete.
@@ -1167,33 +1173,36 @@
}
if (state->is_bloom_filter()) {
- // Assign outgoing bloom filter.
- TBloomFilter& aggregated_filter = state->bloom_filter();
-
- swap(rpc_params.bloom_filter, aggregated_filter);
- DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true
- || !rpc_params.bloom_filter.directory.empty());
- DCHECK(aggregated_filter.directory.empty());
- rpc_params.__isset.bloom_filter = true;
+ // Assign an outgoing bloom filter.
+ *rpc_params.mutable_bloom_filter() = state->bloom_filter();
+ bloom_filter_directory.swap(state->bloom_filter_directory());
+ DCHECK(rpc_params.bloom_filter().always_false()
+ || rpc_params.bloom_filter().always_true() || !bloom_filter_directory.empty());
} else {
DCHECK(state->is_min_max_filter());
- MinMaxFilter::Copy(state->min_max_filter(), &rpc_params.min_max_filter);
- rpc_params.__isset.min_max_filter = true;
+ MinMaxFilter::Copy(state->min_max_filter(), rpc_params.mutable_min_max_filter());
}
// Filter is complete, and can be released.
state->Disable(filter_mem_tracker_);
}
- rpc_params.__set_dst_query_id(query_id());
- rpc_params.__set_filter_id(params.filter_id);
+ TUniqueIdToUniqueIdPB(query_id(), rpc_params.mutable_dst_query_id());
+ rpc_params.set_filter_id(params.filter_id());
// Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
for (BackendState* bs: backend_states_) {
for (int fragment_idx: target_fragment_idxs) {
if (!IsExecuting()) goto cleanup;
- rpc_params.__set_dst_fragment_idx(fragment_idx);
- bs->PublishFilter(rpc_params);
+ rpc_params.set_dst_fragment_idx(fragment_idx);
+ RpcController controller;
+ if (rpc_params.has_bloom_filter() && !rpc_params.bloom_filter().always_false()
+ && !rpc_params.bloom_filter().always_true()) {
+ BloomFilter::AddDirectorySidecar(rpc_params.mutable_bloom_filter(), &controller,
+ bloom_filter_directory);
+ }
+ // TODO: make this asynchronous.
+ bs->PublishFilter(rpc_params, controller);
}
}
@@ -1201,13 +1210,13 @@
// For bloom filters, the memory used in the filter_routing_table_ is transfered to
// rpc_params. Hence the Release() function on the filter_mem_tracker_ is called
// here to ensure that the MemTracker is updated after the memory is actually freed.
- if (rpc_params.__isset.bloom_filter) {
- filter_mem_tracker_->Release(rpc_params.bloom_filter.directory.size());
+ if (rpc_params.has_bloom_filter()) {
+ filter_mem_tracker_->Release(bloom_filter_directory.size());
}
}
-void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params,
- Coordinator* coord) {
+void Coordinator::FilterState::ApplyUpdate(
+ const UpdateFilterParamsPB& params, Coordinator* coord, RpcContext* context) {
DCHECK(!disabled());
DCHECK_GT(pending_count_, 0);
DCHECK_EQ(completion_time_, 0L);
@@ -1217,38 +1226,52 @@
--pending_count_;
if (is_bloom_filter()) {
- DCHECK(params.__isset.bloom_filter);
- if (params.bloom_filter.always_true) {
+ DCHECK(params.has_bloom_filter());
+ if (params.bloom_filter().always_true()) {
Disable(coord->filter_mem_tracker_);
- } else if (bloom_filter_.always_false) {
- int64_t heap_space = params.bloom_filter.directory.size();
- if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
- VLOG_QUERY << "Not enough memory to allocate filter: "
- << PrettyPrinter::Print(heap_space, TUnit::BYTES)
- << " (query_id=" << PrintId(coord->query_id()) << ")";
- // Disable, as one missing update means a correct filter cannot be produced.
- Disable(coord->filter_mem_tracker_);
- } else {
- // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to
- // move the payload from the request rather than copy it and take double the
- // memory cost. After this point, params.bloom_filter is an empty filter and
- // should not be read.
- TBloomFilter* non_const_filter = &const_cast<TBloomFilter&>(params.bloom_filter);
- swap(bloom_filter_, *non_const_filter);
- DCHECK_EQ(non_const_filter->directory.size(), 0);
+ } else if (params.bloom_filter().always_false()) {
+ if (!bloom_filter_.has_log_bufferpool_space()) {
+ bloom_filter_ = BloomFilterPB(params.bloom_filter());
}
} else {
- BloomFilter::Or(params.bloom_filter, &bloom_filter_);
+ // If the incoming Bloom filter is neither an always true filter nor an
+ // always false filter, then it must be the case that a non-empty sidecar slice
+ // has been received. Refer to BloomFilter::ToProtobuf() for further details.
+ DCHECK(params.bloom_filter().has_directory_sidecar_idx());
+ kudu::Slice sidecar_slice;
+ kudu::Status status = context->GetInboundSidecar(
+ params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+ if (!status.ok()) {
+ LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString();
+ Disable(coord->filter_mem_tracker_);
+ } else if (bloom_filter_.always_false()) {
+ int64_t heap_space = sidecar_slice.size();
+ if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
+ VLOG_QUERY << "Not enough memory to allocate filter: "
+ << PrettyPrinter::Print(heap_space, TUnit::BYTES)
+ << " (query_id=" << PrintId(coord->query_id()) << ")";
+ // Disable, as one missing update means a correct filter cannot be produced.
+ Disable(coord->filter_mem_tracker_);
+ } else {
+ bloom_filter_ = params.bloom_filter();
+ bloom_filter_directory_ = sidecar_slice.ToString();
+ }
+ } else {
+ DCHECK_EQ(bloom_filter_directory_.size(), sidecar_slice.size());
+ BloomFilter::Or(params.bloom_filter(), sidecar_slice.data(), &bloom_filter_,
+ reinterpret_cast<uint8_t*>(const_cast<char*>(bloom_filter_directory_.data())),
+ sidecar_slice.size());
+ }
}
} else {
DCHECK(is_min_max_filter());
- DCHECK(params.__isset.min_max_filter);
- if (params.min_max_filter.always_true) {
+ DCHECK(params.has_min_max_filter());
+ if (params.min_max_filter().always_true()) {
Disable(coord->filter_mem_tracker_);
- } else if (min_max_filter_.always_false) {
- MinMaxFilter::Copy(params.min_max_filter, &min_max_filter_);
+ } else if (min_max_filter_.always_false()) {
+ MinMaxFilter::Copy(params.min_max_filter(), &min_max_filter_);
} else {
- MinMaxFilter::Or(params.min_max_filter, &min_max_filter_,
+ MinMaxFilter::Or(params.min_max_filter(), &min_max_filter_,
ColumnType::FromThrift(desc_.src_expr.nodes[0].type));
}
}
@@ -1260,15 +1283,15 @@
void Coordinator::FilterState::Disable(MemTracker* tracker) {
if (is_bloom_filter()) {
- bloom_filter_.always_true = true;
- bloom_filter_.always_false = false;
- tracker->Release(bloom_filter_.directory.size());
- bloom_filter_.directory.clear();
- bloom_filter_.directory.shrink_to_fit();
+ bloom_filter_.set_always_true(true);
+ bloom_filter_.set_always_false(false);
+ tracker->Release(bloom_filter_directory_.size());
+ bloom_filter_directory_.clear();
+ bloom_filter_directory_.shrink_to_fit();
} else {
DCHECK(is_min_max_filter());
- min_max_filter_.always_true = true;
- min_max_filter_.always_false = false;
+ min_max_filter_.set_always_true(true);
+ min_max_filter_.set_always_false(false);
}
}
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9ed3324..7b05fde 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -31,12 +31,19 @@
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "runtime/dml-exec-state.h"
#include "util/counting-barrier.h"
#include "util/progress-updater.h"
#include "util/runtime-profile-counters.h"
#include "util/spinlock.h"
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
namespace impala {
class AuxErrorInfoPB;
@@ -172,7 +179,7 @@
/// with others for the same filter ID into a global filter. If all updates for that
/// filter ID have been received (may be 1 or more per filter), broadcast the global
/// filter to fragment instances.
- void UpdateFilter(const TUpdateFilterParams& params);
+ void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
/// Adds to 'document' a serialized array of all backends in a member named
/// 'backend_states'.
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 90bbf48..4ffdb1d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -122,6 +122,12 @@
stream_mgr_->CloseSender(request, response, rpc_context);
}
+ virtual void UpdateFilter(
+ const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
+
+ virtual void PublishFilter(const PublishFilterParamsPB* req,
+ PublishFilterResultPB* resp, RpcContext* context) {}
+
MemTracker* mem_tracker() { return mem_tracker_.get(); }
private:
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index b34cde0..ea2d126 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -22,6 +22,7 @@
#include <ostream>
#include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "runtime/multi-precision.h"
#include "runtime/types.h"
@@ -59,8 +60,8 @@
return FromDouble(t.precision, t.scale, d, round, overflow);
}
- /// Returns a new DecimalValue created from the value in 'tvalue'.
- static inline DecimalValue FromTColumnValue(const TColumnValue& tvalue);
+ /// Returns a new DecimalValue created from the value in 'value_pb'.
+ static inline DecimalValue FromColumnValuePB(const ColumnValuePB& value_pb);
static inline DecimalValue FromDouble(int precision, int scale, double d,
bool round, bool* overflow);
@@ -196,11 +197,10 @@
inline DecimalValue<T> Abs() const;
- /// Store the binary representation of this DecimalValue in 'tvalue'.
- void ToTColumnValue(TColumnValue* tvalue) const {
+ /// Store the binary representation of this DecimalValue in 'value_pb'.
+ void ToColumnValuePB(ColumnValuePB* value_pb) const {
const uint8_t* data = reinterpret_cast<const uint8_t*>(&value_);
- tvalue->decimal_val.assign(data, data + sizeof(T));
- tvalue->__isset.decimal_val = true;
+ value_pb->mutable_decimal_val()->assign(data, data + sizeof(T));
}
private:
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index 6480099..b2d5fc0 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -61,9 +61,9 @@
}
template <typename T>
-inline DecimalValue<T> DecimalValue<T>::FromTColumnValue(const TColumnValue& tvalue) {
+inline DecimalValue<T> DecimalValue<T>::FromColumnValuePB(const ColumnValuePB& value_pb) {
T value = 0;
- memcpy(&value, tvalue.decimal_val.c_str(), tvalue.decimal_val.length());
+ memcpy(&value, value_pb.decimal_val().c_str(), value_pb.decimal_val().length());
return DecimalValue<T>(value);
}
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 7d7fea4..d68c469 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,7 +27,6 @@
#include "common/logging.h"
#include "common/object-pool.h"
#include "exec/kudu-util.h"
-#include "gen-cpp/ImpalaInternalService.h"
#include "kudu/rpc/service_if.h"
#include "rpc/rpc-mgr.h"
#include "runtime/backend-client.h"
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index b74bbe3..9236852 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -32,6 +32,7 @@
#include "exec/hdfs-scan-node-base.h"
#include "exec/exchange-node.h"
#include "exec/scan-node.h"
+#include "kudu/rpc/rpc_context.h"
#include "runtime/exec-env.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
@@ -49,6 +50,7 @@
#include "util/periodic-counter-updater.h"
#include "gen-cpp/ImpalaInternalService_types.h"
+using kudu::rpc::RpcContext;
using namespace impala;
using namespace apache::thrift;
@@ -528,10 +530,11 @@
return opened_promise_.Get();
}
-void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
+void FragmentInstanceState::PublishFilter(
+ const PublishFilterParamsPB& params, RpcContext* context) {
VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
- << " filter_id=" << params.filter_id;
- runtime_state_->filter_bank()->PublishGlobalFilter(params);
+ << " filter_id=" << params.filter_id();
+ runtime_state_->filter_bank()->PublishGlobalFilter(params, context);
}
const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 69e081b..c151457 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -29,6 +29,7 @@
#include "util/promise.h"
#include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
#include "runtime/row-batch.h"
@@ -36,6 +37,12 @@
#include "util/promise.h"
#include "util/runtime-profile.h"
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
namespace impala {
class TPlanFragmentCtx;
@@ -90,7 +97,7 @@
Status WaitForOpen();
/// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
- void PublishFilter(const TPublishFilterParams& params);
+ void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
/// Called periodically by query state thread to get the current status of this fragment
/// instance. The fragment instance's status is stored in 'instance_status' and its
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index e5a804c..678aecd 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -23,6 +23,7 @@
#include "common/thread-debug-info.h"
#include "exec/kudu-util.h"
#include "exprs/expr.h"
+#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/monotime.h"
@@ -40,6 +41,7 @@
#include "runtime/runtime-state.h"
#include "runtime/scanner-mem-limiter.h"
#include "service/control-service.h"
+#include "service/data-stream-service.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
@@ -50,7 +52,6 @@
#include "gen-cpp/control_service.proxy.h"
using kudu::MonoDelta;
-using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
#include "common/names.h"
@@ -675,11 +676,11 @@
for (auto entry: fis_map_) entry.second->Cancel();
}
-void QueryState::PublishFilter(const TPublishFilterParams& params) {
+void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext* context) {
if (!WaitForPrepare().ok()) return;
- DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
- for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
- fis->PublishFilter(params);
+ DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx()), 1);
+ for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx()]) {
+ fis->PublishFilter(params, context);
}
}
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 86156f4..f359f1a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,12 +27,19 @@
#include "common/object-pool.h"
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Types_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
#include "runtime/tmp-file-mgr.h"
#include "util/container-util.h"
#include "util/counting-barrier.h"
#include "util/uid-util.h"
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
namespace impala {
class ControlServiceProxy;
@@ -192,7 +199,7 @@
const TUniqueId& instance_id, FragmentInstanceState** fi_state);
/// Blocks until all fragment instances have finished their Prepare phase.
- void PublishFilter(const TPublishFilterParams& params);
+ void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
/// Cancels all actively executing fragment instances. Blocks until all fragment
/// instances have finished their Prepare phase. Idempotent.
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 56f63aa..21a0bfd 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -17,19 +17,26 @@
#include "runtime/runtime-filter-bank.h"
+#include <chrono>
+
#include <boost/algorithm/string/join.hpp>
#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.proxy.h"
#include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/backend-client.h"
#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
+#include "service/data-stream-service.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
@@ -38,6 +45,9 @@
#include "common/names.h"
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
using namespace impala;
using namespace boost;
using namespace strings;
@@ -102,32 +112,35 @@
return ret;
}
-namespace {
+void RuntimeFilterBank::UpdateFilterCompleteCb(
+ const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
+ const kudu::Status controller_status = rpc_controller->status();
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
- ImpalaBackendClientCache* client_cache) {
- Status status;
- ImpalaBackendConnection coord(client_cache, address, &status);
- if (!status.ok()) {
- // Failing to send a filter is not a query-wide error - the remote fragment will
- // continue regardless.
- // TODO: Retry.
- LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
- return;
+ // In the case of an unsuccessful KRPC call, e.g., request dropped due to
+ // backpressure, we only log this event w/o retrying. Failing to send a
+ // filter is not a query-wide error - the remote fragment will continue
+ // regardless.
+ if (!controller_status.ok()) {
+ LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
}
- TUpdateFilterResult res;
- status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
+ // DataStreamService::UpdateFilter() should never set an error status
+ DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
+ {
+ std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+ DCHECK_GT(num_inflight_rpcs_, 0);
+ --num_inflight_rpcs_;
+ }
+ krpcs_done_cv_.notify_one();
}
void RuntimeFilterBank::UpdateFilterFromLocal(
int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
<< "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
- TUpdateFilterParams params;
+ // This function is only called from ExecNode::Open() or more specifically
+ // PartitionedHashJoinNode::Open().
+ DCHECK(!closed_);
// A runtime filter may have both local and remote targets.
bool has_local_target = false;
bool has_remote_target = false;
@@ -159,64 +172,108 @@
if (has_remote_target
&& state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
- params.__set_filter_id(filter_id);
- params.__set_query_id(state_->query_id());
+ UpdateFilterParamsPB params;
+ // The memory associated with the following 2 objects needs to live until
+ // the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
+ // Hence, we allocate these 2 objects in 'obj_pool_'.
+ UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
+ RpcController* controller = obj_pool_.Add(new RpcController);
+
+ TUniqueIdToUniqueIdPB(state_->query_id(), params.mutable_query_id());
+ params.set_filter_id(filter_id);
if (type == TRuntimeFilterType::BLOOM) {
- BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
- params.__isset.bloom_filter = true;
+ BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
} else {
- DCHECK(type == TRuntimeFilterType::MIN_MAX);
- min_max_filter->ToThrift(¶ms.min_max_filter);
- params.__isset.min_max_filter = true;
+ DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
+ min_max_filter->ToProtobuf(params.mutable_min_max_filter());
+ }
+ const TNetworkAddress& krpc_address = state_->query_ctx().coord_krpc_address;
+ const TNetworkAddress& host_address = state_->query_ctx().coord_address;
+
+ // Use 'proxy' to send the filter to the coordinator.
+ unique_ptr<DataStreamServiceProxy> proxy;
+ Status get_proxy_status =
+ DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
+ if (!get_proxy_status.ok()) {
+ // Failing to send a filter is not a query-wide error - the remote fragment will
+ // continue regardless.
+ LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
+ host_address.hostname, get_proxy_status.msg().msg());
+ return;
}
- ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
- SendFilterToCoordinator, state_->query_ctx().coord_address, params,
- ExecEnv::GetInstance()->impalad_client_cache()));
+ // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
+ // in Close() until all in-flight RPCs complete.
+ {
+ unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
+ DCHECK_GE(num_inflight_rpcs_, 0);
+ ++num_inflight_rpcs_;
+ }
+
+ proxy->UpdateFilterAsync(params, res, controller,
+ boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
}
}
-void RuntimeFilterBank::PublishGlobalFilter(const TPublishFilterParams& params) {
+void RuntimeFilterBank::PublishGlobalFilter(
+ const PublishFilterParamsPB& params, RpcContext* context) {
lock_guard<mutex> l(runtime_filter_lock_);
if (closed_) return;
- RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id);
+ RuntimeFilterMap::iterator it = consumed_filters_.find(params.filter_id());
DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
- << params.filter_id;
+ << params.filter_id();
BloomFilter* bloom_filter = nullptr;
MinMaxFilter* min_max_filter = nullptr;
if (it->second->is_bloom_filter()) {
- DCHECK(params.__isset.bloom_filter);
- if (params.bloom_filter.always_true) {
+ DCHECK(params.has_bloom_filter());
+ if (params.bloom_filter().always_true()) {
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
- int64_t required_space =
- BloomFilter::GetExpectedMemoryUsed(params.bloom_filter.log_bufferpool_space);
+ int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
+ params.bloom_filter().log_bufferpool_space());
DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
<< "BufferPool Client should have enough reservation to fulfill bloom filter "
"allocation";
bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
- Status status = bloom_filter->Init(params.bloom_filter);
- if (!status.ok()) {
- LOG(ERROR) << "Unable to allocate memory for bloom filter: "
- << status.GetDetail();
- bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+
+ kudu::Slice sidecar_slice;
+ if (params.bloom_filter().has_directory_sidecar_idx()) {
+ kudu::Status status = context->GetInboundSidecar(
+ params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+ if (!status.ok()) {
+ LOG(ERROR) << "Failed to get Bloom filter sidecar: "
+ << status.message().ToString();
+ bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+ }
} else {
- bloom_filters_.push_back(bloom_filter);
- DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
- bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+ DCHECK(params.bloom_filter().always_false());
+ }
+
+ if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
+ Status status = bloom_filter->Init(
+ params.bloom_filter(), sidecar_slice.data(), sidecar_slice.size());
+ if (!status.ok()) {
+ LOG(ERROR) << "Unable to allocate memory for bloom filter: "
+ << status.GetDetail();
+ bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+ } else {
+ bloom_filters_.push_back(bloom_filter);
+ DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
+ bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
+ }
}
}
} else {
DCHECK(it->second->is_min_max_filter());
- DCHECK(params.__isset.min_max_filter);
- min_max_filter = MinMaxFilter::Create(
- params.min_max_filter, it->second->type(), &obj_pool_, filter_mem_tracker_.get());
+ DCHECK(params.has_min_max_filter());
+ min_max_filter = MinMaxFilter::Create(params.min_max_filter(), it->second->type(),
+ &obj_pool_, filter_mem_tracker_.get());
min_max_filters_.push_back(min_max_filter);
}
it->second->SetFilter(bloom_filter, min_max_filter);
state_->runtime_profile()->AddInfoString(
- Substitute("Filter $0 arrival", params.filter_id),
+ Substitute("Filter $0 arrival", params.filter_id()),
PrettyPrinter::Print(it->second->arrival_delay_ms(), TUnit::TIME_MS));
}
@@ -278,8 +335,21 @@
}
void RuntimeFilterBank::Close() {
- lock_guard<mutex> l(runtime_filter_lock_);
+ // Wait for all in-flight RPCs to complete before closing the filters.
+ {
+ unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
+ while (num_inflight_rpcs_ > 0) {
+ krpcs_done_cv_.wait(l1);
+ }
+ }
+
+ lock_guard<mutex> l2(runtime_filter_lock_);
CancelLocked();
+ // We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
+ // drain because the async build thread in
+ // BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
+ // Close() is called so there shouldn't be any new RPCs being issued when this function
+ // is called.
closed_ = true;
for (BloomFilter* filter : bloom_filters_) filter->Close();
for (MinMaxFilter* filter : min_max_filters_) filter->Close();
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index 1208f03..78a95cf 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -20,6 +20,7 @@
#include "codegen/impala-ir.h"
#include "common/object-pool.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/mem-pool.h"
#include "runtime/types.h"
@@ -29,6 +30,15 @@
#include <boost/thread/lock_guard.hpp>
#include <boost/unordered_map.hpp>
+#include <condition_variable>
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
namespace impala {
class BloomFilter;
@@ -36,7 +46,6 @@
class MinMaxFilter;
class RuntimeFilter;
class RuntimeState;
-class TBloomFilter;
class TRuntimeFilterDesc;
class TQueryCtx;
@@ -94,7 +103,8 @@
/// Makes a bloom_filter (aggregated globally from all producer fragments) available for
/// consumption by operators that wish to use it for filtering.
- void PublishGlobalFilter(const TPublishFilterParams& params);
+ void PublishGlobalFilter(
+ const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
/// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
/// 'filter_size' would have an expected false-positive rate which would exceed
@@ -150,6 +160,16 @@
/// All filters expected to be consumed by the local plan fragment instance.
RuntimeFilterMap consumed_filters_;
+ /// Lock protecting 'num_inflight_rpcs_' and it should not be taken at the same
+ /// time as runtime_filter_lock_.
+ SpinLock num_inflight_rpcs_lock_;
+ /// Use 'num_inflight_rpcs_' to keep track of the number of current in-flight
+ /// KRPC calls to prevent the memory pointed to by a BloomFilter* being
+ /// deallocated in RuntimeFilterBank::Close() before all KRPC calls have
+ /// been completed.
+ int32_t num_inflight_rpcs_ = 0;
+ std::condition_variable_any krpcs_done_cv_;
+
/// Fragment instance's runtime state.
RuntimeState* state_;
@@ -184,6 +204,11 @@
/// in ClaimBufferReservation(). Reservations are returned to the initial reservations
/// pool in Close().
BufferPool::ClientHandle buffer_pool_client_;
+
+ /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
+ /// UpdateFilterFromLocal().
+ void UpdateFilterCompleteCb(
+ const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
};
}
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5b65f7a..618788c 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -67,7 +67,8 @@
MinMaxFilter* get_min_max() const { return min_max_filter_.Load(); }
- /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called
+ /// Sets the internal filter bloom_filter to 'bloom_filter' or 'min_max_filter'
+ /// depending on the type of this RuntimeFilter. Can only legally be called
/// once per filter. Does not acquire the memory associated with 'bloom_filter'.
void SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter);
diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h
index 2b2ce79..4eb5c37 100644
--- a/be/src/runtime/timestamp-value.h
+++ b/be/src/runtime/timestamp-value.h
@@ -27,6 +27,7 @@
#include "common/global-types.h"
#include "gen-cpp/Data_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "udf/udf.h"
#include "util/hash-util.h"
@@ -166,17 +167,16 @@
*ptp = boost::posix_time::ptime(date_, time_);
}
- // Store the binary representation of this TimestampValue in 'tvalue'.
- void ToTColumnValue(TColumnValue* tvalue) const {
+ // Store the binary representation of this TimestampValue in 'pvalue'.
+ void ToColumnValuePB(ColumnValuePB* pvalue) const {
const uint8_t* data = reinterpret_cast<const uint8_t*>(this);
- tvalue->timestamp_val.assign(data, data + Size());
- tvalue->__isset.timestamp_val = true;
+ pvalue->mutable_timestamp_val()->assign(data, data + Size());
}
- // Returns a new TimestampValue created from the value in 'tvalue'.
- static TimestampValue FromTColumnValue(const TColumnValue& tvalue) {
+ // Returns a new TimestampValue created from the value in 'value_pb'.
+ static TimestampValue FromColumnValuePB(const ColumnValuePB& value_pb) {
TimestampValue value;
- memcpy(&value, tvalue.timestamp_val.c_str(), Size());
+ memcpy(&value, value_pb.timestamp_val().c_str(), Size());
value.Validate();
return value;
}
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index 02642c4..ad38900 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -20,7 +20,6 @@
#include <jni.h>
-#include "gen-cpp/ImpalaInternalService.h"
#include "common/status.h"
#include "util/metrics-fwd.h"
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 1bdcfcf..6197d1c 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1409,9 +1409,10 @@
return coord_->UpdateBackendExecStatus(request, thrift_profiles);
}
-void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {
+void ClientRequestState::UpdateFilter(
+ const UpdateFilterParamsPB& params, RpcContext* context) {
DCHECK(coord_.get());
- coord_->UpdateFilter(params);
+ coord_->UpdateFilter(params, context);
}
bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_status) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index ac901a4..1ef3b1d 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -190,7 +190,7 @@
/// object.
Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
- void UpdateFilter(const TUpdateFilterParams& params);
+ void UpdateFilter(const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
/// Populate DML stats in 'dml_result' if this request succeeded.
/// Sets 'query_status' to the overall query status.
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 42b14f9..71ec8cc 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -29,7 +29,9 @@
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "runtime/row-batch.h"
+#include "service/impala-server.h"
#include "util/memory-metrics.h"
#include "util/parse-util.h"
@@ -106,6 +108,38 @@
ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
}
+void DataStreamService::UpdateFilter(
+ const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {
+ // This failpoint is to allow jitter to be injected.
+ DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
+ DCHECK(req->has_filter_id());
+ DCHECK(req->has_query_id());
+ DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+ ExecEnv::GetInstance()->impala_server()->UpdateFilter(resp, *req, context);
+ RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+}
+
+void DataStreamService::PublishFilter(
+ const PublishFilterParamsPB* req, PublishFilterResultPB* resp, RpcContext* context) {
+ // This failpoint is to allow jitter to be injected.
+ DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
+ DCHECK(req->has_filter_id());
+ DCHECK(req->has_dst_query_id());
+ DCHECK(req->has_dst_fragment_idx());
+ DCHECK(req->has_bloom_filter() || req->has_min_max_filter());
+ QueryState::ScopedRef qs(ProtoToQueryId(req->dst_query_id()));
+
+ if (qs.get() != nullptr) {
+ qs->PublishFilter(*req, context);
+ RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+ } else {
+ string err_msg = Substitute("Query State not found for query_id=$0",
+ PrintId(ProtoToQueryId(req->dst_query_id())));
+ LOG(INFO) << err_msg;
+ RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
+ }
+}
+
template<typename ResponsePBType>
void DataStreamService::RespondRpc(const Status& status,
ResponsePBType* response, kudu::rpc::RpcContext* ctx) {
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 539974c..24a2249 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -63,6 +63,16 @@
virtual void TransmitData(const TransmitDataRequestPB* request,
TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
+ /// Called by fragment instances that produce local runtime filters to deliver them to
+ /// the coordinator for aggregation and broadcast.
+ virtual void UpdateFilter(const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp,
+ kudu::rpc::RpcContext* context);
+
+ /// Called by the coordinator to deliver global runtime filters to fragments for
+ /// application at plan nodes.
+ virtual void PublishFilter(const PublishFilterParamsPB* req,
+ PublishFilterResultPB* resp, kudu::rpc::RpcContext* context);
+
/// Respond to a RPC passed in 'response'/'ctx' with 'status' and release
/// the payload memory from 'mem_tracker'. Takes ownership of 'ctx'.
template<typename ResponsePBType>
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index d703406..69a5736 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -22,7 +22,6 @@
#include "gen-cpp/ImpalaService.h"
#include "gen-cpp/ImpalaHiveServer2Service.h"
-#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/LineageGraph_types.h"
#include "common/status.h"
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 22a8243..3b7af8a 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -43,24 +43,3 @@
Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
status.SetTStatus(status_container);
}
-
-void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
- const TUpdateFilterParams& params) {
- DebugActionNoFail(FLAGS_debug_actions, "UPDATE_FILTER_DELAY");
- DCHECK(params.__isset.filter_id);
- DCHECK(params.__isset.query_id);
- DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
- impala_server_->UpdateFilter(return_val, params);
-}
-
-void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
- const TPublishFilterParams& params) {
- DebugActionNoFail(FLAGS_debug_actions, "PUBLISH_FILTER_DELAY");
- DCHECK(params.__isset.filter_id);
- DCHECK(params.__isset.dst_query_id);
- DCHECK(params.__isset.dst_fragment_idx);
- DCHECK(params.__isset.bloom_filter || params.__isset.min_max_filter);
- QueryState::ScopedRef qs(params.dst_query_id);
- if (qs.get() == nullptr) return;
- qs->PublishFilter(params);
-}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index c75122b..425678b 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -30,10 +30,6 @@
class ImpalaInternalService : public ImpalaInternalServiceIf {
public:
ImpalaInternalService();
- virtual void UpdateFilter(TUpdateFilterResult& return_val,
- const TUpdateFilterParams& params);
- virtual void PublishFilter(TPublishFilterResult& return_val,
- const TPublishFilterParams& params);
private:
ImpalaServer* impala_server_;
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 11ea058..76184f8 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,6 +53,7 @@
#include "exec/external-data-source-executor.h"
#include "exprs/timezone_db.h"
#include "gen-cpp/CatalogService_constants.h"
+#include "kudu/rpc/rpc_context.h"
#include "kudu/util/random_util.h"
#include "rpc/authentication.h"
#include "rpc/rpc-trace.h"
@@ -95,7 +96,6 @@
#include "gen-cpp/ImpalaService.h"
#include "gen-cpp/DataSinks_types.h"
#include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
#include "gen-cpp/LineageGraph_types.h"
#include "gen-cpp/Frontend_types.h"
@@ -113,6 +113,7 @@
using boost::uuids::random_generator;
using boost::uuids::uuid;
using kudu::GetRandomSeed32;
+using kudu::rpc::RpcContext;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace apache::thrift::transport;
@@ -2537,17 +2538,18 @@
return Status::OK();
}
-void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
- const TUpdateFilterParams& params) {
- DCHECK(params.__isset.query_id);
- DCHECK(params.__isset.filter_id);
+void ImpalaServer::UpdateFilter(UpdateFilterResultPB* result,
+ const UpdateFilterParamsPB& params, RpcContext* context) {
+ DCHECK(params.has_query_id());
+ DCHECK(params.has_filter_id());
shared_ptr<ClientRequestState> client_request_state =
- GetClientRequestState(params.query_id);
+ GetClientRequestState(ProtoToQueryId(params.query_id()));
if (client_request_state.get() == nullptr) {
- LOG(INFO) << "Could not find client request state: " << PrintId(params.query_id);
+ LOG(INFO) << "Could not find client request state: "
+ << PrintId(ProtoToQueryId(params.query_id()));
return;
}
- client_request_state->UpdateFilter(params);
+ client_request_state->UpdateFilter(params, context);
}
Status ImpalaServer::CheckNotShuttingDown() const {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 7e78532..0746784 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -49,6 +49,12 @@
#include "util/thread-pool.h"
#include "util/time.h"
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
namespace impala {
using kudu::ThreadSafeRandom;
@@ -57,12 +63,7 @@
class CancellationWork;
class ImpalaHttpHandler;
class RowDescriptor;
-class TCatalogUpdate;
-class TPlanExecRequest;
-class TPlanExecParams;
class TDmlResult;
-class TReportExecStatusArgs;
-class TReportExecStatusResult;
class TNetworkAddress;
class TClientRequest;
class TExecRequest;
@@ -345,8 +346,8 @@
virtual void CloseImpalaOperation(
TCloseImpalaOperationResp& return_val, const TCloseImpalaOperationReq& request);
- /// ImpalaInternalService rpcs
- void UpdateFilter(TUpdateFilterResult& return_val, const TUpdateFilterParams& params);
+ void UpdateFilter(UpdateFilterResultPB* return_val, const UpdateFilterParamsPB& params,
+ kudu::rpc::RpcContext* context);
/// Generates a unique id for this query and sets it in the given query context.
/// Prepares the given query context by populating fields required for evaluating
diff --git a/be/src/util/bloom-filter-test.cc b/be/src/util/bloom-filter-test.cc
index e8e7e2e..7a7d27f 100644
--- a/be/src/util/bloom-filter-test.cc
+++ b/be/src/util/bloom-filter-test.cc
@@ -21,6 +21,7 @@
#include <unordered_set>
#include <vector>
+#include "kudu/rpc/rpc_controller.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-tracker.h"
@@ -28,11 +29,14 @@
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
+#include "gen-cpp/data_stream_service.pb.h"
+
using namespace std;
-namespace {
-
using namespace impala;
+using kudu::rpc::RpcController;
+
+namespace bloom_filter_test_util {
// Make a random uint64_t, avoiding the absent high bit and the low-entropy low bits
// produced by rand().
@@ -68,24 +72,47 @@
// Computes union of 'x' and 'y'. Computes twice with AVX enabled and disabled and
// verifies both produce the same result. 'success' is set to true if both union
// computations returned the same result and set to false otherwise.
-TBloomFilter BfUnion(const BloomFilter& x, const BloomFilter& y, bool* success) {
- TBloomFilter thrift_x, thrift_y;
- BloomFilter::ToThrift(&x, &thrift_x);
- BloomFilter::ToThrift(&y, &thrift_y);
- BloomFilter::Or(thrift_x, &thrift_y);
+void BfUnion(const BloomFilter& x, const BloomFilter& y, int64_t directory_size,
+ bool* success, BloomFilterPB* protobuf, std::string* directory) {
+ BloomFilterPB protobuf_x, protobuf_y;
+ RpcController controller_x;
+ RpcController controller_y;
+ BloomFilter::ToProtobuf(&x, &controller_x, &protobuf_x);
+ BloomFilter::ToProtobuf(&y, &controller_y, &protobuf_y);
+
+ string directory_x(reinterpret_cast<const char*>(x.directory_), directory_size);
+ string directory_y(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+ BloomFilter::Or(protobuf_x, reinterpret_cast<const uint8_t*>(directory_x.data()),
+ &protobuf_y, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y.data())),
+ directory_size);
+
{
CpuInfo::TempDisable t1(CpuInfo::AVX);
CpuInfo::TempDisable t2(CpuInfo::AVX2);
- TBloomFilter thrift_x2, thrift_y2;
- BloomFilter::ToThrift(&x, &thrift_x2);
- BloomFilter::ToThrift(&y, &thrift_y2);
- BloomFilter::Or(thrift_x2, &thrift_y2);
- *success = thrift_y.directory == thrift_y2.directory;
+ BloomFilterPB protobuf_x2, protobuf_y2;
+ RpcController controller_x2;
+ RpcController controller_y2;
+ BloomFilter::ToProtobuf(&x, &controller_x2, &protobuf_x2);
+ BloomFilter::ToProtobuf(&y, &controller_y2, &protobuf_y2);
+
+ string directory_x2(reinterpret_cast<const char*>(x.directory_), directory_size);
+ string directory_y2(reinterpret_cast<const char*>(y.directory_), directory_size);
+
+ BloomFilter::Or(protobuf_x2, reinterpret_cast<const uint8_t*>(directory_x2.data()),
+ &protobuf_y2, reinterpret_cast<uint8_t*>(const_cast<char*>(directory_y2.data())),
+ directory_size);
+
+ *success = directory_y.compare(directory_y2) == 0;
}
- return thrift_y;
+
+ *protobuf = protobuf_y;
+ *directory = directory_y;
}
-} // namespace
+} // namespace bloom_filter_test_util
+
+using namespace bloom_filter_test_util;
namespace impala {
@@ -204,12 +231,15 @@
return bloom_filter;
}
- BloomFilter* CreateBloomFilter(TBloomFilter t_filter) {
+ BloomFilter* CreateBloomFilter(BloomFilterPB filter_pb, const std::string& directory) {
int64_t filter_size =
- BloomFilter::GetExpectedMemoryUsed(t_filter.log_bufferpool_space);
+ BloomFilter::GetExpectedMemoryUsed(filter_pb.log_bufferpool_space());
EXPECT_TRUE(buffer_pool_client_->IncreaseReservation(filter_size));
BloomFilter* bloom_filter = pool_.Add(new BloomFilter(buffer_pool_client_.get()));
- EXPECT_OK(bloom_filter->Init(t_filter));
+
+ EXPECT_OK(bloom_filter->Init(
+ filter_pb, reinterpret_cast<const uint8_t*>(directory.data()), directory.size()));
+
bloom_filters_.push_back(bloom_filter);
EXPECT_NE(bloom_filter->GetBufferPoolSpaceUsed(), -1);
return bloom_filter;
@@ -311,7 +341,7 @@
}
}
-TEST_F(BloomFilterTest, Thrift) {
+TEST_F(BloomFilterTest, Protobuf) {
BloomFilter* bf = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
for (int i = 0; i < 10; ++i) BfInsert(*bf, i);
// Check no unexpected new false positives.
@@ -320,19 +350,27 @@
if (!BfFind(*bf, i)) missing_ints.insert(i);
}
- TBloomFilter to_thrift;
- BloomFilter::ToThrift(bf, &to_thrift);
- EXPECT_EQ(to_thrift.always_true, false);
+ BloomFilterPB to_protobuf;
- BloomFilter* from_thrift = CreateBloomFilter(to_thrift);
- for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_thrift, i));
- for (int missing: missing_ints) ASSERT_FALSE(BfFind(*from_thrift, missing));
+ RpcController controller;
+ BloomFilter::ToProtobuf(bf, &controller, &to_protobuf);
- BloomFilter::ToThrift(NULL, &to_thrift);
- EXPECT_EQ(to_thrift.always_true, true);
+ EXPECT_EQ(to_protobuf.always_true(), false);
+
+ std::string directory(reinterpret_cast<const char*>(bf->directory_),
+ BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01)));
+
+ BloomFilter* from_protobuf = CreateBloomFilter(to_protobuf, directory);
+
+ for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*from_protobuf, i));
+ for (int missing : missing_ints) ASSERT_FALSE(BfFind(*from_protobuf, missing));
+
+ RpcController controller_2;
+ BloomFilter::ToProtobuf(nullptr, &controller_2, &to_protobuf);
+ EXPECT_EQ(to_protobuf.always_true(), true);
}
-TEST_F(BloomFilterTest, ThriftOr) {
+TEST_F(BloomFilterTest, ProtobufOr) {
BloomFilter* bf1 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
BloomFilter* bf2 = CreateBloomFilter(BloomFilter::MinLogSpace(100, 0.01));
@@ -340,7 +378,15 @@
for (int i = 0; i < 10; ++i) BfInsert(*bf1, i);
bool success;
- BloomFilter *bf3 = CreateBloomFilter(BfUnion(*bf1, *bf2, &success));
+ BloomFilterPB protobuf;
+ std::string directory;
+ int64_t directory_size =
+ BloomFilter::GetExpectedMemoryUsed(BloomFilter::MinLogSpace(100, 0.01));
+
+ BfUnion(*bf1, *bf2, directory_size, &success, &protobuf, &directory);
+
+ BloomFilter* bf3 = CreateBloomFilter(protobuf, directory);
+
ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
for (int i = 0; i < 10; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf3, i)) << i;
@@ -348,8 +394,10 @@
// Insert another value to aggregated BloomFilter.
for (int i = 11; i < 50; ++i) BfInsert(*bf3, i);
- // Apply TBloomFilter back to BloomFilter and verify if aggregation was correct.
- BloomFilter *bf4 = CreateBloomFilter(BfUnion(*bf1, *bf3, &success));
+ // Apply BloomFilterPB back to BloomFilter and verify if aggregation was correct.
+ BfUnion(*bf1, *bf3, directory_size, &success, &protobuf, &directory);
+ BloomFilter* bf4 = CreateBloomFilter(protobuf, directory);
+
ASSERT_TRUE(success) << "SIMD BloomFilter::Union error";
for (int i = 11; i < 50; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
for (int i = 60; i < 80; ++i) ASSERT_TRUE(BfFind(*bf4, i)) << i;
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 0a2f8fc..c41c81b 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -17,6 +17,8 @@
#include "util/bloom-filter.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
@@ -55,12 +57,13 @@
return Status::OK();
}
-Status BloomFilter::Init(const TBloomFilter& thrift) {
- RETURN_IF_ERROR(Init(thrift.log_bufferpool_space));
- if (directory_ != nullptr && !thrift.always_false) {
+Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+ size_t directory_in_size) {
+ RETURN_IF_ERROR(Init(protobuf.log_bufferpool_space()));
+ if (directory_ != nullptr && !protobuf.always_false()) {
always_false_ = false;
- DCHECK_EQ(thrift.directory.size(), directory_size());
- memcpy(directory_, &thrift.directory[0], thrift.directory.size());
+ DCHECK_EQ(directory_in_size, directory_size());
+ memcpy(directory_, directory_in, directory_in_size);
}
return Status::OK();
}
@@ -73,32 +76,66 @@
}
}
-void BloomFilter::ToThrift(TBloomFilter* thrift) const {
- thrift->log_bufferpool_space = log_num_buckets_ + LOG_BUCKET_BYTE_SIZE;
- if (always_false_) {
- thrift->always_false = true;
- thrift->always_true = false;
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+ kudu::rpc::RpcController* controller, const char* directory,
+ unsigned long directory_size) {
+ DCHECK(rpc_params != nullptr);
+ DCHECK(!rpc_params->always_false());
+ DCHECK(!rpc_params->always_true());
+ kudu::Slice dir_slice(directory, directory_size);
+ unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar =
+ kudu::rpc::RpcSidecar::FromSlice(dir_slice);
+
+ int sidecar_idx = -1;
+ kudu::Status sidecar_status =
+ controller->AddOutboundSidecar(std::move(rpc_sidecar), &sidecar_idx);
+ if (!sidecar_status.ok()) {
+ LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString();
+ // If AddOutboundSidecar() fails, we 'disable' the BloomFilterPB by setting it to
+ // an always true filter.
+ rpc_params->set_always_false(false);
+ rpc_params->set_always_true(true);
return;
}
- thrift->directory.assign(reinterpret_cast<const char*>(directory_),
- static_cast<unsigned long>(directory_size()));
- thrift->always_false = false;
- thrift->always_true = false;
+ rpc_params->set_directory_sidecar_idx(sidecar_idx);
+ rpc_params->set_always_false(false);
+ rpc_params->set_always_true(false);
}
-void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) {
- DCHECK(thrift != nullptr);
- if (filter == nullptr) {
- thrift->always_true = true;
- DCHECK_EQ(thrift->always_false, false);
+void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
+ kudu::rpc::RpcController* controller, const string& directory) {
+ AddDirectorySidecar(rpc_params, controller,
+ reinterpret_cast<const char*>(&(directory[0])),
+ static_cast<unsigned long>(directory.size()));
+}
+
+void BloomFilter::ToProtobuf(
+ BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
+ protobuf->set_log_bufferpool_space(log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
+ if (always_false_) {
+ protobuf->set_always_false(true);
+ protobuf->set_always_true(false);
return;
}
- filter->ToThrift(thrift);
+ BloomFilter::AddDirectorySidecar(protobuf, controller,
+ reinterpret_cast<const char*>(directory_),
+ static_cast<unsigned long>(directory_size()));
+}
+
+void BloomFilter::ToProtobuf(const BloomFilter* filter,
+ kudu::rpc::RpcController* controller, BloomFilterPB* protobuf) {
+ DCHECK(protobuf != nullptr);
+ // If filter == nullptr, then this BloomFilter is an always true filter.
+ if (filter == nullptr) {
+ protobuf->set_always_true(true);
+ DCHECK(!protobuf->always_false());
+ return;
+ }
+ filter->ToProtobuf(protobuf, controller);
}
// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we
// compile with -fno-strict-aliasing.
-
void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept {
// new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is
// 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second
@@ -184,20 +221,17 @@
}
} //namespace
-void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
+void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
+ BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) {
DCHECK(out != nullptr);
DCHECK(&in != out);
// These cases are impossible in current code. If they become possible in the future,
// memory usage should be tracked accordingly.
- DCHECK(!out->always_false);
- DCHECK(!out->always_true);
- DCHECK(!in.always_true);
- if (in.always_false) return;
- DCHECK_EQ(in.log_bufferpool_space, out->log_bufferpool_space);
- DCHECK_EQ(in.directory.size(), out->directory.size())
- << "Equal log heap space " << in.log_bufferpool_space
- << ", but different directory sizes: " << in.directory.size() << ", "
- << out->directory.size();
+ DCHECK(!out->always_false());
+ DCHECK(!out->always_true());
+ DCHECK(!in.always_true());
+ if (in.always_false()) return;
+ DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space());
// The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
// written in a way that is very friendly to auto-vectorization. Instead, we manually
// vectorize, increasing the speed by up to 56x.
@@ -205,13 +239,14 @@
// TODO: Tune gcc flags to auto-vectorize the trivial loop instead of hand-vectorizing
// it. This might not be possible.
if (CpuInfo::IsSupported(CpuInfo::AVX)) {
- OrEqualArrayAvx(in.directory.size(), &in.directory[0], &out->directory[0]);
+ OrEqualArrayAvx(directory_size, reinterpret_cast<const char*>(directory_in),
+ reinterpret_cast<char*>(directory_out));
} else {
- const __m128i* simd_in = reinterpret_cast<const __m128i*>(&in.directory[0]);
+ const __m128i* simd_in = reinterpret_cast<const __m128i*>(directory_in);
const __m128i* const simd_in_end =
- reinterpret_cast<const __m128i*>(&in.directory[0] + in.directory.size());
- __m128i* simd_out = reinterpret_cast<__m128i*>(&out->directory[0]);
- // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+ reinterpret_cast<const __m128i*>(directory_in + directory_size);
+ __m128i* simd_out = reinterpret_cast<__m128i*>(directory_out);
+ // directory_in has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
// == 16, we can do two _mm_or_si128's in each iteration without checking array
// bounds.
while (simd_in != simd_in_end) {
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 73cb01e..9c22120 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -27,11 +27,35 @@
#include "common/compiler-util.h"
#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "gutil/macros.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "util/cpu-info.h"
#include "util/hash-util.h"
+namespace kudu {
+namespace rpc {
+class RpcController;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+class BloomFilter;
+} // namespace impala
+
+// Need this forward declaration since we make bloom_filter_test_util::BfUnion() a friend
+// function.
+namespace bloom_filter_test_util {
+void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y,
+ int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf,
+ std::string* directory);
+} // namespace bloom_filter_test_util
+
+// Need this forward declaration since we make either::TestData a friend struct.
+namespace either {
+struct TestData;
+} // namespace either
+
namespace impala {
/// A BloomFilter stores sets of items and offers a query operation indicating whether or
@@ -66,17 +90,21 @@
/// Reset the filter state, allocate/reallocate and initialize the 'directory_'. All
/// calls to Insert() and Find() should only be done between the calls to Init() and
- /// Close().Init and Close are safe to call multiple times.
+ /// Close(). Init and Close are safe to call multiple times.
Status Init(const int log_bufferpool_space);
- Status Init(const TBloomFilter& thrift);
+ Status Init(const BloomFilterPB& protobuf, const uint8_t* directory_in,
+ size_t directory_in_size);
void Close();
/// Representation of a filter which allows all elements to pass.
static constexpr BloomFilter* const ALWAYS_TRUE_FILTER = NULL;
- /// Converts 'filter' to its corresponding Thrift representation. If the first argument
- /// is NULL, it is interpreted as a complete filter which contains all elements.
- static void ToThrift(const BloomFilter* filter, TBloomFilter* thrift);
+ /// Converts 'filter' to its corresponding Protobuf representation.
+ /// If the first argument is NULL, it is interpreted as a complete filter which
+ /// contains all elements.
+ /// Also sets a sidecar on 'controller' containing the Bloom filter's directory.
+ static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
+ BloomFilterPB* protobuf);
bool AlwaysFalse() const { return always_false_; }
@@ -91,8 +119,12 @@
/// high probabilty) if it is not.
bool Find(const uint32_t hash) const noexcept;
- /// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
- static void Or(const TBloomFilter& in, TBloomFilter* out);
+ /// This function computes the logical OR of 'directory_in' with 'directory_out'
+ /// and stores the result in 'directory_out'.
+ /// Additional checks are also performed to make sure the related fields of
+ /// 'in' and 'out' are well-defined.
+ static void Or(const BloomFilterPB& in, const uint8_t* directory_in, BloomFilterPB* out,
+ uint8_t* directory_out, size_t directory_size);
/// As more distinct items are inserted into a BloomFilter, the false positive rate
/// rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter
@@ -119,6 +151,20 @@
return sizeof(Bucket) * (1LL << std::max(1, log_heap_size - LOG_BUCKET_WORD_BITS));
}
+ /// The following two functions set a sidecar on 'controller' containing the Bloom
+ /// filter's directory. Two interfaces are provided because this function may be called
+ /// in different contexts depending on whether or not the caller has access to an
+ /// instantiated BloomFilter. It is also required that 'rpc_params' is neither an
+ /// always false nor an always true Bloom filter when calling this function. Moreover,
+ /// since we directly pass the reference to Bloom filter's directory when instantiating
+ /// the corresponding RpcSidecar, we have to make sure that 'directory' is alive until
+ /// the RPC is done.
+ static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+ kudu::rpc::RpcController* controller, const char* directory,
+ unsigned long directory_size);
+ static void AddDirectorySidecar(BloomFilterPB* rpc_params,
+ kudu::rpc::RpcController* controller, const string& directory);
+
private:
// always_false_ is true when the bloom filter hasn't had any elements inserted.
bool always_false_ = true;
@@ -182,8 +228,8 @@
return 1uLL << (log_num_buckets_ + LOG_BUCKET_BYTE_SIZE);
}
- /// Serializes this filter as Thrift.
- void ToThrift(TBloomFilter* thrift) const;
+ /// Serializes this filter as Protobuf.
+ void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
/// Some constants used in hashing. #defined for efficiency reasons.
#define IMPALA_BLOOM_HASH_CONSTANTS \
@@ -196,6 +242,23 @@
__attribute__((aligned(32))) = {IMPALA_BLOOM_HASH_CONSTANTS};
DISALLOW_COPY_AND_ASSIGN(BloomFilter);
+
+ /// List 'BloomFilterTest_Protobuf_Test' as a friend class to run the backend
+ /// test in 'bloom-filter-test.cc' since it has to access the private field of
+ /// 'directory_' in BloomFilter.
+ friend class BloomFilterTest_Protobuf_Test;
+
+ /// List 'bloom_filter_test_util::BfUnion()' as a friend function to run the backend
+ /// test in 'bloom-filter-test.cc' since it has to access the private field of
+ /// 'directory_' in BloomFilter.
+ friend void bloom_filter_test_util::BfUnion(const impala::BloomFilter& x,
+ const impala::BloomFilter& y, int64_t directory_size, bool* success,
+ impala::BloomFilterPB* protobuf, std::string* directory);
+
+ /// List 'either::Test' as a friend struct to run the benchmark in
+ /// 'bloom-filter-benchmark.cc' since it has to access the private field of
+ /// 'directory_' in BloomFilter.
+ friend struct either::TestData;
};
// To set 8 bits in an 32-byte Bloom filter, we set one bit in each 32-bit uint32_t. This
diff --git a/be/src/util/min-max-filter-test.cc b/be/src/util/min-max-filter-test.cc
index a75ca1a..a5c0667 100644
--- a/be/src/util/min-max-filter-test.cc
+++ b/be/src/util/min-max-filter-test.cc
@@ -18,6 +18,7 @@
#include "testutil/gtest-util.h"
#include "util/min-max-filter.h"
+#include "gen-cpp/data_stream_service.pb.h"
#include "runtime/decimal-value.h"
#include "runtime/decimal-value.inline.h"
#include "runtime/string-value.inline.h"
@@ -50,15 +51,15 @@
EXPECT_EQ(*reinterpret_cast<bool*>(filter->GetMax()), b1);
// Check the behavior of Or.
- TMinMaxFilter tFilter1;
- tFilter1.min.__set_bool_val(false);
- tFilter1.max.__set_bool_val(true);
- TMinMaxFilter tFilter2;
- tFilter2.min.__set_bool_val(false);
- tFilter2.max.__set_bool_val(false);
- MinMaxFilter::Or(tFilter1, &tFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
- EXPECT_FALSE(tFilter2.min.bool_val);
- EXPECT_TRUE(tFilter2.max.bool_val);
+ MinMaxFilterPB pFilter1;
+ pFilter1.mutable_min()->set_bool_val(false);
+ pFilter1.mutable_max()->set_bool_val(true);
+ MinMaxFilterPB pFilter2;
+ pFilter2.mutable_min()->set_bool_val(false);
+ pFilter2.mutable_max()->set_bool_val(false);
+ MinMaxFilter::Or(pFilter1, &pFilter2, ColumnType(PrimitiveType::TYPE_BOOLEAN));
+ EXPECT_FALSE(pFilter2.min().bool_val());
+ EXPECT_TRUE(pFilter2.max().bool_val());
filter->Close();
}
@@ -84,14 +85,14 @@
// Test the behavior of an empty filter.
EXPECT_TRUE(int_filter->AlwaysFalse());
EXPECT_FALSE(int_filter->AlwaysTrue());
- TMinMaxFilter tFilter;
- int_filter->ToThrift(&tFilter);
- EXPECT_TRUE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_FALSE(tFilter.min.__isset.int_val);
- EXPECT_FALSE(tFilter.max.__isset.int_val);
+ MinMaxFilterPB pFilter;
+ int_filter->ToProtobuf(&pFilter);
+ EXPECT_TRUE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_FALSE(pFilter.min().has_int_val());
+ EXPECT_FALSE(pFilter.max().has_int_val());
MinMaxFilter* empty_filter =
- MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
EXPECT_TRUE(empty_filter->AlwaysFalse());
EXPECT_FALSE(empty_filter->AlwaysTrue());
@@ -109,25 +110,25 @@
int_filter->Insert(&i4);
CheckIntVals(int_filter, i4, i2);
- int_filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_EQ(tFilter.min.int_val, i4);
- EXPECT_EQ(tFilter.max.int_val, i2);
+ int_filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_EQ(pFilter.min().int_val(), i4);
+ EXPECT_EQ(pFilter.max().int_val(), i2);
MinMaxFilter* int_filter2 =
- MinMaxFilter::Create(tFilter, int_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, int_type, &obj_pool, &mem_tracker);
CheckIntVals(int_filter2, i4, i2);
// Check the behavior of Or.
- TMinMaxFilter tFilter1;
- tFilter1.min.__set_int_val(4);
- tFilter1.max.__set_int_val(8);
- TMinMaxFilter tFilter2;
- tFilter2.min.__set_int_val(2);
- tFilter2.max.__set_int_val(7);
- MinMaxFilter::Or(tFilter1, &tFilter2, int_type);
- EXPECT_EQ(tFilter2.min.int_val, 2);
- EXPECT_EQ(tFilter2.max.int_val, 8);
+ MinMaxFilterPB pFilter1;
+ pFilter1.mutable_min()->set_int_val(4);
+ pFilter1.mutable_max()->set_int_val(8);
+ MinMaxFilterPB pFilter2;
+ pFilter2.mutable_min()->set_int_val(2);
+ pFilter2.mutable_max()->set_int_val(7);
+ MinMaxFilter::Or(pFilter1, &pFilter2, int_type);
+ EXPECT_EQ(pFilter2.min().int_val(), 2);
+ EXPECT_EQ(pFilter2.max().int_val(), 8);
int_filter->Close();
empty_filter->Close();
@@ -162,13 +163,13 @@
filter->MaterializeValues();
EXPECT_TRUE(filter->AlwaysFalse());
EXPECT_FALSE(filter->AlwaysTrue());
- TMinMaxFilter tFilter;
- filter->ToThrift(&tFilter);
- EXPECT_TRUE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
+ MinMaxFilterPB pFilter;
+ filter->ToProtobuf(&pFilter);
+ EXPECT_TRUE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
MinMaxFilter* empty_filter =
- MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
EXPECT_TRUE(empty_filter->AlwaysFalse());
EXPECT_FALSE(empty_filter->AlwaysTrue());
@@ -191,11 +192,11 @@
filter->MaterializeValues();
CheckStringVals(filter, c, d);
- filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_EQ(tFilter.min.string_val, c);
- EXPECT_EQ(tFilter.max.string_val, d);
+ filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_EQ(pFilter.min().string_val(), c);
+ EXPECT_EQ(pFilter.max().string_val(), d);
// Test that strings longer than 1024 are truncated.
string b1030(1030, 'b');
@@ -227,14 +228,14 @@
for (int i = trailIndex; i < 1024; ++i) truncTrailMaxChar[i] = 0;
CheckStringVals(filter, b1024, truncTrailMaxChar);
- filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_EQ(tFilter.min.string_val, b1024);
- EXPECT_EQ(tFilter.max.string_val, truncTrailMaxChar);
+ filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_EQ(pFilter.min().string_val(), b1024);
+ EXPECT_EQ(pFilter.max().string_val(), truncTrailMaxChar);
MinMaxFilter* filter2 =
- MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
CheckStringVals(filter2, b1024, truncTrailMaxChar);
// Check that if the entire string is the max char and therefore after truncating for
@@ -249,12 +250,12 @@
filter->Insert(&cVal);
EXPECT_TRUE(filter->AlwaysTrue());
- filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_TRUE(tFilter.always_true);
+ filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_TRUE(pFilter.always_true());
MinMaxFilter* always_true_filter =
- MinMaxFilter::Create(tFilter, string_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, string_type, &obj_pool, &mem_tracker);
EXPECT_FALSE(always_true_filter->AlwaysFalse());
EXPECT_TRUE(always_true_filter->AlwaysTrue());
@@ -276,20 +277,20 @@
limit_filter->MaterializeValues();
EXPECT_TRUE(limit_filter->AlwaysTrue());
- limit_filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_TRUE(tFilter.always_true);
+ limit_filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_TRUE(pFilter.always_true());
// Check the behavior of Or.
- TMinMaxFilter tFilter1;
- tFilter1.min.__set_string_val("a");
- tFilter1.max.__set_string_val("d");
- TMinMaxFilter tFilter2;
- tFilter2.min.__set_string_val("b");
- tFilter2.max.__set_string_val("e");
- MinMaxFilter::Or(tFilter1, &tFilter2, string_type);
- EXPECT_EQ(tFilter2.min.string_val, "a");
- EXPECT_EQ(tFilter2.max.string_val, "e");
+ MinMaxFilterPB pFilter1;
+ pFilter1.mutable_min()->set_string_val("a");
+ pFilter1.mutable_max()->set_string_val("d");
+ MinMaxFilterPB pFilter2;
+ pFilter2.mutable_min()->set_string_val("b");
+ pFilter2.mutable_max()->set_string_val("e");
+ MinMaxFilter::Or(pFilter1, &pFilter2, string_type);
+ EXPECT_EQ(pFilter2.min().string_val(), "a");
+ EXPECT_EQ(pFilter2.max().string_val(), "e");
filter->Close();
empty_filter->Close();
@@ -317,14 +318,14 @@
// Test the behavior of an empty filter.
EXPECT_TRUE(filter->AlwaysFalse());
EXPECT_FALSE(filter->AlwaysTrue());
- TMinMaxFilter tFilter;
- filter->ToThrift(&tFilter);
- EXPECT_TRUE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_FALSE(tFilter.min.__isset.timestamp_val);
- EXPECT_FALSE(tFilter.max.__isset.timestamp_val);
+ MinMaxFilterPB pFilter;
+ filter->ToProtobuf(&pFilter);
+ EXPECT_TRUE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_FALSE(pFilter.min().has_timestamp_val());
+ EXPECT_FALSE(pFilter.max().has_timestamp_val());
MinMaxFilter* empty_filter =
- MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
EXPECT_TRUE(empty_filter->AlwaysFalse());
EXPECT_FALSE(empty_filter->AlwaysTrue());
@@ -342,25 +343,25 @@
filter->Insert(&t4);
CheckTimestampVals(filter, t2, t3);
- filter->ToThrift(&tFilter);
- EXPECT_FALSE(tFilter.always_false);
- EXPECT_FALSE(tFilter.always_true);
- EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.min), t2);
- EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter.max), t3);
+ filter->ToProtobuf(&pFilter);
+ EXPECT_FALSE(pFilter.always_false());
+ EXPECT_FALSE(pFilter.always_true());
+ EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.min()), t2);
+ EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter.max()), t3);
MinMaxFilter* filter2 =
- MinMaxFilter::Create(tFilter, timestamp_type, &obj_pool, &mem_tracker);
+ MinMaxFilter::Create(pFilter, timestamp_type, &obj_pool, &mem_tracker);
CheckTimestampVals(filter2, t2, t3);
// Check the behavior of Or.
- TMinMaxFilter tFilter1;
- t2.ToTColumnValue(&tFilter1.min);
- t4.ToTColumnValue(&tFilter1.max);
- TMinMaxFilter tFilter2;
- t1.ToTColumnValue(&tFilter2.min);
- t3.ToTColumnValue(&tFilter2.max);
- MinMaxFilter::Or(tFilter1, &tFilter2, timestamp_type);
- EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.min), t2);
- EXPECT_EQ(TimestampValue::FromTColumnValue(tFilter2.max), t3);
+ MinMaxFilterPB pFilter1;
+ t2.ToColumnValuePB(pFilter1.mutable_min());
+ t4.ToColumnValuePB(pFilter1.mutable_max());
+ MinMaxFilterPB pFilter2;
+ t1.ToColumnValuePB(pFilter2.mutable_min());
+ t3.ToColumnValuePB(pFilter2.mutable_max());
+ MinMaxFilter::Or(pFilter1, &pFilter2, timestamp_type);
+ EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.min()), t2);
+ EXPECT_EQ(TimestampValue::FromColumnValuePB(pFilter2.max()), t3);
filter->Close();
empty_filter->Close();
@@ -391,16 +392,16 @@
}
void CheckDecimalEmptyFilter(MinMaxFilter* filter, const ColumnType& column_type,
- TMinMaxFilter* tFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
+ MinMaxFilterPB* pFilter, ObjectPool* obj_pool, MemTracker* mem_tracker) {
EXPECT_TRUE(filter->AlwaysFalse());
EXPECT_FALSE(filter->AlwaysTrue());
- filter->ToThrift(tFilter);
- EXPECT_TRUE(tFilter->always_false);
- EXPECT_FALSE(tFilter->always_true);
- EXPECT_FALSE(tFilter->min.__isset.decimal_val);
- EXPECT_FALSE(tFilter->max.__isset.decimal_val);
+ filter->ToProtobuf(pFilter);
+ EXPECT_TRUE(pFilter->always_false());
+ EXPECT_FALSE(pFilter->always_true());
+ EXPECT_FALSE(pFilter->min().has_decimal_val());
+ EXPECT_FALSE(pFilter->max().has_decimal_val());
MinMaxFilter* empty_filter =
- MinMaxFilter::Create(*tFilter, column_type, obj_pool, mem_tracker);
+ MinMaxFilter::Create(*pFilter, column_type, obj_pool, mem_tracker);
EXPECT_TRUE(empty_filter->AlwaysFalse());
EXPECT_FALSE(empty_filter->AlwaysTrue());
empty_filter->Close();
@@ -427,30 +428,30 @@
CheckDecimalVals(filter##SIZE, d3##SIZE, d2##SIZE); \
} while (false)
-#define DECIMAL_CHECK_THRIFT(SIZE) \
- do { \
- filter##SIZE->ToThrift(&tFilter##SIZE); \
- EXPECT_FALSE(tFilter##SIZE.always_false); \
- EXPECT_FALSE(tFilter##SIZE.always_true); \
- EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.min), d3##SIZE); \
- EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter##SIZE.max), d2##SIZE); \
- MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create( \
- tFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker); \
- CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE); \
- filter##SIZE##2->Close(); \
+#define DECIMAL_CHECK_PROTOBUF(SIZE) \
+ do { \
+ filter##SIZE->ToProtobuf(&pFilter##SIZE); \
+ EXPECT_FALSE(pFilter##SIZE.always_false()); \
+ EXPECT_FALSE(pFilter##SIZE.always_true()); \
+ EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.min()), d3##SIZE); \
+ EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter##SIZE.max()), d2##SIZE); \
+ MinMaxFilter* filter##SIZE##2 = MinMaxFilter::Create( \
+ pFilter##SIZE, decimal##SIZE##_type, &obj_pool, &mem_tracker); \
+ CheckDecimalVals(filter##SIZE##2, d3##SIZE, d2##SIZE); \
+ filter##SIZE##2->Close(); \
} while (false)
-#define DECIMAL_CHECK_OR(SIZE) \
- do { \
- TMinMaxFilter tFilter1##SIZE; \
- d3##SIZE.ToTColumnValue(&tFilter1##SIZE.min); \
- d2##SIZE.ToTColumnValue(&tFilter1##SIZE.max); \
- TMinMaxFilter tFilter2##SIZE; \
- d1##SIZE.ToTColumnValue(&tFilter2##SIZE.min); \
- d1##SIZE.ToTColumnValue(&tFilter2##SIZE.max); \
- MinMaxFilter::Or(tFilter1##SIZE, &tFilter2##SIZE, decimal##SIZE##_type); \
- EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.min), d3##SIZE); \
- EXPECT_EQ(Decimal##SIZE##Value::FromTColumnValue(tFilter2##SIZE.max), d2##SIZE); \
+#define DECIMAL_CHECK_OR(SIZE) \
+ do { \
+ MinMaxFilterPB pFilter1##SIZE; \
+ d3##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_min()); \
+ d2##SIZE.ToColumnValuePB(pFilter1##SIZE.mutable_max()); \
+ MinMaxFilterPB pFilter2##SIZE; \
+ d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_min()); \
+ d1##SIZE.ToColumnValuePB(pFilter2##SIZE.mutable_max()); \
+ MinMaxFilter::Or(pFilter1##SIZE, &pFilter2##SIZE, decimal##SIZE##_type); \
+ EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.min()), d3##SIZE); \
+ EXPECT_EQ(Decimal##SIZE##Value::FromColumnValuePB(pFilter2##SIZE.max()), d2##SIZE); \
} while (false)
// Tests that a DecimalMinMaxFilter returns the expected min/max after having values
@@ -475,13 +476,13 @@
MinMaxFilter* filter8 = MinMaxFilter::Create(decimal8_type, &obj_pool, &mem_tracker);
MinMaxFilter* filter16 = MinMaxFilter::Create(decimal16_type, &obj_pool, &mem_tracker);
- // Create thrift minmax filters
- TMinMaxFilter tFilter4, tFilter8, tFilter16;
+ // Create protobuf minmax filters
+ MinMaxFilterPB pFilter4, pFilter8, pFilter16;
// Test the behavior of an empty filter.
- CheckDecimalEmptyFilter(filter4, decimal4_type, &tFilter4, &obj_pool, &mem_tracker);
- CheckDecimalEmptyFilter(filter8, decimal8_type, &tFilter8, &obj_pool, &mem_tracker);
- CheckDecimalEmptyFilter(filter16, decimal16_type, &tFilter16, &obj_pool, &mem_tracker);
+ CheckDecimalEmptyFilter(filter4, decimal4_type, &pFilter4, &obj_pool, &mem_tracker);
+ CheckDecimalEmptyFilter(filter8, decimal8_type, &pFilter8, &obj_pool, &mem_tracker);
+ CheckDecimalEmptyFilter(filter16, decimal16_type, &pFilter16, &obj_pool, &mem_tracker);
// Insert and check
DECIMAL_INSERT_AND_CHECK(4, 9, 5, 2345.67891, 3456.78912, 1234.56789);
@@ -490,10 +491,10 @@
DECIMAL_INSERT_AND_CHECK(16, 38, 19, 2345678912345678912.2345678912345678912,
3456789123456789123.3456789123456789123, 1234567891234567891.1234567891234567891);
- // Thrift check
- DECIMAL_CHECK_THRIFT(4);
- DECIMAL_CHECK_THRIFT(8);
- DECIMAL_CHECK_THRIFT(16);
+ // Protobuf check
+ DECIMAL_CHECK_PROTOBUF(4);
+ DECIMAL_CHECK_PROTOBUF(8);
+ DECIMAL_CHECK_PROTOBUF(16);
// Check the behavior of Or.
DECIMAL_CHECK_OR(4);
diff --git a/be/src/util/min-max-filter.cc b/be/src/util/min-max-filter.cc
index c6da060..dd9b351 100644
--- a/be/src/util/min-max-filter.cc
+++ b/be/src/util/min-max-filter.cc
@@ -82,61 +82,55 @@
}
}
-#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, THRIFT_TYPE, PRIMITIVE_TYPE) \
- const char* NAME##MinMaxFilter::LLVM_CLASS_NAME = \
- "class.impala::" #NAME "MinMaxFilter"; \
- NAME##MinMaxFilter::NAME##MinMaxFilter(const TMinMaxFilter& thrift) { \
- DCHECK(!thrift.always_true); \
- if (thrift.always_false) { \
- min_ = numeric_limits<TYPE>::max(); \
- max_ = numeric_limits<TYPE>::lowest(); \
- } else { \
- DCHECK(thrift.__isset.min); \
- DCHECK(thrift.__isset.max); \
- DCHECK(thrift.min.__isset.THRIFT_TYPE##_val); \
- DCHECK(thrift.max.__isset.THRIFT_TYPE##_val); \
- min_ = thrift.min.THRIFT_TYPE##_val; \
- max_ = thrift.max.THRIFT_TYPE##_val; \
- } \
- } \
- PrimitiveType NAME##MinMaxFilter::type() { \
- return PrimitiveType::TYPE_##PRIMITIVE_TYPE; \
- } \
- void NAME##MinMaxFilter::ToThrift(TMinMaxFilter* thrift) const { \
- if (!AlwaysFalse()) { \
- thrift->min.__set_##THRIFT_TYPE##_val(min_); \
- thrift->__isset.min = true; \
- thrift->max.__set_##THRIFT_TYPE##_val(max_); \
- thrift->__isset.max = true; \
- } \
- thrift->__set_always_false(AlwaysFalse()); \
- thrift->__set_always_true(false); \
- } \
- string NAME##MinMaxFilter::DebugString() const { \
- stringstream out; \
- out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_ \
- << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")"; \
- return out.str(); \
- } \
- void NAME##MinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) { \
- if (out->always_false) { \
- out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val); \
- out->__isset.min = true; \
- out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val); \
- out->__isset.max = true; \
- out->__set_always_false(false); \
- } else { \
- out->min.__set_##THRIFT_TYPE##_val( \
- std::min(in.min.THRIFT_TYPE##_val, out->min.THRIFT_TYPE##_val)); \
- out->max.__set_##THRIFT_TYPE##_val( \
- std::max(in.max.THRIFT_TYPE##_val, out->max.THRIFT_TYPE##_val)); \
- } \
- } \
- void NAME##MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) { \
- out->min.__set_##THRIFT_TYPE##_val(in.min.THRIFT_TYPE##_val); \
- out->__isset.min = true; \
- out->max.__set_##THRIFT_TYPE##_val(in.max.THRIFT_TYPE##_val); \
- out->__isset.max = true; \
+#define NUMERIC_MIN_MAX_FILTER_FUNCS(NAME, TYPE, PROTOBUF_TYPE, PRIMITIVE_TYPE) \
+ const char* NAME##MinMaxFilter::LLVM_CLASS_NAME = \
+ "class.impala::" #NAME "MinMaxFilter"; \
+ NAME##MinMaxFilter::NAME##MinMaxFilter(const MinMaxFilterPB& protobuf) { \
+ DCHECK(!protobuf.always_true()); \
+ if (protobuf.always_false()) { \
+ min_ = numeric_limits<TYPE>::max(); \
+ max_ = numeric_limits<TYPE>::lowest(); \
+ } else { \
+ DCHECK(protobuf.has_min()); \
+ DCHECK(protobuf.has_max()); \
+ DCHECK(protobuf.min().has_##PROTOBUF_TYPE##_val()); \
+ DCHECK(protobuf.max().has_##PROTOBUF_TYPE##_val()); \
+ min_ = protobuf.min().PROTOBUF_TYPE##_val(); \
+ max_ = protobuf.max().PROTOBUF_TYPE##_val(); \
+ } \
+ } \
+ PrimitiveType NAME##MinMaxFilter::type() { \
+ return PrimitiveType::TYPE_##PRIMITIVE_TYPE; \
+ } \
+ void NAME##MinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const { \
+ if (!AlwaysFalse()) { \
+ protobuf->mutable_min()->set_##PROTOBUF_TYPE##_val(min_); \
+ protobuf->mutable_max()->set_##PROTOBUF_TYPE##_val(max_); \
+ } \
+ protobuf->set_always_false(AlwaysFalse()); \
+ protobuf->set_always_true(false); \
+ } \
+ string NAME##MinMaxFilter::DebugString() const { \
+ stringstream out; \
+ out << #NAME << "MinMaxFilter(min=" << min_ << ", max=" << max_ \
+ << ", always_false=" << (AlwaysFalse() ? "true" : "false") << ")"; \
+ return out.str(); \
+ } \
+ void NAME##MinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) { \
+ if (out->always_false()) { \
+ out->mutable_min()->set_bool_val(in.min().PROTOBUF_TYPE##_val()); \
+ out->mutable_max()->set_bool_val(in.max().PROTOBUF_TYPE##_val()); \
+ out->set_always_false(false); \
+ } else { \
+ out->mutable_min()->set_##PROTOBUF_TYPE##_val( \
+ std::min(in.min().PROTOBUF_TYPE##_val(), out->min().PROTOBUF_TYPE##_val())); \
+ out->mutable_max()->set_##PROTOBUF_TYPE##_val( \
+ std::max(in.max().PROTOBUF_TYPE##_val(), out->max().PROTOBUF_TYPE##_val())); \
+ } \
+ } \
+ void NAME##MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) { \
+ out->mutable_min()->set_##PROTOBUF_TYPE##_val(in.min().PROTOBUF_TYPE##_val()); \
+ out->mutable_max()->set_##PROTOBUF_TYPE##_val(in.max().PROTOBUF_TYPE##_val()); \
}
NUMERIC_MIN_MAX_FILTER_FUNCS(Bool, bool, bool, BOOLEAN);
@@ -222,19 +216,17 @@
const int StringMinMaxFilter::MAX_BOUND_LENGTH = 1024;
StringMinMaxFilter::StringMinMaxFilter(
- const TMinMaxFilter& thrift, MemTracker* mem_tracker)
- : mem_pool_(mem_tracker),
- min_buffer_(&mem_pool_),
- max_buffer_(&mem_pool_) {
- always_false_ = thrift.always_false;
- always_true_ = thrift.always_true;
+ const MinMaxFilterPB& protobuf, MemTracker* mem_tracker)
+ : mem_pool_(mem_tracker), min_buffer_(&mem_pool_), max_buffer_(&mem_pool_) {
+ always_false_ = protobuf.always_false();
+ always_true_ = protobuf.always_true();
if (!always_true_ && !always_false_) {
- DCHECK(thrift.__isset.min);
- DCHECK(thrift.__isset.max);
- DCHECK(thrift.min.__isset.string_val);
- DCHECK(thrift.max.__isset.string_val);
- min_ = StringValue(thrift.min.string_val);
- max_ = StringValue(thrift.max.string_val);
+ DCHECK(protobuf.has_min());
+ DCHECK(protobuf.has_max());
+ DCHECK(protobuf.min().has_string_val());
+ DCHECK(protobuf.max().has_string_val());
+ min_ = StringValue(protobuf.min().string_val());
+ max_ = StringValue(protobuf.max().string_val());
CopyToBuffer(&min_buffer_, &min_, min_.len);
CopyToBuffer(&max_buffer_, &max_, max_.len);
}
@@ -277,17 +269,13 @@
}
}
-void StringMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void StringMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
if (!always_true_ && !always_false_) {
- thrift->min.string_val.assign(static_cast<char*>(min_.ptr), min_.len);
- thrift->min.__isset.string_val = true;
- thrift->__isset.min = true;
- thrift->max.string_val.assign(static_cast<char*>(max_.ptr), max_.len);
- thrift->max.__isset.string_val = true;
- thrift->__isset.max = true;
+ protobuf->mutable_min()->set_string_val(static_cast<char*>(min_.ptr), min_.len);
+ protobuf->mutable_max()->set_string_val(static_cast<char*>(max_.ptr), max_.len);
}
- thrift->__set_always_false(always_false_);
- thrift->__set_always_true(always_true_);
+ protobuf->set_always_false(always_false_);
+ protobuf->set_always_true(always_true_);
}
string StringMinMaxFilter::DebugString() const {
@@ -298,28 +286,26 @@
return out.str();
}
-void StringMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
- if (out->always_false) {
- out->min.__set_string_val(in.min.string_val);
- out->__isset.min = true;
- out->max.__set_string_val(in.max.string_val);
- out->__isset.max = true;
- out->__set_always_false(false);
+void StringMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ if (out->always_false()) {
+ out->mutable_min()->set_string_val(in.min().string_val());
+ out->mutable_max()->set_string_val(in.max().string_val());
+ out->set_always_false(false);
} else {
- StringValue in_min_val = StringValue(in.min.string_val);
- StringValue out_min_val = StringValue(out->min.string_val);
- if (in_min_val < out_min_val) out->min.__set_string_val(in.min.string_val);
- StringValue in_max_val = StringValue(in.max.string_val);
- StringValue out_max_val = StringValue(out->max.string_val);
- if (in_max_val > out_max_val) out->max.__set_string_val(in.max.string_val);
+ StringValue in_min_val = StringValue(in.min().string_val());
+ StringValue out_min_val = StringValue(out->min().string_val());
+ if (in_min_val < out_min_val)
+ out->mutable_min()->set_string_val(in.min().string_val());
+ StringValue in_max_val = StringValue(in.max().string_val());
+ StringValue out_max_val = StringValue(out->max().string_val());
+ if (in_max_val > out_max_val)
+ out->mutable_max()->set_string_val(in.max().string_val());
}
}
-void StringMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
- out->min.__set_string_val(in.min.string_val);
- out->__isset.min = true;
- out->max.__set_string_val(in.max.string_val);
- out->__isset.max = true;
+void StringMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ out->mutable_min()->set_string_val(in.min().string_val());
+ out->mutable_max()->set_string_val(in.max().string_val());
}
void StringMinMaxFilter::CopyToBuffer(
@@ -349,13 +335,13 @@
const char* TimestampMinMaxFilter::LLVM_CLASS_NAME =
"class.impala::TimestampMinMaxFilter";
-TimestampMinMaxFilter::TimestampMinMaxFilter(const TMinMaxFilter& thrift) {
- always_false_ = thrift.always_false;
+TimestampMinMaxFilter::TimestampMinMaxFilter(const MinMaxFilterPB& protobuf) {
+ always_false_ = protobuf.always_false();
if (!always_false_) {
- DCHECK(thrift.min.__isset.timestamp_val);
- DCHECK(thrift.max.__isset.timestamp_val);
- min_ = TimestampValue::FromTColumnValue(thrift.min);
- max_ = TimestampValue::FromTColumnValue(thrift.max);
+ DCHECK(protobuf.min().has_timestamp_val());
+ DCHECK(protobuf.max().has_timestamp_val());
+ min_ = TimestampValue::FromColumnValuePB(protobuf.min());
+ max_ = TimestampValue::FromColumnValuePB(protobuf.max());
}
}
@@ -363,15 +349,13 @@
return PrimitiveType::TYPE_TIMESTAMP;
}
-void TimestampMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void TimestampMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
if (!always_false_) {
- min_.ToTColumnValue(&thrift->min);
- thrift->__isset.min = true;
- max_.ToTColumnValue(&thrift->max);
- thrift->__isset.max = true;
+ min_.ToColumnValuePB(protobuf->mutable_min());
+ max_.ToColumnValuePB(protobuf->mutable_max());
}
- thrift->__set_always_false(always_false_);
- thrift->__set_always_true(false);
+ protobuf->set_always_false(always_false_);
+ protobuf->set_always_true(false);
}
string TimestampMinMaxFilter::DebugString() const {
@@ -381,45 +365,46 @@
return out.str();
}
-void TimestampMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out) {
- if (out->always_false) {
- out->min.__set_timestamp_val(in.min.timestamp_val);
- out->__isset.min = true;
- out->max.__set_timestamp_val(in.max.timestamp_val);
- out->__isset.max = true;
- out->__set_always_false(false);
+void TimestampMinMaxFilter::Or(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ if (out->always_false()) {
+ out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+ out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+ out->set_always_false(false);
} else {
- TimestampValue in_min_val = TimestampValue::FromTColumnValue(in.min);
- TimestampValue out_min_val = TimestampValue::FromTColumnValue(out->min);
- if (in_min_val < out_min_val) out->min.__set_timestamp_val(in.min.timestamp_val);
- TimestampValue in_max_val = TimestampValue::FromTColumnValue(in.max);
- TimestampValue out_max_val = TimestampValue::FromTColumnValue(out->max);
- if (in_max_val > out_max_val) out->max.__set_timestamp_val(in.max.timestamp_val);
+ TimestampValue in_min_val = TimestampValue::FromColumnValuePB(in.min());
+ TimestampValue out_min_val = TimestampValue::FromColumnValuePB(out->min());
+ if (in_min_val < out_min_val) {
+ out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+ }
+ TimestampValue in_max_val = TimestampValue::FromColumnValuePB(in.max());
+ TimestampValue out_max_val = TimestampValue::FromColumnValuePB(out->max());
+ if (in_max_val > out_max_val) {
+ out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
+ }
}
}
-void TimestampMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
- out->min.__set_timestamp_val(in.min.timestamp_val);
- out->__isset.min = true;
- out->max.__set_timestamp_val(in.max.timestamp_val);
- out->__isset.max = true;
+void TimestampMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ out->mutable_min()->set_timestamp_val(in.min().timestamp_val());
+ out->mutable_max()->set_timestamp_val(in.max().timestamp_val());
}
// DECIMAL
const char* DecimalMinMaxFilter::LLVM_CLASS_NAME = "class.impala::DecimalMinMaxFilter";
-#define DECIMAL_SET_MINMAX(SIZE) \
- do { \
- DCHECK(thrift.min.__isset.decimal_val); \
- DCHECK(thrift.max.__isset.decimal_val); \
- min##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.min); \
- max##SIZE##_ = Decimal##SIZE##Value::FromTColumnValue(thrift.max); \
+#define DECIMAL_SET_MINMAX(SIZE) \
+ do { \
+ DCHECK(protobuf.min().has_decimal_val()); \
+ DCHECK(protobuf.max().has_decimal_val()); \
+ min##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.min()); \
+ max##SIZE##_ = Decimal##SIZE##Value::FromColumnValuePB(protobuf.max()); \
} while (false)
// Construct the Decimal min-max filter when the min-max filter information
// comes in through thrift. This can get called in coordinator, after the filter
// is sent by executor
-DecimalMinMaxFilter::DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision)
- : size_(ColumnType::GetDecimalByteSize(precision)), always_false_(thrift.always_false) {
+DecimalMinMaxFilter::DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision)
+ : size_(ColumnType::GetDecimalByteSize(precision)),
+ always_false_(protobuf.always_false()) {
if (!always_false_) {
switch (size_) {
case DECIMAL_SIZE_4BYTE:
@@ -441,34 +426,32 @@
return PrimitiveType::TYPE_DECIMAL;
}
-#define DECIMAL_TO_THRIFT(SIZE) \
- do { \
- min##SIZE##_.ToTColumnValue(&thrift->min); \
- max##SIZE##_.ToTColumnValue(&thrift->max); \
+#define DECIMAL_TO_PROTOBUF(SIZE) \
+ do { \
+ min##SIZE##_.ToColumnValuePB(protobuf->mutable_min()); \
+ max##SIZE##_.ToColumnValuePB(protobuf->mutable_max()); \
} while (false)
// Construct a thrift min-max filter. Will be called by the executor
// to be sent to the coordinator
-void DecimalMinMaxFilter::ToThrift(TMinMaxFilter* thrift) const {
+void DecimalMinMaxFilter::ToProtobuf(MinMaxFilterPB* protobuf) const {
if (!always_false_) {
switch (size_) {
case DECIMAL_SIZE_4BYTE:
- DECIMAL_TO_THRIFT(4);
+ DECIMAL_TO_PROTOBUF(4);
break;
case DECIMAL_SIZE_8BYTE:
- DECIMAL_TO_THRIFT(8);
+ DECIMAL_TO_PROTOBUF(8);
break;
case DECIMAL_SIZE_16BYTE:
- DECIMAL_TO_THRIFT(16);
+ DECIMAL_TO_PROTOBUF(16);
break;
default:
DCHECK(false) << "DecimalMinMaxFilter: Unknown decimal byte size: " << size_;
}
- thrift->__isset.min = true;
- thrift->__isset.max = true;
}
- thrift->__set_always_false(always_false_);
- thrift->__set_always_true(false);
+ protobuf->set_always_false(always_false_);
+ protobuf->set_always_true(false);
}
void DecimalMinMaxFilter::Insert(void* val) {
@@ -514,25 +497,24 @@
return out.str();
}
-#define DECIMAL_OR(SIZE) \
- do { \
- if (Decimal##SIZE##Value::FromTColumnValue(in.min) \
- < Decimal##SIZE##Value::FromTColumnValue(out->min)) \
- out->min.__set_decimal_val(in.min.decimal_val); \
- if (Decimal##SIZE##Value::FromTColumnValue(in.max) \
- > Decimal##SIZE##Value::FromTColumnValue(out->max)) \
- out->max.__set_decimal_val(in.max.decimal_val); \
+#define DECIMAL_OR(SIZE) \
+ do { \
+ if (Decimal##SIZE##Value::FromColumnValuePB(in.min()) \
+ < Decimal##SIZE##Value::FromColumnValuePB(out->min())) \
+ out->mutable_min()->set_decimal_val(in.min().decimal_val()); \
+ if (Decimal##SIZE##Value::FromColumnValuePB(in.max()) \
+ > Decimal##SIZE##Value::FromColumnValuePB(out->max())) \
+ out->mutable_max()->set_decimal_val(in.max().decimal_val()); \
} while (false)
-void DecimalMinMaxFilter::Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision) {
- if (in.always_false) {
+void DecimalMinMaxFilter::Or(
+ const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision) {
+ if (in.always_false()) {
return;
- } else if (out->always_false) {
- out->min.__set_decimal_val(in.min.decimal_val);
- out->__isset.min = true;
- out->max.__set_decimal_val(in.max.decimal_val);
- out->__isset.max = true;
- out->__set_always_false(false);
+ } else if (out->always_false()) {
+ out->mutable_min()->set_decimal_val(in.min().decimal_val());
+ out->mutable_max()->set_decimal_val(in.max().decimal_val());
+ out->set_always_false(false);
} else {
int size = ColumnType::GetDecimalByteSize(precision);
switch (size) {
@@ -551,11 +533,9 @@
}
}
-void DecimalMinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
- out->min.__set_decimal_val(in.min.decimal_val);
- out->__isset.min = true;
- out->max.__set_decimal_val(in.max.decimal_val);
- out->__isset.max = true;
+void DecimalMinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ out->mutable_min()->set_decimal_val(in.min().decimal_val());
+ out->mutable_max()->set_decimal_val(in.max().decimal_val());
}
// MinMaxFilter
@@ -595,29 +575,29 @@
return nullptr;
}
-MinMaxFilter* MinMaxFilter::Create(const TMinMaxFilter& thrift, ColumnType type,
+MinMaxFilter* MinMaxFilter::Create(const MinMaxFilterPB& protobuf, ColumnType type,
ObjectPool* pool, MemTracker* mem_tracker) {
switch (type.type) {
case PrimitiveType::TYPE_BOOLEAN:
- return pool->Add(new BoolMinMaxFilter(thrift));
+ return pool->Add(new BoolMinMaxFilter(protobuf));
case PrimitiveType::TYPE_TINYINT:
- return pool->Add(new TinyIntMinMaxFilter(thrift));
+ return pool->Add(new TinyIntMinMaxFilter(protobuf));
case PrimitiveType::TYPE_SMALLINT:
- return pool->Add(new SmallIntMinMaxFilter(thrift));
+ return pool->Add(new SmallIntMinMaxFilter(protobuf));
case PrimitiveType::TYPE_INT:
- return pool->Add(new IntMinMaxFilter(thrift));
+ return pool->Add(new IntMinMaxFilter(protobuf));
case PrimitiveType::TYPE_BIGINT:
- return pool->Add(new BigIntMinMaxFilter(thrift));
+ return pool->Add(new BigIntMinMaxFilter(protobuf));
case PrimitiveType::TYPE_FLOAT:
- return pool->Add(new FloatMinMaxFilter(thrift));
+ return pool->Add(new FloatMinMaxFilter(protobuf));
case PrimitiveType::TYPE_DOUBLE:
- return pool->Add(new DoubleMinMaxFilter(thrift));
+ return pool->Add(new DoubleMinMaxFilter(protobuf));
case PrimitiveType::TYPE_STRING:
- return pool->Add(new StringMinMaxFilter(thrift, mem_tracker));
+ return pool->Add(new StringMinMaxFilter(protobuf, mem_tracker));
case PrimitiveType::TYPE_TIMESTAMP:
- return pool->Add(new TimestampMinMaxFilter(thrift));
+ return pool->Add(new TimestampMinMaxFilter(protobuf));
case PrimitiveType::TYPE_DECIMAL:
- return pool->Add(new DecimalMinMaxFilter(thrift, type.precision));
+ return pool->Add(new DecimalMinMaxFilter(protobuf, type.precision));
default:
DCHECK(false) << "Unsupported MinMaxFilter type: " << type;
}
@@ -625,93 +605,93 @@
}
void MinMaxFilter::Or(
- const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType) {
- if (in.always_false || out->always_true) return;
- if (in.always_true) {
- out->__set_always_true(true);
+ const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType) {
+ if (in.always_false() || out->always_true()) return;
+ if (in.always_true()) {
+ out->set_always_true(true);
return;
}
- if (in.min.__isset.bool_val) {
- DCHECK(out->min.__isset.bool_val);
+ if (in.min().has_bool_val()) {
+ DCHECK(out->min().has_bool_val());
BoolMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.byte_val) {
- DCHECK(out->min.__isset.byte_val);
+ } else if (in.min().has_byte_val()) {
+ DCHECK(out->min().has_byte_val());
TinyIntMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.short_val) {
- DCHECK(out->min.__isset.short_val);
+ } else if (in.min().has_short_val()) {
+ DCHECK(out->min().has_short_val());
SmallIntMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.int_val) {
- DCHECK(out->min.__isset.int_val);
+ } else if (in.min().has_int_val()) {
+ DCHECK(out->min().has_int_val());
IntMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.long_val) {
- DCHECK(out->min.__isset.long_val);
+ } else if (in.min().has_long_val()) {
+ DCHECK(out->min().has_long_val());
BigIntMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.double_val) {
+ } else if (in.min().has_double_val()) {
// Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
- DCHECK(out->min.__isset.double_val);
+ DCHECK(out->min().has_double_val());
DoubleMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.string_val) {
- DCHECK(out->min.__isset.string_val);
+ } else if (in.min().has_string_val()) {
+ DCHECK(out->min().has_string_val());
StringMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.timestamp_val) {
- DCHECK(out->min.__isset.timestamp_val);
+ } else if (in.min().has_timestamp_val()) {
+ DCHECK(out->min().has_timestamp_val());
TimestampMinMaxFilter::Or(in, out);
return;
- } else if (in.min.__isset.decimal_val) {
- DCHECK(out->min.__isset.decimal_val);
+ } else if (in.min().has_decimal_val()) {
+ DCHECK(out->min().has_decimal_val());
DecimalMinMaxFilter::Or(in, out, columnType.precision);
return;
}
DCHECK(false) << "Unsupported MinMaxFilter type.";
}
-void MinMaxFilter::Copy(const TMinMaxFilter& in, TMinMaxFilter* out) {
- out->__set_always_false(in.always_false);
- out->__set_always_true(in.always_true);
- if (in.always_false || in.always_true) return;
- if (in.min.__isset.bool_val) {
- DCHECK(!out->min.__isset.bool_val);
+void MinMaxFilter::Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out) {
+ out->set_always_false(in.always_false());
+ out->set_always_true(in.always_true());
+ if (in.always_false() || in.always_true()) return;
+ if (in.min().has_bool_val()) {
+ DCHECK(!out->min().has_bool_val());
BoolMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.byte_val) {
- DCHECK(!out->min.__isset.byte_val);
+ } else if (in.min().has_byte_val()) {
+ DCHECK(!out->min().has_byte_val());
TinyIntMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.short_val) {
- DCHECK(!out->min.__isset.short_val);
+ } else if (in.min().has_short_val()) {
+ DCHECK(!out->min().has_short_val());
SmallIntMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.int_val) {
- DCHECK(!out->min.__isset.int_val);
+ } else if (in.min().has_int_val()) {
+ DCHECK(!out->min().has_int_val());
IntMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.long_val) {
- // Handles TimestampMinMaxFilter also as TColumnValue doesn't have a timestamp type.
- DCHECK(!out->min.__isset.long_val);
+ } else if (in.min().has_long_val()) {
+ // Handles TimestampMinMaxFilter also as ColumnValuePB doesn't have a timestamp type.
+ DCHECK(!out->min().has_long_val());
BigIntMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.double_val) {
- // Handles FloatMinMaxFilter also as TColumnValue doesn't have a float type.
- DCHECK(!out->min.__isset.double_val);
+ } else if (in.min().has_double_val()) {
+ // Handles FloatMinMaxFilter also as ColumnValuePB doesn't have a float type.
+ DCHECK(!out->min().has_double_val());
DoubleMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.string_val) {
- DCHECK(!out->min.__isset.string_val);
+ } else if (in.min().has_string_val()) {
+ DCHECK(!out->min().has_string_val());
StringMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.timestamp_val) {
- DCHECK(!out->min.__isset.timestamp_val);
+ } else if (in.min().has_timestamp_val()) {
+ DCHECK(!out->min().has_timestamp_val());
TimestampMinMaxFilter::Copy(in, out);
return;
- } else if (in.min.__isset.decimal_val) {
- DCHECK(!out->min.__isset.decimal_val);
+ } else if (in.min().has_decimal_val()) {
+ DCHECK(!out->min().has_decimal_val());
DecimalMinMaxFilter::Copy(in, out);
return;
}
diff --git a/be/src/util/min-max-filter.h b/be/src/util/min-max-filter.h
index 82cf54c..ee08894 100644
--- a/be/src/util/min-max-filter.h
+++ b/be/src/util/min-max-filter.h
@@ -74,25 +74,25 @@
/// until this is called.
virtual void MaterializeValues() {}
- /// Convert this filter to a thrift representation.
- virtual void ToThrift(TMinMaxFilter* thrift) const = 0;
+ /// Convert this filter to a protobuf representation.
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const = 0;
virtual std::string DebugString() const = 0;
/// Returns a new MinMaxFilter with the given type, allocated from 'mem_tracker'.
static MinMaxFilter* Create(ColumnType type, ObjectPool* pool, MemTracker* mem_tracker);
- /// Returns a new MinMaxFilter created from the thrift representation, allocated from
+ /// Returns a new MinMaxFilter created from the protobuf representation, allocated from
/// 'mem_tracker'.
- static MinMaxFilter* Create(const TMinMaxFilter& thrift, ColumnType type,
+ static MinMaxFilter* Create(const MinMaxFilterPB& protobuf, ColumnType type,
ObjectPool* pool, MemTracker* mem_tracker);
/// Computes the logical OR of 'in' with 'out' and stores the result in 'out'.
static void Or(
- const TMinMaxFilter& in, TMinMaxFilter* out, const ColumnType& columnType);
+ const MinMaxFilterPB& in, MinMaxFilterPB* out, const ColumnType& columnType);
/// Copies the contents of 'in' into 'out'.
- static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
/// Returns the LLVM_CLASS_NAME for the given type.
static std::string GetLlvmClassName(PrimitiveType type);
@@ -108,7 +108,7 @@
min_ = std::numeric_limits<TYPE>::max(); \
max_ = std::numeric_limits<TYPE>::lowest(); \
} \
- NAME##MinMaxFilter(const TMinMaxFilter& thrift); \
+ NAME##MinMaxFilter(const MinMaxFilterPB& protobuf); \
virtual ~NAME##MinMaxFilter() {} \
virtual void* GetMin() override { return &min_; } \
virtual void* GetMax() override { return &max_; } \
@@ -121,10 +121,10 @@
return min_ == std::numeric_limits<TYPE>::max() \
&& max_ == std::numeric_limits<TYPE>::lowest(); \
} \
- virtual void ToThrift(TMinMaxFilter* thrift) const override; \
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override; \
virtual std::string DebugString() const override; \
- static void Or(const TMinMaxFilter& in, TMinMaxFilter* out); \
- static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out); \
+ static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out); \
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out); \
static const char* LLVM_CLASS_NAME; \
\
private: \
@@ -148,7 +148,7 @@
max_buffer_(&mem_pool_),
always_false_(true),
always_true_(false) {}
- StringMinMaxFilter(const TMinMaxFilter& thrift, MemTracker* mem_tracker);
+ StringMinMaxFilter(const MinMaxFilterPB& protobuf, MemTracker* mem_tracker);
virtual ~StringMinMaxFilter() {}
virtual void Close() override { mem_pool_.FreeAll(); }
@@ -164,11 +164,11 @@
/// truncating them if necessary.
virtual void MaterializeValues() override;
- virtual void ToThrift(TMinMaxFilter* thrift) const override;
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
virtual std::string DebugString() const override;
- static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
- static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
/// Struct name in LLVM IR.
static const char* LLVM_CLASS_NAME;
@@ -208,7 +208,7 @@
class TimestampMinMaxFilter : public MinMaxFilter {
public:
TimestampMinMaxFilter() { always_false_ = true; }
- TimestampMinMaxFilter(const TMinMaxFilter& thrift);
+ TimestampMinMaxFilter(const MinMaxFilterPB& protobuf);
virtual ~TimestampMinMaxFilter() {}
virtual void* GetMin() override { return &min_; }
@@ -218,11 +218,11 @@
virtual void Insert(void* val) override;
virtual bool AlwaysTrue() const override { return false; }
virtual bool AlwaysFalse() const override { return always_false_; }
- virtual void ToThrift(TMinMaxFilter* thrift) const override;
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
virtual std::string DebugString() const override;
- static void Or(const TMinMaxFilter& in, TMinMaxFilter* out);
- static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out);
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
/// Struct name in LLVM IR.
static const char* LLVM_CLASS_NAME;
@@ -269,7 +269,7 @@
DecimalMinMaxFilter(int precision)
: size_(ColumnType::GetDecimalByteSize(precision)), always_false_(true) {}
- DecimalMinMaxFilter(const TMinMaxFilter& thrift, int precision);
+ DecimalMinMaxFilter(const MinMaxFilterPB& protobuf, int precision);
virtual ~DecimalMinMaxFilter() {}
virtual void* GetMin() override {
@@ -286,11 +286,11 @@
virtual PrimitiveType type() override;
virtual bool AlwaysTrue() const override { return false; }
virtual bool AlwaysFalse() const override { return always_false_; }
- virtual void ToThrift(TMinMaxFilter* thrift) const override;
+ virtual void ToProtobuf(MinMaxFilterPB* protobuf) const override;
virtual std::string DebugString() const override;
- static void Or(const TMinMaxFilter& in, TMinMaxFilter* out, int precision);
- static void Copy(const TMinMaxFilter& in, TMinMaxFilter* out);
+ static void Or(const MinMaxFilterPB& in, MinMaxFilterPB* out, int precision);
+ static void Copy(const MinMaxFilterPB& in, MinMaxFilterPB* out);
void Insert4(void* val);
void Insert8(void* val);
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 0bf2a97..6ec84b4 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -45,3 +45,20 @@
NONE = 0; // No compression.
LZ4 = 1;
}
+
+// This is a union over all possible return types.
+// TODO: if we upgrade to proto3, then we can use the oneof feature in Protobuf 3 in
+// the following to save some memory because only one of the fields below is set at a
+// time.
+message ColumnValuePB {
+ optional bool bool_val = 1;
+ optional int32 byte_val = 6;
+ optional int32 short_val = 7;
+ optional int32 int_val = 2;
+ optional int64 long_val = 3;
+ optional double double_val = 4;
+ optional string string_val = 5;
+ optional string binary_val = 8;
+ optional string timestamp_val = 9;
+ optional bytes decimal_val = 10;
+}
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index b0e2b5d..708b814 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -78,6 +78,77 @@
optional int64 receiver_latency_ns = 2;
}
+message BloomFilterPB {
+ // Log_2 of the bufferpool space required for this filter.
+ // See BloomFilter::BloomFilter() for details.
+ optional int32 log_bufferpool_space = 1;
+
+ // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
+ // not meaningful.
+ optional bool always_true = 2;
+ optional bool always_false = 3;
+
+ // The sidecar index associated with the directory of a Bloom filter.
+ // A directory is a list of buckets representing the Bloom Filter contents,
+ // laid out contiguously in one string for efficiency of (de)serialisation.
+ // See BloomFilter::Bucket and BloomFilter::directory_.
+ optional int32 directory_sidecar_idx = 4;
+}
+
+message MinMaxFilterPB {
+ // If true, filter allows all elements to pass and 'min'/'max' will not be set.
+ optional bool always_true = 1;
+
+ // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
+ optional bool always_false = 2;
+
+ optional ColumnValuePB min = 3;
+ optional ColumnValuePB max = 4;
+}
+
+message UpdateFilterParamsPB {
+ // Filter ID, unique within a query.
+ optional int32 filter_id = 1;
+
+ // Query that this filter is for.
+ optional UniqueIdPB query_id = 2;
+
+ optional BloomFilterPB bloom_filter = 3;
+
+ optional MinMaxFilterPB min_max_filter = 4;
+}
+
+message UpdateFilterResultPB {
+ optional StatusPB status = 1;
+
+ // Latency for response in the receiving daemon in nanoseconds.
+ optional int64 receiver_latency_ns = 2;
+}
+
+message PublishFilterParamsPB {
+ // Filter ID, unique within a query.
+ optional int32 filter_id = 1;
+
+ // Query that this filter is for.
+ optional UniqueIdPB dst_query_id = 2;
+
+ // Index of fragment to receive this filter
+ optional int32 dst_fragment_idx = 3;
+
+ // Actual bloom_filter payload
+ optional BloomFilterPB bloom_filter = 4;
+
+ // Actual min_max_filter payload
+ optional MinMaxFilterPB min_max_filter = 5;
+}
+
+message PublishFilterResultPB {
+ optional StatusPB status = 1;
+
+ // Latency for response in the receiving daemon in nanoseconds.
+ optional int64 receiver_latency_ns = 2;
+}
+
// Handles data transmission between fragment instances.
service DataStreamService {
// Override the default authorization method.
@@ -90,4 +161,12 @@
// Called by a sender to close the channel between fragment instances.
rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
+
+ // Called by fragment instances that produce local runtime filters to deliver them to
+ // the coordinator for aggregation and broadcast.
+ rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
+
+ // Called by the coordinator to deliver global runtime filters to fragments for
+ // application at plan nodes.
+ rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
}
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 55eb0cf..cbe3f50 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -766,82 +766,6 @@
11: required i64 max_memory_multiple = 0;
}
-struct TBloomFilter {
- // Log_2 of the bufferpool space required for this filter.
- // See BloomFilter::BloomFilter() for details.
- 1: required i32 log_bufferpool_space
-
- // List of buckets representing the Bloom Filter contents, laid out contiguously in one
- // string for efficiency of (de)serialisation. See BloomFilter::Bucket and
- // BloomFilter::directory_.
- 2: binary directory
-
- // If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
- // not meaningful.
- 3: required bool always_true
- 4: required bool always_false
-}
-
-struct TMinMaxFilter {
- // If true, filter allows all elements to pass and 'min'/'max' will not be set.
- 1: required bool always_true
-
- // If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
- 2: required bool always_false
-
- 3: optional Data.TColumnValue min
- 4: optional Data.TColumnValue max
-}
-
-// UpdateFilter
-
-struct TUpdateFilterParams {
- 1: required ImpalaInternalServiceVersion protocol_version
-
- // Filter ID, unique within a query.
- // required in V1
- 2: optional i32 filter_id
-
- // Query that this filter is for.
- // required in V1
- 3: optional Types.TUniqueId query_id
-
- // required in V1
- 4: optional TBloomFilter bloom_filter
-
- 5: optional TMinMaxFilter min_max_filter
-}
-
-struct TUpdateFilterResult {
-}
-
-
-// PublishFilter
-
-struct TPublishFilterParams {
- 1: required ImpalaInternalServiceVersion protocol_version
-
- // Filter ID to update
- // required in V1
- 2: optional i32 filter_id
-
- // required in V1
- 3: optional Types.TUniqueId dst_query_id
-
- // Index of fragment to receive this filter
- // required in V1
- 4: optional Types.TFragmentIdx dst_fragment_idx
-
- // Actual bloom_filter payload
- // required in V1
- 5: optional TBloomFilter bloom_filter
-
- 6: optional TMinMaxFilter min_max_filter
-}
-
-struct TPublishFilterResult {
-}
-
struct TParseDateStringResult {
// True iff date string was successfully parsed
1: required bool valid
@@ -854,11 +778,4 @@
service ImpalaInternalService {
- // Called by fragment instances that produce local runtime filters to deliver them to
- // the coordinator for aggregation and broadcast.
- TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
-
- // Called by the coordinator to deliver global runtime filters to fragments for
- // application at plan nodes.
- TPublishFilterResult PublishFilter(1:TPublishFilterParams params);
}