blob: 1687fd64975da39e93f6acda8417633eb725a8b0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/delta_store.h"
#include <algorithm>
#include <cstdlib>
#include <cstring>
#include <ostream>
#include <type_traits>
#include <glog/logging.h>
#include "kudu/common/columnblock.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/delta_relevancy.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/memory/arena.h"
using std::nullopt;
using std::optional;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace tablet {
string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool pad_key) const {
return StrCat(Substitute("($0 delta key=$2, change_list=$1)",
DeltaType_Name(type),
RowChangeList(cell).ToString(schema),
(pad_key ? StringPrintf("%06u@ts%06u", key.row_idx(),
atoi(key.timestamp().ToString().c_str()))
: Substitute("$0@ts$1", key.row_idx(),
key.timestamp().ToString()))));
}
SelectedDeltas::SelectedDeltas(size_t nrows) {
Reset(nrows);
}
void SelectedDeltas::MergeFrom(const SelectedDeltas& other) {
DCHECK_EQ(rows_.size(), other.rows_.size());
for (rowid_t idx = 0; idx < rows_.size(); idx++) {
const auto& src = other.rows_[idx];
if (!src) {
continue;
}
if (src->same_delta) {
ProcessDelta(idx, src->oldest);
} else {
ProcessDelta(idx, src->oldest);
ProcessDelta(idx, src->newest);
}
}
}
void SelectedDeltas::ToSelectionVector(SelectionVector* sel_vec) const {
DCHECK_EQ(rows_.size(), sel_vec->nrows());
for (rowid_t idx = 0; idx < rows_.size(); idx++) {
const auto& row = rows_[idx];
if (!row) {
// There were no relevant deltas for this row.
sel_vec->SetRowUnselected(idx);
continue;
}
if (row->same_delta) {
// There was exactly one relevant delta; the row must be selected.
sel_vec->SetRowSelected(idx);
continue;
}
// There was more than one relevant delta.
//
// Before we mark the row as selected, we must first determine whether the
// row was dead at the beginning and end of the time range: such rows should
// be deselected. We've captured the oldest and newest deltas; the table
// below indicates whether, for a given type of delta, the row is live or
// dead at that endpoint.
//
// delta type | oldest | newest
// --------------+--------+-------
// REDO DELETE | L | D
// REDO REINSERT | D | L
// UNDO DELETE | D | L
// UNDO REINSERT | L | D
const auto& oldest = row->oldest;
const auto& newest = row->newest;
if (((oldest.dtype == REDO && oldest.ctype == RowChangeList::kReinsert) ||
(oldest.dtype == UNDO && oldest.ctype == RowChangeList::kDelete)) &&
((newest.dtype == REDO && newest.ctype == RowChangeList::kDelete) ||
(newest.dtype == UNDO && newest.ctype == RowChangeList::kReinsert))) {
sel_vec->SetRowUnselected(idx);
} else {
sel_vec->SetRowSelected(idx);
}
}
}
void SelectedDeltas::ProcessDelta(rowid_t row_idx, Delta new_delta) {
DCHECK_LT(row_idx, rows_.size());
auto& existing = rows_[row_idx];
if (!existing) {
existing = DeltaPair();
existing->same_delta = true;
existing->oldest = new_delta;
existing->newest = new_delta;
return;
}
existing->oldest = std::min(existing->oldest, new_delta, DeltaLessThanFunctor());
existing->newest = std::max(existing->newest, new_delta, DeltaLessThanFunctor());
existing->same_delta = false;
}
string SelectedDeltas::ToString() const {
rowid_t idx = 0;
return JoinMapped(rows_, [&idx](const optional<DeltaPair>& dp) {
if (!dp) {
return Substitute("$0: UNSELECTED", idx++);
}
return Substitute("$0: @ts$1 $2 dis=$3 ($4) @ts$5 $6 dis=$7 ($8)$9", idx++,
dp->oldest.ts.ToString(),
DeltaType_Name(dp->oldest.dtype),
dp->oldest.disambiguator,
RowChangeList::ChangeType_Name(dp->oldest.ctype),
dp->newest.ts.ToString(),
DeltaType_Name(dp->newest.dtype),
dp->newest.disambiguator,
RowChangeList::ChangeType_Name(dp->newest.ctype),
dp->same_delta ? " (same delta)" : "");
}, "\n");
}
void SelectedDeltas::Reset(size_t nrows) {
rows_.clear();
rows_.resize(nrows);
}
template<class Traits>
DeltaPreparer<Traits>::DeltaPreparer(RowIteratorOptions opts)
: opts_(std::move(opts)),
cur_prepared_idx_(0),
prev_prepared_idx_(0),
prepared_flags_(DeltaIterator::PREPARE_NONE),
deletion_state_(UNKNOWN),
deltas_selected_(0),
may_have_deltas_(false) {
}
template<class Traits>
void DeltaPreparer<Traits>::Seek(rowid_t row_idx) {
cur_prepared_idx_ = row_idx;
prev_prepared_idx_ = row_idx;
prepared_flags_ = DeltaIterator::PREPARE_NONE;
}
template<class Traits>
void DeltaPreparer<Traits>::Start(size_t nrows, int prepare_flags) {
DCHECK_NE(prepare_flags, DeltaIterator::PREPARE_NONE);
if (prepare_flags & DeltaIterator::PREPARE_FOR_SELECT) {
DCHECK(opts_.snap_to_exclude);
selected_.Reset(nrows);
}
prepared_flags_ = prepare_flags;
if (updates_by_col_.empty()) {
updates_by_col_.resize(opts_.projection->num_columns());
}
// Lazy initialization.
if (may_have_deltas_) {
for (UpdatesForColumn &ufc : updates_by_col_) {
ufc.clear();
}
deleted_.clear();
reinserted_.clear();
} else {
#ifndef NDEBUG
CHECK(deleted_.empty());
CHECK(reinserted_.empty());
for (const UpdatesForColumn& ufc : updates_by_col_) {
CHECK(ufc.empty());
}
#endif
}
prepared_deltas_.clear();
deletion_state_ = UNKNOWN;
may_have_deltas_ = false;
if (VLOG_IS_ON(3)) {
string snap_to_exclude = opts_.snap_to_exclude ?
opts_.snap_to_exclude->ToString() : "INF";
VLOG(3) << "Starting batch for [" << snap_to_exclude << ","
<< opts_.snap_to_include.ToString() << ")";
}
}
template<class Traits>
void DeltaPreparer<Traits>::Finish(size_t nrows) {
MaybeProcessPreviousRowChange(nullopt);
prev_prepared_idx_ = cur_prepared_idx_;
cur_prepared_idx_ += nrows;
if (VLOG_IS_ON(3)) {
string snap_to_exclude = opts_.snap_to_exclude ?
opts_.snap_to_exclude->ToString() : "INF";
VLOG(3) << "Finishing batch for [" << snap_to_exclude << ","
<< opts_.snap_to_include.ToString() << ")";
}
}
template<class Traits>
Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* finished_row) {
MaybeProcessPreviousRowChange(key.row_idx());
VLOG(4) << "Considering delta " << key.ToString() << ": "
<< RowChangeList(val).ToString(*opts_.projection);
// Different preparations may use different criteria for delta relevancy. Each
// criteria offers a short-circuit when processing of the current row is known
// to be finished, but that short-circuit can only be used if we're not also
// handling a preparation with a different criteria.
RowChangeListDecoder decoder((RowChangeList(val)));
if (prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT) {
bool finished_row_for_select;
if (IsDeltaRelevantForSelect<Traits::kType>(*opts_.snap_to_exclude,
opts_.snap_to_include,
key.timestamp(),
&finished_row_for_select)) {
RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
// The logical ordering of UNDOs is the opposite of their counting order.
int64_t disambiguator = Traits::kType == REDO ?
deltas_selected_ : -deltas_selected_;
SelectedDeltas::Delta new_delta = { key.timestamp(),
Traits::kType,
disambiguator,
decoder.get_type() };
selected_.ProcessDelta(key.row_idx() - cur_prepared_idx_, new_delta);
deltas_selected_++;
VLOG(4) << "Selected deltas after AddDelta:\n" << selected_.ToString();
}
if (finished_row_for_select &&
!(prepared_flags_ & ~DeltaIterator::PREPARE_FOR_SELECT)) {
*finished_row = true;
}
}
// Apply and collect use the same relevancy criteria.
bool relevant_for_apply_or_collect = false;
bool finished_row_for_apply_or_collect = false;
if (prepared_flags_ & (DeltaIterator::PREPARE_FOR_APPLY |
DeltaIterator::PREPARE_FOR_COLLECT)) {
relevant_for_apply_or_collect = IsDeltaRelevantForApply<Traits::kType>(
opts_.snap_to_include, key.timestamp(), &finished_row_for_apply_or_collect);
}
if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
relevant_for_apply_or_collect) {
RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
UpdateDeletionState(decoder.get_type());
if (!decoder.is_delete()) {
while (decoder.HasNext()) {
RowChangeListDecoder::DecodedUpdate dec;
RETURN_NOT_OK(decoder.DecodeNext(&dec));
int col_idx;
const void* col_val;
RETURN_NOT_OK(dec.Validate(*opts_.projection, &col_idx, &col_val));
if (col_idx == -1) {
// This column isn't being projected.
continue;
}
int col_size = opts_.projection->column(col_idx).type_info()->size();
// If we already have an earlier update for the same column, we can
// just overwrite that one.
if (updates_by_col_[col_idx].empty() ||
updates_by_col_[col_idx].back().row_id != key.row_idx()) {
updates_by_col_[col_idx].emplace_back();
}
ColumnUpdate& cu = updates_by_col_[col_idx].back();
cu.row_id = key.row_idx();
if (col_val == nullptr) {
cu.new_val_ptr = nullptr;
} else {
memcpy(cu.new_val_buf, col_val, col_size);
// NOTE: we're constructing a pointer here to an element inside the deque.
// This is safe because deques never invalidate pointers to their elements.
cu.new_val_ptr = cu.new_val_buf;
}
may_have_deltas_ = true;
}
}
}
if (prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT &&
relevant_for_apply_or_collect) {
PreparedDelta d;
d.key = key;
d.val = val;
prepared_deltas_.emplace_back(d);
}
if (finished_row_for_apply_or_collect &&
!(prepared_flags_ & ~(DeltaIterator::PREPARE_FOR_APPLY |
DeltaIterator::PREPARE_FOR_COLLECT))) {
*finished_row = true;
}
last_added_idx_ = key.row_idx();
return Status::OK();
}
template<class Traits>
Status DeltaPreparer<Traits>::ApplyUpdates(size_t col_to_apply, ColumnBlock* dst,
const SelectionVector& filter) {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->nrows());
// Special handling for the IS_DELETED virtual column: convert 'deleted_' and
// 'reinserted_' into true and false cell values.
if (col_to_apply == opts_.projection->first_is_deleted_virtual_column_idx()) {
// See ApplyDeletes() to understand why we adjust the virtual column's value
// for both deleted and reinserted rows.
for (const auto& row_id : deleted_) {
uint32_t idx_in_block = row_id - prev_prepared_idx_;
if (filter.IsRowSelected(idx_in_block)) {
ColumnBlock::Cell cell = dst->cell(idx_in_block);
UnalignedStore(cell.mutable_ptr(), true);
}
}
for (const auto& row_id : reinserted_) {
uint32_t idx_in_block = row_id - prev_prepared_idx_;
if (filter.IsRowSelected(idx_in_block)) {
ColumnBlock::Cell cell = dst->cell(idx_in_block);
UnalignedStore(cell.mutable_ptr(), false);
}
}
return Status::OK();
}
const ColumnSchema* col_schema = &opts_.projection->column(col_to_apply);
for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
int32_t idx_in_block = cu.row_id - prev_prepared_idx_;
DCHECK_GE(idx_in_block, 0);
if (!filter.IsRowSelected(idx_in_block)) {
continue;
}
SimpleConstCell src(col_schema, cu.new_val_ptr);
ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
}
return Status::OK();
}
template<class Traits>
Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
// To understand why we must adjust sel_vec for both deleted_ and reinserted_,
// consider that DeltaIterators are often used en masse (i.e. via
// DeltaIteratorMerger). In such cases, it's possible for one DeltaPreparer to
// delete a row and for the next to reinsert it. Given that ApplyDeletes is
// called on each DeltaPreparer in order, we must "twiddle" sel_vec in either
// direction in order for the row's bit to hold the correct state at the end.
for (const auto& row_id : deleted_) {
uint32_t idx_in_block = row_id - prev_prepared_idx_;
sel_vec->SetRowUnselected(idx_in_block);
}
for (const auto& row_id : reinserted_) {
uint32_t idx_in_block = row_id - prev_prepared_idx_;
sel_vec->SetRowSelected(idx_in_block);
}
return Status::OK();
}
template<class Traits>
Status DeltaPreparer<Traits>::SelectDeltas(SelectedDeltas* deltas) {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, deltas->rows_.size());
VLOG(4) << "Selected deltas before SelectDeltas:\n" << selected_.ToString();
VLOG(4) << "Pre-merge deltas:\n" << deltas->ToString();
deltas->MergeFrom(selected_);
VLOG(4) << "Post-merge deltas:\n" << deltas->ToString();
return Status::OK();
}
template<class Traits>
Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size());
for (const PreparedDelta& src : prepared_deltas_) {
DeltaKey key = src.key;
RowChangeList changelist(src.val);
uint32_t rel_idx = key.row_idx() - prev_prepared_idx_;
Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist);
mutation->PrependToList(&dst->at(rel_idx));
}
return Status::OK();
}
template<class Traits>
Status DeltaPreparer<Traits>::FilterColumnIdsAndCollectDeltas(
const vector<ColumnId>& col_ids,
vector<DeltaKeyAndUpdate>* out,
Arena* arena) {
if (!Traits::kAllowFilterColumnIdsAndCollectDeltas) {
LOG(DFATAL) << "Attempted to call FilterColumnIdsAndCollectDeltas on DMS"
<< GetStackTrace();
return Status::InvalidArgument(
"FilterColumnIdsAndCollectDeltas is not supported");
}
// May only be used on a fully inclusive snapshot.
DCHECK(opts_.snap_to_include.Equals(Traits::kType == REDO ?
MvccSnapshot::CreateSnapshotIncludingAllOps() :
MvccSnapshot::CreateSnapshotIncludingNoOps()));
faststring buf;
RowChangeListEncoder encoder(&buf);
for (const auto& src : prepared_deltas_) {
encoder.Reset();
RETURN_NOT_OK(
RowChangeListDecoder::RemoveColumnIdsFromChangeList(RowChangeList(src.val),
col_ids,
&encoder));
if (encoder.is_initialized()) {
RowChangeList rcl = encoder.as_changelist();
DeltaKeyAndUpdate upd;
upd.key = src.key;
CHECK(arena->RelocateSlice(rcl.slice(), &upd.cell));
out->emplace_back(upd);
}
}
return Status::OK();
}
template<class Traits>
bool DeltaPreparer<Traits>::MayHaveDeltas() const {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY);
return may_have_deltas_;
}
template<class Traits>
Status DeltaPreparer<Traits>::InitDecoderIfNecessary(RowChangeListDecoder* decoder) {
if (decoder->IsInitialized()) {
return Status::OK();
}
if (Traits::kInitializeDecodersWithSafetyChecks) {
RETURN_NOT_OK(decoder->Init());
} else {
decoder->InitNoSafetyChecks();
}
if (!Traits::kAllowReinserts && decoder->is_reinsert()) {
LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
return Status::InvalidArgument("Reinserts are not supported");
}
return Status::OK();
}
template<class Traits>
void DeltaPreparer<Traits>::MaybeProcessPreviousRowChange(optional<rowid_t> cur_row_idx) {
if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
last_added_idx_ &&
(!cur_row_idx || cur_row_idx != *last_added_idx_)) {
switch (deletion_state_) {
case DELETED:
deleted_.emplace_back(*last_added_idx_);
deletion_state_ = UNKNOWN;
may_have_deltas_ = true;
break;
case REINSERTED:
reinserted_.emplace_back(*last_added_idx_);
deletion_state_ = UNKNOWN;
may_have_deltas_ = true;
break;
default:
break;
}
}
}
template<class Traits>
void DeltaPreparer<Traits>::UpdateDeletionState(RowChangeList::ChangeType op) {
// We can't use RowChangeListDecoder.TwiddleDeleteStatus because:
// 1. Our deletion status includes an additional UNKNOWN state.
// 2. The logical chain of DELETEs and REINSERTs for a given row may extend
// across DeltaPreparer instances. For example, the same row may be deleted
// in one delta file and reinserted in the next. But, because
// DeltaPreparers cannot exchange this information in the context of batch
// preparation, we have to allow any state transition from UNKNOWN.
//
// DELETE+REINSERT pairs are reset back to UNKNOWN: these rows were both
// deleted and reinserted in the same batch, so their states haven't actually changed.
if (op == RowChangeList::kDelete) {
DCHECK_NE(deletion_state_, DELETED);
if (deletion_state_ == UNKNOWN) {
deletion_state_ = DELETED;
} else {
DCHECK_EQ(deletion_state_, REINSERTED);
deletion_state_ = UNKNOWN;
}
} else {
DCHECK(op == RowChangeList::kUpdate || op == RowChangeList::kReinsert);
if (op == RowChangeList::kReinsert) {
DCHECK_NE(deletion_state_, REINSERTED);
if (deletion_state_ == UNKNOWN) {
deletion_state_ = REINSERTED;
} else {
DCHECK_EQ(deletion_state_, DELETED);
deletion_state_ = UNKNOWN;
}
}
}
}
// Explicit specialization for callers outside this compilation unit.
template class DeltaPreparer<DMSPreparerTraits>;
template class DeltaPreparer<DeltaFilePreparerTraits<REDO>>;
template class DeltaPreparer<DeltaFilePreparerTraits<UNDO>>;
Status DebugDumpDeltaIterator(DeltaType type,
DeltaIterator* iter,
const Schema& schema,
size_t nrows,
vector<std::string>* out) {
ScanSpec spec;
spec.set_cache_blocks(false);
RETURN_NOT_OK(iter->Init(&spec));
RETURN_NOT_OK(iter->SeekToOrdinal(0));
const size_t kRowsPerBlock = 100;
Arena arena(32 * 1024);
for (size_t i = 0; iter->HasNext(); ) {
size_t n;
if (nrows > 0) {
if (i >= nrows) {
break;
}
n = std::min(kRowsPerBlock, nrows - i);
} else {
n = kRowsPerBlock;
}
arena.Reset();
RETURN_NOT_OK(iter->PrepareBatch(n, DeltaIterator::PREPARE_FOR_COLLECT));
vector<DeltaKeyAndUpdate> cells;
RETURN_NOT_OK(iter->FilterColumnIdsAndCollectDeltas(
vector<ColumnId>(),
&cells,
&arena));
for (const DeltaKeyAndUpdate& cell : cells) {
LOG_STRING(INFO, out) << cell.Stringify(type, schema, true /*pad_key*/ );
}
i += n;
}
return Status::OK();
}
template<DeltaType Type>
Status WriteDeltaIteratorToFile(DeltaIterator* iter,
size_t nrows,
DeltaFileWriter* out) {
ScanSpec spec;
spec.set_cache_blocks(false);
RETURN_NOT_OK(iter->Init(&spec));
RETURN_NOT_OK(iter->SeekToOrdinal(0));
const size_t kRowsPerBlock = 100;
std::unique_ptr<DeltaStats> stats(new DeltaStats);
Arena arena(32 * 1024);
for (size_t i = 0; iter->HasNext(); ) {
size_t n;
if (nrows > 0) {
if (i >= nrows) {
break;
}
n = std::min(kRowsPerBlock, nrows - i);
} else {
n = kRowsPerBlock;
}
arena.Reset();
RETURN_NOT_OK(iter->PrepareBatch(n, DeltaIterator::PREPARE_FOR_COLLECT));
vector<DeltaKeyAndUpdate> cells;
RETURN_NOT_OK(iter->FilterColumnIdsAndCollectDeltas(vector<ColumnId>(),
&cells,
&arena));
for (const DeltaKeyAndUpdate& cell : cells) {
RowChangeList rcl(cell.cell);
RETURN_NOT_OK(out->AppendDelta<Type>(cell.key, rcl));
RETURN_NOT_OK(stats->UpdateStats(cell.key.timestamp(), rcl));
}
i += n;
}
out->WriteDeltaStats(std::move(stats));
return Status::OK();
}
template
Status WriteDeltaIteratorToFile<REDO>(DeltaIterator* iter,
size_t nrows,
DeltaFileWriter* out);
template
Status WriteDeltaIteratorToFile<UNDO>(DeltaIterator* iter,
size_t nrows,
DeltaFileWriter* out);
} // namespace tablet
} // namespace kudu