blob: 5feeab4b8f3a4f569694f0bfc1b50c884c1f7451 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <glog/logging.h>
#include <mutex>
#include <vector>
#include "kudu/common/generic_iterators.h"
#include "kudu/common/iterator.h"
#include "kudu/common/schema.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/cfile/bloomfile.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/type_encodings.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/multi_column_writer.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"
DEFINE_int32(tablet_delta_store_minor_compact_max, 1000,
"How many delta stores are required before forcing a minor delta compaction "
"(Advanced option)");
TAG_FLAG(tablet_delta_store_minor_compact_max, experimental);
DEFINE_double(tablet_delta_store_major_compact_min_ratio, 0.1f,
"Minimum ratio of sizeof(deltas) to sizeof(base data) before a major compaction "
"can run (Advanced option)");
TAG_FLAG(tablet_delta_store_major_compact_min_ratio, experimental);
DEFINE_int32(default_composite_key_index_block_size_bytes, 4096,
"Block size used for composite key indexes.");
TAG_FLAG(default_composite_key_index_block_size_bytes, experimental);
namespace kudu {
namespace tablet {
using cfile::BloomFileWriter;
using fs::ScopedWritableBlockCloser;
using fs::WritableBlock;
using log::LogAnchorRegistry;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
const char *DiskRowSet::kMinKeyMetaEntryName = "min_key";
const char *DiskRowSet::kMaxKeyMetaEntryName = "max_key";
DiskRowSetWriter::DiskRowSetWriter(RowSetMetadata* rowset_metadata,
const Schema* schema,
BloomFilterSizing bloom_sizing)
: rowset_metadata_(rowset_metadata),
schema_(schema),
bloom_sizing_(std::move(bloom_sizing)),
finished_(false),
written_count_(0) {
CHECK(schema->has_column_ids());
}
Status DiskRowSetWriter::Open() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::Open");
FsManager* fs = rowset_metadata_->fs_manager();
col_writer_.reset(new MultiColumnWriter(fs, schema_));
RETURN_NOT_OK(col_writer_->Open());
// Open bloom filter.
RETURN_NOT_OK(InitBloomFileWriter());
if (schema_->num_key_columns() > 1) {
// Open ad-hoc index writer
RETURN_NOT_OK(InitAdHocIndexWriter());
}
return Status::OK();
}
Status DiskRowSetWriter::InitBloomFileWriter() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::InitBloomFileWriter");
gscoped_ptr<WritableBlock> block;
FsManager* fs = rowset_metadata_->fs_manager();
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block),
"Couldn't allocate a block for bloom filter");
rowset_metadata_->set_bloom_block(block->id());
bloom_writer_.reset(new cfile::BloomFileWriter(std::move(block), bloom_sizing_));
RETURN_NOT_OK(bloom_writer_->Start());
return Status::OK();
}
Status DiskRowSetWriter::InitAdHocIndexWriter() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::InitAdHocIndexWriter");
gscoped_ptr<WritableBlock> block;
FsManager* fs = rowset_metadata_->fs_manager();
RETURN_NOT_OK_PREPEND(fs->CreateNewBlock(&block),
"Couldn't allocate a block for compoound index");
rowset_metadata_->set_adhoc_index_block(block->id());
// TODO: allow options to be configured, perhaps on a per-column
// basis as part of the schema. For now use defaults.
//
// Also would be able to set encoding here, or do something smart
// to figure out the encoding on the fly.
cfile::WriterOptions opts;
// Index the composite key by value
opts.write_validx = true;
// no need to index positions
opts.write_posidx = false;
opts.storage_attributes.encoding = PREFIX_ENCODING;
opts.storage_attributes.compression = LZ4;
opts.storage_attributes.cfile_block_size = FLAGS_default_composite_key_index_block_size_bytes;
// Create the CFile writer for the ad-hoc index.
ad_hoc_index_writer_.reset(new cfile::CFileWriter(
opts,
GetTypeInfo(BINARY),
false,
std::move(block)));
return ad_hoc_index_writer_->Start();
}
Status DiskRowSetWriter::AppendBlock(const RowBlock &block) {
DCHECK_EQ(block.schema().num_columns(), schema_->num_columns());
CHECK(!finished_);
// If this is the very first block, encode the first key and save it as metadata
// in the index column.
if (written_count_ == 0) {
Slice enc_key = schema_->EncodeComparableKey(block.row(0), &last_encoded_key_);
key_index_writer()->AddMetadataPair(DiskRowSet::kMinKeyMetaEntryName, enc_key);
last_encoded_key_.clear();
}
// Write the batch to each of the columns
RETURN_NOT_OK(col_writer_->AppendBlock(block));
#ifndef NDEBUG
faststring prev_key;
#endif
// Write the batch to the bloom and optionally the ad-hoc index
for (size_t i = 0; i < block.nrows(); i++) {
#ifndef NDEBUG
prev_key.assign_copy(last_encoded_key_.data(), last_encoded_key_.size());
#endif
// TODO: performance might be better if we actually batch this -
// encode a bunch of key slices, then pass them all in one go.
RowBlockRow row = block.row(i);
// Insert the encoded key into the bloom.
Slice enc_key = schema_->EncodeComparableKey(row, &last_encoded_key_);
RETURN_NOT_OK(bloom_writer_->AppendKeys(&enc_key, 1));
// Write the batch to the ad hoc index if we're using one
if (ad_hoc_index_writer_ != nullptr) {
RETURN_NOT_OK(ad_hoc_index_writer_->AppendEntries(&enc_key, 1));
}
#ifndef NDEBUG
CHECK_LT(Slice(prev_key).compare(enc_key), 0)
<< enc_key.ToDebugString() << " appended to file not > previous key "
<< Slice(prev_key).ToDebugString();
#endif
}
written_count_ += block.nrows();
return Status::OK();
}
Status DiskRowSetWriter::Finish() {
TRACE_EVENT0("tablet", "DiskRowSetWriter::Finish");
ScopedWritableBlockCloser closer;
RETURN_NOT_OK(FinishAndReleaseBlocks(&closer));
return closer.CloseBlocks();
}
Status DiskRowSetWriter::FinishAndReleaseBlocks(ScopedWritableBlockCloser* closer) {
TRACE_EVENT0("tablet", "DiskRowSetWriter::FinishAndReleaseBlocks");
CHECK(!finished_);
if (written_count_ == 0) {
finished_ = true;
return Status::Aborted("no data written");
}
// Save the last encoded (max) key
CHECK_GT(last_encoded_key_.size(), 0);
Slice last_enc_slice(last_encoded_key_);
std::string first_encoded_key =
key_index_writer()->GetMetaValueOrDie(DiskRowSet::kMinKeyMetaEntryName);
Slice first_enc_slice(first_encoded_key);
CHECK_LE(first_enc_slice.compare(last_enc_slice), 0)
<< "First Key not <= Last key: first_key=" << first_enc_slice.ToDebugString()
<< " last_key=" << last_enc_slice.ToDebugString();
key_index_writer()->AddMetadataPair(DiskRowSet::kMaxKeyMetaEntryName, last_enc_slice);
// Finish writing the columns themselves.
RETURN_NOT_OK(col_writer_->FinishAndReleaseBlocks(closer));
// Put the column data blocks in the metadata.
RowSetMetadata::ColumnIdToBlockIdMap flushed_blocks;
col_writer_->GetFlushedBlocksByColumnId(&flushed_blocks);
rowset_metadata_->SetColumnDataBlocks(flushed_blocks);
if (ad_hoc_index_writer_ != nullptr) {
Status s = ad_hoc_index_writer_->FinishAndReleaseBlock(closer);
if (!s.ok()) {
LOG(WARNING) << "Unable to Finish ad hoc index writer: " << s.ToString();
return s;
}
}
// Finish bloom.
Status s = bloom_writer_->FinishAndReleaseBlock(closer);
if (!s.ok()) {
LOG(WARNING) << "Unable to Finish bloom filter writer: " << s.ToString();
return s;
}
finished_ = true;
return Status::OK();
}
cfile::CFileWriter *DiskRowSetWriter::key_index_writer() {
return ad_hoc_index_writer_ ? ad_hoc_index_writer_.get() : col_writer_->writer_for_col_idx(0);
}
size_t DiskRowSetWriter::written_size() const {
size_t size = 0;
if (col_writer_) {
size += col_writer_->written_size();
}
if (bloom_writer_) {
size += bloom_writer_->written_size();
}
if (ad_hoc_index_writer_) {
size += ad_hoc_index_writer_->written_size();
}
return size;
}
DiskRowSetWriter::~DiskRowSetWriter() {
}
RollingDiskRowSetWriter::RollingDiskRowSetWriter(
TabletMetadata* tablet_metadata, const Schema& schema,
BloomFilterSizing bloom_sizing, size_t target_rowset_size)
: state_(kInitialized),
tablet_metadata_(DCHECK_NOTNULL(tablet_metadata)),
schema_(schema),
bloom_sizing_(std::move(bloom_sizing)),
target_rowset_size_(target_rowset_size),
row_idx_in_cur_drs_(0),
can_roll_(false),
written_count_(0),
written_size_(0) {
CHECK(schema.has_column_ids());
}
Status RollingDiskRowSetWriter::Open() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::Open");
CHECK_EQ(state_, kInitialized);
RETURN_NOT_OK(RollWriter());
state_ = kStarted;
return Status::OK();
}
Status RollingDiskRowSetWriter::RollWriter() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::RollWriter");
// Close current writer if it is open
RETURN_NOT_OK(FinishCurrentWriter());
RETURN_NOT_OK(tablet_metadata_->CreateRowSet(&cur_drs_metadata_, schema_));
cur_writer_.reset(new DiskRowSetWriter(cur_drs_metadata_.get(), &schema_, bloom_sizing_));
RETURN_NOT_OK(cur_writer_->Open());
FsManager* fs = tablet_metadata_->fs_manager();
gscoped_ptr<WritableBlock> undo_data_block;
gscoped_ptr<WritableBlock> redo_data_block;
RETURN_NOT_OK(fs->CreateNewBlock(&undo_data_block));
RETURN_NOT_OK(fs->CreateNewBlock(&redo_data_block));
cur_undo_ds_block_id_ = undo_data_block->id();
cur_redo_ds_block_id_ = redo_data_block->id();
cur_undo_writer_.reset(new DeltaFileWriter(std::move(undo_data_block)));
cur_redo_writer_.reset(new DeltaFileWriter(std::move(redo_data_block)));
cur_undo_delta_stats.reset(new DeltaStats());
cur_redo_delta_stats.reset(new DeltaStats());
row_idx_in_cur_drs_ = 0;
can_roll_ = false;
RETURN_NOT_OK(cur_undo_writer_->Start());
return cur_redo_writer_->Start();
}
Status RollingDiskRowSetWriter::RollIfNecessary() {
DCHECK_EQ(state_, kStarted);
if (can_roll_ && cur_writer_->written_size() > target_rowset_size_) {
RETURN_NOT_OK(RollWriter());
}
return Status::OK();
}
Status RollingDiskRowSetWriter::AppendBlock(const RowBlock &block) {
DCHECK_EQ(state_, kStarted);
RETURN_NOT_OK(cur_writer_->AppendBlock(block));
written_count_ += block.nrows();
row_idx_in_cur_drs_ += block.nrows();
can_roll_ = true;
return Status::OK();
}
Status RollingDiskRowSetWriter::AppendUndoDeltas(rowid_t row_idx_in_block,
Mutation* undo_delta_head,
rowid_t* row_idx) {
return AppendDeltas<UNDO>(row_idx_in_block, undo_delta_head,
row_idx,
cur_undo_writer_.get(),
cur_undo_delta_stats.get());
}
Status RollingDiskRowSetWriter::AppendRedoDeltas(rowid_t row_idx_in_block,
Mutation* redo_delta_head,
rowid_t* row_idx) {
return AppendDeltas<REDO>(row_idx_in_block, redo_delta_head,
row_idx,
cur_redo_writer_.get(),
cur_redo_delta_stats.get());
}
template<DeltaType Type>
Status RollingDiskRowSetWriter::AppendDeltas(rowid_t row_idx_in_block,
Mutation* delta_head,
rowid_t* row_idx,
DeltaFileWriter* writer,
DeltaStats* delta_stats) {
can_roll_ = false;
*row_idx = row_idx_in_cur_drs_ + row_idx_in_block;
for (const Mutation *mut = delta_head; mut != nullptr; mut = mut->next()) {
DeltaKey undo_key(*row_idx, mut->timestamp());
RETURN_NOT_OK(writer->AppendDelta<Type>(undo_key, mut->changelist()));
delta_stats->UpdateStats(mut->timestamp(), mut->changelist());
}
return Status::OK();
}
Status RollingDiskRowSetWriter::FinishCurrentWriter() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::FinishCurrentWriter");
if (!cur_writer_) {
return Status::OK();
}
CHECK_EQ(state_, kStarted);
Status writer_status = cur_writer_->FinishAndReleaseBlocks(&block_closer_);
// If no rows were written (e.g. due to an empty flush or a compaction with all rows
// deleted), FinishAndReleaseBlocks(...) returns Aborted. In that case, we don't
// generate a RowSetMetadata.
if (writer_status.IsAborted()) {
CHECK_EQ(cur_writer_->written_count(), 0);
} else {
RETURN_NOT_OK(writer_status);
CHECK_GT(cur_writer_->written_count(), 0);
cur_undo_writer_->WriteDeltaStats(*cur_undo_delta_stats);
cur_redo_writer_->WriteDeltaStats(*cur_redo_delta_stats);
// Commit the UNDO block. Status::Aborted() indicates that there
// were no UNDOs written.
Status s = cur_undo_writer_->FinishAndReleaseBlock(&block_closer_);
if (!s.IsAborted()) {
RETURN_NOT_OK(s);
cur_drs_metadata_->CommitUndoDeltaDataBlock(cur_undo_ds_block_id_);
} else {
DCHECK_EQ(cur_undo_delta_stats->min_timestamp(), Timestamp::kMax);
}
// Same for the REDO block.
s = cur_redo_writer_->FinishAndReleaseBlock(&block_closer_);
if (!s.IsAborted()) {
RETURN_NOT_OK(s);
cur_drs_metadata_->CommitRedoDeltaDataBlock(0, cur_redo_ds_block_id_);
} else {
DCHECK_EQ(cur_redo_delta_stats->min_timestamp(), Timestamp::kMax);
}
written_size_ += cur_writer_->written_size();
written_drs_metas_.push_back(cur_drs_metadata_);
}
cur_writer_.reset(nullptr);
cur_undo_writer_.reset(nullptr);
cur_redo_writer_.reset(nullptr);
cur_drs_metadata_.reset();
return Status::OK();
}
Status RollingDiskRowSetWriter::Finish() {
TRACE_EVENT0("tablet", "RollingDiskRowSetWriter::Finish");
DCHECK_EQ(state_, kStarted);
RETURN_NOT_OK(FinishCurrentWriter());
RETURN_NOT_OK(block_closer_.CloseBlocks());
state_ = kFinished;
return Status::OK();
}
void RollingDiskRowSetWriter::GetWrittenRowSetMetadata(RowSetMetadataVector* metas) const {
CHECK_EQ(state_, kFinished);
metas->assign(written_drs_metas_.begin(), written_drs_metas_.end());
}
RollingDiskRowSetWriter::~RollingDiskRowSetWriter() {
}
////////////////////////////////////////////////////////////
// Reader
////////////////////////////////////////////////////////////
Status DiskRowSet::Open(const shared_ptr<RowSetMetadata>& rowset_metadata,
log::LogAnchorRegistry* log_anchor_registry,
shared_ptr<DiskRowSet> *rowset,
const shared_ptr<MemTracker>& parent_tracker) {
shared_ptr<DiskRowSet> rs(new DiskRowSet(rowset_metadata, log_anchor_registry, parent_tracker));
RETURN_NOT_OK(rs->Open());
rowset->swap(rs);
return Status::OK();
}
DiskRowSet::DiskRowSet(shared_ptr<RowSetMetadata> rowset_metadata,
LogAnchorRegistry* log_anchor_registry,
shared_ptr<MemTracker> parent_tracker)
: rowset_metadata_(std::move(rowset_metadata)),
open_(false),
log_anchor_registry_(log_anchor_registry),
parent_tracker_(std::move(parent_tracker)) {}
Status DiskRowSet::Open() {
TRACE_EVENT0("tablet", "DiskRowSet::Open");
gscoped_ptr<CFileSet> new_base(new CFileSet(rowset_metadata_));
RETURN_NOT_OK(new_base->Open());
base_data_.reset(new_base.release());
rowid_t num_rows;
RETURN_NOT_OK(base_data_->CountRows(&num_rows));
delta_tracker_.reset(new DeltaTracker(rowset_metadata_, num_rows,
log_anchor_registry_,
parent_tracker_));
RETURN_NOT_OK(delta_tracker_->Open());
open_ = true;
return Status::OK();
}
Status DiskRowSet::FlushDeltas() {
TRACE_EVENT0("tablet", "DiskRowSet::FlushDeltas");
return delta_tracker_->Flush(DeltaTracker::FLUSH_METADATA);
}
Status DiskRowSet::MinorCompactDeltaStores() {
TRACE_EVENT0("tablet", "DiskRowSet::MinorCompactDeltaStores");
return delta_tracker_->Compact();
}
Status DiskRowSet::MajorCompactDeltaStores() {
vector<ColumnId> col_ids;
delta_tracker_->GetColumnIdsWithUpdates(&col_ids);
if (col_ids.empty()) {
return Status::OK();
}
return MajorCompactDeltaStoresWithColumnIds(col_ids);
}
Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>& col_ids) {
TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStores");
std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
// TODO: do we need to lock schema or anything here?
gscoped_ptr<MajorDeltaCompaction> compaction;
RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, &compaction));
RETURN_NOT_OK(compaction->Compact());
// Update and flush the metadata. This needs to happen before we make the new files visible to
// prevent inconsistencies after a server crash.
RowSetMetadataUpdate update;
RETURN_NOT_OK(compaction->CreateMetadataUpdate(&update));
RETURN_NOT_OK(rowset_metadata_->CommitUpdate(update));
RETURN_NOT_OK(rowset_metadata_->Flush());
// Make the new base data and delta files visible.
gscoped_ptr<CFileSet> new_base(new CFileSet(rowset_metadata_));
RETURN_NOT_OK(new_base->Open());
{
std::lock_guard<percpu_rwlock> lock(component_lock_);
RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
base_data_.reset(new_base.release());
}
return Status::OK();
}
Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
gscoped_ptr<MajorDeltaCompaction>* out) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
const Schema* schema = &rowset_metadata_->tablet_schema();
vector<shared_ptr<DeltaStore> > included_stores;
unique_ptr<DeltaIterator> delta_iter;
RETURN_NOT_OK(delta_tracker_->NewDeltaFileIterator(
schema,
MvccSnapshot::CreateSnapshotIncludingAllTransactions(),
REDO,
&included_stores,
&delta_iter));
out->reset(new MajorDeltaCompaction(rowset_metadata_->fs_manager(),
*schema,
base_data_.get(),
std::move(delta_iter),
included_stores,
col_ids));
return Status::OK();
}
Status DiskRowSet::NewRowIterator(const Schema *projection,
const MvccSnapshot &mvcc_snap,
gscoped_ptr<RowwiseIterator>* out) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
shared_ptr<CFileSet::Iterator> base_iter(base_data_->NewIterator(projection));
gscoped_ptr<ColumnwiseIterator> col_iter;
RETURN_NOT_OK(delta_tracker_->WrapIterator(base_iter, mvcc_snap, &col_iter));
out->reset(new MaterializingIterator(
shared_ptr<ColumnwiseIterator>(col_iter.release())));
return Status::OK();
}
Status DiskRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
gscoped_ptr<CompactionInput>* out) const {
return CompactionInput::Create(*this, projection, snap, out);
}
Status DiskRowSet::MutateRow(Timestamp timestamp,
const RowSetKeyProbe &probe,
const RowChangeList &update,
const consensus::OpId& op_id,
ProbeStats* stats,
OperationResultPB* result) {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
rowid_t row_idx;
RETURN_NOT_OK(base_data_->FindRow(probe, &row_idx, stats));
// It's possible that the row key exists in this DiskRowSet, but it has
// in fact been Deleted already. Check with the delta tracker to be sure.
bool deleted;
RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(row_idx, &deleted, stats));
if (deleted) {
return Status::NotFound("row not found");
}
RETURN_NOT_OK(delta_tracker_->Update(timestamp, row_idx, update, op_id, result));
return Status::OK();
}
Status DiskRowSet::CheckRowPresent(const RowSetKeyProbe &probe,
bool* present,
ProbeStats* stats) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
rowid_t row_idx;
RETURN_NOT_OK(base_data_->CheckRowPresent(probe, present, &row_idx, stats));
if (!*present) {
// If it wasn't in the base data, then it's definitely not in the rowset.
return Status::OK();
}
// Otherwise it might be in the base data but deleted.
bool deleted = false;
RETURN_NOT_OK(delta_tracker_->CheckRowDeleted(row_idx, &deleted, stats));
*present = !deleted;
return Status::OK();
}
Status DiskRowSet::CountRows(rowid_t *count) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
return base_data_->CountRows(count);
}
Status DiskRowSet::GetBounds(std::string* min_encoded_key,
std::string* max_encoded_key) const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
return base_data_->GetBounds(min_encoded_key, max_encoded_key);
}
uint64_t DiskRowSet::EstimateBaseDataDiskSize() const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
return base_data_->EstimateOnDiskSize();
}
uint64_t DiskRowSet::EstimateDeltaDiskSize() const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
return delta_tracker_->EstimateOnDiskSize();
}
uint64_t DiskRowSet::EstimateOnDiskSize() const {
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_.get_lock());
return base_data_->EstimateOnDiskSize() + delta_tracker_->EstimateOnDiskSize();
}
size_t DiskRowSet::DeltaMemStoreSize() const {
DCHECK(open_);
return delta_tracker_->DeltaMemStoreSize();
}
bool DiskRowSet::DeltaMemStoreEmpty() const {
DCHECK(open_);
return delta_tracker_->DeltaMemStoreEmpty();
}
int64_t DiskRowSet::MinUnflushedLogIndex() const {
DCHECK(open_);
return delta_tracker_->MinUnflushedLogIndex();
}
size_t DiskRowSet::CountDeltaStores() const {
DCHECK(open_);
return delta_tracker_->CountRedoDeltaStores();
}
// In this implementation, the returned improvement score is 0 if there aren't any redo files to
// compact or if the base data is empty. After this, with a max score of 1:
// - Major compactions: the score will be the result of sizeof(deltas)/sizeof(base data), unless
// it is smaller than tablet_delta_store_major_compact_min_ratio or if the
// delta files are only composed of deletes, in which case the score is
// brought down to zero.
// - Minor compactions: the score will be zero if there's only 1 redo file, else it will be the
// result of redo_files_count/tablet_delta_store_minor_compact_max. The
// latter is meant to be high since minor compactions don't give us much, so
// we only consider it a gain if it gets rid of many tiny files.
double DiskRowSet::DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const {
DCHECK(open_);
double perf_improv = 0;
size_t store_count = CountDeltaStores();
uint64_t base_data_size = EstimateBaseDataDiskSize();
if (store_count == 0) {
return perf_improv;
}
if (type == RowSet::MAJOR_DELTA_COMPACTION) {
vector<ColumnId> col_ids_with_updates;
delta_tracker_->GetColumnIdsWithUpdates(&col_ids_with_updates);
// If we have files but no updates, we don't want to major compact.
if (!col_ids_with_updates.empty()) {
double ratio = static_cast<double>(EstimateDeltaDiskSize()) / base_data_size;
if (ratio >= FLAGS_tablet_delta_store_major_compact_min_ratio) {
perf_improv = ratio;
}
}
} else if (type == RowSet::MINOR_DELTA_COMPACTION) {
if (store_count > 1) {
perf_improv = static_cast<double>(store_count) / FLAGS_tablet_delta_store_minor_compact_max;
}
} else {
LOG(FATAL) << "Unknown delta compaction type " << type;
}
return std::min(1.0, perf_improv);
}
Status DiskRowSet::DebugDump(vector<string> *lines) {
// Using CompactionInput to dump our data is an easy way of seeing all the
// rows and deltas.
gscoped_ptr<CompactionInput> input;
RETURN_NOT_OK(NewCompactionInput(&rowset_metadata_->tablet_schema(),
MvccSnapshot::CreateSnapshotIncludingAllTransactions(),
&input));
return DebugDumpCompactionInput(input.get(), lines);
}
} // namespace tablet
} // namespace kudu