| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "util/bloom-filter.h" |
| #ifdef __aarch64__ |
| #include "sse2neon.h" |
| #else |
| #include <emmintrin.h> |
| #endif |
| #include <math.h> |
| #include <string.h> |
| |
| #include <cmath> |
| #include <cstdint> |
| #include <memory> |
| #include <ostream> |
| |
| #include "gen-cpp/data_stream_service.pb.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/rpc_sidecar.h" |
| #include "kudu/util/block_bloom_filter.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| #include "runtime/exec-env.h" |
| #include "util/kudu-status-util.h" |
| |
| using namespace std; |
| |
| namespace impala { |
| |
| constexpr BloomFilter* const BloomFilter::ALWAYS_TRUE_FILTER; |
| |
| BloomFilter::BloomFilter(BufferPool::ClientHandle* client) |
| : buffer_allocator_(client), block_bloom_filter_(&buffer_allocator_) {} |
| |
| BloomFilter::~BloomFilter() {} |
| |
| Status BloomFilter::Init(const int log_bufferpool_space, uint32_t hash_seed) { |
| KUDU_RETURN_IF_ERROR( |
| block_bloom_filter_.Init(log_bufferpool_space, kudu::FAST_HASH, hash_seed), |
| "Failed to init Block Bloom Filter"); |
| return Status::OK(); |
| } |
| |
| Status BloomFilter::Init(const BloomFilterPB& protobuf, const uint8_t* directory_in, |
| size_t directory_in_size, uint32_t hash_seed) { |
| if (protobuf.always_false() || directory_in_size == 0) { |
| // Directory size equal 0 only when it's always false. |
| KUDU_RETURN_IF_ERROR(block_bloom_filter_.Init( |
| protobuf.log_bufferpool_space(), kudu::FAST_HASH, hash_seed), |
| "Failed to init Block Bloom Filter"); |
| } else { |
| kudu::Slice slice(directory_in, directory_in_size); |
| KUDU_RETURN_IF_ERROR( |
| block_bloom_filter_.InitFromDirectory( |
| protobuf.log_bufferpool_space(), slice, false, kudu::FAST_HASH, hash_seed), |
| "Failed to init Block Bloom Filter"); |
| } |
| return Status::OK(); |
| } |
| |
| void BloomFilter::Close() { |
| block_bloom_filter_.Close(); |
| } |
| |
| void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params, |
| kudu::rpc::RpcController* controller, const char* directory, |
| unsigned long directory_size) { |
| DCHECK(rpc_params != nullptr); |
| DCHECK(!rpc_params->always_false()); |
| DCHECK(!rpc_params->always_true()); |
| kudu::Slice dir_slice(directory, directory_size); |
| unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar = |
| kudu::rpc::RpcSidecar::FromSlice(dir_slice); |
| |
| int sidecar_idx = -1; |
| kudu::Status sidecar_status = |
| controller->AddOutboundSidecar(std::move(rpc_sidecar), &sidecar_idx); |
| if (!sidecar_status.ok()) { |
| LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString(); |
| // If AddOutboundSidecar() fails, we 'disable' the BloomFilterPB by setting it to |
| // an always true filter. |
| rpc_params->set_always_false(false); |
| rpc_params->set_always_true(true); |
| return; |
| } |
| rpc_params->set_directory_sidecar_idx(sidecar_idx); |
| rpc_params->set_always_false(false); |
| rpc_params->set_always_true(false); |
| } |
| |
| void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params, |
| kudu::rpc::RpcController* controller, const string& directory) { |
| AddDirectorySidecar(rpc_params, controller, |
| reinterpret_cast<const char*>(&(directory[0])), |
| static_cast<unsigned long>(directory.size())); |
| } |
| |
| void BloomFilter::ToProtobuf( |
| BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const { |
| protobuf->set_log_bufferpool_space(block_bloom_filter_.log_space_bytes()); |
| if (block_bloom_filter_.always_false()) { |
| protobuf->set_always_false(true); |
| protobuf->set_always_true(false); |
| return; |
| } |
| kudu::Slice directory = block_bloom_filter_.directory(); |
| BloomFilter::AddDirectorySidecar(protobuf, controller, |
| reinterpret_cast<const char*>(directory.data()), |
| static_cast<unsigned long>(directory.size())); |
| } |
| |
| void BloomFilter::ToProtobuf(const BloomFilter* filter, |
| kudu::rpc::RpcController* controller, BloomFilterPB* protobuf) { |
| DCHECK(protobuf != nullptr); |
| // If filter == nullptr, then this BloomFilter is an always true filter. |
| if (filter == nullptr) { |
| protobuf->set_always_true(true); |
| DCHECK(!protobuf->always_false()); |
| return; |
| } |
| filter->ToProtobuf(protobuf, controller); |
| } |
| |
| int64_t BloomFilter::GetBufferPoolSpaceUsed() { |
| return buffer_allocator_.IsAllocated() ? block_bloom_filter_.GetSpaceUsed() : -1; |
| } |
| |
| void BloomFilter::Or(const BloomFilter& other) { |
| DCHECK_NE(this, &other); |
| DCHECK_NE(&other, ALWAYS_TRUE_FILTER); |
| if (other.AlwaysFalse()) return; |
| DCHECK_EQ( |
| block_bloom_filter_.log_space_bytes(), other.block_bloom_filter_.log_space_bytes()); |
| block_bloom_filter_.Or(other.block_bloom_filter_); |
| } |
| |
| void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in, |
| BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) { |
| DCHECK(out != nullptr); |
| DCHECK_NE(&in, out); |
| // These cases are impossible in current code. If they become possible in the future, |
| // memory usage should be tracked accordingly. |
| DCHECK(!out->always_false()); |
| DCHECK(!out->always_true()); |
| DCHECK(!in.always_true()); |
| if (in.always_false()) return; |
| DCHECK_EQ(in.log_bufferpool_space(), out->log_bufferpool_space()); |
| kudu::BlockBloomFilter::OrEqualArray(directory_size, directory_in, directory_out); |
| } |
| |
| ImpalaBloomFilterBufferAllocator::ImpalaBloomFilterBufferAllocator() |
| : buffer_pool_client_(nullptr), is_allocated_(false) { |
| // Default constructor, which is defined to support the virtual function Clone(). |
| // Impala code should not hit this function. |
| LOG(DFATAL) << "Unsupported code path."; |
| } |
| |
| ImpalaBloomFilterBufferAllocator::ImpalaBloomFilterBufferAllocator( |
| BufferPool::ClientHandle* client) |
| : buffer_pool_client_(DCHECK_NOTNULL(client)), is_allocated_(false) {} |
| |
| ImpalaBloomFilterBufferAllocator::~ImpalaBloomFilterBufferAllocator() { |
| if (is_allocated_) { |
| LOG(DFATAL) << "Close() should have been called before the object is destroyed."; |
| Close(); |
| } |
| } |
| |
| void ImpalaBloomFilterBufferAllocator::Close() { |
| if (!is_allocated_) return; |
| DCHECK(buffer_pool_client_ != nullptr); |
| BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool(); |
| buffer_pool->FreeBuffer(buffer_pool_client_, &buffer_handle_); |
| is_allocated_ = false; |
| } |
| |
| kudu::Status ImpalaBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) { |
| Close(); // Ensure that any previously allocated memory is released. |
| |
| BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool(); |
| DCHECK(buffer_pool_client_ != nullptr); |
| impala::Status status = |
| buffer_pool->AllocateBuffer(buffer_pool_client_, bytes, &buffer_handle_); |
| if (!status.ok()) { |
| return kudu::Status::RuntimeError( |
| strings::Substitute("BufferPool bad_alloc, bytes: $0", bytes)); |
| } |
| *ptr = reinterpret_cast<void*>(buffer_handle_.data()); |
| is_allocated_ = true; |
| return kudu::Status::OK(); |
| } |
| |
| void ImpalaBloomFilterBufferAllocator::FreeBuffer(void* ptr) { |
| if (ptr == nullptr) return; |
| DCHECK_EQ(ptr, buffer_handle_.data()); |
| Close(); |
| } |
| |
| } // namespace impala |