blob: 8ed1e386649f02698b681573bad7bb54311e8406 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/runtime-filter-bank.h"
#include <chrono>
#include <boost/algorithm/string/join.hpp>
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/data_stream_service.proxy.h"
#include "gutil/strings/substitute.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/backend-client.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "service/data-stream-service.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
#include "util/debug-util.h"
#include "util/min-max-filter.h"
#include "util/pretty-printer.h"
#include "util/uid-util.h"
#include "common/names.h"
using kudu::rpc::RpcContext;
using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
using namespace impala;
using namespace strings;
DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The target false positive "
"probability used to determine the ideal size for each bloom filter size. This value "
"can be overriden by the RUNTIME_FILTER_ERROR_RATE query option.");
const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
RuntimeFilterBank::RuntimeFilterBank(QueryState* query_state,
const unordered_map<int32_t, FilterRegistration>& filters,
long total_filter_mem_required)
: filters_(BuildFilterMap(filters, &obj_pool_)),
filter_mem_tracker_(query_state->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter Bank", query_state->query_mem_tracker(), false))),
query_state->host_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES)),
total_bloom_filter_mem_required_(total_filter_mem_required) {}
RuntimeFilterBank::~RuntimeFilterBank() {}
unordered_map<int32_t, unique_ptr<RuntimeFilterBank::PerFilterState>>
const unordered_map<int32_t, FilterRegistration>& filters, ObjectPool* obj_pool) {
unordered_map<int32_t, unique_ptr<PerFilterState>> result;
for (auto& entry : filters) {
const FilterRegistration reg = entry.second;
RuntimeFilter* result_filter = nullptr;
RuntimeFilter* consumed_filter = nullptr;
if (reg.has_consumer) {
VLOG(3) << "registered consumer filter " << reg.desc.filter_id;
consumed_filter =
obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
if (reg.num_producers > 0) {
result_filter =
obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
make_unique<PerFilterState>(reg.num_producers, result_filter, consumed_filter));
return result;
Status RuntimeFilterBank::ClaimBufferReservation() {
string filter_bank_name =
Substitute("RuntimeFilterBank (Query Id: $0)", PrintId(query_state_->query_id()));
query_state_->file_group(), query_state_->buffer_reservation(),
filter_mem_tracker_, total_bloom_filter_mem_required_,
query_state_->host_profile(), &buffer_pool_client_));
VLOG_FILE << filter_bank_name << " claiming reservation "
<< total_bloom_filter_mem_required_;
&buffer_pool_client_, total_bloom_filter_mem_required_);
return Status::OK();
RuntimeFilter* RuntimeFilterBank::RegisterProducer(
const TRuntimeFilterDesc& filter_desc) {
auto it = filters_.find(filter_desc.filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_desc.filter_id
<< " not registered";
PerFilterState* fs = it->second.get();
DCHECK(fs->produced_filter.result_filter != nullptr);
RuntimeFilter* ret = fs->produced_filter.result_filter;
DCHECK_EQ(filter_desc.filter_size_bytes, ret->filter_size());
return ret;
RuntimeFilter* RuntimeFilterBank::RegisterConsumer(
const TRuntimeFilterDesc& filter_desc) {
auto it = filters_.find(filter_desc.filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_desc.filter_id
<< " not registered";
PerFilterState* fs = it->second.get();
DCHECK(fs->consumed_filter != nullptr)
<< "Consumed filters must be created in constructor";
VLOG(3) << "Consumer registered for filter " << filter_desc.filter_id;
DCHECK_EQ(filter_desc.filter_size_bytes, fs->consumed_filter->filter_size());
return fs->consumed_filter;
void RuntimeFilterBank::UpdateFilterCompleteCb(
const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
const kudu::Status controller_status = rpc_controller->status();
// In the case of an unsuccessful KRPC call, e.g., request dropped due to
// backpressure, we only log this event w/o retrying. Failing to send a
// filter is not a query-wide error - the remote fragment will continue
// regardless.
if (!controller_status.ok()) {
LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
// DataStreamService::UpdateFilter() should never set an error status
DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
DCHECK_GT(num_inflight_rpcs_, 0);
void RuntimeFilterBank::UpdateFilterFromLocal(
int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
DCHECK_NE(query_state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
<< "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
// This function is only called from ExecNode::Open() or more specifically
// PartitionedHashJoinNode::Open().
// A runtime filter may have both local and remote targets.
bool has_local_target = false;
bool has_remote_target = false;
RuntimeFilter* complete_filter = nullptr; // Set if the filter should be sent out.
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Tried to update unregistered filter: " << filter_id;
PerFilterState* fs = it->second.get();
unique_lock<SpinLock> l(fs->lock);
ProducedFilter& produced_filter = fs->produced_filter;
RuntimeFilter* result_filter = produced_filter.result_filter;
DCHECK(result_filter != nullptr)
<< "Tried to update unregistered filter: " << filter_id;
DCHECK_GT(produced_filter.pending_producers, 0);
if (result_filter->filter_desc().is_broadcast_join) {
// For broadcast joins, the first filter to arrived is used, and the rest are
// ignored (because they should have identical contents).
if (result_filter->HasFilter()) {
// Don't need to merge, the previous broadcast filter contained all values.
VLOG(3) << "Dropping redundant broadcast filter " << filter_id;
VLOG(3) << "Setting broadcast filter " << filter_id;
result_filter->SetFilter(bloom_filter, min_max_filter);
complete_filter = result_filter;
} else {
// Merge partitioned join filters in parallel - each thread setting the filter will
// try to merge its filter with a previously merged filter, looping until either
// it has produced the final filter or it runs out of other filters to merge.
unique_ptr<RuntimeFilter> tmp_filter = make_unique<RuntimeFilter>(
result_filter->filter_desc(), result_filter->filter_size());
tmp_filter->SetFilter(bloom_filter, min_max_filter);
while (produced_filter.pending_merge_filter != nullptr) {
unique_ptr<RuntimeFilter> pending_merge =
// Drop the lock while doing the merge so that other merges can proceed in
// parallel.
VLOG(3) << "Merging partitioned join filter " << filter_id;
// At this point, either we've merged all the filters or we're waiting for more
// filters.
if (produced_filter.pending_producers > 1) {
// A subsequent caller of UpdateFilterFromLocal() is responsible for merging this
// filter into the final one.
produced_filter.pending_merge_filter = std::move(tmp_filter);
} else {
// Everything was merged into 'tmp_filter'. It is therefore the result filter.
complete_filter = result_filter;
VLOG(3) << "Partitioned join filter " << filter_id << " is locally complete.";
int remaining_producers = --produced_filter.pending_producers;
VLOG(3) << "Filter " << filter_id << " updated. " << remaining_producers
<< " producers left on the backend.";
DCHECK(remaining_producers > 0 || result_filter != nullptr);
has_local_target = result_filter->filter_desc().has_local_targets;
has_remote_target = result_filter->filter_desc().has_remote_targets;
if (complete_filter != nullptr && has_local_target) {
// Do a short circuit publication by pushing the same filter to the consumer side.
RuntimeFilter* consumed_filter;
lock_guard<SpinLock> l(fs->lock);
if (fs->consumed_filter == nullptr) return;
consumed_filter = fs->consumed_filter;
// Update the filter while still holding the lock to avoid racing with the
// SetFilter() call in PublishGlobalFilter().
if (consumed_filter->HasFilter()) {
// Multiple instances may produce the same filter for broadcast joins.
// TODO: we would ideally update the coordinator logic to avoid creating duplicates
// on the same node, but sending out a few duplicate filters is relatively
// inconsequential for performance.
<< consumed_filter->filter_desc();
} else {
Substitute("Filter $0 arrival", filter_id),
PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
if (complete_filter != nullptr && has_remote_target &&
query_state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
UpdateFilterParamsPB params;
// The memory associated with the following 2 objects needs to live until
// the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
// Hence, we allocate these 2 objects in 'obj_pool_'.
UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
RpcController* controller = obj_pool_.Add(new RpcController);
TUniqueIdToUniqueIdPB(query_state_->query_id(), params.mutable_query_id());
TRuntimeFilterType::type type = complete_filter->filter_desc().type;
if (type == TRuntimeFilterType::BLOOM) {
BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
} else {
DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_krpc_address;
const TNetworkAddress& host_address = query_state_->query_ctx().coord_address;
// Use 'proxy' to send the filter to the coordinator.
unique_ptr<DataStreamServiceProxy> proxy;
Status get_proxy_status =
DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
if (!get_proxy_status.ok()) {
// Failing to send a filter is not a query-wide error - the remote fragment will
// continue regardless.
LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
host_address.hostname, get_proxy_status.msg().msg());
// Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
// in Close() until all in-flight RPCs complete.
unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
DCHECK_GE(num_inflight_rpcs_, 0);
proxy->UpdateFilterAsync(params, res, controller,
boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
void RuntimeFilterBank::PublishGlobalFilter(
const PublishFilterParamsPB& params, RpcContext* context) {
VLOG(3) << "PublishGlobalFilter(filter_id=" << params.filter_id() << ")";
auto it = filters_.find(params.filter_id());
DCHECK(it != filters_.end()) << "Filter ID " << params.filter_id() << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return;
if (fs->consumed_filter->HasFilter()) {
// The filter routing in the Coordinator sometimes can redundantly send broadcast
// filters that were already produced on this backend and consumed locally.
// It is safe to drop the filter because we already have a filter with the same
// contents.
<< "Got duplicate partitioned join filter";
BloomFilter* bloom_filter = nullptr;
MinMaxFilter* min_max_filter = nullptr;
if (fs->consumed_filter->is_bloom_filter()) {
if (params.bloom_filter().always_true()) {
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
<< "BufferPool Client should have enough reservation to fulfill bloom filter "
bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
kudu::Slice sidecar_slice;
if (params.bloom_filter().has_directory_sidecar_idx()) {
kudu::Status status = context->GetInboundSidecar(
params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
if (!status.ok()) {
LOG(ERROR) << "Failed to get Bloom filter sidecar: "
<< status.message().ToString();
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
Status status = bloom_filter->Init(params.bloom_filter(),,
sidecar_slice.size(), DefaultHashSeed());
if (!status.ok()) {
LOG(ERROR) << "Unable to allocate memory for bloom filter: "
<< status.GetDetail();
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
} else {
min_max_filter = MinMaxFilter::Create(params.min_max_filter(),
fs->consumed_filter->type(), &obj_pool_, filter_mem_tracker_);
fs->consumed_filter->SetFilter(bloom_filter, min_max_filter);
Substitute("Filter $0 arrival", params.filter_id()),
PrettyPrinter::Print(fs->consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return nullptr;
// Track required space
int64_t log_filter_size =
int64_t required_space = BloomFilter::GetExpectedMemoryUsed(log_filter_size);
DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
<< "BufferPool Client should have enough reservation to fulfill bloom filter "
BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
Status status = bloom_filter->Init(log_filter_size, DefaultHashSeed());
if (!status.ok()) {
LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
return nullptr;
DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
return bloom_filter;
MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
int32_t filter_id, ColumnType type) {
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return nullptr;
MinMaxFilter* min_max_filter =
MinMaxFilter::Create(type, &obj_pool_, filter_mem_tracker_);
return min_max_filter;
vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
vector<unique_lock<SpinLock>> locks;
for (auto& entry : filters_) locks.emplace_back(entry.second->lock);
return locks;
void RuntimeFilterBank::Cancel() {
auto all_locks = LockAllFilters();
void RuntimeFilterBank::CancelLocked() {
// IMPALA-9730: if no filters are present, we did not acquire any filter locks. Avoid a
// TSAN data race on 'cancelled_' by exiting early.
if (filters_.empty() || cancelled_) return;
// Cancel all filters that a thread might be waiting on.
for (auto& entry : filters_) {
if (entry.second->consumed_filter != nullptr) entry.second->consumed_filter->Cancel();
cancelled_ = true;
void RuntimeFilterBank::Close() {
// Wait for all in-flight RPCs to complete before closing the filters.
unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
while (num_inflight_rpcs_ > 0) {
auto all_locks = LockAllFilters();
// We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
// drain because the async build thread in
// BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
// Close() is called so there shouldn't be any new RPCs being issued when this function
// is called.
if (closed_) return;
closed_ = true;
for (auto& entry : filters_) {
for (BloomFilter* filter : entry.second->bloom_filters) filter->Close();
for (MinMaxFilter* filter : entry.second->min_max_filters) filter->Close();
if (buffer_pool_client_.is_registered()) {
VLOG_FILE << "RuntimeFilterBank (Query Id: " << PrintId(query_state_->query_id())
<< ") returning reservation " << total_bloom_filter_mem_required_;
&buffer_pool_client_, total_bloom_filter_mem_required_);
DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
int pending_producers, RuntimeFilter* result_filter)
: result_filter(result_filter), pending_producers(pending_producers) {}
int pending_producers, RuntimeFilter* result_filter, RuntimeFilter* consumed_filter)
: produced_filter(pending_producers, result_filter), consumed_filter(consumed_filter) {}