blob: 63b5ce602fea05baeabd97bcafbec4826605f114 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/cfile/index_btree.h"
#include <cstddef>
#include <memory>
#include <ostream>
#include <vector>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/index_block.h"
#include "kudu/fs/block_id.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
using kudu::fs::IOContext;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace cfile {
IndexTreeBuilder::IndexTreeBuilder(
const WriterOptions *options,
CFileWriter *writer) :
options_(options),
writer_(writer) {
idx_blocks_.emplace_back(CreateBlockBuilder(true));
}
IndexBlockBuilder *IndexTreeBuilder::CreateBlockBuilder(bool is_leaf) {
return new IndexBlockBuilder(options_, is_leaf);
}
Status IndexTreeBuilder::Append(const Slice &key,
const BlockPointer &block) {
return Append(key, block, 0);
}
Status IndexTreeBuilder::Append(
const Slice &key, const BlockPointer &block_ptr,
size_t level) {
if (level >= idx_blocks_.size()) {
// Need to create a new level
CHECK(level == idx_blocks_.size()) <<
"trying to create level " << level << " but size is only "
<< idx_blocks_.size();
VLOG(1) << "Creating level-" << level << " in index b-tree";
idx_blocks_.emplace_back(CreateBlockBuilder(false));
}
IndexBlockBuilder* idx_block = idx_blocks_[level].get();
idx_block->Add(key, block_ptr);
// This index block is full, and there are at least two entries,
// flush it.
size_t est_size = idx_block->EstimateEncodedSize();
if (est_size > options_->index_block_size &&
idx_block->count() > 1) {
BlockPointer index_block_ptr;
RETURN_NOT_OK(FinishBlockAndPropagate(level));
}
return Status::OK();
}
Status IndexTreeBuilder::Finish(BTreeInfoPB *info) {
// Now do the same for the positional index blocks, starting
// with leaf
VLOG(1) << "flushing tree, b-tree has " <<
idx_blocks_.size() << " levels";
// Flush all but the root of the index.
for (size_t i = 0; i < idx_blocks_.size() - 1; i++) {
RETURN_NOT_OK(FinishBlockAndPropagate(i));
}
// Flush the root
int root_level = idx_blocks_.size() - 1;
BlockPointer ptr;
Status s = FinishAndWriteBlock(root_level, &ptr);
if (!s.ok()) {
return s.CloneAndPrepend("Unable to flush root index block");
}
VLOG(1) << "Flushed root index block: " << ptr.ToString();
ptr.CopyToPB(info->mutable_root_block());
return Status::OK();
}
Status IndexTreeBuilder::FinishBlockAndPropagate(size_t level) {
IndexBlockBuilder* idx_block = idx_blocks_[level].get();
// If the block doesn't have any data in it, we don't need to
// write it out.
// This happens if a lower-level block fills up exactly,
// and then the file completes.
//
// TODO: add a test case which exercises this explicitly.
if (idx_block->count() == 0) {
return Status::OK();
}
// Write to file.
BlockPointer idx_block_ptr;
RETURN_NOT_OK(FinishAndWriteBlock(level, &idx_block_ptr));
// Get the first key of the finished block.
Slice first_in_idx_block;
Status s = idx_block->GetFirstKey(&first_in_idx_block);
if (!s.ok()) {
LOG(ERROR) << "Unable to get first key of level-" << level
<< " index block: " << s.ToString() << std::endl
<< GetStackTrace();
return s;
}
// Add to higher-level index.
RETURN_NOT_OK(Append(first_in_idx_block, idx_block_ptr,
level + 1));
// Finally, reset the block we just wrote. It's important to wait until
// here to do this, since the first_in_idx_block data may point to internal
// storage of the index block.
idx_block->Reset();
return Status::OK();
}
// Finish the current block at the given level, writing it
// to the file. Return the location of the written block
// in 'written'.
Status IndexTreeBuilder::FinishAndWriteBlock(size_t level, BlockPointer *written) {
IndexBlockBuilder* idx_block = idx_blocks_[level].get();
Slice data = idx_block->Finish();
vector<Slice> v;
v.push_back(data);
Status s = writer_->AddBlock(v, written, "index block");
if (!s.ok()) {
LOG(ERROR) << "Unable to append level-" << level << " index "
<< "block to file";
return s;
}
return Status::OK();
}
////////////////////////////////////////////////////////////
struct IndexTreeIterator::SeekedIndex {
SeekedIndex() :
iter(&reader)
{}
// Hold a copy of the underlying block data, which would
// otherwise go out of scope. The reader and iter
// do not themselves retain the data.
BlockPointer block_ptr;
scoped_refptr<BlockHandle> data;
IndexBlockReader reader;
IndexBlockIterator iter;
};
IndexTreeIterator::IndexTreeIterator(const IOContext* io_context, const CFileReader *reader,
const BlockPointer &root_blockptr)
: reader_(reader),
root_block_(root_blockptr),
io_context_(io_context) {
}
IndexTreeIterator::~IndexTreeIterator() = default;
Status IndexTreeIterator::SeekAtOrBefore(const Slice &search_key) {
return SeekDownward(search_key, root_block_, 0);
}
Status IndexTreeIterator::SeekToFirst() {
return SeekToFirstDownward(root_block_, 0);
}
bool IndexTreeIterator::HasNext() {
for (int i = seeked_indexes_.size(); i > 0; i--) {
if (seeked_indexes_[i - 1]->iter.HasNext())
return true;
}
return false;
}
Status IndexTreeIterator::Next() {
CHECK(!seeked_indexes_.empty()) << "not seeked";
// Start at the bottom level of the BTree, calling Next(),
// until one succeeds. If any does not succeed, then
// that block is exhausted, and gets removed.
while (!seeked_indexes_.empty()) {
Status s = BottomIter()->Next();
if (s.IsNotFound()) {
seeked_indexes_.pop_back();
} else if (s.ok()) {
break;
} else {
// error
return s;
}
}
// If we're now empty, then the root block was exhausted,
// so we're entirely out of data.
if (seeked_indexes_.empty()) {
return Status::NotFound("end of iterator");
}
// Otherwise, the last layer points to the valid
// next block. Propagate downward if it is not a leaf.
while (!BottomReader()->IsLeaf()) {
RETURN_NOT_OK(
LoadBlock(BottomIter()->GetCurrentBlockPointer(),
seeked_indexes_.size()));
RETURN_NOT_OK(BottomIter()->SeekToIndex(0));
}
return Status::OK();
}
const Slice IndexTreeIterator::GetCurrentKey() const {
return seeked_indexes_.back()->iter.GetCurrentKey();
}
const BlockPointer &IndexTreeIterator::GetCurrentBlockPointer() const {
return seeked_indexes_.back()->iter.GetCurrentBlockPointer();
}
IndexBlockIterator *IndexTreeIterator::BottomIter() {
return &seeked_indexes_.back()->iter;
}
IndexBlockReader *IndexTreeIterator::BottomReader() {
return &seeked_indexes_.back()->reader;
}
IndexBlockIterator *IndexTreeIterator::seeked_iter(int depth) {
return &seeked_indexes_[depth]->iter;
}
IndexBlockReader *IndexTreeIterator::seeked_reader(int depth) {
return &seeked_indexes_[depth]->reader;
}
Status IndexTreeIterator::LoadBlock(const BlockPointer &block,
int depth) {
SeekedIndex *seeked;
if (depth < seeked_indexes_.size()) {
// We have a cached instance from previous seek.
seeked = seeked_indexes_[depth].get();
if (seeked->block_ptr.offset() == block.offset()) {
// We're already seeked to this block - no need to re-parse it.
// This is handy on the root block as well as for the case
// when a lot of requests are traversing down the same part of
// the tree.
return Status::OK();
}
// Seeked to a different block: reset the reader
seeked->reader.Reset();
seeked->iter.Reset();
} else {
// No cached instance, make a new one.
seeked_indexes_.emplace_back(new SeekedIndex());
seeked = seeked_indexes_.back().get();
}
RETURN_NOT_OK(reader_->ReadBlock(io_context_, block,
CFileReader::CACHE_BLOCK, &seeked->data));
seeked->block_ptr = block;
// Parse the new block.
RETURN_NOT_OK_PREPEND(seeked->reader.Parse(seeked->data->data()),
Substitute("failed to parse index block in block $0 at $1",
reader_->block_id().ToString(),
block.ToString()));
return Status::OK();
}
Status IndexTreeIterator::SeekDownward(const Slice &search_key, const BlockPointer &in_block,
int cur_depth) {
// Read the block.
RETURN_NOT_OK(LoadBlock(in_block, cur_depth));
IndexBlockIterator *iter = seeked_iter(cur_depth);
RETURN_NOT_OK(iter->SeekAtOrBefore(search_key));
// If the block is a leaf block, we're done,
// otherwise recurse downward into next layer
// of B-Tree
if (seeked_reader(cur_depth)->IsLeaf()) {
seeked_indexes_.resize(cur_depth + 1);
return Status::OK();
}
return SeekDownward(search_key, iter->GetCurrentBlockPointer(), cur_depth + 1);
}
Status IndexTreeIterator::SeekToFirstDownward(const BlockPointer &in_block, int cur_depth) {
// Read the block.
RETURN_NOT_OK(LoadBlock(in_block, cur_depth));
IndexBlockIterator *iter = seeked_iter(cur_depth);
RETURN_NOT_OK(iter->SeekToIndex(0));
// If the block is a leaf block, we're done,
// otherwise recurse downward into next layer
// of B-Tree
if (seeked_reader(cur_depth)->IsLeaf()) {
seeked_indexes_.resize(cur_depth + 1);
return Status::OK();
} else {
return SeekToFirstDownward(iter->GetCurrentBlockPointer(), cur_depth + 1);
}
}
IndexTreeIterator *IndexTreeIterator::IndexTreeIterator::Create(
const IOContext* io_context,
const CFileReader *reader,
const BlockPointer &root_blockptr) {
return new IndexTreeIterator(io_context, reader, root_blockptr);
}
} // namespace cfile
} // namespace kudu