blob: dcf33aa29efd6856cd8b514355d6e2642beff856 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <boost/bind.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <iterator>
#include <limits>
#include <memory>
#include <ostream>
#include <unordered_set>
#include <utility>
#include <vector>
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/iterator.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/compaction.h"
#include "kudu/tablet/compaction_policy.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/rowset_info.h"
#include "kudu/tablet/rowset_tree.h"
#include "kudu/tablet/svg_dump.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_mm_ops.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/util/bloom_filter.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/trace.h"
#include "kudu/util/url-coding.h"
DEFINE_bool(tablet_do_dup_key_checks, true,
"Whether to check primary keys for duplicate on insertion. "
"Use at your own risk!");
TAG_FLAG(tablet_do_dup_key_checks, unsafe);
DEFINE_int32(tablet_compaction_budget_mb, 128,
"Budget for a single compaction");
TAG_FLAG(tablet_compaction_budget_mb, experimental);
DEFINE_int32(tablet_bloom_block_size, 4096,
"Block size of the bloom filters used for tablet keys.");
TAG_FLAG(tablet_bloom_block_size, advanced);
DEFINE_double(tablet_bloom_target_fp_rate, 0.01f,
"Target false-positive rate (between 0 and 1) to size tablet key bloom filters. "
"A lower false positive rate may reduce the number of disk seeks required "
"in heavy insert workloads, at the expense of more space and RAM "
"required for bloom filters.");
TAG_FLAG(tablet_bloom_target_fp_rate, advanced);
DEFINE_double(fault_crash_before_flush_tablet_meta_after_compaction, 0.0,
"Fraction of the time, during compaction, to crash before flushing metadata");
TAG_FLAG(fault_crash_before_flush_tablet_meta_after_compaction, unsafe);
DEFINE_double(fault_crash_before_flush_tablet_meta_after_flush_mrs, 0.0,
"Fraction of the time, while flushing an MRS, to crash before flushing metadata");
TAG_FLAG(fault_crash_before_flush_tablet_meta_after_flush_mrs, unsafe);
DEFINE_int64(tablet_throttler_rpc_per_sec, 0,
"Maximum write RPC rate (op/s) allowed for a tablet, write RPC exceeding this "
"limit will be throttled. 0 means no limit.");
TAG_FLAG(tablet_throttler_rpc_per_sec, experimental);
DEFINE_int64(tablet_throttler_bytes_per_sec, 0,
"Maximum write RPC IO rate (byte/s) allowed for a tablet, write RPC exceeding "
"this limit will be throttled. 0 means no limit.");
TAG_FLAG(tablet_throttler_bytes_per_sec, experimental);
DEFINE_double(tablet_throttler_burst_factor, 1.0f,
"Burst factor for write RPC throttling. The maximum rate the throttler "
"allows within a token refill period (100ms) equals burst factor multiply "
"base rate.");
TAG_FLAG(tablet_throttler_burst_factor, experimental);
METRIC_DEFINE_entity(tablet);
METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage",
kudu::MetricUnit::kBytes,
"Size of this tablet's memrowset");
METRIC_DEFINE_gauge_size(tablet, on_disk_size, "Tablet Size On Disk",
kudu::MetricUnit::kBytes,
"Size of this tablet on disk.");
using std::shared_ptr;
using std::string;
using std::unordered_set;
using std::vector;
namespace kudu {
namespace tablet {
using kudu::MaintenanceManager;
using consensus::OpId;
using consensus::MaximumOpId;
using log::LogAnchorRegistry;
using strings::Substitute;
using base::subtle::Barrier_AtomicIncrement;
static CompactionPolicy *CreateCompactionPolicy() {
return new BudgetedCompactionPolicy(FLAGS_tablet_compaction_budget_mb);
}
////////////////////////////////////////////////////////////
// TabletComponents
////////////////////////////////////////////////////////////
TabletComponents::TabletComponents(shared_ptr<MemRowSet> mrs,
shared_ptr<RowSetTree> rs_tree)
: memrowset(std::move(mrs)), rowsets(std::move(rs_tree)) {}
////////////////////////////////////////////////////////////
// Tablet
////////////////////////////////////////////////////////////
const char* Tablet::kDMSMemTrackerId = "DeltaMemStores";
Tablet::Tablet(const scoped_refptr<TabletMetadata>& metadata,
const scoped_refptr<server::Clock>& clock,
const shared_ptr<MemTracker>& parent_mem_tracker,
MetricRegistry* metric_registry,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
: key_schema_(metadata->schema().CreateKeyProjection()),
metadata_(metadata),
log_anchor_registry_(log_anchor_registry),
mem_tracker_(MemTracker::CreateTracker(
-1, Substitute("tablet-$0", tablet_id()),
parent_mem_tracker)),
dms_mem_tracker_(MemTracker::CreateTracker(
-1, kDMSMemTrackerId, mem_tracker_)),
next_mrs_id_(0),
clock_(clock),
mvcc_(clock),
rowsets_flush_sem_(1),
state_(kInitialized) {
CHECK(schema()->has_column_ids());
compaction_policy_.reset(CreateCompactionPolicy());
if (metric_registry) {
MetricEntity::AttributeMap attrs;
// TODO(KUDU-745): table_id is apparently not set in the metadata.
attrs["table_id"] = metadata_->table_id();
attrs["table_name"] = metadata_->table_name();
attrs["partition"] = metadata_->partition_schema().PartitionDebugString(metadata_->partition(),
*schema());
metric_entity_ = METRIC_ENTITY_tablet.Instantiate(metric_registry, tablet_id(), attrs);
metrics_.reset(new TabletMetrics(metric_entity_));
METRIC_memrowset_size.InstantiateFunctionGauge(
metric_entity_, Bind(&Tablet::MemRowSetSize, Unretained(this)))
->AutoDetach(&metric_detacher_);
METRIC_on_disk_size.InstantiateFunctionGauge(
metric_entity_, Bind(&Tablet::EstimateOnDiskSize, Unretained(this)))
->AutoDetach(&metric_detacher_);
}
if (FLAGS_tablet_throttler_rpc_per_sec > 0 || FLAGS_tablet_throttler_bytes_per_sec > 0) {
throttler_.reset(new Throttler(MonoTime::Now(MonoTime::FINE),
FLAGS_tablet_throttler_rpc_per_sec,
FLAGS_tablet_throttler_bytes_per_sec,
FLAGS_tablet_throttler_burst_factor));
}
}
Tablet::~Tablet() {
Shutdown();
dms_mem_tracker_->UnregisterFromParent();
mem_tracker_->UnregisterFromParent();
}
Status Tablet::Open() {
TRACE_EVENT0("tablet", "Tablet::Open");
boost::lock_guard<rw_spinlock> lock(component_lock_);
CHECK_EQ(state_, kInitialized) << "already open";
CHECK(schema()->has_column_ids());
next_mrs_id_ = metadata_->last_durable_mrs_id() + 1;
RowSetVector rowsets_opened;
// open the tablet row-sets
for (const shared_ptr<RowSetMetadata>& rowset_meta : metadata_->rowsets()) {
shared_ptr<DiskRowSet> rowset;
Status s = DiskRowSet::Open(rowset_meta, log_anchor_registry_.get(), &rowset, mem_tracker_);
if (!s.ok()) {
LOG_WITH_PREFIX(ERROR) << "Failed to open rowset " << rowset_meta->ToString() << ": "
<< s.ToString();
return s;
}
rowsets_opened.push_back(rowset);
}
shared_ptr<RowSetTree> new_rowset_tree(new RowSetTree());
CHECK_OK(new_rowset_tree->Reset(rowsets_opened));
// now that the current state is loaded, create the new MemRowSet with the next id
shared_ptr<MemRowSet> new_mrs(new MemRowSet(next_mrs_id_++, *schema(),
log_anchor_registry_.get(),
mem_tracker_));
components_ = new TabletComponents(new_mrs, new_rowset_tree);
state_ = kBootstrapping;
return Status::OK();
}
void Tablet::MarkFinishedBootstrapping() {
CHECK_EQ(state_, kBootstrapping);
state_ = kOpen;
}
void Tablet::Shutdown() {
UnregisterMaintenanceOps();
boost::lock_guard<rw_spinlock> lock(component_lock_);
components_ = nullptr;
state_ = kShutdown;
// In the case of deleting a tablet, we still keep the metadata around after
// ShutDown(), and need to flush the metadata to indicate that the tablet is deleted.
// During that flush, we don't want metadata to call back into the Tablet, so we
// have to unregister the pre-flush callback.
metadata_->SetPreFlushCallback(Bind(DoNothingStatusClosure));
}
Status Tablet::GetMappedReadProjection(const Schema& projection,
Schema *mapped_projection) const {
const Schema* cur_schema = schema();
return cur_schema->GetMappedReadProjection(projection, mapped_projection);
}
BloomFilterSizing Tablet::bloom_sizing() const {
return BloomFilterSizing::BySizeAndFPRate(FLAGS_tablet_bloom_block_size,
FLAGS_tablet_bloom_target_fp_rate);
}
Status Tablet::NewRowIterator(const Schema &projection,
gscoped_ptr<RowwiseIterator> *iter) const {
// Yield current rows.
MvccSnapshot snap(mvcc_);
return NewRowIterator(projection, snap, Tablet::UNORDERED, iter);
}
Status Tablet::NewRowIterator(const Schema &projection,
const MvccSnapshot &snap,
const OrderMode order,
gscoped_ptr<RowwiseIterator> *iter) const {
CHECK_EQ(state_, kOpen);
if (metrics_) {
metrics_->scans_started->Increment();
}
VLOG_WITH_PREFIX(2) << "Created new Iterator under snap: " << snap.ToString();
iter->reset(new Iterator(this, projection, snap, order));
return Status::OK();
}
Status Tablet::DecodeWriteOperations(const Schema* client_schema,
WriteTransactionState* tx_state) {
TRACE_EVENT0("tablet", "Tablet::DecodeWriteOperations");
DCHECK_EQ(tx_state->row_ops().size(), 0);
// Acquire the schema lock in shared mode, so that the schema doesn't
// change while this transaction is in-flight.
tx_state->AcquireSchemaLock(&schema_lock_);
// The Schema needs to be held constant while any transactions are between
// PREPARE and APPLY stages
TRACE("PREPARE: Decoding operations");
vector<DecodedRowOperation> ops;
// Decode the ops
RowOperationsPBDecoder dec(&tx_state->request()->row_operations(),
client_schema,
schema(),
tx_state->arena());
RETURN_NOT_OK(dec.DecodeOperations(&ops));
// Create RowOp objects for each
vector<RowOp*> row_ops;
row_ops.reserve(ops.size());
for (const DecodedRowOperation& op : ops) {
row_ops.push_back(new RowOp(op));
}
// Important to set the schema before the ops -- we need the
// schema in order to stringify the ops.
tx_state->set_schema_at_decode_time(schema());
tx_state->swap_row_ops(&row_ops);
return Status::OK();
}
Status Tablet::AcquireRowLocks(WriteTransactionState* tx_state) {
TRACE_EVENT1("tablet", "Tablet::AcquireRowLocks",
"num_locks", tx_state->row_ops().size());
TRACE("PREPARE: Acquiring locks for $0 operations", tx_state->row_ops().size());
for (RowOp* op : tx_state->row_ops()) {
RETURN_NOT_OK(AcquireLockForOp(tx_state, op));
}
TRACE("PREPARE: locks acquired");
return Status::OK();
}
Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
bool contains_row;
RETURN_NOT_OK(metadata_->partition_schema().PartitionContainsRow(metadata_->partition(),
row,
&contains_row));
if (PREDICT_FALSE(!contains_row)) {
return Status::NotFound(
Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.",
metadata_->partition_schema().PartitionDebugString(metadata_->partition(),
*schema()),
metadata_->partition_schema().RowDebugString(row)));
}
return Status::OK();
}
Status Tablet::AcquireLockForOp(WriteTransactionState* tx_state, RowOp* op) {
ConstContiguousRow row_key(&key_schema_, op->decoded_op.row_data);
op->key_probe.reset(new tablet::RowSetKeyProbe(row_key));
RETURN_NOT_OK(CheckRowInTablet(row_key));
op->row_lock = ScopedRowLock(&lock_manager_,
tx_state,
op->key_probe->encoded_key_slice(),
LockManager::LOCK_EXCLUSIVE);
return Status::OK();
}
void Tablet::StartTransaction(WriteTransactionState* tx_state) {
gscoped_ptr<ScopedTransaction> mvcc_tx;
// If the state already has a timestamp then we're replaying a transaction that occurred
// before a crash or at another node...
if (tx_state->has_timestamp()) {
mvcc_tx.reset(new ScopedTransaction(&mvcc_, tx_state->timestamp()));
// ... otherwise this is a new transaction and we must assign a new timestamp. We either
// assign a timestamp in the future, if the consistency mode is COMMIT_WAIT, or we assign
// one in the present if the consistency mode is any other one.
} else if (tx_state->external_consistency_mode() == COMMIT_WAIT) {
mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW_LATEST));
} else {
mvcc_tx.reset(new ScopedTransaction(&mvcc_, ScopedTransaction::NOW));
}
tx_state->SetMvccTxAndTimestamp(std::move(mvcc_tx));
}
Status Tablet::InsertUnlocked(WriteTransactionState *tx_state,
RowOp* insert,
ProbeStats* stats) {
const TabletComponents* comps = DCHECK_NOTNULL(tx_state->tablet_components());
CHECK(state_ == kOpen || state_ == kBootstrapping);
// make sure that the WriteTransactionState has the component lock and that
// there the RowOp has the row lock.
DCHECK(insert->has_row_lock()) << "RowOp must hold the row lock.";
DCHECK_EQ(tx_state->schema_at_decode_time(), schema()) << "Raced against schema change";
DCHECK(tx_state->op_id().IsInitialized()) << "TransactionState OpId needed for anchoring";
// First, ensure that it is a unique key by checking all the open RowSets.
if (FLAGS_tablet_do_dup_key_checks) {
vector<RowSet *> to_check;
comps->rowsets->FindRowSetsWithKeyInRange(insert->key_probe->encoded_key_slice(),
&to_check);
for (const RowSet *rowset : to_check) {
bool present = false;
RETURN_NOT_OK(rowset->CheckRowPresent(*insert->key_probe, &present, stats));
if (PREDICT_FALSE(present)) {
Status s = Status::AlreadyPresent("key already present");
if (metrics_) {
metrics_->insertions_failed_dup_key->Increment();
}
insert->SetFailed(s);
return s;
}
}
}
Timestamp ts = tx_state->timestamp();
ConstContiguousRow row(schema(), insert->decoded_op.row_data);
// TODO: the Insert() call below will re-encode the key, which is a
// waste. Should pass through the KeyProbe structure perhaps.
// Now try to insert into memrowset. The memrowset itself will return
// AlreadyPresent if it has already been inserted there.
Status s = comps->memrowset->Insert(ts, row, tx_state->op_id());
if (PREDICT_TRUE(s.ok())) {
insert->SetInsertSucceeded(comps->memrowset->mrs_id());
} else {
if (s.IsAlreadyPresent() && metrics_) {
metrics_->insertions_failed_dup_key->Increment();
}
insert->SetFailed(s);
}
return s;
}
vector<RowSet*> Tablet::FindRowSetsToCheck(RowOp* mutate,
const TabletComponents* comps) {
vector<RowSet*> to_check;
if (PREDICT_TRUE(!mutate->orig_result_from_log_)) {
// TODO: could iterate the rowsets in a smart order
// based on recent statistics - eg if a rowset is getting
// updated frequently, pick that one first.
comps->rowsets->FindRowSetsWithKeyInRange(mutate->key_probe->encoded_key_slice(),
&to_check);
#ifndef NDEBUG
// The order in which the rowset tree returns its results doesn't have semantic
// relevance. We've had bugs in the past (eg KUDU-1341) which were obscured by
// relying on the order of rowsets here. So, in debug builds, we shuffle the
// order to encourage finding such bugs more easily.
std::random_shuffle(to_check.begin(), to_check.end());
#endif
return to_check;
}
// If we are replaying an operation during bootstrap, then we already have a
// COMMIT message which tells us specifically which memory store to apply it to.
for (const auto& store : mutate->orig_result_from_log_->mutated_stores()) {
if (store.has_mrs_id()) {
to_check.push_back(comps->memrowset.get());
} else {
DCHECK(store.has_rs_id());
RowSet* drs = comps->rowsets->drs_by_id(store.rs_id());
if (PREDICT_TRUE(drs)) {
to_check.push_back(drs);
}
// If for some reason we didn't find any stores that the COMMIT message indicated,
// then 'to_check' will be empty at this point. That will result in a NotFound()
// status below, which the bootstrap code catches and propagates as a tablet
// corruption.
}
}
return to_check;
}
Status Tablet::MutateRowUnlocked(WriteTransactionState *tx_state,
RowOp* mutate,
ProbeStats* stats) {
DCHECK(tx_state != nullptr) << "you must have a WriteTransactionState";
DCHECK(tx_state->op_id().IsInitialized()) << "TransactionState OpId needed for anchoring";
DCHECK_EQ(tx_state->schema_at_decode_time(), schema());
gscoped_ptr<OperationResultPB> result(new OperationResultPB());
const TabletComponents* comps = DCHECK_NOTNULL(tx_state->tablet_components());
// Validate the update.
RowChangeListDecoder rcl_decoder(mutate->decoded_op.changelist);
Status s = rcl_decoder.Init();
if (rcl_decoder.is_reinsert()) {
// REINSERT mutations are the byproduct of an INSERT on top of a ghost
// row, not something the user is allowed to specify on their own.
s = Status::InvalidArgument("User may not specify REINSERT mutations");
}
if (!s.ok()) {
mutate->SetFailed(s);
return s;
}
Timestamp ts = tx_state->timestamp();
// First try to update in memrowset.
s = comps->memrowset->MutateRow(ts,
*mutate->key_probe,
mutate->decoded_op.changelist,
tx_state->op_id(),
stats,
result.get());
if (s.ok()) {
mutate->SetMutateSucceeded(std::move(result));
return s;
}
if (!s.IsNotFound()) {
mutate->SetFailed(s);
return s;
}
// Next, check the disk rowsets.
vector<RowSet *> to_check = FindRowSetsToCheck(mutate, comps);
for (RowSet *rs : to_check) {
s = rs->MutateRow(ts,
*mutate->key_probe,
mutate->decoded_op.changelist,
tx_state->op_id(),
stats,
result.get());
if (s.ok()) {
mutate->SetMutateSucceeded(std::move(result));
return s;
}
if (!s.IsNotFound()) {
mutate->SetFailed(s);
return s;
}
}
s = Status::NotFound("key not found");
mutate->SetFailed(s);
return s;
}
void Tablet::StartApplying(WriteTransactionState* tx_state) {
boost::shared_lock<rw_spinlock> lock(component_lock_);
tx_state->StartApplying();
tx_state->set_tablet_components(components_);
}
void Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
// Allocate the ProbeStats objects from the transaction's arena, so
// they're all contiguous and we don't need to do any central allocation.
int num_ops = tx_state->row_ops().size();
ProbeStats* stats_array = static_cast<ProbeStats*>(
tx_state->arena()->AllocateBytesAligned(sizeof(ProbeStats) * num_ops,
alignof(ProbeStats)));
StartApplying(tx_state);
int i = 0;
for (RowOp* row_op : tx_state->row_ops()) {
ProbeStats* stats = &stats_array[i++];
// Manually run the constructor to clear the stats to 0 before collecting
// them.
new (stats) ProbeStats();
ApplyRowOperation(tx_state, row_op, stats);
}
if (metrics_) {
metrics_->AddProbeStats(stats_array, num_ops, tx_state->arena());
}
}
void Tablet::ApplyRowOperation(WriteTransactionState* tx_state,
RowOp* row_op,
ProbeStats* stats) {
switch (row_op->decoded_op.type) {
case RowOperationsPB::INSERT:
ignore_result(InsertUnlocked(tx_state, row_op, stats));
return;
case RowOperationsPB::UPDATE:
case RowOperationsPB::DELETE:
ignore_result(MutateRowUnlocked(tx_state, row_op, stats));
return;
default:
LOG_WITH_PREFIX(FATAL) << RowOperationsPB::Type_Name(row_op->decoded_op.type);
}
}
void Tablet::ModifyRowSetTree(const RowSetTree& old_tree,
const RowSetVector& rowsets_to_remove,
const RowSetVector& rowsets_to_add,
RowSetTree* new_tree) {
RowSetVector post_swap;
// O(n^2) diff algorithm to collect the set of rowsets excluding
// the rowsets that were included in the compaction
int num_removed = 0;
for (const shared_ptr<RowSet> &rs : old_tree.all_rowsets()) {
// Determine if it should be removed
bool should_remove = false;
for (const shared_ptr<RowSet> &to_remove : rowsets_to_remove) {
if (to_remove == rs) {
should_remove = true;
num_removed++;
break;
}
}
if (!should_remove) {
post_swap.push_back(rs);
}
}
CHECK_EQ(num_removed, rowsets_to_remove.size());
// Then push the new rowsets on the end of the new list
std::copy(rowsets_to_add.begin(),
rowsets_to_add.end(),
std::back_inserter(post_swap));
CHECK_OK(new_tree->Reset(post_swap));
}
void Tablet::AtomicSwapRowSets(const RowSetVector &old_rowsets,
const RowSetVector &new_rowsets) {
boost::lock_guard<rw_spinlock> lock(component_lock_);
AtomicSwapRowSetsUnlocked(old_rowsets, new_rowsets);
}
void Tablet::AtomicSwapRowSetsUnlocked(const RowSetVector &to_remove,
const RowSetVector &to_add) {
DCHECK(component_lock_.is_locked());
shared_ptr<RowSetTree> new_tree(new RowSetTree());
ModifyRowSetTree(*components_->rowsets,
to_remove, to_add, new_tree.get());
components_ = new TabletComponents(components_->memrowset, new_tree);
}
Status Tablet::DoMajorDeltaCompaction(const vector<ColumnId>& col_ids,
shared_ptr<RowSet> input_rs) {
CHECK_EQ(state_, kOpen);
Status s = down_cast<DiskRowSet*>(input_rs.get())
->MajorCompactDeltaStoresWithColumnIds(col_ids);
return s;
}
Status Tablet::Flush() {
TRACE_EVENT1("tablet", "Tablet::Flush", "id", tablet_id());
boost::lock_guard<Semaphore> lock(rowsets_flush_sem_);
return FlushUnlocked();
}
Status Tablet::FlushUnlocked() {
TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
RowSetsInCompaction input;
shared_ptr<MemRowSet> old_mrs;
{
// Create a new MRS with the latest schema.
boost::lock_guard<rw_spinlock> lock(component_lock_);
RETURN_NOT_OK(ReplaceMemRowSetUnlocked(&input, &old_mrs));
}
// Wait for any in-flight transactions to finish against the old MRS
// before we flush it.
mvcc_.WaitForApplyingTransactionsToCommit();
// Note: "input" should only contain old_mrs.
return FlushInternal(input, old_mrs);
}
Status Tablet::ReplaceMemRowSetUnlocked(RowSetsInCompaction *compaction,
shared_ptr<MemRowSet> *old_ms) {
*old_ms = components_->memrowset;
// Mark the memrowset rowset as locked, so compactions won't consider it
// for inclusion in any concurrent compactions.
shared_ptr<boost::mutex::scoped_try_lock> ms_lock(
new boost::mutex::scoped_try_lock(*((*old_ms)->compact_flush_lock())));
CHECK(ms_lock->owns_lock());
// Add to compaction.
compaction->AddRowSet(*old_ms, ms_lock);
shared_ptr<MemRowSet> new_mrs(new MemRowSet(next_mrs_id_++, *schema(), log_anchor_registry_.get(),
mem_tracker_));
shared_ptr<RowSetTree> new_rst(new RowSetTree());
ModifyRowSetTree(*components_->rowsets,
RowSetVector(), // remove nothing
{ *old_ms }, // add the old MRS
new_rst.get());
// Swap it in
components_ = new TabletComponents(new_mrs, new_rst);
return Status::OK();
}
Status Tablet::FlushInternal(const RowSetsInCompaction& input,
const shared_ptr<MemRowSet>& old_ms) {
CHECK(state_ == kOpen || state_ == kBootstrapping);
// Step 1. Freeze the old memrowset by blocking readers and swapping
// it in as a new rowset, replacing it with an empty one.
//
// At this point, we have already swapped in a new empty rowset, and
// any new inserts are going into that one. 'old_ms' is effectively
// frozen -- no new inserts should arrive after this point.
//
// NOTE: updates and deletes may still arrive into 'old_ms' at this point.
//
// TODO(perf): there's a memrowset.Freeze() call which we might be able to
// use to improve iteration performance during the flush. The old design
// used this, but not certain whether it's still doable with the new design.
uint64_t start_insert_count = old_ms->debug_insert_count();
int64_t mrs_being_flushed = old_ms->mrs_id();
if (old_ms->empty()) {
// If we're flushing an empty RowSet, we can short circuit here rather than
// waiting until the check at the end of DoCompactionAndFlush(). This avoids
// the need to create cfiles and write their headers only to later delete
// them.
LOG(INFO) << "MemRowSet was empty: no flush needed.";
return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
}
if (flush_hooks_) {
RETURN_NOT_OK_PREPEND(flush_hooks_->PostSwapNewMemRowSet(),
"PostSwapNewMemRowSet hook failed");
}
LOG_WITH_PREFIX(INFO) << "Flush: entering stage 1 (old memrowset already frozen for inserts)";
input.DumpToLog();
LOG_WITH_PREFIX(INFO) << "Memstore in-memory size: " << old_ms->memory_footprint() << " bytes";
RETURN_NOT_OK(DoCompactionOrFlush(input, mrs_being_flushed));
// Sanity check that no insertions happened during our flush.
CHECK_EQ(start_insert_count, old_ms->debug_insert_count())
<< "Sanity check failed: insertions continued in memrowset "
<< "after flush was triggered! Aborting to prevent dataloss.";
return Status::OK();
}
Status Tablet::CreatePreparedAlterSchema(AlterSchemaTransactionState *tx_state,
const Schema* schema) {
if (!key_schema_.KeyEquals(*schema)) {
return Status::InvalidArgument("Schema keys cannot be altered",
schema->CreateKeyProjection().ToString());
}
if (!schema->has_column_ids()) {
// this probably means that the request is not from the Master
return Status::InvalidArgument("Missing Column IDs");
}
// Alter schema must run when no reads/writes are in progress.
// However, compactions and flushes can continue to run in parallel
// with the schema change,
tx_state->AcquireSchemaLock(&schema_lock_);
tx_state->set_schema(schema);
return Status::OK();
}
Status Tablet::AlterSchema(AlterSchemaTransactionState *tx_state) {
DCHECK(key_schema_.KeyEquals(*DCHECK_NOTNULL(tx_state->schema()))) <<
"Schema keys cannot be altered";
// Prevent any concurrent flushes. Otherwise, we run into issues where
// we have an MRS in the rowset tree, and we can't alter its schema
// in-place.
boost::lock_guard<Semaphore> lock(rowsets_flush_sem_);
// If the current version >= new version, there is nothing to do.
bool same_schema = schema()->Equals(*tx_state->schema());
if (metadata_->schema_version() >= tx_state->schema_version()) {
LOG_WITH_PREFIX(INFO) << "Already running schema version " << metadata_->schema_version()
<< " got alter request for version " << tx_state->schema_version();
return Status::OK();
}
LOG_WITH_PREFIX(INFO) << "Alter schema from " << schema()->ToString()
<< " version " << metadata_->schema_version()
<< " to " << tx_state->schema()->ToString()
<< " version " << tx_state->schema_version();
DCHECK(schema_lock_.is_locked());
metadata_->SetSchema(*tx_state->schema(), tx_state->schema_version());
if (tx_state->has_new_table_name()) {
metadata_->SetTableName(tx_state->new_table_name());
if (metric_entity_) {
metric_entity_->SetAttribute("table_name", tx_state->new_table_name());
}
}
// If the current schema and the new one are equal, there is nothing to do.
if (same_schema) {
return metadata_->Flush();
}
return FlushUnlocked();
}
Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
int64_t schema_version) {
CHECK_EQ(state_, kBootstrapping);
// We know that the MRS should be empty at this point, because we
// rewind the schema before replaying any operations. So, we just
// swap in a new one with the correct schema, rather than attempting
// to flush.
LOG_WITH_PREFIX(INFO) << "Rewinding schema during bootstrap to " << new_schema.ToString();
metadata_->SetSchema(new_schema, schema_version);
{
boost::lock_guard<rw_spinlock> lock(component_lock_);
shared_ptr<MemRowSet> old_mrs = components_->memrowset;
shared_ptr<RowSetTree> old_rowsets = components_->rowsets;
CHECK(old_mrs->empty());
int64_t old_mrs_id = old_mrs->mrs_id();
// We have to reset the components here before creating the new MemRowSet,
// or else the new MRS will end up trying to claim the same MemTracker ID
// as the old one.
components_.reset();
old_mrs.reset();
shared_ptr<MemRowSet> new_mrs(new MemRowSet(old_mrs_id, new_schema,
log_anchor_registry_.get(), mem_tracker_));
components_ = new TabletComponents(new_mrs, old_rowsets);
}
return Status::OK();
}
void Tablet::SetCompactionHooksForTests(
const shared_ptr<Tablet::CompactionFaultHooks> &hooks) {
compaction_hooks_ = hooks;
}
void Tablet::SetFlushHooksForTests(
const shared_ptr<Tablet::FlushFaultHooks> &hooks) {
flush_hooks_ = hooks;
}
void Tablet::SetFlushCompactCommonHooksForTests(
const shared_ptr<Tablet::FlushCompactCommonHooks> &hooks) {
common_hooks_ = hooks;
}
int32_t Tablet::CurrentMrsIdForTests() const {
boost::shared_lock<rw_spinlock> lock(component_lock_);
return components_->memrowset->mrs_id();
}
bool Tablet::ShouldThrottleAllow(int64_t bytes) {
if (!throttler_) {
return true;
}
return throttler_->Take(MonoTime::Now(MonoTime::FINE), 1, bytes);
}
////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////
CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
: MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_rs_compacted_(0),
tablet_(tablet) {
}
void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
boost::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the on-disk row layout invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
}
}
tablet_->UpdateCompactionStats(&prev_stats_);
*stats = prev_stats_;
}
bool CompactRowSetsOp::Prepare() {
boost::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another section of the tablet can
// be compacted concurrently.
//
// TODO: we should acquire the rowset compaction locks here. Otherwise, until
// Compact() acquires them, the maintenance manager may compute the same
// stats for this op and run it again, even though Perform() will end up
// performing a much less fruitful compaction. See KUDU-790 for more details.
prev_stats_.Clear();
return true;
}
void CompactRowSetsOp::Perform() {
WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
Substitute("Compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
return tablet_->metrics()->compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
return tablet_->metrics()->compact_rs_running;
}
////////////////////////////////////////////////////////////
// MinorDeltaCompactionOp
////////////////////////////////////////////////////////////
MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
tablet_(tablet) {
}
void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
boost::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the number of REDO files invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
}
}
double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MINOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}
bool MinorDeltaCompactionOp::Prepare() {
boost::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}
void MinorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_minor_compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_minor_compact_rs_running;
}
////////////////////////////////////////////////////////////
// MajorDeltaCompactionOp
////////////////////////////////////////////////////////////
MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
last_num_rs_major_delta_compacted_(0),
tablet_(tablet) {
}
void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
boost::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the size of the on-disk data invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
int64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
int64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
}
}
double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MAJOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}
bool MajorDeltaCompactionOp::Prepare() {
boost::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}
void MajorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_major_compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_major_compact_rs_running;
}
////////////////////////////////////////////////////////////
// Tablet
////////////////////////////////////////////////////////////
Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
CompactFlags flags) const {
CHECK_EQ(state_, kOpen);
// Grab a local reference to the current RowSetTree. This is to avoid
// holding the component_lock_ for too long. See the comment on component_lock_
// in tablet.h for details on why that would be bad.
shared_ptr<RowSetTree> rowsets_copy;
{
boost::shared_lock<rw_spinlock> lock(component_lock_);
rowsets_copy = components_->rowsets;
}
boost::lock_guard<boost::mutex> compact_lock(compact_select_lock_);
CHECK_EQ(picked->num_rowsets(), 0);
unordered_set<RowSet*> picked_set;
if (flags & FORCE_COMPACT_ALL) {
// Compact all rowsets, regardless of policy.
for (const shared_ptr<RowSet>& rs : rowsets_copy->all_rowsets()) {
if (rs->IsAvailableForCompaction()) {
picked_set.insert(rs.get());
}
}
} else {
// Let the policy decide which rowsets to compact.
double quality = 0;
RETURN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, &picked_set, &quality, NULL));
VLOG_WITH_PREFIX(2) << "Compaction quality: " << quality;
}
boost::shared_lock<rw_spinlock> lock(component_lock_);
for (const shared_ptr<RowSet>& rs : components_->rowsets->all_rowsets()) {
if (picked_set.erase(rs.get()) == 0) {
// Not picked.
continue;
}
// Grab the compact_flush_lock: this prevents any other concurrent
// compaction from selecting this same rowset, and also ensures that
// we don't select a rowset which is currently in the middle of being
// flushed.
shared_ptr<boost::mutex::scoped_try_lock> lock(
new boost::mutex::scoped_try_lock(*rs->compact_flush_lock()));
CHECK(lock->owns_lock()) << rs->ToString() << " appeared available for "
"compaction when inputs were selected, but was unable to lock its "
"compact_flush_lock to prepare for compaction.";
// Push the lock on our scoped list, so we unlock when done.
picked->AddRowSet(rs, lock);
}
// When we iterated through the current rowsets, we should have found all of the
// rowsets that we picked. If we didn't, that implies that some other thread swapped
// them out while we were making our selection decision -- that's not possible
// since we only picked rowsets that were marked as available for compaction.
if (!picked_set.empty()) {
for (const RowSet* not_found : picked_set) {
LOG_WITH_PREFIX(ERROR) << "Rowset selected for compaction but not available anymore: "
<< not_found->ToString();
}
LOG_WITH_PREFIX(FATAL) << "Was unable to find all rowsets selected for compaction";
}
return Status::OK();
}
void Tablet::GetRowSetsForTests(RowSetVector* out) {
shared_ptr<RowSetTree> rowsets_copy;
{
boost::shared_lock<rw_spinlock> lock(component_lock_);
rowsets_copy = components_->rowsets;
}
for (const shared_ptr<RowSet>& rs : rowsets_copy->all_rowsets()) {
out->push_back(rs);
}
}
void Tablet::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
CHECK_EQ(state_, kOpen);
DCHECK(maintenance_ops_.empty());
gscoped_ptr<MaintenanceOp> rs_compact_op(new CompactRowSetsOp(this));
maint_mgr->RegisterOp(rs_compact_op.get());
maintenance_ops_.push_back(rs_compact_op.release());
gscoped_ptr<MaintenanceOp> minor_delta_compact_op(new MinorDeltaCompactionOp(this));
maint_mgr->RegisterOp(minor_delta_compact_op.get());
maintenance_ops_.push_back(minor_delta_compact_op.release());
gscoped_ptr<MaintenanceOp> major_delta_compact_op(new MajorDeltaCompactionOp(this));
maint_mgr->RegisterOp(major_delta_compact_op.get());
maintenance_ops_.push_back(major_delta_compact_op.release());
}
void Tablet::UnregisterMaintenanceOps() {
for (MaintenanceOp* op : maintenance_ops_) {
op->Unregister();
}
STLDeleteElements(&maintenance_ops_);
}
Status Tablet::FlushMetadata(const RowSetVector& to_remove,
const RowSetMetadataVector& to_add,
int64_t mrs_being_flushed) {
RowSetMetadataIds to_remove_meta;
for (const shared_ptr<RowSet>& rowset : to_remove) {
// Skip MemRowSet & DuplicatingRowSets which don't have metadata.
if (rowset->metadata().get() == nullptr) {
continue;
}
to_remove_meta.insert(rowset->metadata()->id());
}
return metadata_->UpdateAndFlush(to_remove_meta, to_add, mrs_being_flushed);
}
Status Tablet::DoCompactionOrFlush(const RowSetsInCompaction &input, int64_t mrs_being_flushed) {
const char *op_name =
(mrs_being_flushed == TabletMetadata::kNoMrsFlushed) ? "Compaction" : "Flush";
TRACE_EVENT2("tablet", "Tablet::DoCompactionOrFlush",
"tablet_id", tablet_id(),
"op", op_name);
MvccSnapshot flush_snap(mvcc_);
LOG_WITH_PREFIX(INFO) << op_name << ": entering phase 1 (flushing snapshot). Phase 1 snapshot: "
<< flush_snap.ToString();
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
"PostTakeMvccSnapshot hook failed");
}
shared_ptr<CompactionInput> merge;
RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &merge));
RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), bloom_sizing(),
compaction_policy_->target_rowset_size());
RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
RETURN_NOT_OK_PREPEND(FlushCompactionInput(merge.get(), flush_snap, &drsw),
"Flush to disk failed");
RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(),
"PostWriteSnapshot hook failed");
}
// Though unlikely, it's possible that all of the input rows were actually
// GCed in this compaction. In that case, we don't actually want to reopen.
bool gced_all_input = drsw.written_count() == 0;
if (gced_all_input) {
LOG_WITH_PREFIX(INFO) << op_name << " resulted in no output rows (all input rows "
<< "were GCed!) Removing all input rowsets.";
return HandleEmptyCompactionOrFlush(input.rowsets(), mrs_being_flushed);
}
// The RollingDiskRowSet writer wrote out one or more RowSets as the
// output. Open these into 'new_rowsets'.
vector<shared_ptr<RowSet> > new_disk_rowsets;
RowSetMetadataVector new_drs_metas;
drsw.GetWrittenRowSetMetadata(&new_drs_metas);
if (metrics_.get()) metrics_->bytes_flushed->IncrementBy(drsw.written_size());
CHECK(!new_drs_metas.empty());
{
TRACE_EVENT0("tablet", "Opening compaction results");
for (const shared_ptr<RowSetMetadata>& meta : new_drs_metas) {
shared_ptr<DiskRowSet> new_rowset;
Status s = DiskRowSet::Open(meta, log_anchor_registry_.get(), &new_rowset, mem_tracker_);
if (!s.ok()) {
LOG_WITH_PREFIX(WARNING) << "Unable to open snapshot " << op_name << " results "
<< meta->ToString() << ": " << s.ToString();
return s;
}
new_disk_rowsets.push_back(new_rowset);
}
}
// Setup for Phase 2: Start duplicating any new updates into the new on-disk
// rowsets.
//
// During Phase 1, we may have missed some updates which came into the input
// rowsets while we were writing. So, we can't immediately start reading from
// the on-disk rowsets alone. Starting here, we continue to read from the
// original rowset(s), but mirror updates to both the input and the output
// data.
//
// It's crucial that, during the rest of the compaction, we do not allow the
// output rowsets to flush their deltas to disk. This is to avoid the following
// bug:
// - during phase 1, timestamp 1 updates a flushed row. This is only reflected in the
// input rowset. (ie it is a "missed delta")
// - during phase 2, timestamp 2 updates the same row. This is reflected in both the
// input and output, because of the DuplicatingRowSet.
// - now suppose the output rowset were allowed to flush deltas. This would create the
// first DeltaFile for the output rowset, with only timestamp 2.
// - Now we run the "ReupdateMissedDeltas", and copy over the first transaction to the output
// DMS, which later flushes.
// The end result would be that redos[0] has timestamp 2, and redos[1] has timestamp 1.
// This breaks an invariant that the redo files are time-ordered, and would we would probably
// reapply the deltas in the wrong order on the read path.
//
// The way that we avoid this case is that DuplicatingRowSet's FlushDeltas method is a
// no-op.
LOG_WITH_PREFIX(INFO) << op_name << ": entering phase 2 (starting to duplicate updates "
<< "in new rowsets)";
shared_ptr<DuplicatingRowSet> inprogress_rowset(
new DuplicatingRowSet(input.rowsets(), new_disk_rowsets));
// The next step is to swap in the DuplicatingRowSet, and at the same time, determine an
// MVCC snapshot which includes all of the transactions that saw a pre-DuplicatingRowSet
// version of components_.
MvccSnapshot non_duplicated_txns_snap;
vector<Timestamp> applying_during_swap;
{
TRACE_EVENT0("tablet", "Swapping DuplicatingRowSet");
// Taking component_lock_ in write mode ensures that no new transactions
// can StartApplying() (or snapshot components_) during this block.
boost::lock_guard<rw_spinlock> lock(component_lock_);
AtomicSwapRowSetsUnlocked(input.rowsets(), { inprogress_rowset });
// NOTE: transactions may *commit* in between these two lines.
// We need to make sure all such transactions end up in the
// 'applying_during_swap' list, the 'non_duplicated_txns_snap' snapshot,
// or both. Thus it's crucial that these next two lines are in this order!
mvcc_.GetApplyingTransactionsTimestamps(&applying_during_swap);
non_duplicated_txns_snap = MvccSnapshot(mvcc_);
}
// All transactions committed in 'non_duplicated_txns_snap' saw the pre-swap components_.
// Additionally, any transactions that were APPLYING during the above block by definition
// _started_ doing so before the swap. Hence those transactions also need to get included in
// non_duplicated_txns_snap. To do so, we wait for them to commit, and then
// manually include them into our snapshot.
if (VLOG_IS_ON(1) && !applying_during_swap.empty()) {
VLOG_WITH_PREFIX(1) << "Waiting for " << applying_during_swap.size()
<< " mid-APPLY txns to commit before finishing compaction...";
for (const Timestamp& ts : applying_during_swap) {
VLOG_WITH_PREFIX(1) << " " << ts.value();
}
}
// This wait is a little bit conservative - technically we only need to wait for
// those transactions in 'applying_during_swap', but MVCC doesn't implement the
// ability to wait for a specific set. So instead we wait for all currently applying --
// a bit more than we need, but still correct.
mvcc_.WaitForApplyingTransactionsToCommit();
// Then we want to consider all those transactions that were in-flight when we did the
// swap as committed in 'non_duplicated_txns_snap'.
non_duplicated_txns_snap.AddCommittedTimestamps(applying_during_swap);
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(),
"PostSwapInDuplicatingRowSet hook failed");
}
// Phase 2. Here we re-scan the compaction input, copying those missed updates into the
// new rowset's DeltaTracker.
LOG_WITH_PREFIX(INFO) << op_name
<< " Phase 2: carrying over any updates which arrived during Phase 1";
LOG_WITH_PREFIX(INFO) << "Phase 2 snapshot: " << non_duplicated_txns_snap.ToString();
RETURN_NOT_OK_PREPEND(
input.CreateCompactionInput(non_duplicated_txns_snap, schema(), &merge),
Substitute("Failed to create $0 inputs", op_name).c_str());
// Update the output rowsets with the deltas that came in in phase 1, before we swapped
// in the DuplicatingRowSets. This will perform a flush of the updated DeltaTrackers
// in the end so that the data that is reported in the log as belonging to the input
// rowsets is flushed.
RETURN_NOT_OK_PREPEND(ReupdateMissedDeltas(metadata_->tablet_id(),
merge.get(),
flush_snap,
non_duplicated_txns_snap,
new_disk_rowsets),
Substitute("Failed to re-update deltas missed during $0 phase 1",
op_name).c_str());
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(),
"PostReupdateMissedDeltas hook failed");
}
// ------------------------------
// Flush was successful.
// Run fault points used by some integration tests.
if (input.num_rowsets() > 1) {
MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_compaction);
} else if (input.num_rowsets() == 1 &&
input.rowsets()[0]->EstimateOnDiskSize() == 0) {
MAYBE_FAULT(FLAGS_fault_crash_before_flush_tablet_meta_after_flush_mrs);
}
// Write out the new Tablet Metadata and remove old rowsets.
RETURN_NOT_OK_PREPEND(FlushMetadata(input.rowsets(), new_drs_metas, mrs_being_flushed),
"Failed to flush new tablet metadata");
// Replace the compacted rowsets with the new on-disk rowsets, making them visible now that
// their metadata was written to disk.
AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
LOG_WITH_PREFIX(INFO) << op_name << " successful on " << drsw.written_count()
<< " rows " << "(" << drsw.written_size() << " bytes)";
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),
"PostSwapNewRowSet hook failed");
}
return Status::OK();
}
Status Tablet::HandleEmptyCompactionOrFlush(const RowSetVector& rowsets,
int mrs_being_flushed) {
// Write out the new Tablet Metadata and remove old rowsets.
RETURN_NOT_OK_PREPEND(FlushMetadata(rowsets,
RowSetMetadataVector(),
mrs_being_flushed),
"Failed to flush new tablet metadata");
AtomicSwapRowSets(rowsets, RowSetVector());
return Status::OK();
}
Status Tablet::Compact(CompactFlags flags) {
CHECK_EQ(state_, kOpen);
RowSetsInCompaction input;
// Step 1. Capture the rowsets to be merged
RETURN_NOT_OK_PREPEND(PickRowSetsToCompact(&input, flags),
"Failed to pick rowsets to compact");
if (input.num_rowsets() < 2) {
VLOG_WITH_PREFIX(1) << "Not enough rowsets to run compaction! Aborting...";
return Status::OK();
}
LOG_WITH_PREFIX(INFO) << "Compaction: stage 1 complete, picked "
<< input.num_rowsets() << " rowsets to compact";
if (compaction_hooks_) {
RETURN_NOT_OK_PREPEND(compaction_hooks_->PostSelectIterators(),
"PostSelectIterators hook failed");
}
input.DumpToLog();
return DoCompactionOrFlush(input,
TabletMetadata::kNoMrsFlushed);
}
void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
// TODO: use workload statistics here to find out how "hot" the tablet has
// been in the last 5 minutes, and somehow scale the compaction quality
// based on that, so we favor hot tablets.
double quality = 0;
unordered_set<RowSet*> picked_set_ignored;
shared_ptr<RowSetTree> rowsets_copy;
{
boost::shared_lock<rw_spinlock> lock(component_lock_);
rowsets_copy = components_->rowsets;
}
{
boost::lock_guard<boost::mutex> compact_lock(compact_select_lock_);
WARN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, &picked_set_ignored, &quality, NULL),
Substitute("Couldn't determine compaction quality for $0", tablet_id()));
}
VLOG_WITH_PREFIX(1) << "Best compaction for " << tablet_id() << ": " << quality;
stats->set_runnable(quality >= 0);
stats->set_perf_improvement(quality);
}
Status Tablet::DebugDump(vector<string> *lines) {
boost::shared_lock<rw_spinlock> lock(component_lock_);
LOG_STRING(INFO, lines) << "Dumping tablet:";
LOG_STRING(INFO, lines) << "---------------------------";
LOG_STRING(INFO, lines) << "MRS " << components_->memrowset->ToString() << ":";
RETURN_NOT_OK(components_->memrowset->DebugDump(lines));
for (const shared_ptr<RowSet> &rs : components_->rowsets->all_rowsets()) {
LOG_STRING(INFO, lines) << "RowSet " << rs->ToString() << ":";
RETURN_NOT_OK(rs->DebugDump(lines));
}
return Status::OK();
}
Status Tablet::CaptureConsistentIterators(
const Schema *projection,
const MvccSnapshot &snap,
const ScanSpec *spec,
vector<shared_ptr<RowwiseIterator> > *iters) const {
boost::shared_lock<rw_spinlock> lock(component_lock_);
// Construct all the iterators locally first, so that if we fail
// in the middle, we don't modify the output arguments.
vector<shared_ptr<RowwiseIterator> > ret;
// Grab the memrowset iterator.
gscoped_ptr<RowwiseIterator> ms_iter;
RETURN_NOT_OK(components_->memrowset->NewRowIterator(projection, snap, &ms_iter));
ret.push_back(shared_ptr<RowwiseIterator>(ms_iter.release()));
// Cull row-sets in the case of key-range queries.
if (spec != nullptr && spec->lower_bound_key() && spec->exclusive_upper_bound_key()) {
// TODO : support open-ended intervals
// TODO: the upper bound key is exclusive, but the RowSetTree function takes
// an inclusive interval. So, we might end up fetching one more rowset than
// necessary.
vector<RowSet *> interval_sets;
components_->rowsets->FindRowSetsIntersectingInterval(
spec->lower_bound_key()->encoded_key(),
spec->exclusive_upper_bound_key()->encoded_key(),
&interval_sets);
for (const RowSet *rs : interval_sets) {
gscoped_ptr<RowwiseIterator> row_it;
RETURN_NOT_OK_PREPEND(rs->NewRowIterator(projection, snap, &row_it),
Substitute("Could not create iterator for rowset $0",
rs->ToString()));
ret.push_back(shared_ptr<RowwiseIterator>(row_it.release()));
}
ret.swap(*iters);
return Status::OK();
}
// If there are no encoded predicates or they represent an open-ended range, then
// fall back to grabbing all rowset iterators
for (const shared_ptr<RowSet> &rs : components_->rowsets->all_rowsets()) {
gscoped_ptr<RowwiseIterator> row_it;
RETURN_NOT_OK_PREPEND(rs->NewRowIterator(projection, snap, &row_it),
Substitute("Could not create iterator for rowset $0",
rs->ToString()));
ret.push_back(shared_ptr<RowwiseIterator>(row_it.release()));
}
// Swap results into the parameters.
ret.swap(*iters);
return Status::OK();
}
Status Tablet::CountRows(uint64_t *count) const {
// First grab a consistent view of the components of the tablet.
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
// Now sum up the counts.
*count = comps->memrowset->entry_count();
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
rowid_t l_count;
RETURN_NOT_OK(rowset->CountRows(&l_count));
*count += l_count;
}
return Status::OK();
}
size_t Tablet::MemRowSetSize() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
if (comps) {
return comps->memrowset->memory_footprint();
}
return 0;
}
bool Tablet::MemRowSetEmpty() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
return comps->memrowset->empty();
}
size_t Tablet::MemRowSetLogRetentionSize(const MaxIdxToSegmentMap& max_idx_to_segment_size) const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
return GetLogRetentionSizeForIndex(comps->memrowset->MinUnflushedLogIndex(),
max_idx_to_segment_size);
}
size_t Tablet::EstimateOnDiskSize() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
if (!comps) return 0;
size_t ret = 0;
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
ret += rowset->EstimateOnDiskSize();
}
return ret;
}
size_t Tablet::DeltaMemStoresSize() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
size_t ret = 0;
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
ret += rowset->DeltaMemStoreSize();
}
return ret;
}
bool Tablet::DeltaMemRowSetEmpty() const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
if (!rowset->DeltaMemStoreEmpty()) {
return false;
}
}
return true;
}
void Tablet::GetInfoForBestDMSToFlush(const MaxIdxToSegmentMap& max_idx_to_segment_size,
int64_t* mem_size, int64_t* retention_size) const {
shared_ptr<RowSet> rowset = FindBestDMSToFlush(max_idx_to_segment_size);
if (rowset) {
*retention_size = GetLogRetentionSizeForIndex(rowset->MinUnflushedLogIndex(),
max_idx_to_segment_size);
*mem_size = rowset->DeltaMemStoreSize();
} else {
*retention_size = 0;
*mem_size = 0;
}
}
Status Tablet::FlushDMSWithHighestRetention(const MaxIdxToSegmentMap&
max_idx_to_segment_size) const {
shared_ptr<RowSet> rowset = FindBestDMSToFlush(max_idx_to_segment_size);
if (rowset) {
return rowset->FlushDeltas();
}
return Status::OK();
}
shared_ptr<RowSet> Tablet::FindBestDMSToFlush(const MaxIdxToSegmentMap&
max_idx_to_segment_size) const {
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
int64_t mem_size = 0;
int64_t retention_size = 0;
shared_ptr<RowSet> best_dms;
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
if (rowset->DeltaMemStoreEmpty()) {
continue;
}
int64_t size = GetLogRetentionSizeForIndex(rowset->MinUnflushedLogIndex(),
max_idx_to_segment_size);
if ((size > retention_size) ||
(size == retention_size &&
(rowset->DeltaMemStoreSize() > mem_size))) {
mem_size = rowset->DeltaMemStoreSize();
retention_size = size;
best_dms = rowset;
}
}
return best_dms;
}
int64_t Tablet::GetLogRetentionSizeForIndex(int64_t min_log_index,
const MaxIdxToSegmentMap& max_idx_to_segment_size) {
if (max_idx_to_segment_size.size() == 0 || min_log_index == -1) {
return 0;
}
int64_t total_size = 0;
for (const MaxIdxToSegmentMap::value_type& entry : max_idx_to_segment_size) {
if (min_log_index > entry.first) {
continue; // We're not in this segment, probably someone else is retaining it.
}
total_size += entry.second;
}
return total_size;
}
Status Tablet::FlushBiggestDMS() {
CHECK_EQ(state_, kOpen);
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
int64_t max_size = -1;
shared_ptr<RowSet> biggest_drs;
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
int64_t current = rowset->DeltaMemStoreSize();
if (current > max_size) {
max_size = current;
biggest_drs = rowset;
}
}
return max_size > 0 ? biggest_drs->FlushDeltas() : Status::OK();
}
Status Tablet::CompactWorstDeltas(RowSet::DeltaCompactionType type) {
CHECK_EQ(state_, kOpen);
shared_ptr<RowSet> rs;
// We're required to grab the rowset's compact_flush_lock under the compact_select_lock_.
shared_ptr<boost::mutex::scoped_try_lock> lock;
double perf_improv;
{
// We only want to keep the selection lock during the time we look at rowsets to compact.
// The returned rowset is guaranteed to be available to lock since locking must be done
// under this lock.
boost::lock_guard<boost::mutex> compact_lock(compact_select_lock_);
perf_improv = GetPerfImprovementForBestDeltaCompactUnlocked(type, &rs);
if (rs) {
lock.reset(new boost::mutex::scoped_try_lock(*rs->compact_flush_lock()));
CHECK(lock->owns_lock());
} else {
return Status::OK();
}
}
// We just released compact_select_lock_ so other compactions can select and run, but the
// rowset is ours.
DCHECK(perf_improv != 0);
if (type == RowSet::MINOR_DELTA_COMPACTION) {
RETURN_NOT_OK_PREPEND(rs->MinorCompactDeltaStores(),
"Failed minor delta compaction on " + rs->ToString());
} else if (type == RowSet::MAJOR_DELTA_COMPACTION) {
RETURN_NOT_OK_PREPEND(down_cast<DiskRowSet*>(rs.get())->MajorCompactDeltaStores(),
"Failed major delta compaction on " + rs->ToString());
}
return Status::OK();
}
double Tablet::GetPerfImprovementForBestDeltaCompact(RowSet::DeltaCompactionType type,
shared_ptr<RowSet>* rs) const {
boost::lock_guard<boost::mutex> compact_lock(compact_select_lock_);
return GetPerfImprovementForBestDeltaCompactUnlocked(type, rs);
}
double Tablet::GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompactionType type,
shared_ptr<RowSet>* rs) const {
boost::mutex::scoped_try_lock cs_lock(compact_select_lock_);
DCHECK(!cs_lock.owns_lock());
scoped_refptr<TabletComponents> comps;
GetComponents(&comps);
double worst_delta_perf = 0;
shared_ptr<RowSet> worst_rs;
for (const shared_ptr<RowSet> &rowset : comps->rowsets->all_rowsets()) {
if (!rowset->IsAvailableForCompaction()) {
continue;
}
double perf_improv = rowset->DeltaStoresCompactionPerfImprovementScore(type);
if (perf_improv > worst_delta_perf) {
worst_rs = rowset;
worst_delta_perf = perf_improv;
}
}
if (rs && worst_delta_perf > 0) {
*rs = worst_rs;
}
return worst_delta_perf;
}
size_t Tablet::num_rowsets() const {
boost::shared_lock<rw_spinlock> lock(component_lock_);
return components_->rowsets->all_rowsets().size();
}
void Tablet::PrintRSLayout(ostream* o) {
shared_ptr<RowSetTree> rowsets_copy;
{
boost::shared_lock<rw_spinlock> lock(component_lock_);
rowsets_copy = components_->rowsets;
}
boost::lock_guard<boost::mutex> compact_lock(compact_select_lock_);
// Run the compaction policy in order to get its log and highlight those
// rowsets which would be compacted next.
vector<string> log;
unordered_set<RowSet*> picked;
double quality;
Status s = compaction_policy_->PickRowSets(*rowsets_copy, &picked, &quality, &log);
if (!s.ok()) {
*o << "<b>Error:</b> " << EscapeForHtmlToString(s.ToString());
return;
}
if (!picked.empty()) {
*o << "<p>";
*o << "Highlighted rowsets indicate those that would be compacted next if a "
<< "compaction were to run on this tablet.";
*o << "</p>";
}
vector<RowSetInfo> min, max;
RowSetInfo::CollectOrdered(*rowsets_copy, &min, &max);
DumpCompactionSVG(min, picked, o, false);
*o << "<h2>Compaction policy log</h2>" << std::endl;
*o << "<pre>" << std::endl;
for (const string& s : log) {
*o << EscapeForHtmlToString(s) << std::endl;
}
*o << "</pre>" << std::endl;
}
string Tablet::LogPrefix() const {
return Substitute("T $0 ", tablet_id());
}
////////////////////////////////////////////////////////////
// Tablet::Iterator
////////////////////////////////////////////////////////////
Tablet::Iterator::Iterator(const Tablet* tablet, const Schema& projection,
MvccSnapshot snap, const OrderMode order)
: tablet_(tablet),
projection_(projection),
snap_(std::move(snap)),
order_(order) {}
Tablet::Iterator::~Iterator() {}
Status Tablet::Iterator::Init(ScanSpec *spec) {
DCHECK(iter_.get() == nullptr);
RETURN_NOT_OK(tablet_->GetMappedReadProjection(projection_, &projection_));
vector<shared_ptr<RowwiseIterator>> iters;
RETURN_NOT_OK(tablet_->CaptureConsistentIterators(&projection_, snap_, spec, &iters));
switch (order_) {
case ORDERED:
iter_.reset(new MergeIterator(projection_, iters));
break;
case UNORDERED:
default:
iter_.reset(new UnionIterator(iters));
break;
}
RETURN_NOT_OK(iter_->Init(spec));
return Status::OK();
}
bool Tablet::Iterator::HasNext() const {
DCHECK(iter_.get() != nullptr) << "Not initialized!";
return iter_->HasNext();
}
Status Tablet::Iterator::NextBlock(RowBlock *dst) {
DCHECK(iter_.get() != nullptr) << "Not initialized!";
return iter_->NextBlock(dst);
}
string Tablet::Iterator::ToString() const {
string s;
s.append("tablet iterator: ");
if (iter_.get() == nullptr) {
s.append("NULL");
} else {
s.append(iter_->ToString());
}
return s;
}
void Tablet::Iterator::GetIteratorStats(vector<IteratorStats>* stats) const {
iter_->GetIteratorStats(stats);
}
} // namespace tablet
} // namespace kudu