// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/deltafile.h"
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <gflags/gflags.h>
#include "kudu/cfile/binary_plain_block.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/index_btree.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_relevancy.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/util/compression/compression_codec.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/trace.h"
DEFINE_int32(deltafile_default_block_size, 32*1024,
"Block size for delta files. In the future, this may become configurable "
"on a per-table basis.");
TAG_FLAG(deltafile_default_block_size, experimental);
DEFINE_string(deltafile_default_compression_codec, "lz4",
"The compression codec used when writing deltafiles.");
TAG_FLAG(deltafile_default_compression_codec, experimental);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
class MemTracker;
using cfile::BinaryPlainBlockDecoder;
using cfile::BlockPointer;
using cfile::CFileReader;
using cfile::IndexTreeIterator;
using cfile::ReaderOptions;
using fs::BlockCreationTransaction;
using fs::BlockManager;
using fs::IOContext;
using fs::ReadableBlock;
using fs::WritableBlock;
namespace tablet {
const char * const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
DeltaFileWriter::DeltaFileWriter(unique_ptr<WritableBlock> block)
#ifndef NDEBUG
: has_appended_(false)
{ // NOLINT(*)
cfile::WriterOptions opts;
opts.write_validx = true;
opts.storage_attributes.cfile_block_size = FLAGS_deltafile_default_block_size;
opts.storage_attributes.encoding = PLAIN_ENCODING;
opts.storage_attributes.compression =
// The CFile value index is 'compressed' by truncating delta values to only
// contain the delta key. The entire deltakey is required in order to support
// efficient seeks without deserializing the entire block. The generic value
// index optimization is disabled, since it could truncate portions of the
// deltakey.
// Note: The deltafile usage of the CFile value index is irregular, since it
// inserts values in non-sorted order (the timestamp portion of the deltakey
// in UNDO files is sorted in descending order). This doesn't appear to cause
// problems in practice.
opts.optimize_index_keys = false;
opts.validx_key_encoder = [] (const void* value, faststring* buffer) {
const Slice* s1 = static_cast<const Slice*>(value);
Slice s2(*s1);
DeltaKey key;
writer_.reset(new cfile::CFileWriter(std::move(opts),
Status DeltaFileWriter::Start() {
return writer_->Start();
Status DeltaFileWriter::Finish() {
BlockManager* bm = writer_->block()->block_manager();
unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
return transaction->CommitCreatedBlocks();
Status DeltaFileWriter::FinishAndReleaseBlock(BlockCreationTransaction* transaction) {
if (writer_->written_value_count() == 0) {
return Status::Aborted("no deltas written");
return writer_->FinishAndReleaseBlock(transaction);
Status DeltaFileWriter::DoAppendDelta(const DeltaKey &key,
const RowChangeList &delta) {
Slice delta_slice(delta.slice());
// Write the encoded form of the key to the file.
tmp_buf_.append(, delta_slice.size());
Slice tmp_buf_slice(tmp_buf_);
return writer_->AppendEntries(&tmp_buf_slice, 1);
Status DeltaFileWriter::AppendDelta<REDO>(
const DeltaKey &key, const RowChangeList &delta) {
#ifndef NDEBUG
// Sanity check insertion order in debug mode.
if (has_appended_) {
DCHECK(last_key_.CompareTo<REDO>(key) <= 0)
<< "must insert redo deltas in sorted order (ascending key, then ascending ts): "
<< "got key " << key.ToString() << " after "
<< last_key_.ToString();
has_appended_ = true;
last_key_ = key;
return DoAppendDelta(key, delta);
Status DeltaFileWriter::AppendDelta<UNDO>(
const DeltaKey &key, const RowChangeList &delta) {
#ifndef NDEBUG
// Sanity check insertion order in debug mode.
if (has_appended_) {
DCHECK(last_key_.CompareTo<UNDO>(key) <= 0)
<< "must insert undo deltas in sorted order (ascending key, then descending ts): "
<< "got key " << key.ToString() << " after "
<< last_key_.ToString();
has_appended_ = true;
last_key_ = key;
return DoAppendDelta(key, delta);
void DeltaFileWriter::WriteDeltaStats(std::unique_ptr<DeltaStats> stats) {
DeltaStatsPB delta_stats_pb;
faststring buf;
pb_util::SerializeToString(delta_stats_pb, &buf);
writer_->AddMetadataPair(DeltaFileReader::kDeltaStatsEntryName, buf.ToString());
delta_stats_ = std::move(stats);
// Reader
Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
DeltaType delta_type,
ReaderOptions options,
shared_ptr<DeltaFileReader>* reader_out) {
shared_ptr<DeltaFileReader> df_reader;
const IOContext* io_context = options.io_context;
*reader_out = df_reader;
return Status::OK();
Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
DeltaType delta_type,
ReaderOptions options,
unique_ptr<DeltaStats> delta_stats,
shared_ptr<DeltaFileReader>* reader_out) {
unique_ptr<CFileReader> cf_reader;
const IOContext* io_context = options.io_context;
unique_ptr<DeltaFileReader> df_reader(
new DeltaFileReader(std::move(cf_reader), std::move(delta_stats), delta_type));
if (!FLAGS_cfile_lazy_open) {
return Status::OK();
DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader,
unique_ptr<DeltaStats> delta_stats,
DeltaType delta_type)
: reader_(cf_reader.release()),
delta_type_(delta_type) {}
Status DeltaFileReader::Init(const IOContext* io_context) {
return init_once_.Init([this, io_context] { return InitOnce(io_context); });
Status DeltaFileReader::InitOnce(const IOContext* io_context) {
// Fully open the CFileReader if it was lazily opened earlier.
// If it's already initialized, this is a no-op.
if (!reader_->has_validx()) {
return Status::NotSupported("file does not have a value index!");
// Initialize delta file stats
if (!has_delta_stats()) {
return Status::OK();
Status DeltaFileReader::ReadDeltaStats() {
string filestats_pb_buf;
if (!reader_->GetMetadataEntry(kDeltaStatsEntryName, &filestats_pb_buf)) {
return Status::NotSupported("missing delta stats from the delta file metadata");
DeltaStatsPB deltastats_pb;
if (!deltastats_pb.ParseFromString(filestats_pb_buf)) {
return Status::Corruption("unable to parse the delta stats protobuf");
unique_ptr<DeltaStats> stats(new DeltaStats());
std::lock_guard<simple_spinlock> l(stats_lock_);
delta_stats_ = std::move(stats);
return Status::OK();
bool DeltaFileReader::IsRelevantForSnapshots(
const std::optional<MvccSnapshot>& snap_to_exclude,
const MvccSnapshot& snap_to_include) const {
if (!init_once_.init_succeeded()) {
// If we're not initted, it means we have no delta stats and must
// assume that this file is relevant for every snapshot.
return true;
// We don't know whether the caller's intent is to apply deltas, to select
// them, or both. As such, we must be conservative and assume 'both', which
// means the file is relevant if any relevancy criteria is true.
bool relevant = delta_type_ == REDO ?
delta_stats_->min_timestamp()) :
if (snap_to_exclude) {
// The select criteria is the same regardless of delta_type_.
relevant |= IsDeltaRelevantForSelect(*snap_to_exclude, snap_to_include,
return relevant;
Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
const shared_ptr<MemTracker>& parent_mem_tracker,
shared_ptr<DeltaFileReader>* out) const {
unique_ptr<ReadableBlock> block;
RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block));
ReaderOptions options;
options.parent_mem_tracker = parent_mem_tracker;
return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options,
/*delta_stats*/nullptr, out);
Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
unique_ptr<DeltaIterator>* iterator) const {
if (IsRelevantForSnapshots(opts.snap_to_exclude, opts.snap_to_include)) {
if (VLOG_IS_ON(2)) {
if (!init_once_.init_succeeded()) {
TRACE_COUNTER_INCREMENT("delta_iterators_lazy_initted", 1);
VLOG(2) << (delta_type_ == REDO ? "REDO" : "UNDO") << " delta " << ToString()
<< " has no delta stats"
<< ": can't cull for " << opts.snap_to_include.ToString();
} else if (delta_type_ == REDO) {
VLOG(2) << "REDO delta " << ToString()
<< " has min ts " << delta_stats_->min_timestamp().ToString()
<< ": can't cull for " << opts.snap_to_include.ToString();
} else {
VLOG(2) << "UNDO delta " << ToString()
<< " has max ts " << delta_stats_->max_timestamp().ToString()
<< ": can't cull for " << opts.snap_to_include.ToString();
TRACE_COUNTER_INCREMENT("delta_iterators_relevant", 1);
// Ugly cast, but it lets the iterator fully initialize the reader
// during its first seek.
auto s_this = const_cast<DeltaFileReader*>(this)->shared_from_this();
if (delta_type_ == REDO) {
iterator->reset(new DeltaFileIterator<REDO>(std::move(s_this), opts));
} else {
iterator->reset(new DeltaFileIterator<UNDO>(std::move(s_this), opts));
return Status::OK();
VLOG(2) << "Culling "
<< ((delta_type_ == REDO) ? "REDO":"UNDO")
<< " delta " << ToString() << " for " << opts.snap_to_include.ToString();
return Status::NotFound("MvccSnapshot outside the range of this delta.");
Status DeltaFileReader::CheckRowDeleted(rowid_t row_idx, const IOContext* io_context,
bool* deleted) const {
// If there are no deletes in the delta file at all, we can short-circuit
// the seek.
if (delta_stats_->delete_count() == 0) {
*deleted = false;
return Status::OK();
// TODO(todd): would be nice to avoid allocation here, but we don't want to
// duplicate all the logic from NewDeltaIterator. So, we'll heap-allocate
// for now.
Schema empty_schema;
RowIteratorOptions opts;
opts.projection = &empty_schema;
opts.io_context = io_context;
unique_ptr<DeltaIterator> iter;
Status s = NewDeltaIterator(opts, &iter);
if (s.IsNotFound()) {
*deleted = false;
return Status::OK();
ScanSpec spec;
RETURN_NOT_OK(iter->PrepareBatch(1, DeltaIterator::PREPARE_FOR_APPLY));
// TODO: this does an allocation - can we stack-allocate the bitmap
// and make SelectionVector able to "release" its buffer?
SelectionVector sel_vec(1);
*deleted = !sel_vec.IsRowSelected(0);
return Status::OK();
uint64_t DeltaFileReader::EstimateSize() const {
return reader_->file_size();
// DeltaFileIterator
template<DeltaType Type>
DeltaFileIterator<Type>::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
RowIteratorOptions opts)
: dfr_(std::move(dfr)),
cache_blocks_(CFileReader::CACHE_BLOCK) {}
template<DeltaType Type>
Status DeltaFileIterator<Type>::Init(ScanSpec* spec) {
DCHECK(!initted_) << "Already initted";
if (spec) {
cache_blocks_ = spec->cache_blocks() ? CFileReader::CACHE_BLOCK :
initted_ = true;
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
DCHECK(initted_) << "Must call Init()";
// Finish the initialization of any lazily-initialized state.
// Check again whether this delta file is relevant given the snapshots
// that we are querying. We did this already before creating the
// DeltaFileIterator, but due to lazy initialization, it's possible
// that we weren't able to check at that time.
if (!dfr_->IsRelevantForSnapshots(preparer_.opts().snap_to_exclude,
preparer_.opts().snap_to_include)) {
exhausted_ = true;
return Status::OK();
if (!index_iter_) {
DeltaKey(idx, Timestamp(0)).EncodeTo(&tmp_buf_);
Slice key_slice(tmp_buf_);
Status s = index_iter_->SeekAtOrBefore(key_slice);
if (PREDICT_FALSE(s.IsNotFound())) {
// Seeking to a value before the first value in the file
// will return NotFound, due to the way the index seek
// works. We need to special-case this and have the
// iterator seek all the way down its leftmost branches
// to get the correct result.
s = index_iter_->SeekToFirst();
prepared_ = false;
exhausted_ = false;
return Status::OK();
struct PreparedDeltaBlock {
// The pointer from which this block was read. This is only used for
// logging, etc.
cfile::BlockPointer block_ptr_;
// Handle to the block, so it doesn't get freed from underneath us.
scoped_refptr<cfile::BlockHandle> block_;
// The block decoder, to avoid having to re-parse the block header
// on every ApplyUpdates() call
std::unique_ptr<cfile::BinaryPlainBlockDecoder> decoder_;
// The first row index for which there is an update in this delta block.
rowid_t first_updated_idx_;
// The last row index for which there is an update in this delta block.
rowid_t last_updated_idx_;
// Within this block, the index of the update which is the first one that
// needs to be consulted. This allows deltas to be skipped at the beginning
// of the block when the row block starts towards the end of the delta block.
// For example:
// <-- delta block ---->
// <--- prepared row block --->
// Here, we can skip a bunch of deltas at the beginning of the delta block
// which we know don't apply to the prepared row block.
rowid_t prepared_block_start_idx_;
// Return a string description of this prepared block, for logging.
std::string ToString() const {
return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
template<DeltaType Type>
Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
DCHECK(initted_) << "Must call Init()";
DCHECK(index_iter_) << "Must call SeekToOrdinal()";
unique_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
shared_ptr<CFileReader> reader = dfr_->cfile_reader();
dblk_ptr, cache_blocks_, &pdb->block_));
// The data has been successfully read. Finish creating the decoder.
pdb->prepared_block_start_idx_ = 0;
pdb->block_ptr_ = dblk_ptr;
// Decode the block.
pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_));
Substitute("unable to decode data block header in delta block $0 ($1)",
RETURN_NOT_OK(GetLastRowIndexInDecodedBlock(*pdb->decoder_, &pdb->last_updated_idx_));
#ifndef NDEBUG
VLOG(2) << "Read delta block which updates " <<
pdb->first_updated_idx_ << " through " <<
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
DCHECK(index_iter_) << "Must call SeekToOrdinal()";
Slice index_entry = index_iter_->GetCurrentKey();
DeltaKey k;
*idx = k.row_idx();
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDecoder &dec,
rowid_t *idx) {
DCHECK_GT(dec.Count(), 0);
Slice s(dec.string_at_index(dec.Count() - 1));
DeltaKey k;
*idx = k.row_idx();
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
DCHECK(initted_) << "Must call Init()";
DCHECK(exhausted_ || index_iter_) << "Must call SeekToOrdinal()";
CHECK_GT(nrows, 0);
rowid_t start_row = preparer_.cur_prepared_idx();
rowid_t stop_row = start_row + nrows - 1;
// Remove blocks from our list which are no longer relevant to the range
// being prepared.
while (!delta_blocks_.empty() &&
delta_blocks_.front()->last_updated_idx_ < start_row) {
while (!exhausted_) {
rowid_t next_block_rowidx = 0;
VLOG(2) << "Current delta block starting at row " << next_block_rowidx;
if (next_block_rowidx > stop_row) {
Status s = index_iter_->Next();
if (s.IsNotFound()) {
exhausted_ = true;
if (!delta_blocks_.empty()) {
PreparedDeltaBlock& block = *delta_blocks_.front();
int i = 0;
for (i = block.prepared_block_start_idx_;
i < block.decoder_->Count();
i++) {
Slice s(block.decoder_->string_at_index(i));
DeltaKey key;
if (key.row_idx() >= start_row) break;
block.prepared_block_start_idx_ = i;
#ifndef NDEBUG
VLOG(2) << "Done preparing deltas for " << start_row << "-" << stop_row
<< ": row block spans " << delta_blocks_.size() << " delta blocks";
prepared_ = true;
preparer_.Start(nrows, prepare_flags);
RETURN_NOT_OK(AddDeltas(start_row, stop_row));
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::AddDeltas(rowid_t start_row, rowid_t stop_row) {
DCHECK(prepared_) << "must Prepare";
for (auto& block : delta_blocks_) {
const BinaryPlainBlockDecoder& bpd = *block->decoder_;
DVLOG(2) << "Adding deltas from delta block " << block->first_updated_idx_ << "-"
<< block->last_updated_idx_ << " for row block starting at " << start_row;
if (PREDICT_FALSE(start_row > block->last_updated_idx_)) {
// The block to be updated completely falls after this delta block:
// <-- delta block --> <-- delta block -->
// <-- block to update -->
// This can happen because we don't know the block's last entry until after
// we queued it in PrepareBatch(). We could potentially remove it at that
// point during the prepare step, but for now just skip it here.
bool finished_row = false;
for (int i = block->prepared_block_start_idx_; i < bpd.Count(); i++) {
Slice slice = bpd.string_at_index(i);
// Decode and check the ID of the row we're going to update.
DeltaKey key;
// If this delta is for the same row as before, skip it if the previous
// AddDelta() call told us that we're done with this row.
if (preparer_.last_added_idx() &&
preparer_.last_added_idx() == key.row_idx() &&
finished_row) {
finished_row = false;
// Check that the delta is within the block we're currently processing.
if (key.row_idx() > stop_row) {
// Delta is for a row which comes after the block we're processing.
return Status::OK();
if (key.row_idx() < start_row) {
// Delta is for a row which comes before the block we're processing.
// Note: if AddDelta sets 'finished_row' to true, we could skip the
// remaining deltas for this row by seeking the block decoder. This trades
// off the cost of a seek against the cost of decoding some irrelevant delta keys.
// Given that updates are expected to be uncommon and that most scans are
// _not_ historical, the current implementation eschews seeking in favor of
// skipping irrelevant deltas one by one.
RETURN_NOT_OK(preparer_.AddDelta(key, slice, &finished_row));
if (VLOG_IS_ON(3)) {
RowChangeList rcl(slice);
DVLOG(3) << "Visited " << DeltaType_Name(DeltaTypeSelector<Type>::kTag)
<< " delta for key: " << key.ToString() << " Mut: "
<< rcl.ToString(*preparer_.opts().projection)
<< " Continue?: " << (!finished_row ? "TRUE" : "FALSE");
return Status::OK();
template<DeltaType Type>
Status DeltaFileIterator<Type>::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
const SelectionVector& filter) {
return preparer_.ApplyUpdates(col_to_apply, dst, filter);
template<DeltaType Type>
Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
return preparer_.ApplyDeletes(sel_vec);
template<DeltaType Type>
Status DeltaFileIterator<Type>::SelectDeltas(SelectedDeltas* deltas) {
return preparer_.SelectDeltas(deltas);
template<DeltaType Type>
Status DeltaFileIterator<Type>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
return preparer_.CollectMutations(dst, arena);
template<DeltaType Type>
bool DeltaFileIterator<Type>::HasNext() const {
return !exhausted_ || !delta_blocks_.empty();
template<DeltaType Type>
bool DeltaFileIterator<Type>::MayHaveDeltas() const {
return preparer_.MayHaveDeltas();
template<DeltaType Type>
string DeltaFileIterator<Type>::ToString() const {
return "DeltaFileIterator(" + dfr_->ToString() + ")";
template<DeltaType Type>
Status DeltaFileIterator<Type>::FilterColumnIdsAndCollectDeltas(
const vector<ColumnId>& col_ids,
vector<DeltaKeyAndUpdate>* out,
Arena* arena) {
return preparer_.FilterColumnIdsAndCollectDeltas(col_ids, out, arena);
template<DeltaType Type>
void DeltaFileIterator<Type>::FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
const string &msg) {
LOG(FATAL) << "Saw unexpected delta type in deltafile " << dfr_->ToString() << ": "
<< " rcl=" << RowChangeList(deltas).ToString(*preparer_.opts().projection)
<< " key=" << key.ToString() << " (" << msg << ")";
} // namespace tablet
} // namespace kudu