blob: 345e8987e776a748dfd4e935257729d1a388904c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_mm_ops.h"
#include <mutex>
#include "kudu/util/locks.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metrics.h"
using strings::Substitute;
namespace kudu {
namespace tablet {
////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////
CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
: MaintenanceOp(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_rs_compacted_(0),
tablet_(tablet) {
}
void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the on-disk row layout invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
}
}
tablet_->UpdateCompactionStats(&prev_stats_);
*stats = prev_stats_;
}
bool CompactRowSetsOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another section of the tablet can
// be compacted concurrently.
//
// TODO: we should acquire the rowset compaction locks here. Otherwise, until
// Compact() acquires them, the maintenance manager may compute the same
// stats for this op and run it again, even though Perform() will end up
// performing a much less fruitful compaction. See KUDU-790 for more details.
prev_stats_.Clear();
return true;
}
void CompactRowSetsOp::Perform() {
WARN_NOT_OK(tablet_->Compact(Tablet::COMPACT_NO_FLAGS),
Substitute("Compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> CompactRowSetsOp::DurationHistogram() const {
return tablet_->metrics()->compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > CompactRowSetsOp::RunningGauge() const {
return tablet_->metrics()->compact_rs_running;
}
////////////////////////////////////////////////////////////
// MinorDeltaCompactionOp
////////////////////////////////////////////////////////////
MinorDeltaCompactionOp::MinorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MinorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
tablet_(tablet) {
}
void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the number of REDO files invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
}
}
double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MINOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}
bool MinorDeltaCompactionOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}
void MinorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION),
Substitute("Minor delta compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> MinorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_minor_compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > MinorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_minor_compact_rs_running;
}
////////////////////////////////////////////////////////////
// MajorDeltaCompactionOp
////////////////////////////////////////////////////////////
MajorDeltaCompactionOp::MajorDeltaCompactionOp(Tablet* tablet)
: MaintenanceOp(Substitute("MajorDeltaCompactionOp($0)", tablet->tablet_id()),
MaintenanceOp::HIGH_IO_USAGE),
last_num_mrs_flushed_(0),
last_num_dms_flushed_(0),
last_num_rs_compacted_(0),
last_num_rs_minor_delta_compacted_(0),
last_num_rs_major_delta_compacted_(0),
tablet_(tablet) {
}
void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
std::lock_guard<simple_spinlock> l(lock_);
// Any operation that changes the size of the on-disk data invalidates the
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
int64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
int64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
new_num_dms_flushed == last_num_dms_flushed_ &&
new_num_rs_compacted == last_num_rs_compacted_ &&
new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
*stats = prev_stats_;
return;
} else {
last_num_mrs_flushed_ = new_num_mrs_flushed;
last_num_dms_flushed_ = new_num_dms_flushed;
last_num_rs_compacted_ = new_num_rs_compacted;
last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
}
}
double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
RowSet::MAJOR_DELTA_COMPACTION, nullptr);
prev_stats_.set_perf_improvement(perf_improv);
prev_stats_.set_runnable(perf_improv > 0);
*stats = prev_stats_;
}
bool MajorDeltaCompactionOp::Prepare() {
std::lock_guard<simple_spinlock> l(lock_);
// Invalidate the cached stats so that another rowset in the tablet can
// be delta compacted concurrently.
//
// TODO: See CompactRowSetsOp::Prepare().
prev_stats_.Clear();
return true;
}
void MajorDeltaCompactionOp::Perform() {
WARN_NOT_OK(tablet_->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION),
Substitute("Major delta compaction failed on $0", tablet_->tablet_id()));
}
scoped_refptr<Histogram> MajorDeltaCompactionOp::DurationHistogram() const {
return tablet_->metrics()->delta_major_compact_rs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() const {
return tablet_->metrics()->delta_major_compact_rs_running;
}
} // namespace tablet
} // namespace kudu