// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/memrowset.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <boost/none_t.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/codegen/compilation_manager.h"
#include "kudu/codegen/row_projector.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/scan_spec.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/txn_metadata.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/memory.h"
DEFINE_bool(mrs_use_codegen, true, "whether the memrowset should use code "
"generation for iteration");
TAG_FLAG(mrs_use_codegen, hidden);
using kudu::consensus::OpId;
using kudu::fs::IOContext;
using kudu::log::LogAnchorRegistry;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
static const int kInitialArenaSize = 16;
bool MRSRow::IsGhost() const {
const Mutation *mut_tail = header_->redo_tail;
if (mut_tail == nullptr) {
return false;
RowChangeListDecoder decoder(mut_tail->changelist());
Status s = decoder.Init();
if (!PREDICT_TRUE(s.ok())) {
LOG(FATAL) << Substitute("Failed to decode: $0 ($1)",
return decoder.is_delete();
namespace {
shared_ptr<MemTracker> CreateMemTrackerForMemRowSet(
int64_t id, shared_ptr<MemTracker> parent_tracker) {
string mem_tracker_id = Substitute("MemRowSet-$0", id);
return MemTracker::CreateTracker(-1, mem_tracker_id, std::move(parent_tracker));
} // anonymous namespace
Status MemRowSet::Create(int64_t id,
const Schema &schema,
LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker,
shared_ptr<MemRowSet>* mrs) {
auto local_mrs(MemRowSet::make_shared(
id, schema, /*txn_id*/boost::none, /*txn_metadata*/nullptr, log_anchor_registry,
*mrs = std::move(local_mrs);
return Status::OK();
Status MemRowSet::Create(int64_t id,
const Schema &schema,
int64_t txn_id,
scoped_refptr<TxnMetadata> txn_metadata,
LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker,
shared_ptr<MemRowSet>* mrs) {
auto local_mrs(MemRowSet::make_shared(
id, schema, boost::make_optional(txn_id), std::move(txn_metadata), log_anchor_registry,
*mrs = std::move(local_mrs);
return Status::OK();
MemRowSet::MemRowSet(int64_t id,
const Schema& schema,
boost::optional<int64_t> txn_id,
scoped_refptr<TxnMetadata> txn_metadata,
LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker)
: id_(id),
allocator_(new MemoryTrackingBufferAllocator(
CreateMemTrackerForMemRowSet(id, std::move(parent_tracker)))),
arena_(new ThreadSafeMemoryTrackingArena(kInitialArenaSize, allocator_)),
anchorer_(log_anchor_registry, Substitute("MemRowSet-$0$1", id_, txn_id_ ?
Substitute("(txn_id=$0)", *txn_id) : "")),
live_row_count_(0) {
ANNOTATE_BENIGN_RACE(&debug_insert_count_, "insert count isnt accurate");
ANNOTATE_BENIGN_RACE(&debug_update_count_, "update count isnt accurate");
MemRowSet::~MemRowSet() {
Status MemRowSet::DebugDump(vector<string> *lines) {
unique_ptr<Iterator> iter(NewIterator());
while (iter->HasNext()) {
MRSRow row = iter->GetCurrentRow();
<< "@" << row.insertion_timestamp()
<< string(txn_id_ ?
Substitute("(txn_id=$0$1)", *txn_id_,
DCHECK_NOTNULL(txn_metadata_)->commit_timestamp() ?
Substitute("@$0", txn_metadata_->commit_timestamp()->value()) : "") :
<< ": row " << schema_.DebugRow(row)
<< " mutations=" << Mutation::StringifyMutationList(schema_, row.header_->redo_head)
<< std::endl;
return Status::OK();
Status MemRowSet::Insert(Timestamp timestamp,
const ConstContiguousRow& row,
const OpId& op_id) {
DCHECK_SCHEMA_EQ(schema_, *row.schema());
faststring enc_key_buf;
schema_.EncodeComparableKey(row, &enc_key_buf);
Slice enc_key(enc_key_buf);
btree::PreparedMutation<MSBTreeTraits> mutation(enc_key);
// TODO: for now, the key ends up stored doubly --
// once encoded in the btree key, and again in the value
// (unencoded).
// That's not very memory-efficient!
if (mutation.exists()) {
// It's OK for it to exist if it's just a "ghost" row -- i.e the
// row is deleted.
MRSRow ms_row(this, mutation.current_mutable_value());
if (!ms_row.IsGhost()) {
return Status::AlreadyPresent("key already present");
// Insert a "reinsert" mutation.
return Reinsert(timestamp, row, &ms_row);
// Copy the non-encoded key onto the stack since we need
// to mutate it when we relocate its Slices into our arena.
DEFINE_MRSROW_ON_STACK(this, mrsrow, mrsrow_slice);
mrsrow.header_->insertion_timestamp = timestamp;
mrsrow.header_->redo_head = nullptr;
mrsrow.header_->redo_tail = nullptr;
RETURN_NOT_OK(mrsrow.CopyRow(row, arena_.get()));
<< "Expected to be able to insert, since the prepared mutation "
<< "succeeded!";
return Status::OK();
Status MemRowSet::Reinsert(Timestamp timestamp, const ConstContiguousRow& row, MRSRow *ms_row) {
DCHECK_SCHEMA_EQ(schema_, *row.schema());
// Encode the REINSERT mutation
faststring buf;
RowChangeListEncoder encoder(&buf);
// Move the REINSERT mutation itself into our Arena.
Mutation *mut = Mutation::CreateInArena(arena_.get(), timestamp, encoder.as_changelist());
// Append the mutation into the row's mutation list.
// This function has "release" semantics which ensures that the memory writes
// for the mutation are fully published before any concurrent reader sees
// the appended mutation.
mut->AppendToListAtomic(&ms_row->header_->redo_head, &ms_row->header_->redo_tail);
return Status::OK();
Status MemRowSet::MutateRow(Timestamp timestamp,
const RowSetKeyProbe &probe,
const RowChangeList &delta,
const consensus::OpId& op_id,
const IOContext* /*io_context*/,
ProbeStats* stats,
OperationResultPB *result) {
btree::PreparedMutation<MSBTreeTraits> mutation(probe.encoded_key_slice());
if (!mutation.exists()) {
return Status::NotFound("not in memrowset");
MRSRow row(this, mutation.current_mutable_value());
// If the row exists, it may still be a "ghost" row -- i.e a row
// that's been deleted. If that's the case, we should treat it as
// NotFound.
if (row.IsGhost()) {
return Status::NotFound("not in memrowset (ghost)");
// Append to the linked list of mutations for this row.
Mutation *mut = Mutation::CreateInArena(arena_.get(), timestamp, delta);
// This function has "release" semantics which ensures that the memory writes
// for the mutation are fully published before any concurrent reader sees
// the appended mutation.
mut->AppendToListAtomic(&row.header_->redo_head, &row.header_->redo_tail);
MemStoreTargetPB* target = result->add_mutated_stores();
if (txn_id_) {
if (delta.is_delete()) {
return Status::OK();
Status MemRowSet::CheckRowPresent(const RowSetKeyProbe &probe, const IOContext* /*io_context*/,
bool* present, ProbeStats* stats) const {
// Use a PreparedMutation here even though we don't plan to mutate. Even though
// this takes a lock rather than an optimistic copy, it should be a very short
// critical section, and this call is only made on updates, which are rare.
btree::PreparedMutation<MSBTreeTraits> mutation(probe.encoded_key_slice());
mutation.Prepare(const_cast<MSBTree *>(&tree_));
if (!mutation.exists()) {
*present = false;
return Status::OK();
// TODO(perf): using current_mutable_value() will actually change the data's
// version number, even though we're not going to do any mutation. This would
// make concurrent readers retry, even though they don't have to (we aren't
// actually mutating anything here!)
MRSRow row(this, mutation.current_mutable_value());
// If the row exists, it may still be a "ghost" row -- i.e a row
// that's been deleted. If that's the case, we should treat it as
// NotFound.
*present = !row.IsGhost();
return Status::OK();
MemRowSet::Iterator *MemRowSet::NewIterator(const RowIteratorOptions& opts) const {
return new MemRowSet::Iterator(shared_from_this(), tree_.NewIterator(), opts);
MemRowSet::Iterator *MemRowSet::NewIterator() const {
// TODO(todd): can we kill this function? should be only used by tests?
RowIteratorOptions opts;
opts.projection = &schema();
return NewIterator(opts);
Status MemRowSet::NewRowIterator(const RowIteratorOptions& opts,
unique_ptr<RowwiseIterator>* out) const {
return Status::OK();
Status MemRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const IOContext* /*io_context*/,
unique_ptr<CompactionInput>* out) const {
out->reset(CompactionInput::Create(*this, projection, snap));
return Status::OK();
Status MemRowSet::GetBounds(string *min_encoded_key,
string *max_encoded_key) const {
return Status::NotSupported("");
// Virtual interface allows two possible row projector implementations
class MemRowSet::Iterator::MRSRowProjector {
typedef RowProjector::ProjectionIdxMapping ProjectionIdxMapping;
virtual ~MRSRowProjector() {}
virtual Status ProjectRowForRead(const MRSRow& src_row,
RowBlockRow* dst_row,
Arena* arena) = 0;
virtual Status ProjectRowForRead(const ConstContiguousRow& src_row,
RowBlockRow* dst_row,
Arena* arena) = 0;
virtual const vector<ProjectionIdxMapping>& base_cols_mapping() const = 0;
virtual Status Init() = 0;
namespace {
typedef MemRowSet::Iterator::MRSRowProjector MRSRowProjector;
template<class ActualProjector>
class MRSRowProjectorImpl : public MRSRowProjector {
explicit MRSRowProjectorImpl(unique_ptr<ActualProjector> actual)
: actual_(std::move(actual)) {}
Status Init() override { return actual_->Init(); }
Status ProjectRowForRead(const MRSRow& src_row, RowBlockRow* dst_row,
Arena* arena) override {
return actual_->ProjectRowForRead(src_row, dst_row, arena);
Status ProjectRowForRead(const ConstContiguousRow& src_row,
RowBlockRow* dst_row,
Arena* arena) override {
return actual_->ProjectRowForRead(src_row, dst_row, arena);
const vector<ProjectionIdxMapping>& base_cols_mapping() const override {
return actual_->base_cols_mapping();
unique_ptr<ActualProjector> actual_;
// If codegen is enabled, then generates a codegen::RowProjector;
// otherwise makes a regular one.
unique_ptr<MRSRowProjector> GenerateAppropriateProjector(
const Schema* base, const Schema* projection) {
// Attempt code-generated implementation
if (FLAGS_mrs_use_codegen) {
unique_ptr<codegen::RowProjector> actual;
if (codegen::CompilationManager::GetSingleton()->RequestRowProjector(
base, projection, &actual)) {
return unique_ptr<MRSRowProjector>(
new MRSRowProjectorImpl<codegen::RowProjector>(std::move(actual)));
// Proceed with default implementation
unique_ptr<RowProjector> actual(new RowProjector(base, projection));
return unique_ptr<MRSRowProjector>(
new MRSRowProjectorImpl<RowProjector>(std::move(actual)));
} // anonymous namespace
MemRowSet::Iterator::Iterator(const std::shared_ptr<const MemRowSet>& mrs,
MemRowSet::MSBTIter* iter,
RowIteratorOptions opts)
: memrowset_(mrs),
GenerateAppropriateProjector(&mrs->schema_nonvirtual(), opts_.projection)),
delta_projector_(&mrs->schema_nonvirtual(), opts_.projection),
state_(kUninitialized) {
// TODO(todd): various code assumes that a newly constructed iterator
// is pointed at the beginning of the dataset. This causes a redundant
// seek. Could make this lazy instead, or change the semantics so that
// a seek is required (probably the latter)
MemRowSet::Iterator::~Iterator() {}
Status MemRowSet::Iterator::Init(ScanSpec *spec) {
DCHECK_EQ(state_, kUninitialized);
if (spec && spec->lower_bound_key()) {
bool exact;
const Slice &lower_bound = spec->lower_bound_key()->encoded_key();
if (!iter_->SeekAtOrAfter(lower_bound, &exact)) {
// Lower bound is after the end of the key range, no rows will
// pass the predicate so we can stop the scan right away.
state_ = kFinished;
return Status::OK();
if (spec && spec->exclusive_upper_bound_key()) {
const Slice &upper_bound = spec->exclusive_upper_bound_key()->encoded_key();
state_ = kScanning;
return Status::OK();
Status MemRowSet::Iterator::SeekAtOrAfter(const Slice &key, bool *exact) {
DCHECK_NE(state_, kUninitialized) << "not initted";
if (key.size() > 0) {
ConstContiguousRow row_slice(&memrowset_->schema(), key);
memrowset_->schema().EncodeComparableKey(row_slice, &tmp_buf);
} else {
// Seeking to empty key shouldn't try to run any encoding.
if (iter_->SeekAtOrAfter(Slice(tmp_buf), exact) ||
key.size() == 0) {
return Status::OK();
} else {
return Status::NotFound("no match in memrowset");
Status MemRowSet::Iterator::NextBlock(RowBlock *dst) {
// TODO: add dcheck that dst->schema() matches our schema
// also above TODO applies to a lot of other CopyNextRows cases
DCHECK_NE(state_, kUninitialized) << "not initted";
if (PREDICT_FALSE(!iter_->IsValid())) {
return Status::NotFound("end of iter");
if (PREDICT_FALSE(state_ != kScanning)) {
return Status::OK();
if (PREDICT_FALSE(dst->row_capacity() == 0)) {
return Status::OK();
// Reset rowblock arena to eventually reach appropriate buffer size.
// Always allocating the full capacity is only a problem for the last block.
if (dst->arena()) {
// Fill
size_t fetched;
RETURN_NOT_OK(FetchRows(dst, &fetched));
DCHECK_LE(0, fetched);
DCHECK_LE(fetched, dst->nrows());
// Clear unreached bits by resizing
return Status::OK();
Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
*fetched = 0;
do {
RowBlockRow dst_row = dst->row(*fetched);
// Copy the row into the destination, including projection
// and relocating slices.
// TODO(todd): can we share some code here with CopyRowToArena() from row.h
// or otherwise put this elsewhere?
Slice v = iter_->GetCurrentValue();
MRSRow row(memrowset_.get(), v);
// Short-circuit if we've exceeded the iteration's upper bound.
if (has_upper_bound() && out_of_bounds(iter_->GetCurrentKey())) {
state_ = kFinished;
// The snapshots in 'opts_' represent a time range that this iterator must
// respect. There are two possible cases:
// 1. 'snap_to_exclude' is unset but 'snap_to_include' is set. The time
// range is [INF, snap_to_include).
// 2. Both 'snap_to exclude' and 'snap_to_include' are set. The time range
// is [snap_to_exclude, snap_to_include).
// If a non-transactional row's insertion timestamp is applied in
// 'snap_to_exclude' or a transactional row's commit timestamp is committed
// in 'snap_to_exclude', the insertion was outside this iterator's time
// range (i.e. the insert was "excluded"). However, subsequent mutations
// may be inside the time range, so we must still project the row and walk
// its mutation list.
const auto& txn_meta = memrowset_->txn_metadata();
bool insert_excluded = opts_.snap_to_exclude &&
// TODO(awong): if we find this to be too slow, we should be able to
// compute IsCommitted() once per iterator at construction time.
(txn_meta ? opts_.snap_to_exclude->IsCommitted(*txn_meta.get()) :
bool unset_in_sel_vector;
ApplyStatus apply_status;
if (insert_excluded ||
(txn_meta ? opts_.snap_to_include.IsCommitted(*txn_meta.get()) :
opts_.snap_to_include.IsApplied(row.insertion_timestamp()))) {
RETURN_NOT_OK(projector_->ProjectRowForRead(row, &dst_row, dst->arena()));
// Roll-forward MVCC for committed updates.
Mutation* redo_head = reinterpret_cast<Mutation*>(
redo_head, &dst_row, dst->arena(), insert_excluded, &apply_status));
unset_in_sel_vector = (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
(apply_status == NONE_APPLIED && insert_excluded) ||
} else {
// The insertion is too new; the entire row should be omitted.
unset_in_sel_vector = true;
if (unset_in_sel_vector) {
// In debug mode, fill the row data for easy debugging
#ifndef NDEBUG
if (state_ != kFinished) {
} else if (projection_vc_is_deleted_idx_ != Schema::kColumnNotFound) {
apply_status == APPLIED_AND_DELETED);
} while (iter_->Next() && *fetched < dst->nrows());
return Status::OK();
Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
const Mutation* mutation_head, RowBlockRow* dst_row, Arena* dst_arena,
bool insert_excluded, ApplyStatus* apply_status) {
ApplyStatus local_apply_status = NONE_APPLIED;
// Fast short-circuit the likely case of a row which was inserted and never
// updated.
if (PREDICT_TRUE(mutation_head == nullptr)) {
*apply_status = local_apply_status;
return Status::OK();
// In order to find unobservable rows, we need to track row liveness at the
// start and end of the time range. If a row was dead at both ends, its
// lifespan must have been a subset of the time range and it should be
// excluded from the results.
// Finding 'is_deleted_end' is relatively straight-forward: we use each
// relevant mutation to drive a liveness state machine, and after we're done
// applying, 'is_deleted_end' is just the final value of that state machine.
// Finding 'is_deleted_start' is trickier. If the insertion was inside the
// time range, we know the value is true because the row was dead prior to the
// insertion and the insertion happened after the start of the time range.
// However, if the insertion was excluded from the time range, the value is
// going to be whatever the value of the liveness state machine was at the
// start of the time range.
bool is_deleted_start = !insert_excluded;
bool is_deleted_end = false;
for (const Mutation *mut = mutation_head;
mut != nullptr;
mut = mut->acquire_next()) {
if (!opts_.snap_to_include.IsApplied(mut->timestamp_)) {
// This mutation is too new and should be omitted.
// All subsequent mutations are also too new because their timestamps are
// guaranteed to be equal to or greater than this mutation's timestamp.
// If the mutation is too old, we still need to apply it (so that the column
// values are correct if we see a relevant mutation later), but it doesn't
// count towards the overall "application status".
if (!opts_.snap_to_exclude ||
!opts_.snap_to_exclude->IsApplied(mut->timestamp_)) {
// This is the first mutation within the time range, so we may use it to
// initialize 'is_deleted_start'.
if (local_apply_status == NONE_APPLIED && insert_excluded) {
is_deleted_start = is_deleted_end;
local_apply_status = APPLIED_AND_PRESENT;
// Apply the mutation.
// Check if it's a deletion.
RowChangeListDecoder decoder(mut->changelist());
if (decoder.is_delete()) {
} else {
DCHECK(decoder.is_update() || decoder.is_reinsert());
if (decoder.is_reinsert()) {
// TODO(todd): this is slow, since it makes multiple passes through the rowchangelist.
// Instead, we should keep the backwards mapping of columns.
for (const RowProjector::ProjectionIdxMapping& mapping : projector_->base_cols_mapping()) {
RowChangeListDecoder decoder(mut->changelist());
ColumnBlock dst_col = dst_row->column_block(mapping.first);
RETURN_NOT_OK(decoder.ApplyToOneColumn(dst_row->row_index(), &dst_col,
mapping.second, dst_arena));
if (opts_.snap_to_exclude && is_deleted_start && is_deleted_end) {
// The row's lifespan was a subset of the time range. It can't be observed,
// so it should definitely not show up in the results.
// Note: we condition on 'snap_to_exclude' because although insert_excluded
// is false on some closed time range scans, it's also false in all open
// time range scans, and we don't want this heuristic to fire in the latter case.
local_apply_status = APPLIED_AND_UNOBSERVABLE;
} else if (is_deleted_end && local_apply_status == APPLIED_AND_PRESENT) {
// The row was selected and deleted. It may be omitted from the results,
// depending on whether the results should include deleted rows or not.
local_apply_status = APPLIED_AND_DELETED;
*apply_status = local_apply_status;
return Status::OK();
// Copy the current MRSRow to the 'dst_row' provided using the iterator projection schema.
Status MemRowSet::Iterator::GetCurrentRow(RowBlockRow* dst_row,
Arena* row_arena,
Mutation** redo_head,
Arena* mutation_arena,
Timestamp* insertion_timestamp) {
DCHECK(boost::none == opts_.snap_to_exclude);
DCHECK(redo_head != nullptr);
// Get the row from the MemRowSet. It may have a different schema from the iterator projection.
MRSRow src_row = GetCurrentRow();
const auto& mrs_txn_id = memrowset_->txn_id();
if (mrs_txn_id) {
// NOTE: we currently only support flushing committed MRSs.
const auto& txn_meta = memrowset_->txn_metadata();
CHECK(boost::none != txn_meta->commit_timestamp());
*insertion_timestamp = *txn_meta->commit_timestamp();
} else {
*insertion_timestamp = src_row.insertion_timestamp();
// Project the RowChangeList if required
*redo_head = src_row.acquire_redo_head();
if (!delta_projector_.is_identity()) {
DCHECK(mutation_arena != nullptr);
Mutation *prev_redo = nullptr;
*redo_head = nullptr;
for (const Mutation *mut = src_row.redo_head();
mut != nullptr;
mut = mut->acquire_next()) {
RowChangeListEncoder enc(&delta_buf_);
// The projection resulted in an empty mutation (e.g. update of a removed column)
if (enc.is_empty()) continue;
Mutation *mutation = Mutation::CreateInArena(mutation_arena,
if (prev_redo != nullptr) {
} else {
*redo_head = mutation;
prev_redo = mutation;
// Project the Row
return projector_->ProjectRowForRead(src_row, dst_row, row_arena);
} // namespace tablet
} // namespace kudu