blob: 3db3f26d94c0dc0137587aeb1c9fe8c591603859 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <utility>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/gutil/port.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/deltamemstore.h"
#include "kudu/tablet/delta_tracker.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/status.h"
namespace kudu {
namespace tablet {
using log::LogAnchorRegistry;
using std::shared_ptr;
using strings::Substitute;
////////////////////////////////////////////////////////////
// DeltaMemStore implementation
////////////////////////////////////////////////////////////
static const int kInitialArenaSize = 16;
static const int kMaxArenaBufferSize = 5*1024*1024;
DeltaMemStore::DeltaMemStore(int64_t id,
int64_t rs_id,
LogAnchorRegistry* log_anchor_registry,
const shared_ptr<MemTracker>& parent_tracker)
: id_(id),
rs_id_(rs_id),
anchorer_(log_anchor_registry, Substitute("Rowset-$0/DeltaMemStore-$1", rs_id_, id_)),
disambiguator_sequence_number_(0) {
if (parent_tracker) {
CHECK(MemTracker::FindTracker(Tablet::kDMSMemTrackerId,
&mem_tracker_,
parent_tracker));
} else {
mem_tracker_ = MemTracker::GetRootTracker();
}
allocator_.reset(new MemoryTrackingBufferAllocator(
HeapBufferAllocator::Get(), mem_tracker_));
arena_.reset(new ThreadSafeMemoryTrackingArena(
kInitialArenaSize, kMaxArenaBufferSize, allocator_));
tree_.reset(new DMSTree(arena_));
}
Status DeltaMemStore::Init() {
return Status::OK();
}
Status DeltaMemStore::Update(Timestamp timestamp,
rowid_t row_idx,
const RowChangeList &update,
const consensus::OpId& op_id) {
DeltaKey key(row_idx, timestamp);
faststring buf;
key.EncodeTo(&buf);
Slice key_slice(buf);
btree::PreparedMutation<DMSTreeTraits> mutation(key_slice);
mutation.Prepare(tree_.get());
if (PREDICT_FALSE(mutation.exists())) {
// We already have a delta for this row at the same timestamp.
// Try again with a disambiguating sequence number appended to the key.
int seq = disambiguator_sequence_number_.Increment();
PutMemcmpableVarint64(&buf, seq);
key_slice = Slice(buf);
mutation.Reset(key_slice);
mutation.Prepare(tree_.get());
CHECK(!mutation.exists())
<< "Appended a sequence number but still hit a duplicate "
<< "for rowid " << row_idx << " at timestamp " << timestamp;
}
if (PREDICT_FALSE(!mutation.Insert(update.slice()))) {
return Status::IOError("Unable to insert into tree");
}
anchorer_.AnchorIfMinimum(op_id.index());
return Status::OK();
}
Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw,
gscoped_ptr<DeltaStats>* stats_ret) {
gscoped_ptr<DeltaStats> stats(new DeltaStats());
gscoped_ptr<DMSTreeIter> iter(tree_->NewIterator());
iter->SeekToStart();
while (iter->IsValid()) {
Slice key_slice, val;
iter->GetCurrentEntry(&key_slice, &val);
DeltaKey key;
RETURN_NOT_OK(key.DecodeFrom(&key_slice));
RowChangeList rcl(val);
RETURN_NOT_OK_PREPEND(dfw->AppendDelta<REDO>(key, rcl), "Failed to append delta");
stats->UpdateStats(key.timestamp(), rcl);
iter->Next();
}
RETURN_NOT_OK(dfw->WriteDeltaStats(*stats));
stats_ret->swap(stats);
return Status::OK();
}
Status DeltaMemStore::NewDeltaIterator(const Schema *projection,
const MvccSnapshot &snap,
DeltaIterator** iterator) const {
*iterator = new DMSIterator(shared_from_this(), projection, snap);
return Status::OK();
}
Status DeltaMemStore::CheckRowDeleted(rowid_t row_idx, bool *deleted) const {
*deleted = false;
DeltaKey key(row_idx, Timestamp(0));
faststring buf;
key.EncodeTo(&buf);
Slice key_slice(buf);
bool exact;
// TODO: can we avoid the allocation here?
gscoped_ptr<DMSTreeIter> iter(tree_->NewIterator());
if (!iter->SeekAtOrAfter(key_slice, &exact)) {
return Status::OK();
}
while (iter->IsValid()) {
// Iterate forward until reaching an entry with a larger row idx.
Slice key_slice, v;
iter->GetCurrentEntry(&key_slice, &v);
RETURN_NOT_OK(key.DecodeFrom(&key_slice));
DCHECK_GE(key.row_idx(), row_idx);
if (key.row_idx() != row_idx) break;
RowChangeList val(v);
// Mutation is for the target row, check deletion status.
RowChangeListDecoder decoder((RowChangeList(v)));
decoder.InitNoSafetyChecks();
decoder.TwiddleDeleteStatus(deleted);
iter->Next();
}
return Status::OK();
}
void DeltaMemStore::DebugPrint() const {
tree_->DebugPrint();
}
////////////////////////////////////////////////////////////
// DMSIterator
////////////////////////////////////////////////////////////
DMSIterator::DMSIterator(const shared_ptr<const DeltaMemStore>& dms,
const Schema* projection, MvccSnapshot snapshot)
: dms_(dms),
mvcc_snapshot_(std::move(snapshot)),
iter_(dms->tree_->NewIterator()),
initted_(false),
prepared_idx_(0),
prepared_count_(0),
prepared_for_(NOT_PREPARED),
seeked_(false),
projection_(projection) {}
Status DMSIterator::Init(ScanSpec *spec) {
initted_ = true;
return Status::OK();
}
Status DMSIterator::SeekToOrdinal(rowid_t row_idx) {
faststring buf;
DeltaKey key(row_idx, Timestamp(0));
key.EncodeTo(&buf);
bool exact; /* unused */
iter_->SeekAtOrAfter(Slice(buf), &exact);
prepared_idx_ = row_idx;
prepared_count_ = 0;
prepared_for_ = NOT_PREPARED;
seeked_ = true;
return Status::OK();
}
Status DMSIterator::PrepareBatch(size_t nrows, PrepareFlag flag) {
// This current implementation copies the whole batch worth of deltas
// into a buffer local to this iterator, after filtering out deltas which
// aren't yet committed in the current MVCC snapshot. The theory behind
// this approach is the following:
// Each batch needs to be processed once per column, meaning that unless
// we make a local copy, we'd have to reset the CBTree iterator back to the
// start of the batch and re-iterate for each column. CBTree iterators make
// local copies as they progress in order to shield from concurrent mutation,
// so with N columns, we'd end up making N copies of the data. Making a local
// copy here is instead a single copy of the data, so is likely faster.
CHECK(seeked_);
DCHECK(initted_) << "must init";
rowid_t start_row = prepared_idx_ + prepared_count_;
rowid_t stop_row = start_row + nrows - 1;
if (updates_by_col_.empty()) {
updates_by_col_.resize(projection_->num_columns());
}
for (UpdatesForColumn& ufc : updates_by_col_) {
ufc.clear();
}
deletes_and_reinserts_.clear();
prepared_deltas_.clear();
while (iter_->IsValid()) {
Slice key_slice, val;
iter_->GetCurrentEntry(&key_slice, &val);
DeltaKey key;
RETURN_NOT_OK(key.DecodeFrom(&key_slice));
DCHECK_GE(key.row_idx(), start_row);
if (key.row_idx() > stop_row) break;
if (!mvcc_snapshot_.IsCommitted(key.timestamp())) {
// The transaction which applied this update is not yet committed
// in this iterator's MVCC snapshot. Hence, skip it.
iter_->Next();
continue;
}
if (flag == PREPARE_FOR_APPLY) {
RowChangeListDecoder decoder((RowChangeList(val)));
decoder.InitNoSafetyChecks();
if (decoder.is_delete() || decoder.is_reinsert()) {
DeleteOrReinsert dor;
dor.row_id = key.row_idx();
dor.exists = decoder.is_reinsert();
deletes_and_reinserts_.push_back(dor);
} else {
DCHECK(decoder.is_update());
while (decoder.HasNext()) {
RowChangeListDecoder::DecodedUpdate dec;
RETURN_NOT_OK(decoder.DecodeNext(&dec));
int col_idx;
const void* col_val;
RETURN_NOT_OK(dec.Validate(*projection_, &col_idx, &col_val));
if (col_idx == -1) {
// This column isn't being projected.
continue;
}
int col_size = projection_->column(col_idx).type_info()->size();
// If we already have an earlier update for the same column, we can
// just overwrite that one.
if (updates_by_col_[col_idx].empty() ||
updates_by_col_[col_idx].back().row_id != key.row_idx()) {
updates_by_col_[col_idx].push_back(ColumnUpdate());
}
ColumnUpdate& cu = updates_by_col_[col_idx].back();
cu.row_id = key.row_idx();
if (col_val == nullptr) {
cu.new_val_ptr = nullptr;
} else {
memcpy(cu.new_val_buf, col_val, col_size);
// NOTE: we're constructing a pointer here to an element inside the deque.
// This is safe because deques never invalidate pointers to their elements.
cu.new_val_ptr = cu.new_val_buf;
}
}
}
} else {
DCHECK_EQ(flag, PREPARE_FOR_COLLECT);
PreparedDelta d;
d.key = key;
d.val = val;
prepared_deltas_.push_back(d);
}
iter_->Next();
}
prepared_idx_ = start_row;
prepared_count_ = nrows;
prepared_for_ = flag == PREPARE_FOR_APPLY ? PREPARED_FOR_APPLY : PREPARED_FOR_COLLECT;
return Status::OK();
}
Status DMSIterator::ApplyUpdates(size_t col_to_apply, ColumnBlock *dst) {
DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
DCHECK_EQ(prepared_count_, dst->nrows());
const ColumnSchema* col_schema = &projection_->column(col_to_apply);
for (const ColumnUpdate& cu : updates_by_col_[col_to_apply]) {
int32_t idx_in_block = cu.row_id - prepared_idx_;
DCHECK_GE(idx_in_block, 0);
SimpleConstCell src(col_schema, cu.new_val_ptr);
ColumnBlock::Cell dst_cell = dst->cell(idx_in_block);
RETURN_NOT_OK(CopyCell(src, &dst_cell, dst->arena()));
}
return Status::OK();
}
Status DMSIterator::ApplyDeletes(SelectionVector *sel_vec) {
DCHECK_EQ(prepared_for_, PREPARED_FOR_APPLY);
DCHECK_EQ(prepared_count_, sel_vec->nrows());
for (const DeleteOrReinsert& dor : deletes_and_reinserts_) {
uint32_t idx_in_block = dor.row_id - prepared_idx_;
if (!dor.exists) {
sel_vec->SetRowUnselected(idx_in_block);
}
}
return Status::OK();
}
Status DMSIterator::CollectMutations(vector<Mutation *> *dst, Arena *arena) {
DCHECK_EQ(prepared_for_, PREPARED_FOR_COLLECT);
for (const PreparedDelta& src : prepared_deltas_) {
DeltaKey key = src.key;;
RowChangeList changelist(src.val);
uint32_t rel_idx = key.row_idx() - prepared_idx_;
Mutation *mutation = Mutation::CreateInArena(arena, key.timestamp(), changelist);
mutation->PrependToList(&dst->at(rel_idx));
}
return Status::OK();
}
Status DMSIterator::FilterColumnIdsAndCollectDeltas(const vector<ColumnId>& col_ids,
vector<DeltaKeyAndUpdate>* out,
Arena* arena) {
LOG(DFATAL) << "Attempt to call FilterColumnIdsAndCollectDeltas on DMS" << GetStackTrace();
return Status::InvalidArgument("FilterColumsAndAppend() is not supported by DMSIterator");
}
bool DMSIterator::HasNext() {
// TODO implement this if we ever want to include DeltaMemStore in minor
// delta compaction.
LOG(FATAL) << "Unimplemented";
return false;
}
string DMSIterator::ToString() const {
return "DMSIterator";
}
} // namespace tablet
} // namespace kudu