blob: 4d1e43b174f05c7d6cab9e386325c8eb21331e05 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_peer_mm_ops.h"
#include <algorithm>
#include <map>
#include <string>
#include <gflags/gflags.h>
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
DEFINE_int32(flush_threshold_mb, 64,
"Size at which MemRowSet flushes are triggered. "
"A MRS can still flush below this threshold if it if hasn't flushed in a while");
TAG_FLAG(flush_threshold_mb, experimental);
METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
"Log GCs Running",
kudu::MetricUnit::kOperations,
"Number of log GC operations currently running.");
METRIC_DEFINE_histogram(tablet, log_gc_duration,
"Log GC Duration",
kudu::MetricUnit::kMilliseconds,
"Time spent garbage collecting the logs.", 60000LU, 1);
namespace kudu {
namespace tablet {
using std::map;
using strings::Substitute;
// How long we wait before considering a time-based flush.
const double kFlushDueToTimeMs = 2 * 60 * 1000;
// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing.
const double kFlushUpperBoundMs = 60 * 60 * 1000;
//
// FlushOpPerfImprovementPolicy.
//
void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats,
double elapsed_ms) {
if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) {
// If we're over the user-specified flush threshold, then consider the perf
// improvement to be 1 for every extra MB. This produces perf_improvement results
// which are much higher than any compaction would produce, and means that, when
// there is an MRS over threshold, a flush will almost always be selected instead of
// a compaction. That's not necessarily a good thing, but in the absence of better
// heuristics, it will do for now.
double extra_mb =
static_cast<double>(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024));
stats->set_perf_improvement(extra_mb);
} else if (elapsed_ms > kFlushDueToTimeMs) {
// Even if we aren't over the threshold, consider flushing if we haven't flushed
// in a long time. But, don't give it a large perf_improvement score. We should
// only do this if we really don't have much else to do, and if we've already waited a bit.
// The following will give an improvement that's between 0.0 and 1.0, gradually growing
// as 'elapsed_ms' approaches 'kFlushUpperBoundMs'.
double perf = elapsed_ms / kFlushUpperBoundMs;
if (perf > 1.0) {
perf = 1.0;
}
stats->set_perf_improvement(perf);
}
}
//
// FlushMRSOp.
//
void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
boost::lock_guard<simple_spinlock> l(lock_);
map<int64_t, int64_t> max_idx_to_segment_size;
if (tablet_peer_->tablet()->MemRowSetEmpty() ||
!tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok()) {
return;
}
{
boost::unique_lock<Semaphore> lock(tablet_peer_->tablet()->rowsets_flush_sem_,
boost::defer_lock);
stats->set_runnable(lock.try_lock());
}
stats->set_ram_anchored(tablet_peer_->tablet()->MemRowSetSize());
stats->set_logs_retained_bytes(
tablet_peer_->tablet()->MemRowSetLogRetentionSize(max_idx_to_segment_size));
// TODO: use workload statistics here to find out how "hot" the tablet has
// been in the last 5 minutes.
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
stats,
time_since_flush_.elapsed().wall_millis());
}
bool FlushMRSOp::Prepare() {
// Try to acquire the rowsets_flush_sem_. If we can't, the Prepare step
// fails. This also implies that only one instance of FlushMRSOp can be
// running at once.
return tablet_peer_->tablet()->rowsets_flush_sem_.try_lock();
}
void FlushMRSOp::Perform() {
CHECK(!tablet_peer_->tablet()->rowsets_flush_sem_.try_lock());
KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushUnlocked(),
Substitute("FlushMRS failed on $0", tablet_peer_->tablet_id()));
{
boost::lock_guard<simple_spinlock> l(lock_);
time_since_flush_.start();
}
tablet_peer_->tablet()->rowsets_flush_sem_.unlock();
}
scoped_refptr<Histogram> FlushMRSOp::DurationHistogram() const {
return tablet_peer_->tablet()->metrics()->flush_mrs_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > FlushMRSOp::RunningGauge() const {
return tablet_peer_->tablet()->metrics()->flush_mrs_running;
}
//
// FlushDeltaMemStoresOp.
//
void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
boost::lock_guard<simple_spinlock> l(lock_);
int64_t dms_size;
int64_t retention_size;
map<int64_t, int64_t> max_idx_to_segment_size;
if (tablet_peer_->tablet()->DeltaMemRowSetEmpty() ||
!tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok()) {
return;
}
tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_segment_size,
&dms_size, &retention_size);
stats->set_ram_anchored(dms_size);
stats->set_runnable(true);
stats->set_logs_retained_bytes(retention_size);
FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
stats,
time_since_flush_.elapsed().wall_millis());
}
void FlushDeltaMemStoresOp::Perform() {
map<int64_t, int64_t> max_idx_to_segment_size;
if (!tablet_peer_->GetMaxIndexesToSegmentSizeMap(&max_idx_to_segment_size).ok()) {
LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
return;
}
WARN_NOT_OK(tablet_peer_->tablet()->FlushDMSWithHighestRetention(max_idx_to_segment_size),
Substitute("Failed to flush DMS on $0",
tablet_peer_->tablet()->tablet_id()));
{
boost::lock_guard<simple_spinlock> l(lock_);
time_since_flush_.start();
}
}
scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
return tablet_peer_->tablet()->metrics()->flush_dms_duration;
}
scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() const {
return tablet_peer_->tablet()->metrics()->flush_dms_running;
}
//
// LogGCOp.
//
LogGCOp::LogGCOp(TabletPeer* tablet_peer)
: MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_peer->tablet()->tablet_id().c_str()),
MaintenanceOp::LOW_IO_USAGE),
tablet_peer_(tablet_peer),
log_gc_duration_(METRIC_log_gc_duration.Instantiate(
tablet_peer->tablet()->GetMetricEntity())),
log_gc_running_(METRIC_log_gc_running.Instantiate(
tablet_peer->tablet()->GetMetricEntity(), 0)),
sem_(1) {}
void LogGCOp::UpdateStats(MaintenanceOpStats* stats) {
int64_t retention_size;
if (!tablet_peer_->GetGCableDataSize(&retention_size).ok()) {
return;
}
stats->set_logs_retained_bytes(retention_size);
stats->set_runnable(sem_.GetValue() == 1);
}
bool LogGCOp::Prepare() {
return sem_.try_lock();
}
void LogGCOp::Perform() {
CHECK(!sem_.try_lock());
tablet_peer_->RunLogGC();
sem_.unlock();
}
scoped_refptr<Histogram> LogGCOp::DurationHistogram() const {
return log_gc_duration_;
}
scoped_refptr<AtomicGauge<uint32_t> > LogGCOp::RunningGauge() const {
return log_gc_running_;
}
} // namespace tablet
} // namespace kudu