#include <algorithm>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/cfile/bloomfile.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/cfile_set.h"
#include "kudu/util/flag_tags.h"
DEFINE_bool(consult_bloom_filters, true, "Whether to consult bloom filters on row presence checks");
TAG_FLAG(consult_bloom_filters, hidden);
namespace kudu {
namespace tablet {
using cfile::ReaderOptions;
using cfile::DefaultColumnValueIterator;
using fs::ReadableBlock;
using std::shared_ptr;
using strings::Substitute;
// Utilities
static Status OpenReader(const shared_ptr<RowSetMetadata>& rowset_metadata,
ColumnId col_id,
gscoped_ptr<CFileReader> *new_reader) {
FsManager* fs = rowset_metadata->fs_manager();
gscoped_ptr<ReadableBlock> block;
BlockId block_id = rowset_metadata->column_data_block_for_col_id(col_id);
RETURN_NOT_OK(fs->OpenBlock(block_id, &block));
// TODO: somehow pass reader options in schema
ReaderOptions opts;
return CFileReader::OpenNoInit(std::move(block), opts, new_reader);
// CFile Base
CFileSet::CFileSet(shared_ptr<RowSetMetadata> rowset_metadata)
: rowset_metadata_(std::move(rowset_metadata)) {}
CFileSet::~CFileSet() {
Status CFileSet::Open() {
// Lazily open the column data cfiles. Each one will be fully opened
// later, when the first iterator seeks for the first time.
RowSetMetadata::ColumnIdToBlockIdMap block_map = rowset_metadata_->GetColumnBlocksById();
for (const RowSetMetadata::ColumnIdToBlockIdMap::value_type& e : block_map) {
ColumnId col_id = e.first;
DCHECK(!ContainsKey(readers_by_col_id_, col_id)) << "already open";
gscoped_ptr<CFileReader> reader;
RETURN_NOT_OK(OpenReader(rowset_metadata_, col_id, &reader));
readers_by_col_id_[col_id] = shared_ptr<CFileReader>(reader.release());
VLOG(1) << "Successfully opened cfile for column id " << col_id
<< " in " << rowset_metadata_->ToString();
// However, the key reader should always be fully opened, so that we
// can figure out where in the rowset tree we belong.
if (rowset_metadata_->has_adhoc_index_block()) {
} else {
// Determine the upper and lower key bounds for this CFileSet.
return Status::OK();
Status CFileSet::OpenAdHocIndexReader() {
if (ad_hoc_idx_reader_ != nullptr) {
return Status::OK();
FsManager* fs = rowset_metadata_->fs_manager();
gscoped_ptr<ReadableBlock> block;
RETURN_NOT_OK(fs->OpenBlock(rowset_metadata_->adhoc_index_block(), &block));
ReaderOptions opts;
return CFileReader::Open(std::move(block), opts, &ad_hoc_idx_reader_);
Status CFileSet::OpenBloomReader() {
if (bloom_reader_ != nullptr) {
return Status::OK();
FsManager* fs = rowset_metadata_->fs_manager();
gscoped_ptr<ReadableBlock> block;
RETURN_NOT_OK(fs->OpenBlock(rowset_metadata_->bloom_block(), &block));
ReaderOptions opts;
Status s = BloomFileReader::OpenNoInit(std::move(block), opts, &bloom_reader_);
if (!s.ok()) {
LOG(WARNING) << "Unable to open bloom file in " << rowset_metadata_->ToString() << ": "
<< s.ToString();
// Continue without bloom.
return Status::OK();
Status CFileSet::LoadMinMaxKeys() {
CFileReader *key_reader = key_index_reader();
if (!key_reader->GetMetadataEntry(DiskRowSet::kMinKeyMetaEntryName, &min_encoded_key_)) {
return Status::Corruption("No min key found", ToString());
if (!key_reader->GetMetadataEntry(DiskRowSet::kMaxKeyMetaEntryName, &max_encoded_key_)) {
return Status::Corruption("No max key found", ToString());
if (Slice(min_encoded_key_).compare(max_encoded_key_) > 0) {
return Status::Corruption(StringPrintf("Min key %s > max key %s",
return Status::OK();
CFileReader* CFileSet::key_index_reader() const {
if (ad_hoc_idx_reader_) {
return ad_hoc_idx_reader_.get();
// If there is no special index cfile, then we have a non-compound key
// and we can just use the key column.
// This is always the first column listed in the tablet schema.
int key_col_id = tablet_schema().column_id(0);
return FindOrDie(readers_by_col_id_, key_col_id).get();
Status CFileSet::NewColumnIterator(ColumnId col_id, CFileReader::CacheControl cache_blocks,
CFileIterator **iter) const {
return FindOrDie(readers_by_col_id_, col_id)->NewIterator(iter, cache_blocks);
CFileSet::Iterator *CFileSet::NewIterator(const Schema *projection) const {
return new CFileSet::Iterator(shared_from_this(), projection);
Status CFileSet::CountRows(rowid_t *count) const {
return key_index_reader()->CountRows(count);
Status CFileSet::GetBounds(string* min_encoded_key,
string* max_encoded_key) const {
*min_encoded_key = min_encoded_key_;
*max_encoded_key = max_encoded_key_;
return Status::OK();
uint64_t CFileSet::EstimateOnDiskSize() const {
uint64_t ret = 0;
for (const ReaderMap::value_type& e : readers_by_col_id_) {
const shared_ptr<CFileReader> &reader = e.second;
ret += reader->file_size();
return ret;
Status CFileSet::FindRow(const RowSetKeyProbe &probe, rowid_t *idx,
ProbeStats* stats) const {
if (bloom_reader_ != nullptr && FLAGS_consult_bloom_filters) {
// Fully open the BloomFileReader if it was lazily opened earlier.
// If it's already initialized, this is a no-op.
bool present;
Status s = bloom_reader_->CheckKeyPresent(probe.bloom_probe(), &present);
if (s.ok() && !present) {
return Status::NotFound("not present in bloom filter");
} else if (!s.ok()) {
LOG(WARNING) << "Unable to query bloom: " << s.ToString()
<< " (disabling bloom for this rowset from this point forward)";
const_cast<CFileSet *>(this)->bloom_reader_.reset(nullptr);
// Continue with the slow path
CFileIterator *key_iter = nullptr;
gscoped_ptr<CFileIterator> key_iter_scoped(key_iter); // free on return
bool exact;
RETURN_NOT_OK(key_iter->SeekAtOrAfter(probe.encoded_key(), &exact));
if (!exact) {
return Status::NotFound("not present in storefile (failed seek)");
*idx = key_iter->GetCurrentOrdinal();
return Status::OK();
Status CFileSet::CheckRowPresent(const RowSetKeyProbe &probe, bool *present,
rowid_t *rowid, ProbeStats* stats) const {
Status s = FindRow(probe, rowid, stats);
if (s.IsNotFound()) {
// In the case that the key comes past the end of the file, Seek
// will return NotFound. In that case, it is OK from this function's
// point of view - just a non-present key.
*present = false;
return Status::OK();
*present = true;
return s;
Status CFileSet::NewKeyIterator(CFileIterator **key_iter) const {
return key_index_reader()->NewIterator(key_iter, CFileReader::CACHE_BLOCK);
// Iterator
CFileSet::Iterator::~Iterator() {
Status CFileSet::Iterator::CreateColumnIterators(const ScanSpec* spec) {
DCHECK_EQ(0, col_iters_.size());
vector<ColumnIterator*> ret_iters;
ElementDeleter del(&ret_iters);
CFileReader::CacheControl cache_blocks = CFileReader::CACHE_BLOCK;
if (spec && !spec->cache_blocks()) {
cache_blocks = CFileReader::DONT_CACHE_BLOCK;
for (int proj_col_idx = 0;
proj_col_idx < projection_->num_columns();
proj_col_idx++) {
ColumnId col_id = projection_->column_id(proj_col_idx);
if (!base_data_->has_data_for_column_id(col_id)) {
// If we have no data for a column, most likely it was added via an ALTER
// operation after this CFileSet was flushed. In that case, we're guaranteed
// that it is either NULLable, or has a "read-default". Otherwise, consider it a corruption.
const ColumnSchema& col_schema = projection_->column(proj_col_idx);
if (PREDICT_FALSE(!col_schema.is_nullable() && !col_schema.has_read_default())) {
return Status::Corruption(Substitute("column $0 has no data in rowset $1",
col_schema.ToString(), base_data_->ToString()));
ret_iters.push_back(new DefaultColumnValueIterator(col_schema.type_info(),
CFileIterator *iter;
RETURN_NOT_OK_PREPEND(base_data_->NewColumnIterator(col_id, cache_blocks, &iter),
Substitute("could not create iterator for column $0",
return Status::OK();
Status CFileSet::Iterator::Init(ScanSpec *spec) {
// Setup Key Iterator
CFileIterator *tmp;
// Setup column iterators.
// If there is a range predicate on the key column, push that down into an
// ordinal range.
initted_ = true;
// Don't actually seek -- we'll seek when we first actually read the
// data.
cur_idx_ = lower_bound_idx_;
Unprepare(); // Reset state.
return Status::OK();
Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) {
CHECK_GT(row_count_, 0);
lower_bound_idx_ = 0;
upper_bound_idx_ = row_count_;
if (spec == nullptr) {
// No predicate.
return Status::OK();
Schema key_schema_for_vlog;
if (VLOG_IS_ON(1)) {
key_schema_for_vlog = base_data_->tablet_schema().CreateKeyProjection();
if (spec->lower_bound_key() &&
spec->lower_bound_key()->encoded_key().compare(base_data_->min_encoded_key_) > 0) {
bool exact;
Status s = key_iter_->SeekAtOrAfter(*spec->lower_bound_key(), &exact);
if (s.IsNotFound()) {
// The lower bound is after the end of the key range.
// Thus, no rows will pass the predicate, so we set the lower bound
// to the end of the file.
lower_bound_idx_ = row_count_;
return Status::OK();
lower_bound_idx_ = std::max(lower_bound_idx_, key_iter_->GetCurrentOrdinal());
VLOG(1) << "Pushed lower bound value "
<< spec->lower_bound_key()->Stringify(key_schema_for_vlog)
<< " as row_idx >= " << lower_bound_idx_;
if (spec->exclusive_upper_bound_key() &&
base_data_->max_encoded_key_) <= 0) {
bool exact;
Status s = key_iter_->SeekAtOrAfter(*spec->exclusive_upper_bound_key(), &exact);
if (PREDICT_FALSE(s.IsNotFound())) {
LOG(DFATAL) << "CFileSet indicated upper bound was within range, but "
<< "key iterator could not seek. "
<< "CFileSet upper_bound = "
<< Slice(base_data_->max_encoded_key_).ToDebugString()
<< ", enc_key = "
<< spec->exclusive_upper_bound_key()->encoded_key().ToDebugString();
} else {
rowid_t cur = key_iter_->GetCurrentOrdinal();
upper_bound_idx_ = std::min(upper_bound_idx_, cur);
VLOG(1) << "Pushed upper bound value "
<< spec->exclusive_upper_bound_key()->Stringify(key_schema_for_vlog)
<< " as row_idx < " << upper_bound_idx_;
return Status::OK();
void CFileSet::Iterator::Unprepare() {
prepared_count_ = 0;
cols_prepared_.assign(col_iters_.size(), false);
Status CFileSet::Iterator::PrepareBatch(size_t *n) {
DCHECK_EQ(prepared_count_, 0) << "Already prepared";
size_t remaining = upper_bound_idx_ - cur_idx_;
if (*n > remaining) {
*n = remaining;
prepared_count_ = *n;
// Lazily prepare the first column when it is materialized.
return Status::OK();
Status CFileSet::Iterator::PrepareColumn(size_t idx) {
if (cols_prepared_[idx]) {
// Already prepared in this batch.
return Status::OK();
ColumnIterator* col_iter = col_iters_[idx];
size_t n = prepared_count_;
if (!col_iter->seeked() || col_iter->GetCurrentOrdinal() != cur_idx_) {
// Either this column has not yet been accessed, or it was accessed
// but then skipped in a prior block (e.g because predicates on other
// columns completely eliminated the block).
// Either way, we need to seek it to the correct offset.
Status s = col_iter->PrepareBatch(&n);
if (!s.ok()) {
LOG(WARNING) << "Unable to prepare column " << idx << ": " << s.ToString();
return s;
if (n != prepared_count_) {
return Status::Corruption(
StringPrintf("Column %zd (%s) didn't yield enough rows at offset %zd: expected "
"%zd but only got %zd", idx, projection_->column(idx).ToString().c_str(),
cur_idx_, prepared_count_, n));
cols_prepared_[idx] = true;
return Status::OK();
Status CFileSet::Iterator::InitializeSelectionVector(SelectionVector *sel_vec) {
return Status::OK();
Status CFileSet::Iterator::MaterializeColumn(size_t col_idx, ColumnBlock *dst) {
CHECK_EQ(prepared_count_, dst->nrows());
DCHECK_LT(col_idx, col_iters_.size());
ColumnIterator* iter = col_iters_[col_idx];
return iter->Scan(dst);
Status CFileSet::Iterator::FinishBatch() {
CHECK_GT(prepared_count_, 0);
for (size_t i = 0; i < col_iters_.size(); i++) {
if (cols_prepared_[i]) {
Status s = col_iters_[i]->FinishBatch();
if (!s.ok()) {
LOG(WARNING) << "Unable to FinishBatch() on column " << i;
return s;
cur_idx_ += prepared_count_;
return Status::OK();
void CFileSet::Iterator::GetIteratorStats(vector<IteratorStats>* stats) const {
for (const ColumnIterator* iter : col_iters_) {
} // namespace tablet
} // namespace kudu