blob: 8d9ca542a67b8628de281475575e6acee06920e7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/diskrowset.h"
#include <algorithm>
#include <map>
#include <optional>
#include <ostream>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include "kudu/cfile/bloomfile.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/generic_iterators.h"
#include "kudu/common/iterator.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/multi_column_writer.h"
#include "kudu/tablet/mutation.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
DEFINE_int32(tablet_delta_store_minor_compact_max, 1000,
"How many delta stores are required before forcing a minor delta compaction "
"(Advanced option)");
TAG_FLAG(tablet_delta_store_minor_compact_max, experimental);
DEFINE_double(tablet_delta_store_major_compact_min_ratio, 0.1f,
"Minimum ratio of sizeof(deltas) to sizeof(base data) before a major compaction "
"can run (Advanced option)");
TAG_FLAG(tablet_delta_store_major_compact_min_ratio, experimental);
DEFINE_int32(default_composite_key_index_block_size_bytes, 4096,
"Block size used for composite key indexes.");
TAG_FLAG(default_composite_key_index_block_size_bytes, experimental);
DEFINE_bool(rowset_metadata_store_keys, false,
"Whether to store the min/max encoded keys in the rowset "
"metadata. If false, keys will be read from the data blocks.");
TAG_FLAG(rowset_metadata_store_keys, experimental);
using kudu::cfile::BloomFileWriter;
using kudu::fs::BlockManager;
using kudu::fs::BlockCreationTransaction;
using kudu::fs::CreateBlockOptions;
using kudu::fs::IOContext;
using kudu::fs::WritableBlock;
using kudu::log::LogAnchorRegistry;
using std::optional;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
class Mutex;
namespace consensus {
class OpId;
}
namespace tablet {
const char *DiskRowSet::kMinKeyMetaEntryName = "min_key";
const char *DiskRowSet::kMaxKeyMetaEntryName = "max_key";
DiskRowSetWriter::DiskRowSetWriter(RowSetMetadata* rowset_metadata,
const Schema* schema,
BloomFilterSizing bloom_sizing)
: rowset_metadata_(rowset_metadata),
schema_(schema),
bloom_sizing_(bloom_sizing),
finished_(false),
written_count_(0) {
CHECK(schema->has_column_ids());
}
Status DiskRowSetWriter::Open() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::Open");
FsManager* fs = rowset_metadata_->fs_manager();
const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
col_writer_.reset(new MultiColumnWriter(fs, schema_, tablet_id));
RETURN_NOT_OK(col_writer_->Open());
// Open bloom filter.
RETURN_NOT_OK(InitBloomFileWriter());
if (schema_->num_key_columns() > 1) {
// Open ad-hoc index writer
RETURN_NOT_OK(InitAdHocIndexWriter());
}
return Status::OK();
}
Status DiskRowSetWriter::InitBloomFileWriter() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::InitBloomFileWriter");
unique_ptr<WritableBlock> block;
FsManager* fs = rowset_metadata_->fs_manager();
const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(CreateBlockOptions({ tablet_id }),
&block),
"Couldn't allocate a block for bloom filter");
rowset_metadata_->set_bloom_block(block->id());
bloom_writer_.reset(new cfile::BloomFileWriter(std::move(block), bloom_sizing_));
RETURN_NOT_OK(bloom_writer_->Start());
return Status::OK();
}
Status DiskRowSetWriter::InitAdHocIndexWriter() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::InitAdHocIndexWriter");
unique_ptr<WritableBlock> block;
FsManager* fs = rowset_metadata_->fs_manager();
const string& tablet_id = rowset_metadata_->tablet_metadata()->tablet_id();
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(CreateBlockOptions({ tablet_id }),
&block),
"Couldn't allocate a block for compoound index");
rowset_metadata_->set_adhoc_index_block(block->id());
cfile::WriterOptions opts;
// Index the composite key by value
opts.write_validx = true;
// no need to index positions
opts.write_posidx = false;
opts.storage_attributes.encoding = PREFIX_ENCODING;
opts.storage_attributes.compression = LZ4;
opts.storage_attributes.cfile_block_size = FLAGS_default_composite_key_index_block_size_bytes;
// Create the CFile writer for the ad-hoc index.
ad_hoc_index_writer_.reset(new cfile::CFileWriter(
std::move(opts),
GetTypeInfo(BINARY),
false,
std::move(block)));
return ad_hoc_index_writer_->Start();
}
Status DiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
DCHECK_EQ(block.schema()->num_columns(), schema_->num_columns());
CHECK(!finished_);
// If this is the very first block, encode the first key and save it as metadata
// in the index column.
if (written_count_ == 0) {
Slice enc_key = schema_->EncodeComparableKey(block.row(0), &last_encoded_key_);
key_index_writer()->AddMetadataPair(DiskRowSet::kMinKeyMetaEntryName, enc_key);
if (FLAGS_rowset_metadata_store_keys) {
rowset_metadata_->set_min_encoded_key(enc_key.ToString());
}
last_encoded_key_.clear();
}
// Write the batch to each of the columns
RETURN_NOT_OK(col_writer_->AppendBlock(block));
// Increase the live row count if necessary.
rowset_metadata_->IncrementLiveRows(live_row_count);
#ifndef NDEBUG
faststring prev_key;
#endif
// Write the batch to the bloom and optionally the ad-hoc index
for (size_t i = 0; i < block.nrows(); i++) {
#ifndef NDEBUG
prev_key.assign_copy(last_encoded_key_.data(), last_encoded_key_.size());
#endif
// TODO: performance might be better if we actually batch this -
// encode a bunch of key slices, then pass them all in one go.
RowBlockRow row = block.row(i);
// Insert the encoded key into the bloom.
Slice enc_key = schema_->EncodeComparableKey(row, &last_encoded_key_);
RETURN_NOT_OK(bloom_writer_->AppendKeys(&enc_key, 1));
// Write the batch to the ad hoc index if we're using one
if (ad_hoc_index_writer_ != nullptr) {
RETURN_NOT_OK(ad_hoc_index_writer_->AppendEntries(&enc_key, 1));
}
#ifndef NDEBUG
CHECK(prev_key.size() == 0 || Slice(prev_key) < enc_key)
<< KUDU_REDACT(enc_key.ToDebugString()) << " appended to file not > previous key "
<< KUDU_REDACT(Slice(prev_key).ToDebugString());
#endif
}
written_count_ += block.nrows();
return Status::OK();
}
Status DiskRowSetWriter::Finish() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::Finish");
BlockManager* bm = rowset_metadata_->fs_manager()->block_manager();
unique_ptr<BlockCreationTransaction> transaction = bm->NewCreationTransaction();
RETURN_NOT_OK(FinishAndReleaseBlocks(transaction.get()));
return transaction->CommitCreatedBlocks();
}
Status DiskRowSetWriter::FinishAndReleaseBlocks(BlockCreationTransaction* transaction) {
TRACE_EVENT0("tablet", "DiskRowSetWriter::FinishAndReleaseBlocks");
CHECK(!finished_);
if (written_count_ == 0) {
finished_ = true;
return Status::Aborted("no data written");
}
// Save the last encoded (max) key
Slice last_enc_slice(last_encoded_key_);
std::string first_encoded_key =
key_index_writer()->GetMetaValueOrDie(DiskRowSet::kMinKeyMetaEntryName);
Slice first_enc_slice(first_encoded_key);
CHECK(first_enc_slice <= last_enc_slice)
<< "First Key not <= Last key: first_key="
<< KUDU_REDACT(first_enc_slice.ToDebugString())
<< " last_key=" << KUDU_REDACT(last_enc_slice.ToDebugString());
key_index_writer()->AddMetadataPair(DiskRowSet::kMaxKeyMetaEntryName, last_enc_slice);
if (FLAGS_rowset_metadata_store_keys) {
rowset_metadata_->set_max_encoded_key(last_enc_slice.ToString());
}
// Finish writing the columns themselves.
RETURN_NOT_OK(col_writer_->FinishAndReleaseBlocks(transaction));
// Put the column data blocks in the metadata.
std::map<ColumnId, BlockId> flushed_blocks;
col_writer_->GetFlushedBlocksByColumnId(&flushed_blocks);
rowset_metadata_->SetColumnDataBlocks(flushed_blocks);
if (ad_hoc_index_writer_ != nullptr) {
Status s = ad_hoc_index_writer_->FinishAndReleaseBlock(transaction);
if (!s.ok()) {
LOG(WARNING) << "Unable to Finish ad hoc index writer: " << s.ToString();
return s;
}
}
// Finish bloom.
Status s = bloom_writer_->FinishAndReleaseBlock(transaction);
if (!s.ok()) {
LOG(WARNING) << "Unable to Finish bloom filter writer: " << s.ToString();
return s;
}
finished_ = true;
return Status::OK();
}
cfile::CFileWriter *DiskRowSetWriter::key_index_writer() {
return ad_hoc_index_writer_ ? ad_hoc_index_writer_.get() : col_writer_->writer_for_col_idx(0);
}
size_t DiskRowSetWriter::written_size() const {
size_t size = 0;
if (col_writer_) {
size += col_writer_->written_size();
}
if (bloom_writer_) {
size += bloom_writer_->written_size();
}
if (ad_hoc_index_writer_) {
size += ad_hoc_index_writer_->written_size();
}
return size;
}
DiskRowSetWriter::~DiskRowSetWriter() {
}
RollingDiskRowSetWriter::RollingDiskRowSetWriter(
TabletMetadata* tablet_metadata, const Schema& schema,
BloomFilterSizing bloom_sizing, size_t target_rowset_size)
: state_(kInitialized),
tablet_metadata_(DCHECK_NOTNULL(tablet_metadata)),
schema_(schema),
bloom_sizing_(bloom_sizing),
target_rowset_size_(target_rowset_size),
row_idx_in_cur_drs_(0),
can_roll_(false),
written_count_(0),
written_size_(0) {
BlockManager* bm = tablet_metadata->fs_manager()->block_manager();
block_transaction_ = bm->NewCreationTransaction();
CHECK(schema.has_column_ids());
}
Status RollingDiskRowSetWriter::Open() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::Open");
CHECK_EQ(state_, kInitialized);
RETURN_NOT_OK(RollWriter());
state_ = kStarted;
return Status::OK();
}
Status RollingDiskRowSetWriter::RollWriter() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::RollWriter");
// Close current writer if it is open
RETURN_NOT_OK(FinishCurrentWriter());
RETURN_NOT_OK(tablet_metadata_->CreateRowSet(&cur_drs_metadata_));
cur_writer_.reset(new DiskRowSetWriter(cur_drs_metadata_.get(), &schema_, bloom_sizing_));
RETURN_NOT_OK(cur_writer_->Open());
FsManager* fs = tablet_metadata_->fs_manager();
unique_ptr<WritableBlock> undo_data_block;
unique_ptr<WritableBlock> redo_data_block;
RETURN_NOT_OK(fs->CreateNewBlock(CreateBlockOptions({ tablet_metadata_->tablet_id() }),
&undo_data_block));
RETURN_NOT_OK(fs->CreateNewBlock(CreateBlockOptions({ tablet_metadata_->tablet_id() }),
&redo_data_block));
cur_undo_ds_block_id_ = undo_data_block->id();
cur_redo_ds_block_id_ = redo_data_block->id();
cur_undo_writer_.reset(new DeltaFileWriter(std::move(undo_data_block)));
cur_redo_writer_.reset(new DeltaFileWriter(std::move(redo_data_block)));
cur_undo_delta_stats_.reset(new DeltaStats());
cur_redo_delta_stats_.reset(new DeltaStats());
row_idx_in_cur_drs_ = 0;
can_roll_ = false;
RETURN_NOT_OK(cur_undo_writer_->Start());
return cur_redo_writer_->Start();
}
Status RollingDiskRowSetWriter::RollIfNecessary() {
DCHECK_EQ(state_, kStarted);
if (can_roll_ && cur_writer_->written_size() > target_rowset_size_) {
RETURN_NOT_OK(RollWriter());
}
return Status::OK();
}
Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block, int live_row_count) {
DCHECK_EQ(state_, kStarted);
RETURN_NOT_OK(cur_writer_->AppendBlock(block, live_row_count));
written_count_ += block.nrows();
row_idx_in_cur_drs_ += block.nrows();
can_roll_ = true;
return Status::OK();
}
Status RollingDiskRowSetWriter::AppendUndoDeltas(rowid_t row_idx_in_block,
Mutation* undo_delta_head,
rowid_t* row_idx) {
return AppendDeltas<UNDO>(row_idx_in_block, undo_delta_head,
row_idx,
cur_undo_writer_.get(),
cur_undo_delta_stats_.get());
}
Status RollingDiskRowSetWriter::AppendRedoDeltas(rowid_t row_idx_in_block,
Mutation* redo_delta_head,
rowid_t* row_idx) {
return AppendDeltas<REDO>(row_idx_in_block, redo_delta_head,
row_idx,
cur_redo_writer_.get(),
cur_redo_delta_stats_.get());
}
template<DeltaType Type>
Status RollingDiskRowSetWriter::AppendDeltas(rowid_t row_idx_in_block,
Mutation* delta_head,
rowid_t* row_idx,
DeltaFileWriter* writer,
DeltaStats* delta_stats) {
can_roll_ = false;
*row_idx = row_idx_in_cur_drs_ + row_idx_in_block;
for (const Mutation *mut = delta_head; mut != nullptr; mut = mut->next()) {
DeltaKey undo_key(*row_idx, mut->timestamp());
RETURN_NOT_OK(writer->AppendDelta<Type>(undo_key, mut->changelist()));
delta_stats->UpdateStats(mut->timestamp(), mut->changelist());
}
return Status::OK();
}
Status RollingDiskRowSetWriter::FinishCurrentWriter() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::FinishCurrentWriter");
if (!cur_writer_) {
return Status::OK();
}
CHECK_EQ(state_, kStarted);
Status writer_status = cur_writer_->FinishAndReleaseBlocks(block_transaction_.get());
// If no rows were written (e.g. due to an empty flush or a compaction with all rows
// deleted), FinishAndReleaseBlocks(...) returns Aborted. In that case, we don't
// generate a RowSetMetadata.
if (writer_status.IsAborted()) {
CHECK_EQ(cur_writer_->written_count(), 0);
} else {
RETURN_NOT_OK(writer_status);
CHECK_GT(cur_writer_->written_count(), 0);
cur_undo_writer_->WriteDeltaStats(std::move(cur_undo_delta_stats_));
cur_redo_writer_->WriteDeltaStats(std::move(cur_redo_delta_stats_));
// Commit the UNDO block. Status::Aborted() indicates that there
// were no UNDOs written.
Status s = cur_undo_writer_->FinishAndReleaseBlock(block_transaction_.get());
if (!s.IsAborted()) {
RETURN_NOT_OK(s);
cur_drs_metadata_->CommitUndoDeltaDataBlock(cur_undo_ds_block_id_);
}
// Same for the REDO block.
s = cur_redo_writer_->FinishAndReleaseBlock(block_transaction_.get());
if (!s.IsAborted()) {
RETURN_NOT_OK(s);
cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_);
}
written_size_ += cur_writer_->written_size();
written_drs_metas_.push_back(cur_drs_metadata_);
}
cur_writer_.reset(nullptr);
cur_undo_writer_.reset(nullptr);
cur_redo_writer_.reset(nullptr);
cur_drs_metadata_.reset();
return Status::OK();
}
Status RollingDiskRowSetWriter::Finish() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::Finish");
DCHECK_EQ(state_, kStarted);
RETURN_NOT_OK(FinishCurrentWriter());
RETURN_NOT_OK(block_transaction_->CommitCreatedBlocks());
state_ = kFinished;
return Status::OK();
}
void RollingDiskRowSetWriter::GetWrittenRowSetMetadata(RowSetMetadataVector* metas) const {
CHECK_EQ(state_, kFinished);
metas->assign(written_drs_metas_.begin(), written_drs_metas_.end());
}
RollingDiskRowSetWriter::~RollingDiskRowSetWriter() {
}
////////////////////////////////////////////////////////////
// Reader
////////////////////////////////////////////////////////////
Status DiskRowSet::Open(const shared_ptr<RowSetMetadata>& rowset_metadata,
log::LogAnchorRegistry* log_anchor_registry,
const TabletMemTrackers& mem_trackers,
const IOContext* io_context,
shared_ptr<DiskRowSet> *rowset) {
auto rs(DiskRowSet::make_shared(
rowset_metadata, log_anchor_registry, mem_trackers));
RETURN_NOT_OK(rs->Open(io_context));
*rowset = std::move(rs);
return Status::OK();
}
DiskRowSet::DiskRowSet(shared_ptr<RowSetMetadata> rowset_metadata,
LogAnchorRegistry* log_anchor_registry,
TabletMemTrackers mem_trackers)
: rowset_metadata_(std::move(rowset_metadata)),
open_(false),
log_anchor_registry_(log_anchor_registry),
mem_trackers_(std::move(mem_trackers)),
num_rows_(-1),
has_been_compacted_(false) {}
Status DiskRowSet::Open(const IOContext* io_context) {
TRACE_EVENT0("tablet", "DiskRowSet::Open");
RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
mem_trackers_.bloomfile_tracker,
mem_trackers_.cfile_reader_tracker,
io_context,
&base_data_));
RETURN_NOT_OK(DeltaTracker::Open(rowset_metadata_,
log_anchor_registry_,
mem_trackers_,
io_context,
&delta_tracker_));
open_ = true;
return Status::OK();
}
Status DiskRowSet::FlushDeltas(const IOContext* io_context) {
TRACE_EVENT0("tablet", "DiskRowSet::FlushDeltas");
return delta_tracker_->Flush(io_context, DeltaTracker::FLUSH_METADATA);
}
Status DiskRowSet::MinorCompactDeltaStores(const IOContext* io_context) {
TRACE_EVENT0("tablet", "DiskRowSet::MinorCompactDeltaStores");
return delta_tracker_->Compact(io_context);
}
Status DiskRowSet::MajorCompactDeltaStores(const IOContext* io_context,
HistoryGcOpts history_gc_opts) {
vector<ColumnId> col_ids;
delta_tracker_->GetColumnIdsWithUpdates(&col_ids);
if (col_ids.empty()) {
VLOG_WITH_PREFIX(2) << "There are no column ids with updates";
return Status::OK();
}
return MajorCompactDeltaStoresWithColumnIds(col_ids, io_context, std::move(history_gc_opts));
}
Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>& col_ids,
const IOContext* io_context,
HistoryGcOpts history_gc_opts) {
VLOG_WITH_PREFIX(1) << "Major compacting REDO delta stores (cols: " << col_ids << ")";
TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
RETURN_NOT_OK(delta_tracker()->CheckWritableUnlocked());
// TODO(todd): do we need to lock schema or anything here?
unique_ptr<MajorDeltaCompaction> compaction;
RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts),
io_context, &compaction));
RETURN_NOT_OK(compaction->Compact(io_context));
// Before updating anything, create a copy of the rowset metadata so we can
// revert changes in case of error.
RowSetDataPB original_pb;
rowset_metadata_->ToProtobuf(&original_pb);
auto revert_metadata_update = MakeScopedCleanup([&] {
LOG_WITH_PREFIX(WARNING) << "Error during major delta compaction! Rolling back rowset metadata";
rowset_metadata_->LoadFromPB(original_pb);
});
// Prepare the changes to the metadata.
RowSetMetadataUpdate update;
compaction->CreateMetadataUpdate(&update);
BlockIdContainer removed_blocks;
rowset_metadata_->CommitUpdate(update, &removed_blocks);
// Now that the metadata has been updated, open a new cfile set with the
// appropriate blocks to match the update.
shared_ptr<CFileSet> new_base;
RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
mem_trackers_.bloomfile_tracker,
mem_trackers_.cfile_reader_tracker,
io_context,
&new_base));
{
// Update the delta tracker and the base data with the changes.
std::lock_guard<rw_spinlock> lock(component_lock_);
RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get(), io_context));
base_data_.swap(new_base);
}
// Now that we've successfully compacted, add the removed blocks to the
// orphaned blocks list and cancel cleanup.
rowset_metadata_->AddOrphanedBlocks(removed_blocks);
revert_metadata_update.cancel();
// Even if we don't successfully flush we don't have consistency problems in
// the case of major delta compaction -- we are not adding additional
// mutations that werent already present.
return rowset_metadata_->Flush();
}
Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
HistoryGcOpts history_gc_opts,
const IOContext* io_context,
unique_ptr<MajorDeltaCompaction>* out) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
const SchemaPtr schema_ptr = rowset_metadata_->tablet_schema();
RowIteratorOptions opts;
opts.projection = schema_ptr.get();
opts.io_context = io_context;
vector<shared_ptr<DeltaStore>> included_stores;
unique_ptr<DeltaIterator> delta_iter;
RETURN_NOT_OK(delta_tracker_->NewDeltaFileIterator(
opts, REDO, &included_stores, &delta_iter));
out->reset(new MajorDeltaCompaction(rowset_metadata_->fs_manager(),
*schema_ptr,
base_data_.get(),
std::move(delta_iter),
std::move(included_stores),
col_ids,
std::move(history_gc_opts),
rowset_metadata_->tablet_metadata()->tablet_id()));
return Status::OK();
}
Status DiskRowSet::NewRowIterator(const RowIteratorOptions& opts,
unique_ptr<RowwiseIterator>* out) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
shared_ptr<CFileSet::Iterator> base_iter(base_data_->NewIterator(opts.projection,
opts.io_context));
unique_ptr<ColumnwiseIterator> col_iter;
RETURN_NOT_OK(delta_tracker_->WrapIterator(base_iter, opts, &col_iter));
*out = NewMaterializingIterator(std::move(col_iter));
return Status::OK();
}
Status DiskRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const IOContext* io_context,
unique_ptr<CompactionInput>* out) const {
return CompactionInput::Create(*this, projection, snap, io_context, out);
}
Status DiskRowSet::MutateRow(Timestamp timestamp,
const RowSetKeyProbe &probe,
const RowChangeList &update,
const consensus::OpId& op_id,
const IOContext* io_context,
ProbeStats* stats,
OperationResultPB* result) {
DCHECK(open_);
#ifndef NDEBUG
rowid_t num_rows;
RETURN_NOT_OK(CountRows(io_context, &num_rows));
#endif
shared_lock<rw_spinlock> l(component_lock_);
optional<rowid_t> row_idx;
RETURN_NOT_OK(base_data_->FindRow(probe, io_context, &row_idx, stats));
if (PREDICT_FALSE(!row_idx)) {
return Status::NotFound("row not found");
}
#ifndef NDEBUG
CHECK_LT(*row_idx, num_rows);
#endif
// It's possible that the row key exists in this DiskRowSet, but it has
// in fact been Deleted already. Check with the delta tracker to be sure.
bool deleted;
RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(*row_idx, io_context, &deleted, stats));
if (deleted) {
return Status::NotFound("row not found");
}
RETURN_NOT_OK(delta_tracker_->Update(timestamp, *row_idx, update, op_id, result));
return Status::OK();
}
Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
const IOContext* io_context,
bool* present,
ProbeStats* stats) const {
DCHECK(open_);
#ifndef NDEBUG
rowid_t num_rows;
RETURN_NOT_OK(CountRows(io_context, &num_rows));
#endif
shared_lock<rw_spinlock> l(component_lock_);
rowid_t row_idx;
RETURN_NOT_OK(base_data_->CheckRowPresent(probe, io_context, present, &row_idx, stats));
if (!*present) {
// If it wasn't in the base data, then it's definitely not in the rowset.
return Status::OK();
}
#ifndef NDEBUG
CHECK_LT(row_idx, num_rows);
#endif
// Otherwise it might be in the base data but deleted.
bool deleted = false;
RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(row_idx, io_context, &deleted, stats));
*present = !deleted;
return Status::OK();
}
Status DiskRowSet::CountRows(const IOContext* io_context, rowid_t *count) const {
DCHECK(open_);
rowid_t num_rows = num_rows_.load();
if (PREDICT_TRUE(num_rows != -1)) {
*count = num_rows;
} else {
shared_lock<rw_spinlock> l(component_lock_);
RETURN_NOT_OK(base_data_->CountRows(io_context, count));
num_rows_.store(*count);
}
return Status::OK();
}
Status DiskRowSet::CountLiveRows(uint64_t* count) const {
DCHECK_GE(rowset_metadata_->live_row_count(), delta_tracker_->CountDeletedRows());
*count = rowset_metadata_->live_row_count() - delta_tracker_->CountDeletedRows();
return Status::OK();
}
Status DiskRowSet::GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
return base_data_->GetBounds(min_encoded_key, max_encoded_key);
}
void DiskRowSet::GetDiskRowSetSpaceUsage(DiskRowSetSpace* drss) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
drss->base_data_size = base_data_->OnDiskDataSize();
drss->bloom_size = base_data_->BloomFileOnDiskSize();
drss->ad_hoc_index_size = base_data_->AdhocIndexOnDiskSize();
drss->redo_deltas_size = delta_tracker_->RedoDeltaOnDiskSize();
drss->undo_deltas_size = delta_tracker_->UndoDeltaOnDiskSize();
}
uint64_t DiskRowSet::OnDiskSize() const {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
return drss.CFileSetOnDiskSize() + drss.redo_deltas_size + drss.undo_deltas_size;
}
uint64_t DiskRowSet::OnDiskBaseDataSize() const {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
return drss.base_data_size;
}
uint64_t DiskRowSet::OnDiskBaseDataColumnSize(const ColumnId& col_id) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
if (base_data_->has_data_for_column_id(col_id)) {
return base_data_->OnDiskColumnDataSize(col_id);
}
return 0;
}
uint64_t DiskRowSet::OnDiskBaseDataSizeWithRedos() const {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
return drss.base_data_size + drss.redo_deltas_size;
}
size_t DiskRowSet::DeltaMemStoreSize() const {
DCHECK(open_);
return delta_tracker_->DeltaMemStoreSize();
}
bool DiskRowSet::DeltaMemStoreEmpty() const {
DCHECK(open_);
return delta_tracker_->DeltaMemStoreEmpty();
}
bool DiskRowSet::DeltaMemStoreInfo(size_t* size_bytes, MonoTime* creation_time) const {
DCHECK(open_);
return delta_tracker_->GetDeltaMemStoreInfo(size_bytes, creation_time);
}
int64_t DiskRowSet::MinUnflushedLogIndex() const {
DCHECK(open_);
return delta_tracker_->MinUnflushedLogIndex();
}
size_t DiskRowSet::CountDeltaStores() const {
DCHECK(open_);
return delta_tracker_->CountRedoDeltaStores();
}
// In this implementation, the returned improvement score is 0 if there aren't any redo files to
// compact or if the base data is empty. After this, with a max score of 1:
// - Major compactions: the score will be the result of sizeof(deltas)/sizeof(base data), unless
// it is smaller than tablet_delta_store_major_compact_min_ratio or if the
// delta files are only composed of deletes, in which case the score is
// brought down to zero.
// - Minor compactions: the score will be zero if there's only 1 redo file, else it will be the
// result of redo_files_count/tablet_delta_store_minor_compact_max. The
// latter is meant to be high since minor compactions don't give us much, so
// we only consider it a gain if it gets rid of many tiny files.
double DiskRowSet::DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const {
DCHECK(open_);
double perf_improv = 0;
size_t store_count = CountDeltaStores();
if (store_count == 0) {
return perf_improv;
}
if (type == RowSet::MAJOR_DELTA_COMPACTION) {
vector<ColumnId> col_ids_with_updates;
delta_tracker_->GetColumnIdsWithUpdates(&col_ids_with_updates);
// If we have files but no updates, we don't want to major compact.
if (!col_ids_with_updates.empty()) {
DiskRowSetSpace drss;
GetDiskRowSetSpaceUsage(&drss);
double ratio = static_cast<double>(drss.redo_deltas_size) / drss.base_data_size;
if (ratio >= FLAGS_tablet_delta_store_major_compact_min_ratio) {
perf_improv = ratio;
}
}
} else if (type == RowSet::MINOR_DELTA_COMPACTION) {
if (store_count > 1) {
perf_improv = static_cast<double>(store_count) / FLAGS_tablet_delta_store_minor_compact_max;
}
} else {
LOG_WITH_PREFIX(FATAL) << "Unknown delta compaction type " << type;
}
return std::min(1.0, perf_improv);
}
Status DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
int64_t* bytes) {
return delta_tracker_->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark, bytes);
}
Status DiskRowSet::IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
bool* deleted_and_ancient) {
uint64_t live_row_count = 0;
RETURN_NOT_OK(CountLiveRows(&live_row_count));
if (live_row_count > 0) {
*deleted_and_ancient = false;
return Status::OK();
}
// NOTE: this estimate might not read from disk and may thus return false
// despite having ancient on-disk data. That's sufficient because false
// negatives are OK for the purposes of GCing deleted rowsets -- we just
// won't delete them.
*deleted_and_ancient = delta_tracker_->EstimateAllRedosAreAncient(ancient_history_mark);
return Status::OK();
}
Status DiskRowSet::InitUndoDeltas(Timestamp ancient_history_mark,
MonoTime deadline,
const IOContext* io_context,
int64_t* delta_blocks_initialized,
int64_t* bytes_in_ancient_undos) {
TRACE_EVENT0("tablet", "DiskRowSet::InitUndoDeltas");
return delta_tracker_->InitUndoDeltas(ancient_history_mark, deadline, io_context,
delta_blocks_initialized, bytes_in_ancient_undos);
}
Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
const IOContext* io_context,
int64_t* blocks_deleted, int64_t* bytes_deleted) {
TRACE_EVENT0("tablet", "DiskRowSet::DeleteAncientUndoDeltas");
return delta_tracker_->DeleteAncientUndoDeltas(ancient_history_mark, io_context,
blocks_deleted, bytes_deleted);
}
Status DiskRowSet::DebugDump(vector<string> *lines) {
// Using CompactionInput to dump our data is an easy way of seeing all the
// rows and deltas.
unique_ptr<CompactionInput> input;
RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
MvccSnapshot::CreateSnapshotIncludingAllOps(),
nullptr, &input));
return DebugDumpCompactionInput(input.get(), lines);
}
} // namespace tablet
} // namespace kudu