blob: 202245939fe618947b2e21aa61d2eec3d6af0a32 [file] [log] [blame]
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "table/partitioned_filter_block.h"
#include <utility>
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
#include "rocksdb/filter_policy.h"
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "util/coding.h"
namespace rocksdb {
PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
const SliceTransform* prefix_extractor, bool whole_key_filtering,
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
PartitionedIndexBuilder* const p_index_builder,
const uint32_t partition_size)
: FullFilterBlockBuilder(prefix_extractor, whole_key_filtering,
filter_bits_builder),
index_on_filter_block_builder_(index_block_restart_interval),
p_index_builder_(p_index_builder),
filters_in_partition_(0) {
filters_per_partition_ =
filter_bits_builder_->CalculateNumEntry(partition_size);
}
PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock() {
// Use == to send the request only once
if (filters_in_partition_ == filters_per_partition_) {
// Currently only index builder is in charge of cutting a partition. We keep
// requesting until it is granted.
p_index_builder_->RequestPartitionCut();
}
if (!p_index_builder_->ShouldCutFilterBlock()) {
return;
}
filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
std::string& index_key = p_index_builder_->GetPartitionKey();
filters.push_back({index_key, filter});
filters_in_partition_ = 0;
}
void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
MaybeCutAFilterBlock();
filter_bits_builder_->AddKey(key);
filters_in_partition_++;
}
Slice PartitionedFilterBlockBuilder::Finish(
const BlockHandle& last_partition_block_handle, Status* status) {
if (finishing_filters == true) {
// Record the handle of the last written filter block in the index
FilterEntry& last_entry = filters.front();
std::string handle_encoding;
last_partition_block_handle.EncodeTo(&handle_encoding);
index_on_filter_block_builder_.Add(last_entry.key, handle_encoding);
filters.pop_front();
} else {
MaybeCutAFilterBlock();
}
// If there is no filter partition left, then return the index on filter
// partitions
if (UNLIKELY(filters.empty())) {
*status = Status::OK();
if (finishing_filters) {
return index_on_filter_block_builder_.Finish();
} else {
// This is the rare case where no key was added to the filter
return Slice();
}
} else {
// Return the next filter partition in line and set Incomplete() status to
// indicate we expect more calls to Finish
*status = Status::Incomplete();
finishing_filters = true;
return filters.front().filter;
}
}
PartitionedFilterBlockReader::PartitionedFilterBlockReader(
const SliceTransform* prefix_extractor, bool _whole_key_filtering,
BlockContents&& contents, FilterBitsReader* filter_bits_reader,
Statistics* stats, const Comparator& comparator,
const BlockBasedTable* table)
: FilterBlockReader(contents.data.size(), stats, _whole_key_filtering),
prefix_extractor_(prefix_extractor),
comparator_(comparator),
table_(table) {
idx_on_fltr_blk_.reset(new Block(std::move(contents),
kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, stats));
}
PartitionedFilterBlockReader::~PartitionedFilterBlockReader() {
// TODO(myabandeh): if instead of filter object we store only the blocks in
// block cache, then we don't have to manually earse them from block cache
// here.
auto block_cache = table_->rep_->table_options.block_cache.get();
if (UNLIKELY(block_cache == nullptr)) {
return;
}
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
BlockIter biter;
BlockHandle handle;
idx_on_fltr_blk_->NewIterator(&comparator_, &biter, true);
biter.SeekToFirst();
for (; biter.Valid(); biter.Next()) {
auto input = biter.value();
auto s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
continue;
}
auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix,
table_->rep_->cache_key_prefix_size,
handle, cache_key);
block_cache->Erase(key);
}
}
bool PartitionedFilterBlockReader::KeyMayMatch(
const Slice& key, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!whole_key_filtering_) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
return false;
}
bool cached = false;
auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */,
&filter_handle, no_io, &cached);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->KeyMayMatch(key, block_offset, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
bool PartitionedFilterBlockReader::PrefixMayMatch(
const Slice& prefix, uint64_t block_offset, const bool no_io,
const Slice* const const_ikey_ptr) {
assert(const_ikey_ptr != nullptr);
assert(block_offset == kNotValid);
if (!prefix_extractor_) {
return true;
}
if (UNLIKELY(idx_on_fltr_blk_->size() == 0)) {
return true;
}
auto filter_handle = GetFilterPartitionHandle(*const_ikey_ptr);
if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range
return false;
}
bool cached = false;
auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */,
&filter_handle, no_io, &cached);
if (UNLIKELY(!filter_partition.value)) {
return true;
}
auto res = filter_partition.value->PrefixMayMatch(prefix, kNotValid, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
}
Slice PartitionedFilterBlockReader::GetFilterPartitionHandle(
const Slice& entry) {
BlockIter iter;
idx_on_fltr_blk_->NewIterator(&comparator_, &iter, true);
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
return Slice();
}
assert(iter.Valid());
Slice handle_value = iter.value();
return handle_value;
}
BlockBasedTable::CachableEntry<FilterBlockReader>
PartitionedFilterBlockReader::GetFilterPartition(
FilePrefetchBuffer* prefetch_buffer, Slice* handle_value, const bool no_io,
bool* cached) {
BlockHandle fltr_blk_handle;
auto s = fltr_blk_handle.DecodeFrom(handle_value);
assert(s.ok());
const bool is_a_filter_partition = true;
auto block_cache = table_->rep_->table_options.block_cache.get();
if (LIKELY(block_cache != nullptr)) {
if (filter_map_.size() != 0) {
auto iter = filter_map_.find(fltr_blk_handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (iter != filter_map_.end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
RecordTick(statistics(), BLOCK_CACHE_HIT);
RecordTick(statistics(), BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(iter->second.cache_handle));
*cached = true;
return iter->second;
}
}
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
is_a_filter_partition, no_io);
} else {
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
is_a_filter_partition);
return {filter, nullptr};
}
}
size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
return idx_on_fltr_blk_->size();
}
// TODO(myabandeh): merge this with the same function in IndexReader
void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
// Before read partitions, prefetch them to avoid lots of IOs
auto rep = table_->rep_;
BlockIter biter;
BlockHandle handle;
idx_on_fltr_blk_->NewIterator(&comparator_, &biter, true);
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
Slice input = biter.value();
Status s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read first index partition");
return;
}
uint64_t prefetch_off = handle.offset();
// Read the last block's offset
biter.SeekToLast();
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log,
"Could not read last index partition");
return;
}
uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
auto& file = table_->rep_->file;
prefetch_buffer.reset(new FilePrefetchBuffer());
s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len);
// After prefetch, read the partitions one by one
biter.SeekToFirst();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
input = biter.value();
s = handle.DecodeFrom(&input);
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_WARN(rep->ioptions.info_log, "Could not read index partition");
continue;
}
const bool no_io = true;
const bool is_a_filter_partition = true;
auto filter = table_->GetFilter(prefetch_buffer.get(), handle,
is_a_filter_partition, !no_io);
if (LIKELY(filter.IsSet())) {
if (pin) {
filter_map_[handle.offset()] = std::move(filter);
} else {
block_cache->Release(filter.cache_handle);
}
} else {
delete filter.value;
}
}
}
} // namespace rocksdb