blob: 8ed1e386649f02698b681573bad7bb54311e8406 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/runtime-filter-bank.h"
#include <chrono>
#include <boost/algorithm/string/join.hpp>
#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/data_stream_service.proxy.h"
#include "gutil/strings/substitute.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "runtime/backend-client.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
#include "runtime/initial-reservations.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "service/data-stream-service.h"
#include "service/impala-server.h"
#include "util/bit-util.h"
#include "util/bloom-filter.h"
#include "util/debug-util.h"
#include "util/min-max-filter.h"
#include "util/pretty-printer.h"
#include "util/uid-util.h"
#include "common/names.h"
using kudu::rpc::RpcContext;
using kudu::rpc::RpcController;
using kudu::rpc::RpcSidecar;
using namespace impala;
using namespace strings;
DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The target false positive "
"probability used to determine the ideal size for each bloom filter size. This value "
"can be overriden by the RUNTIME_FILTER_ERROR_RATE query option.");
const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
RuntimeFilterBank::RuntimeFilterBank(QueryState* query_state,
const unordered_map<int32_t, FilterRegistration>& filters,
long total_filter_mem_required)
: filters_(BuildFilterMap(filters, &obj_pool_)),
query_state_(query_state),
filter_mem_tracker_(query_state->obj_pool()->Add(new MemTracker(
-1, "Runtime Filter Bank", query_state->query_mem_tracker(), false))),
bloom_memory_allocated_(
query_state->host_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES)),
total_bloom_filter_mem_required_(total_filter_mem_required) {}
RuntimeFilterBank::~RuntimeFilterBank() {}
unordered_map<int32_t, unique_ptr<RuntimeFilterBank::PerFilterState>>
RuntimeFilterBank::BuildFilterMap(
const unordered_map<int32_t, FilterRegistration>& filters, ObjectPool* obj_pool) {
unordered_map<int32_t, unique_ptr<PerFilterState>> result;
for (auto& entry : filters) {
const FilterRegistration reg = entry.second;
RuntimeFilter* result_filter = nullptr;
RuntimeFilter* consumed_filter = nullptr;
if (reg.has_consumer) {
VLOG(3) << "registered consumer filter " << reg.desc.filter_id;
consumed_filter =
obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
}
if (reg.num_producers > 0) {
result_filter =
obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
}
result.emplace(entry.first,
make_unique<PerFilterState>(reg.num_producers, result_filter, consumed_filter));
}
return result;
}
Status RuntimeFilterBank::ClaimBufferReservation() {
DCHECK(!buffer_pool_client_.is_registered());
string filter_bank_name =
Substitute("RuntimeFilterBank (Query Id: $0)", PrintId(query_state_->query_id()));
RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(filter_bank_name,
query_state_->file_group(), query_state_->buffer_reservation(),
filter_mem_tracker_, total_bloom_filter_mem_required_,
query_state_->host_profile(), &buffer_pool_client_));
VLOG_FILE << filter_bank_name << " claiming reservation "
<< total_bloom_filter_mem_required_;
query_state_->initial_reservations()->Claim(
&buffer_pool_client_, total_bloom_filter_mem_required_);
return Status::OK();
}
RuntimeFilter* RuntimeFilterBank::RegisterProducer(
const TRuntimeFilterDesc& filter_desc) {
auto it = filters_.find(filter_desc.filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_desc.filter_id
<< " not registered";
PerFilterState* fs = it->second.get();
DCHECK(fs->produced_filter.result_filter != nullptr);
RuntimeFilter* ret = fs->produced_filter.result_filter;
DCHECK_EQ(filter_desc.filter_size_bytes, ret->filter_size());
return ret;
}
RuntimeFilter* RuntimeFilterBank::RegisterConsumer(
const TRuntimeFilterDesc& filter_desc) {
auto it = filters_.find(filter_desc.filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_desc.filter_id
<< " not registered";
PerFilterState* fs = it->second.get();
DCHECK(fs->consumed_filter != nullptr)
<< "Consumed filters must be created in constructor";
VLOG(3) << "Consumer registered for filter " << filter_desc.filter_id;
DCHECK_EQ(filter_desc.filter_size_bytes, fs->consumed_filter->filter_size());
return fs->consumed_filter;
}
void RuntimeFilterBank::UpdateFilterCompleteCb(
const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
const kudu::Status controller_status = rpc_controller->status();
// In the case of an unsuccessful KRPC call, e.g., request dropped due to
// backpressure, we only log this event w/o retrying. Failing to send a
// filter is not a query-wide error - the remote fragment will continue
// regardless.
if (!controller_status.ok()) {
LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
}
// DataStreamService::UpdateFilter() should never set an error status
DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
{
std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
DCHECK_GT(num_inflight_rpcs_, 0);
--num_inflight_rpcs_;
}
krpcs_done_cv_.notify_one();
}
void RuntimeFilterBank::UpdateFilterFromLocal(
int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
DCHECK_NE(query_state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
<< "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
// This function is only called from ExecNode::Open() or more specifically
// PartitionedHashJoinNode::Open().
DCHECK(!closed_);
// A runtime filter may have both local and remote targets.
bool has_local_target = false;
bool has_remote_target = false;
RuntimeFilter* complete_filter = nullptr; // Set if the filter should be sent out.
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Tried to update unregistered filter: " << filter_id;
PerFilterState* fs = it->second.get();
{
unique_lock<SpinLock> l(fs->lock);
ProducedFilter& produced_filter = fs->produced_filter;
RuntimeFilter* result_filter = produced_filter.result_filter;
DCHECK(result_filter != nullptr)
<< "Tried to update unregistered filter: " << filter_id;
DCHECK_GT(produced_filter.pending_producers, 0);
if (result_filter->filter_desc().is_broadcast_join) {
// For broadcast joins, the first filter to arrived is used, and the rest are
// ignored (because they should have identical contents).
if (result_filter->HasFilter()) {
// Don't need to merge, the previous broadcast filter contained all values.
VLOG(3) << "Dropping redundant broadcast filter " << filter_id;
return;
}
VLOG(3) << "Setting broadcast filter " << filter_id;
result_filter->SetFilter(bloom_filter, min_max_filter);
complete_filter = result_filter;
} else {
// Merge partitioned join filters in parallel - each thread setting the filter will
// try to merge its filter with a previously merged filter, looping until either
// it has produced the final filter or it runs out of other filters to merge.
unique_ptr<RuntimeFilter> tmp_filter = make_unique<RuntimeFilter>(
result_filter->filter_desc(), result_filter->filter_size());
tmp_filter->SetFilter(bloom_filter, min_max_filter);
while (produced_filter.pending_merge_filter != nullptr) {
unique_ptr<RuntimeFilter> pending_merge =
std::move(produced_filter.pending_merge_filter);
// Drop the lock while doing the merge so that other merges can proceed in
// parallel.
l.unlock();
VLOG(3) << "Merging partitioned join filter " << filter_id;
tmp_filter->Or(pending_merge.get());
l.lock();
}
// At this point, either we've merged all the filters or we're waiting for more
// filters.
if (produced_filter.pending_producers > 1) {
// A subsequent caller of UpdateFilterFromLocal() is responsible for merging this
// filter into the final one.
produced_filter.pending_merge_filter = std::move(tmp_filter);
} else {
// Everything was merged into 'tmp_filter'. It is therefore the result filter.
result_filter->SetFilter(tmp_filter.get());
complete_filter = result_filter;
VLOG(3) << "Partitioned join filter " << filter_id << " is locally complete.";
}
}
int remaining_producers = --produced_filter.pending_producers;
VLOG(3) << "Filter " << filter_id << " updated. " << remaining_producers
<< " producers left on the backend.";
DCHECK(remaining_producers > 0 || result_filter != nullptr);
has_local_target = result_filter->filter_desc().has_local_targets;
has_remote_target = result_filter->filter_desc().has_remote_targets;
}
if (complete_filter != nullptr && has_local_target) {
// Do a short circuit publication by pushing the same filter to the consumer side.
RuntimeFilter* consumed_filter;
{
lock_guard<SpinLock> l(fs->lock);
if (fs->consumed_filter == nullptr) return;
consumed_filter = fs->consumed_filter;
// Update the filter while still holding the lock to avoid racing with the
// SetFilter() call in PublishGlobalFilter().
if (consumed_filter->HasFilter()) {
// Multiple instances may produce the same filter for broadcast joins.
// TODO: we would ideally update the coordinator logic to avoid creating duplicates
// on the same node, but sending out a few duplicate filters is relatively
// inconsequential for performance.
DCHECK(consumed_filter->filter_desc().is_broadcast_join)
<< consumed_filter->filter_desc();
} else {
consumed_filter->SetFilter(complete_filter);
query_state_->host_profile()->AddInfoString(
Substitute("Filter $0 arrival", filter_id),
PrettyPrinter::Print(consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
}
}
}
if (complete_filter != nullptr && has_remote_target &&
query_state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
UpdateFilterParamsPB params;
// The memory associated with the following 2 objects needs to live until
// the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
// Hence, we allocate these 2 objects in 'obj_pool_'.
UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB);
RpcController* controller = obj_pool_.Add(new RpcController);
TUniqueIdToUniqueIdPB(query_state_->query_id(), params.mutable_query_id());
params.set_filter_id(filter_id);
TRuntimeFilterType::type type = complete_filter->filter_desc().type;
if (type == TRuntimeFilterType::BLOOM) {
BloomFilter::ToProtobuf(bloom_filter, controller, params.mutable_bloom_filter());
} else {
DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
min_max_filter->ToProtobuf(params.mutable_min_max_filter());
}
const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_krpc_address;
const TNetworkAddress& host_address = query_state_->query_ctx().coord_address;
// Use 'proxy' to send the filter to the coordinator.
unique_ptr<DataStreamServiceProxy> proxy;
Status get_proxy_status =
DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
if (!get_proxy_status.ok()) {
// Failing to send a filter is not a query-wide error - the remote fragment will
// continue regardless.
LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
host_address.hostname, get_proxy_status.msg().msg());
return;
}
// Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
// in Close() until all in-flight RPCs complete.
{
unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
DCHECK_GE(num_inflight_rpcs_, 0);
++num_inflight_rpcs_;
}
proxy->UpdateFilterAsync(params, res, controller,
boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
}
}
void RuntimeFilterBank::PublishGlobalFilter(
const PublishFilterParamsPB& params, RpcContext* context) {
VLOG(3) << "PublishGlobalFilter(filter_id=" << params.filter_id() << ")";
auto it = filters_.find(params.filter_id());
DCHECK(it != filters_.end()) << "Filter ID " << params.filter_id() << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return;
if (fs->consumed_filter->HasFilter()) {
// The filter routing in the Coordinator sometimes can redundantly send broadcast
// filters that were already produced on this backend and consumed locally.
// It is safe to drop the filter because we already have a filter with the same
// contents.
DCHECK(fs->consumed_filter->filter_desc().is_broadcast_join)
<< "Got duplicate partitioned join filter";
return;
}
BloomFilter* bloom_filter = nullptr;
MinMaxFilter* min_max_filter = nullptr;
if (fs->consumed_filter->is_bloom_filter()) {
DCHECK(params.has_bloom_filter());
if (params.bloom_filter().always_true()) {
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
int64_t required_space = BloomFilter::GetExpectedMemoryUsed(
params.bloom_filter().log_bufferpool_space());
DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
<< "BufferPool Client should have enough reservation to fulfill bloom filter "
"allocation";
bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
kudu::Slice sidecar_slice;
if (params.bloom_filter().has_directory_sidecar_idx()) {
kudu::Status status = context->GetInboundSidecar(
params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
if (!status.ok()) {
LOG(ERROR) << "Failed to get Bloom filter sidecar: "
<< status.message().ToString();
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
}
} else {
DCHECK(params.bloom_filter().always_false());
}
if (bloom_filter != BloomFilter::ALWAYS_TRUE_FILTER) {
Status status = bloom_filter->Init(params.bloom_filter(), sidecar_slice.data(),
sidecar_slice.size(), DefaultHashSeed());
if (!status.ok()) {
LOG(ERROR) << "Unable to allocate memory for bloom filter: "
<< status.GetDetail();
bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
} else {
fs->bloom_filters.push_back(bloom_filter);
DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
}
}
}
} else {
DCHECK(fs->consumed_filter->is_min_max_filter());
DCHECK(params.has_min_max_filter());
min_max_filter = MinMaxFilter::Create(params.min_max_filter(),
fs->consumed_filter->type(), &obj_pool_, filter_mem_tracker_);
fs->min_max_filters.push_back(min_max_filter);
}
fs->consumed_filter->SetFilter(bloom_filter, min_max_filter);
query_state_->host_profile()->AddInfoString(
Substitute("Filter $0 arrival", params.filter_id()),
PrettyPrinter::Print(fs->consumed_filter->arrival_delay_ms(), TUnit::TIME_MS));
}
BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return nullptr;
// Track required space
int64_t log_filter_size =
BitUtil::Log2Ceiling64(fs->produced_filter.result_filter->filter_size());
int64_t required_space = BloomFilter::GetExpectedMemoryUsed(log_filter_size);
DCHECK_GE(buffer_pool_client_.GetUnusedReservation(), required_space)
<< "BufferPool Client should have enough reservation to fulfill bloom filter "
"allocation";
BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(&buffer_pool_client_));
Status status = bloom_filter->Init(log_filter_size, DefaultHashSeed());
if (!status.ok()) {
LOG(ERROR) << "Unable to allocate memory for bloom filter: " << status.GetDetail();
return nullptr;
}
fs->bloom_filters.push_back(bloom_filter);
DCHECK_EQ(required_space, bloom_filter->GetBufferPoolSpaceUsed());
bloom_memory_allocated_->Add(bloom_filter->GetBufferPoolSpaceUsed());
return bloom_filter;
}
MinMaxFilter* RuntimeFilterBank::AllocateScratchMinMaxFilter(
int32_t filter_id, ColumnType type) {
auto it = filters_.find(filter_id);
DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
PerFilterState* fs = it->second.get();
lock_guard<SpinLock> l(fs->lock);
if (closed_) return nullptr;
MinMaxFilter* min_max_filter =
MinMaxFilter::Create(type, &obj_pool_, filter_mem_tracker_);
fs->min_max_filters.push_back(min_max_filter);
return min_max_filter;
}
vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
vector<unique_lock<SpinLock>> locks;
for (auto& entry : filters_) locks.emplace_back(entry.second->lock);
return locks;
}
void RuntimeFilterBank::Cancel() {
auto all_locks = LockAllFilters();
CancelLocked();
}
void RuntimeFilterBank::CancelLocked() {
// IMPALA-9730: if no filters are present, we did not acquire any filter locks. Avoid a
// TSAN data race on 'cancelled_' by exiting early.
if (filters_.empty() || cancelled_) return;
// Cancel all filters that a thread might be waiting on.
for (auto& entry : filters_) {
if (entry.second->consumed_filter != nullptr) entry.second->consumed_filter->Cancel();
}
cancelled_ = true;
}
void RuntimeFilterBank::Close() {
// Wait for all in-flight RPCs to complete before closing the filters.
{
unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
while (num_inflight_rpcs_ > 0) {
krpcs_done_cv_.wait(l1);
}
}
auto all_locks = LockAllFilters();
CancelLocked();
// We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
// drain because the async build thread in
// BlockingJoinNode::ProcessBuildInputAndOpenProbe() should have exited by the time
// Close() is called so there shouldn't be any new RPCs being issued when this function
// is called.
if (closed_) return;
closed_ = true;
for (auto& entry : filters_) {
for (BloomFilter* filter : entry.second->bloom_filters) filter->Close();
for (MinMaxFilter* filter : entry.second->min_max_filters) filter->Close();
}
obj_pool_.Clear();
if (buffer_pool_client_.is_registered()) {
VLOG_FILE << "RuntimeFilterBank (Query Id: " << PrintId(query_state_->query_id())
<< ") returning reservation " << total_bloom_filter_mem_required_;
query_state_->initial_reservations()->Return(
&buffer_pool_client_, total_bloom_filter_mem_required_);
ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
}
DCHECK_EQ(filter_mem_tracker_->consumption(), 0);
filter_mem_tracker_->Close();
}
RuntimeFilterBank::ProducedFilter::ProducedFilter(
int pending_producers, RuntimeFilter* result_filter)
: result_filter(result_filter), pending_producers(pending_producers) {}
RuntimeFilterBank::PerFilterState::PerFilterState(
int pending_producers, RuntimeFilter* result_filter, RuntimeFilter* consumed_filter)
: produced_filter(pending_producers, result_filter), consumed_filter(consumed_filter) {}