blob: a1cffcd460fa451ba839504f1983a37d38f8c2d0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/bloomfile.h"
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/index_btree.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_manager.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/util/coding.h"
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/logging.h"
#include "kudu/util/malloc.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/threadlocal_cache.h"
DECLARE_bool(cfile_lazy_open);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace cfile {
using fs::BlockCreationTransaction;
using fs::BlockManager;
using fs::IOContext;
using fs::ReadableBlock;
using fs::WritableBlock;
// Generator for BloomFileReader::instance_nonce_.
static Atomic64 g_next_nonce = 0;
namespace {
// Frequently, a thread processing a batch of operations will consult the same BloomFile
// many times in a row. So, we keep a thread-local cache of the state for recently-accessed
// BloomFileReaders so that we can avoid doing repetitive work.
class BloomCacheItem {
public:
explicit BloomCacheItem(const IOContext* io_context, CFileReader* reader)
: index_iter(io_context, reader, reader->validx_root()),
cur_block_pointer(0, 0) {
}
// The IndexTreeIterator used to seek the BloomFileReader last time it was accessed.
IndexTreeIterator index_iter;
// The block pointer to the specific block we read last time we used this bloom reader.
BlockPointer cur_block_pointer;
// The block handle and parsed BloomFilter corresponding to cur_block_pointer.
scoped_refptr<BlockHandle> cur_block_handle;
BloomFilter cur_bloom;
private:
DISALLOW_COPY_AND_ASSIGN(BloomCacheItem);
};
using BloomCacheTLC = ThreadLocalCache<uint64_t, BloomCacheItem>;
} // anonymous namespace
////////////////////////////////////////////////////////////
// Writer
////////////////////////////////////////////////////////////
BloomFileWriter::BloomFileWriter(unique_ptr<WritableBlock> block,
const BloomFilterSizing &sizing)
: bloom_builder_(sizing) {
cfile::WriterOptions opts;
opts.write_posidx = false;
opts.write_validx = true;
// Never use compression, regardless of the default settings, since
// bloom filters are high-entropy data structures by their nature.
opts.storage_attributes.encoding = PLAIN_ENCODING;
opts.storage_attributes.compression = NO_COMPRESSION;
writer_.reset(new cfile::CFileWriter(std::move(opts),
GetTypeInfo(BINARY),
false,
std::move(block)));
}
Status BloomFileWriter::Start() {
return writer_->Start();
}
Status BloomFileWriter::Finish() {
BlockManager* bm = writer_->block()->block_manager();
unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
RETURN_NOT_OK(FinishAndReleaseBlock(transaction.get()));
return transaction->CommitCreatedBlocks();
}
Status BloomFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction) {
if (bloom_builder_.count() > 0) {
RETURN_NOT_OK(FinishCurrentBloomBlock());
}
return writer_->FinishAndReleaseBlock(transaction);
}
size_t BloomFileWriter::written_size() const {
return writer_->written_size();
}
Status BloomFileWriter::AppendKeys(
const Slice *keys, size_t n_keys) {
// If this is the call on a new bloom, copy the first key.
if (bloom_builder_.count() == 0 && n_keys > 0) {
first_key_.assign_copy(keys[0].data(), keys[0].size());
}
for (size_t i = 0; i < n_keys; i++) {
bloom_builder_.AddKey(BloomKeyProbe(keys[i]));
// Bloom has reached optimal occupancy: flush it to the file
if (PREDICT_FALSE(bloom_builder_.count() >= bloom_builder_.expected_count())) {
RETURN_NOT_OK(FinishCurrentBloomBlock());
// Update the last key and set the next key as the first key of the next block.
// Setting the first key here avoids having to do it in normal code path of the loop.
last_key_.assign_copy(keys[i].data(), keys[i].size());
if (i < n_keys - 1) {
first_key_.assign_copy(keys[i + 1].data(), keys[i + 1].size());
}
}
}
return Status::OK();
}
Status BloomFileWriter::FinishCurrentBloomBlock() {
VLOG(1) << "Appending a new bloom block, first_key="
<< KUDU_REDACT(Slice(first_key_).ToDebugString());
// Encode the header.
BloomBlockHeaderPB hdr;
hdr.set_num_hash_functions(bloom_builder_.n_hashes());
faststring hdr_str;
PutFixed32(&hdr_str, static_cast<uint32_t>(hdr.ByteSizeLong()));
pb_util::AppendToString(hdr, &hdr_str);
// The data is the concatenation of the header and the bloom itself.
vector<Slice> slices;
slices.emplace_back(hdr_str);
slices.push_back(bloom_builder_.slice());
// Append to the file.
Slice start_key(first_key_);
Slice last_key(last_key_);
RETURN_NOT_OK(writer_->AppendRawBlock(slices, 0, &start_key, last_key, "bloom block"));
bloom_builder_.Clear();
#ifndef NDEBUG
first_key_.assign_copy("POST_RESET");
#endif
return Status::OK();
}
////////////////////////////////////////////////////////////
// Reader
////////////////////////////////////////////////////////////
Status BloomFileReader::Open(unique_ptr<ReadableBlock> block,
ReaderOptions options,
unique_ptr<BloomFileReader> *reader) {
unique_ptr<BloomFileReader> bf_reader;
const IOContext* io_context = options.io_context;
RETURN_NOT_OK(OpenNoInit(std::move(block),
std::move(options), &bf_reader));
RETURN_NOT_OK(bf_reader->Init(io_context));
*reader = std::move(bf_reader);
return Status::OK();
}
Status BloomFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
ReaderOptions options,
unique_ptr<BloomFileReader> *reader) {
unique_ptr<CFileReader> cf_reader;
RETURN_NOT_OK(CFileReader::OpenNoInit(std::move(block),
options, &cf_reader));
const IOContext* io_context = options.io_context;
unique_ptr<BloomFileReader> bf_reader(new BloomFileReader(
std::move(cf_reader), std::move(options)));
if (!FLAGS_cfile_lazy_open) {
RETURN_NOT_OK(bf_reader->Init(io_context));
}
*reader = std::move(bf_reader);
return Status::OK();
}
BloomFileReader::BloomFileReader(unique_ptr<CFileReader> reader,
ReaderOptions options)
: instance_nonce_(base::subtle::NoBarrier_AtomicIncrement(&g_next_nonce, 1)),
reader_(std::move(reader)),
mem_consumption_(std::move(options.parent_mem_tracker),
memory_footprint_excluding_reader()) {
}
Status BloomFileReader::Init(const IOContext* io_context) {
return init_once_.Init([this, io_context] { return InitOnce(io_context); });
}
Status BloomFileReader::InitOnce(const IOContext* io_context) {
// Fully open the CFileReader if it was lazily opened earlier.
//
// If it's already initialized, this is a no-op.
RETURN_NOT_OK(reader_->Init(io_context));
if (reader_->is_compressed()) {
return Status::NotSupported("bloom file is compressed (compression not supported)",
reader_->ToString());
}
if (!reader_->has_validx()) {
return Status::NotSupported("bloom file missing value index",
reader_->ToString());
}
return Status::OK();
}
Status BloomFileReader::ParseBlockHeader(const Slice& block,
BloomBlockHeaderPB* hdr,
Slice* bloom_data) const {
Slice data(block);
if (PREDICT_FALSE(data.size() < 4)) {
return Status::Corruption("Invalid bloom block header: not enough bytes");
}
uint32_t header_len = DecodeFixed32(data.data());
data.remove_prefix(sizeof(header_len));
if (header_len > data.size()) {
return Status::Corruption(
StringPrintf("Header length %d doesn't fit in buffer of size %ld",
header_len, data.size()));
}
if (!hdr->ParseFromArray(data.data(), header_len)) {
return Status::Corruption(
string("Invalid bloom block header: ") +
hdr->InitializationErrorString() +
"\nHeader:" + HexDump(Slice(data.data(), header_len)));
}
data.remove_prefix(header_len);
*bloom_data = data;
return Status::OK();
}
Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
const IOContext* io_context,
bool *maybe_present) {
DCHECK(init_once_.init_succeeded());
// Since we frequently will access the same BloomFile many times in a row
// when processing a batch of operations, we put our state in a small thread-local
// cache, keyed by the BloomFileReader's nonce. We use this nonce rather than
// the BlockID because it's possible that a BloomFile could be closed and
// re-opened, in which case we don't want to use our previous cache entry,
// which now points to a destructed CFileReader.
auto* tlc = BloomCacheTLC::GetInstance();
BloomCacheItem* bci = tlc->Lookup(instance_nonce_);
// If we didn't hit in the cache, make a new cache entry and instantiate a reader.
if (!bci) {
bci = tlc->EmplaceNew(instance_nonce_, io_context, reader_.get());
}
DCHECK_EQ(reader_.get(), bci->index_iter.cfile_reader())
<< "Cached index reader does not match expected instance";
IndexTreeIterator* index_iter = &bci->index_iter;
Status s = index_iter->SeekAtOrBefore(probe.key());
if (PREDICT_FALSE(s.IsNotFound())) {
// Seek to before the first entry in the file.
*maybe_present = false;
return Status::OK();
}
RETURN_NOT_OK(s);
// Successfully found the pointer to the bloom block.
BlockPointer bblk_ptr = index_iter->GetCurrentBlockPointer();
// If the previous lookup from this bloom on this thread seeked to a different
// block in the BloomFile, we need to read the correct block and re-hydrate the
// BloomFilter instance.
if (!bci->cur_block_pointer.Equals(bblk_ptr)) {
scoped_refptr<BlockHandle> dblk_data;
RETURN_NOT_OK(reader_->ReadBlock(io_context, bblk_ptr,
CFileReader::CACHE_BLOCK, &dblk_data));
// Parse the header in the block.
BloomBlockHeaderPB hdr;
Slice bloom_data;
RETURN_NOT_OK(ParseBlockHeader(dblk_data->data(), &hdr, &bloom_data));
// Save the data back into our threadlocal cache.
bci->cur_bloom = BloomFilter(bloom_data, hdr.num_hash_functions());
bci->cur_block_pointer = bblk_ptr;
bci->cur_block_handle = std::move(dblk_data);
}
// Actually check the bloom filter.
*maybe_present = bci->cur_bloom.MayContainKey(probe);
return Status::OK();
}
size_t BloomFileReader::memory_footprint_excluding_reader() const {
return kudu_malloc_usable_size(this) + init_once_.memory_footprint_excluding_this();
}
} // namespace cfile
} // namespace kudu