blob: 5cf07d01a4e921b49fe62685da390cc0f14bde66 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/deltafile.h"
#include <arpa/inet.h>
#include <memory>
#include <string>
#include "kudu/common/wire_protocol.h"
#include "kudu/cfile/binary_plain_block.h"
#include "kudu/cfile/block_encodings.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/pb_util.h"
DECLARE_bool(cfile_lazy_open);
DEFINE_int32(deltafile_default_block_size, 32*1024,
"Block size for delta files. In the future, this may become configurable "
"on a per-table basis.");
TAG_FLAG(deltafile_default_block_size, experimental);
using std::shared_ptr;
namespace kudu {
using cfile::BlockHandle;
using cfile::BlockPointer;
using cfile::IndexTreeIterator;
using cfile::BinaryPlainBlockDecoder;
using cfile::CFileReader;
using fs::ReadableBlock;
using fs::ScopedWritableBlockCloser;
using fs::WritableBlock;
namespace tablet {
const char * const DeltaFileReader::kDeltaStatsEntryName = "deltafilestats";
namespace {
} // namespace
DeltaFileWriter::DeltaFileWriter(gscoped_ptr<WritableBlock> block)
#ifndef NDEBUG
: has_appended_(false)
#endif
{ // NOLINT(*)
cfile::WriterOptions opts;
opts.write_validx = true;
opts.storage_attributes.cfile_block_size = FLAGS_deltafile_default_block_size;
opts.storage_attributes.encoding = PLAIN_ENCODING;
writer_.reset(new cfile::CFileWriter(opts, GetTypeInfo(BINARY), false, std::move(block)));
}
Status DeltaFileWriter::Start() {
return writer_->Start();
}
Status DeltaFileWriter::Finish() {
ScopedWritableBlockCloser closer;
RETURN_NOT_OK(FinishAndReleaseBlock(&closer));
return closer.CloseBlocks();
}
Status DeltaFileWriter::FinishAndReleaseBlock(ScopedWritableBlockCloser* closer) {
return writer_->FinishAndReleaseBlock(closer);
}
Status DeltaFileWriter::DoAppendDelta(const DeltaKey &key,
const RowChangeList &delta) {
Slice delta_slice(delta.slice());
// See TODO in RowChangeListEncoder::SetToReinsert
CHECK(!delta.is_reinsert())
<< "TODO: REINSERT deltas cannot currently be written to disk "
<< "since they don't have a standalone encoded form.";
tmp_buf_.clear();
// Write the encoded form of the key to the file.
key.EncodeTo(&tmp_buf_);
tmp_buf_.append(delta_slice.data(), delta_slice.size());
Slice tmp_buf_slice(tmp_buf_);
return writer_->AppendEntries(&tmp_buf_slice, 1);
}
template<>
Status DeltaFileWriter::AppendDelta<REDO>(
const DeltaKey &key, const RowChangeList &delta) {
#ifndef NDEBUG
// Sanity check insertion order in debug mode.
if (has_appended_) {
DCHECK(last_key_.CompareTo<REDO>(key) <= 0)
<< "must insert redo deltas in sorted order (ascending key, then ascending ts): "
<< "got key " << key.ToString() << " after "
<< last_key_.ToString();
}
has_appended_ = true;
last_key_ = key;
#endif
return DoAppendDelta(key, delta);
}
template<>
Status DeltaFileWriter::AppendDelta<UNDO>(
const DeltaKey &key, const RowChangeList &delta) {
#ifndef NDEBUG
// Sanity check insertion order in debug mode.
if (has_appended_) {
DCHECK(last_key_.CompareTo<UNDO>(key) <= 0)
<< "must insert undo deltas in sorted order (ascending key, then descending ts): "
<< "got key " << key.ToString() << " after "
<< last_key_.ToString();
}
has_appended_ = true;
last_key_ = key;
#endif
return DoAppendDelta(key, delta);
}
Status DeltaFileWriter::WriteDeltaStats(const DeltaStats& stats) {
DeltaStatsPB delta_stats_pb;
stats.ToPB(&delta_stats_pb);
faststring buf;
if (!pb_util::SerializeToString(delta_stats_pb, &buf)) {
return Status::IOError("Unable to serialize DeltaStatsPB", delta_stats_pb.DebugString());
}
writer_->AddMetadataPair(DeltaFileReader::kDeltaStatsEntryName, buf.ToString());
return Status::OK();
}
////////////////////////////////////////////////////////////
// Reader
////////////////////////////////////////////////////////////
Status DeltaFileReader::Open(gscoped_ptr<ReadableBlock> block,
const BlockId& block_id,
shared_ptr<DeltaFileReader>* reader_out,
DeltaType delta_type) {
shared_ptr<DeltaFileReader> df_reader;
RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block),
block_id, &df_reader, delta_type));
RETURN_NOT_OK(df_reader->Init());
*reader_out = df_reader;
return Status::OK();
}
Status DeltaFileReader::OpenNoInit(gscoped_ptr<ReadableBlock> block,
const BlockId& block_id,
shared_ptr<DeltaFileReader>* reader_out,
DeltaType delta_type) {
gscoped_ptr<CFileReader> cf_reader;
RETURN_NOT_OK(CFileReader::OpenNoInit(std::move(block),
cfile::ReaderOptions(), &cf_reader));
gscoped_ptr<DeltaFileReader> df_reader(new DeltaFileReader(block_id,
cf_reader.release(),
delta_type));
if (!FLAGS_cfile_lazy_open) {
RETURN_NOT_OK(df_reader->Init());
}
reader_out->reset(df_reader.release());
return Status::OK();
}
DeltaFileReader::DeltaFileReader(BlockId block_id, CFileReader *cf_reader,
DeltaType delta_type)
: reader_(cf_reader),
block_id_(std::move(block_id)),
delta_type_(delta_type) {}
Status DeltaFileReader::Init() {
return init_once_.Init(&DeltaFileReader::InitOnce, this);
}
Status DeltaFileReader::InitOnce() {
// Fully open the CFileReader if it was lazily opened earlier.
//
// If it's already initialized, this is a no-op.
RETURN_NOT_OK(reader_->Init());
if (!reader_->has_validx()) {
return Status::Corruption("file does not have a value index!");
}
// Initialize delta file stats
RETURN_NOT_OK(ReadDeltaStats());
return Status::OK();
}
Status DeltaFileReader::ReadDeltaStats() {
string filestats_pb_buf;
if (!reader_->GetMetadataEntry(kDeltaStatsEntryName, &filestats_pb_buf)) {
return Status::Corruption("missing delta stats from the delta file metadata");
}
DeltaStatsPB deltastats_pb;
if (!deltastats_pb.ParseFromString(filestats_pb_buf)) {
return Status::Corruption("unable to parse the delta stats protobuf");
}
gscoped_ptr<DeltaStats>stats(new DeltaStats());
RETURN_NOT_OK(stats->InitFromPB(deltastats_pb));
delta_stats_.swap(stats);
return Status::OK();
}
bool DeltaFileReader::IsRelevantForSnapshot(const MvccSnapshot& snap) const {
if (!init_once_.initted()) {
// If we're not initted, it means we have no delta stats and must
// assume that this file is relevant for every snapshot.
return true;
}
if (delta_type_ == REDO) {
return snap.MayHaveCommittedTransactionsAtOrAfter(delta_stats_->min_timestamp());
}
if (delta_type_ == UNDO) {
return snap.MayHaveUncommittedTransactionsAtOrBefore(delta_stats_->max_timestamp());
}
LOG(DFATAL) << "Cannot reach here";
return false;
}
Status DeltaFileReader::NewDeltaIterator(const Schema *projection,
const MvccSnapshot &snap,
DeltaIterator** iterator) const {
if (IsRelevantForSnapshot(snap)) {
if (VLOG_IS_ON(2)) {
if (!init_once_.initted()) {
VLOG(2) << (delta_type_ == REDO ? "REDO" : "UNDO") << " delta " << ToString()
<< "has no delta stats"
<< ": can't cull for " << snap.ToString();
} else if (delta_type_ == REDO) {
VLOG(2) << "REDO delta " << ToString()
<< " has min ts " << delta_stats_->min_timestamp().ToString()
<< ": can't cull for " << snap.ToString();
} else {
VLOG(2) << "UNDO delta " << ToString()
<< " has max ts " << delta_stats_->max_timestamp().ToString()
<< ": can't cull for " << snap.ToString();
}
}
// Ugly cast, but it lets the iterator fully initialize the reader
// during its first seek.
*iterator = new DeltaFileIterator(
const_cast<DeltaFileReader*>(this)->shared_from_this(), projection, snap, delta_type_);
return Status::OK();
} else {
VLOG(2) << "Culling "
<< ((delta_type_ == REDO) ? "REDO":"UNDO")
<< " delta " << ToString() << " for " << snap.ToString();
return Status::NotFound("MvccSnapshot outside the range of this delta.");
}
}
Status DeltaFileReader::CheckRowDeleted(rowid_t row_idx, bool *deleted) const {
MvccSnapshot snap_all(MvccSnapshot::CreateSnapshotIncludingAllTransactions());
// TODO: would be nice to avoid allocation here, but we don't want to
// duplicate all the logic from NewDeltaIterator. So, we'll heap-allocate
// for now.
Schema empty_schema;
DeltaIterator* raw_iter;
Status s = NewDeltaIterator(&empty_schema, snap_all, &raw_iter);
if (s.IsNotFound()) {
*deleted = false;
return Status::OK();
}
RETURN_NOT_OK(s);
gscoped_ptr<DeltaIterator> iter(raw_iter);
ScanSpec spec;
RETURN_NOT_OK(iter->Init(&spec));
RETURN_NOT_OK(iter->SeekToOrdinal(row_idx));
RETURN_NOT_OK(iter->PrepareBatch(1, DeltaIterator::PREPARE_FOR_APPLY));
// TODO: this does an allocation - can we stack-allocate the bitmap
// and make SelectionVector able to "release" its buffer?
SelectionVector sel_vec(1);
sel_vec.SetAllTrue();
RETURN_NOT_OK(iter->ApplyDeletes(&sel_vec));
*deleted = !sel_vec.IsRowSelected(0);
return Status::OK();
}
uint64_t DeltaFileReader::EstimateSize() const {
return reader_->file_size();
}
////////////////////////////////////////////////////////////
// DeltaFileIterator
////////////////////////////////////////////////////////////
DeltaFileIterator::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
const Schema *projection,
MvccSnapshot snap, DeltaType delta_type)
: dfr_(std::move(dfr)),
projection_(projection),
mvcc_snap_(std::move(snap)),
prepared_idx_(0xdeadbeef),
prepared_count_(0),
prepared_(false),
exhausted_(false),
initted_(false),
delta_type_(delta_type),
cache_blocks_(CFileReader::CACHE_BLOCK) {}
Status DeltaFileIterator::Init(ScanSpec *spec) {
DCHECK(!initted_) << "Already initted";
if (spec) {
cache_blocks_ = spec->cache_blocks() ? CFileReader::CACHE_BLOCK :
CFileReader::DONT_CACHE_BLOCK;
}
initted_ = true;
return Status::OK();
}
Status DeltaFileIterator::SeekToOrdinal(rowid_t idx) {
DCHECK(initted_) << "Must call Init()";
// Finish the initialization of any lazily-initialized state.
RETURN_NOT_OK(dfr_->Init());
if (!index_iter_) {
index_iter_.reset(IndexTreeIterator::Create(
dfr_->cfile_reader().get(),
dfr_->cfile_reader()->validx_root()));
}
tmp_buf_.clear();
DeltaKey(idx, Timestamp(0)).EncodeTo(&tmp_buf_);
Slice key_slice(tmp_buf_);
Status s = index_iter_->SeekAtOrBefore(key_slice);
if (PREDICT_FALSE(s.IsNotFound())) {
// Seeking to a value before the first value in the file
// will return NotFound, due to the way the index seek
// works. We need to special-case this and have the
// iterator seek all the way down its leftmost branches
// to get the correct result.
s = index_iter_->SeekToFirst();
}
RETURN_NOT_OK(s);
prepared_idx_ = idx;
prepared_count_ = 0;
prepared_ = false;
delta_blocks_.clear();
exhausted_ = false;
return Status::OK();
}
Status DeltaFileIterator::ReadCurrentBlockOntoQueue() {
DCHECK(initted_) << "Must call Init()";
DCHECK(index_iter_) << "Must call SeekToOrdinal()";
gscoped_ptr<PreparedDeltaBlock> pdb(new PreparedDeltaBlock());
BlockPointer dblk_ptr = index_iter_->GetCurrentBlockPointer();
RETURN_NOT_OK(dfr_->cfile_reader()->ReadBlock(
dblk_ptr, cache_blocks_, &pdb->block_));
// The data has been successfully read. Finish creating the decoder.
pdb->prepared_block_start_idx_ = 0;
pdb->block_ptr_ = dblk_ptr;
// Decode the block.
pdb->decoder_.reset(new BinaryPlainBlockDecoder(pdb->block_.data()));
RETURN_NOT_OK(pdb->decoder_->ParseHeader());
RETURN_NOT_OK(GetFirstRowIndexInCurrentBlock(&pdb->first_updated_idx_));
RETURN_NOT_OK(GetLastRowIndexInDecodedBlock(*pdb->decoder_, &pdb->last_updated_idx_));
#ifndef NDEBUG
VLOG(2) << "Read delta block which updates " <<
pdb->first_updated_idx_ << " through " <<
pdb->last_updated_idx_;
#endif
delta_blocks_.push_back(pdb.release());
return Status::OK();
}
Status DeltaFileIterator::GetFirstRowIndexInCurrentBlock(rowid_t *idx) {
DCHECK(index_iter_) << "Must call SeekToOrdinal()";
Slice index_entry = index_iter_->GetCurrentKey();
DeltaKey k;
RETURN_NOT_OK(k.DecodeFrom(&index_entry));
*idx = k.row_idx();
return Status::OK();
}
Status DeltaFileIterator::GetLastRowIndexInDecodedBlock(const BinaryPlainBlockDecoder &dec,
rowid_t *idx) {
DCHECK_GT(dec.Count(), 0);
Slice s(dec.string_at_index(dec.Count() - 1));
DeltaKey k;
RETURN_NOT_OK(k.DecodeFrom(&s));
*idx = k.row_idx();
return Status::OK();
}
string DeltaFileIterator::PreparedDeltaBlock::ToString() const {
return StringPrintf("%d-%d (%s)", first_updated_idx_, last_updated_idx_,
block_ptr_.ToString().c_str());
}
Status DeltaFileIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
DCHECK(initted_) << "Must call Init()";
DCHECK(index_iter_) << "Must call SeekToOrdinal()";
CHECK_GT(nrows, 0);
rowid_t start_row = prepared_idx_ + prepared_count_;
rowid_t stop_row = start_row + nrows - 1;
// Remove blocks from our list which are no longer relevant to the range
// being prepared.
while (!delta_blocks_.empty() &&
delta_blocks_.front().last_updated_idx_ < start_row) {
delta_blocks_.pop_front();
}
while (!exhausted_) {
rowid_t next_block_rowidx;
RETURN_NOT_OK(GetFirstRowIndexInCurrentBlock(&next_block_rowidx));
VLOG(2) << "Current delta block starting at row " << next_block_rowidx;
if (next_block_rowidx > stop_row) {
break;
}
RETURN_NOT_OK(ReadCurrentBlockOntoQueue());
Status s = index_iter_->Next();
if (s.IsNotFound()) {
exhausted_ = true;
break;
}
RETURN_NOT_OK(s);
}
if (!delta_blocks_.empty()) {
PreparedDeltaBlock &block = delta_blocks_.front();
int i = 0;
for (i = block.prepared_block_start_idx_;
i < block.decoder_->Count();
i++) {
Slice s(block.decoder_->string_at_index(i));
DeltaKey key;
RETURN_NOT_OK(key.DecodeFrom(&s));
if (key.row_idx() >= start_row) break;
}
block.prepared_block_start_idx_ = i;
}
#ifndef NDEBUG
VLOG(2) << "Done preparing deltas for " << start_row << "-" << stop_row
<< ": row block spans " << delta_blocks_.size() << " delta blocks";
#endif
prepared_idx_ = start_row;
prepared_count_ = nrows;
prepared_ = true;
return Status::OK();
}
template<class Visitor>
Status DeltaFileIterator::VisitMutations(Visitor *visitor) {
DCHECK(prepared_) << "must Prepare";
rowid_t start_row = prepared_idx_;
for (PreparedDeltaBlock &block : delta_blocks_) {
BinaryPlainBlockDecoder &bpd = *block.decoder_;
DVLOG(2) << "Visiting delta block " << block.first_updated_idx_ << "-"
<< block.last_updated_idx_ << " for row block starting at " << start_row;
if (PREDICT_FALSE(start_row > block.last_updated_idx_)) {
// The block to be updated completely falls after this delta block:
// <-- delta block --> <-- delta block -->
// <-- block to update -->
// This can happen because we don't know the block's last entry until after
// we queued it in PrepareBatch(). We could potentially remove it at that
// point during the prepare step, but for now just skip it here.
continue;
}
rowid_t previous_rowidx = MathLimits<rowid_t>::kMax;
bool continue_visit = true;
for (int i = block.prepared_block_start_idx_; i < bpd.Count(); i++) {
Slice slice = bpd.string_at_index(i);
// Decode and check the ID of the row we're going to update.
DeltaKey key;
RETURN_NOT_OK(key.DecodeFrom(&slice));
rowid_t row_idx = key.row_idx();
// Check if the previous visitor notified us we don't need to apply more
// mutations to this row and skip if we don't.
if (row_idx == previous_rowidx && !continue_visit) {
continue;
} else {
previous_rowidx = row_idx;
continue_visit = true;
}
// Check that the delta is within the block we're currently processing.
if (row_idx >= start_row + prepared_count_) {
// Delta is for a row which comes after the block we're processing.
return Status::OK();
} else if (row_idx < start_row) {
// Delta is for a row which comes before the block we're processing.
continue;
}
RETURN_NOT_OK(visitor->Visit(key, slice, &continue_visit));
if (VLOG_IS_ON(3)) {
RowChangeList rcl(slice);
DVLOG(3) << "Visited delta for key: " << key.ToString() << " Mut: "
<< rcl.ToString(*projection_) << " Continue?: "
<< (continue_visit ? "TRUE" : "FALSE");
}
}
}
return Status::OK();
}
// Returns whether a REDO mutation with 'timestamp' is relevant under 'snap'.
// If snap cannot include any mutations with a higher timestamp 'continue_visit' is
// set to false, it's set to true otherwise.
inline bool IsRedoRelevant(const MvccSnapshot& snap,
const Timestamp& timestamp,
bool* continue_visit) {
*continue_visit = true;
if (!snap.IsCommitted(timestamp)) {
if (!snap.MayHaveCommittedTransactionsAtOrAfter(timestamp)) {
*continue_visit = false;
}
return false;
}
return true;
}
// Returns whether an UNDO mutation with 'timestamp' is relevant under 'snap'.
// If snap cannot include any mutations with a lower timestamp 'continue_visit' is
// set to false, it's set to true otherwise.
inline bool IsUndoRelevant(const MvccSnapshot& snap,
const Timestamp& timestamp,
bool* continue_visit) {
*continue_visit = true;
if (snap.IsCommitted(timestamp)) {
if (!snap.MayHaveUncommittedTransactionsAtOrBefore(timestamp)) {
*continue_visit = false;
}
return false;
}
return true;
}
template<DeltaType Type>
struct ApplyingVisitor {
Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
inline Status ApplyMutation(const DeltaKey &key, const Slice &deltas) {
int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
DCHECK_GE(rel_idx, 0);
// TODO: this code looks eerily similar to DMSIterator::ApplyUpdates!
// I bet it can be combined.
const Schema* schema = dfi->projection_;
RowChangeListDecoder decoder((RowChangeList(deltas)));
RETURN_NOT_OK(decoder.Init());
if (decoder.is_update()) {
return decoder.ApplyToOneColumn(rel_idx, dst, *schema, col_to_apply, dst->arena());
} else if (decoder.is_delete()) {
// If it's a DELETE, then it will be processed by DeletingVisitor.
return Status::OK();
} else {
dfi->FatalUnexpectedDelta(key, deltas, "Expect only UPDATE or DELETE deltas on disk");
}
return Status::OK();
}
DeltaFileIterator *dfi;
size_t col_to_apply;
ColumnBlock *dst;
};
template<>
inline Status ApplyingVisitor<REDO>::Visit(const DeltaKey& key,
const Slice& deltas,
bool* continue_visit) {
if (IsRedoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
DVLOG(3) << "Applied redo delta";
return ApplyMutation(key, deltas);
}
DVLOG(3) << "Redo delta uncommitted, skipped applying.";
return Status::OK();
}
template<>
inline Status ApplyingVisitor<UNDO>::Visit(const DeltaKey& key,
const Slice& deltas,
bool* continue_visit) {
if (IsUndoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
DVLOG(3) << "Applied undo delta";
return ApplyMutation(key, deltas);
}
DVLOG(3) << "Undo delta committed, skipped applying.";
return Status::OK();
}
Status DeltaFileIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
DCHECK_LE(prepared_count_, dst->nrows());
if (delta_type_ == REDO) {
DVLOG(3) << "Applying REDO mutations to " << col_to_apply;
ApplyingVisitor<REDO> visitor = {this, col_to_apply, dst};
return VisitMutations(&visitor);
} else {
DVLOG(3) << "Applying UNDO mutations to " << col_to_apply;
ApplyingVisitor<UNDO> visitor = {this, col_to_apply, dst};
return VisitMutations(&visitor);
}
}
// Visitor which applies deletes to the selection vector.
template<DeltaType Type>
struct DeletingVisitor {
Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
inline Status ApplyDelete(const DeltaKey &key, const Slice &deltas) {
int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
DCHECK_GE(rel_idx, 0);
RowChangeListDecoder decoder((RowChangeList(deltas)));
RETURN_NOT_OK(decoder.Init());
if (decoder.is_update()) {
DVLOG(3) << "Didn't delete row (update)";
// If this is an update the row must be selected.
DCHECK(sel_vec->IsRowSelected(rel_idx));
return Status::OK();
} else if (decoder.is_delete()) {
DVLOG(3) << "Row deleted";
sel_vec->SetRowUnselected(rel_idx);
} else {
dfi->FatalUnexpectedDelta(key, deltas, "Expect only UPDATE or DELETE deltas on disk");
}
return Status::OK();
}
DeltaFileIterator *dfi;
SelectionVector *sel_vec;
};
template<>
inline Status DeletingVisitor<REDO>::Visit(const DeltaKey& key,
const Slice& deltas,
bool* continue_visit) {
if (IsRedoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
return ApplyDelete(key, deltas);
}
return Status::OK();
}
template<>
inline Status DeletingVisitor<UNDO>::Visit(const DeltaKey& key,
const Slice& deltas, bool*
continue_visit) {
if (IsUndoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
return ApplyDelete(key, deltas);
}
return Status::OK();
}
Status DeltaFileIterator::ApplyDeletes(SelectionVector *sel_vec) {
DCHECK_LE(prepared_count_, sel_vec->nrows());
if (delta_type_ == REDO) {
DVLOG(3) << "Applying REDO deletes";
DeletingVisitor<REDO> visitor = { this, sel_vec};
return VisitMutations(&visitor);
} else {
DVLOG(3) << "Applying UNDO deletes";
DeletingVisitor<UNDO> visitor = { this, sel_vec};
return VisitMutations(&visitor);
}
}
// Visitor which, for each mutation, appends it into a ColumnBlock of
// Mutation *s. See CollectMutations()
// Each mutation is projected into the iterator schema, if required.
template<DeltaType Type>
struct CollectingVisitor {
Status Visit(const DeltaKey &key, const Slice &deltas, bool* continue_visit);
Status Collect(const DeltaKey &key, const Slice &deltas) {
int64_t rel_idx = key.row_idx() - dfi->prepared_idx_;
DCHECK_GE(rel_idx, 0);
RowChangeList changelist(deltas);
Mutation *mutation = Mutation::CreateInArena(dst_arena, key.timestamp(), changelist);
mutation->AppendToList(&dst->at(rel_idx));
return Status::OK();
}
DeltaFileIterator *dfi;
vector<Mutation *> *dst;
Arena *dst_arena;
};
template<>
inline Status CollectingVisitor<REDO>::Visit(const DeltaKey& key,
const Slice& deltas,
bool* continue_visit) {
if (IsRedoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
return Collect(key, deltas);
}
return Status::OK();
}
template<>
inline Status CollectingVisitor<UNDO>::Visit(const DeltaKey& key,
const Slice& deltas, bool*
continue_visit) {
if (IsUndoRelevant(dfi->mvcc_snap_, key.timestamp(), continue_visit)) {
return Collect(key, deltas);
}
return Status::OK();
}
Status DeltaFileIterator::CollectMutations(vector<Mutation *> *dst, Arena *dst_arena) {
DCHECK_LE(prepared_count_, dst->size());
if (delta_type_ == REDO) {
CollectingVisitor<REDO> visitor = {this, dst, dst_arena};
return VisitMutations(&visitor);
} else {
CollectingVisitor<UNDO> visitor = {this, dst, dst_arena};
return VisitMutations(&visitor);
}
}
bool DeltaFileIterator::HasNext() {
return !exhausted_ || !delta_blocks_.empty();
}
string DeltaFileIterator::ToString() const {
return "DeltaFileIterator(" + dfr_->ToString() + ")";
}
struct FilterAndAppendVisitor {
Status Visit(const DeltaKey& key, const Slice& deltas, bool* continue_visit) {
// FilterAndAppendVisitor visitor visits all mutations.
*continue_visit = true;
faststring buf;
RowChangeListEncoder enc(&buf);
RETURN_NOT_OK(
RowChangeListDecoder::RemoveColumnIdsFromChangeList(RowChangeList(deltas),
col_ids,
&enc));
if (enc.is_initialized()) {
RowChangeList rcl = enc.as_changelist();
DeltaKeyAndUpdate upd;
upd.key = key;
CHECK(arena->RelocateSlice(rcl.slice(), &upd.cell));
out->push_back(upd);
}
// if enc.is_initialized() return false, that means deltas only
// contained the specified columns.
return Status::OK();
}
const DeltaFileIterator* dfi;
const vector<ColumnId>& col_ids;
vector<DeltaKeyAndUpdate>* out;
Arena* arena;
};
Status DeltaFileIterator::FilterColumnIdsAndCollectDeltas(
const vector<ColumnId>& col_ids,
vector<DeltaKeyAndUpdate>* out,
Arena* arena) {
FilterAndAppendVisitor visitor = {this, col_ids, out, arena};
return VisitMutations(&visitor);
}
void DeltaFileIterator::FatalUnexpectedDelta(const DeltaKey &key, const Slice &deltas,
const string &msg) {
LOG(FATAL) << "Saw unexpected delta type in deltafile " << dfr_->ToString() << ": "
<< " rcl=" << RowChangeList(deltas).ToString(*projection_)
<< " key=" << key.ToString() << " (" << msg << ")";
}
} // namespace tablet
} // namespace kudu