blob: 6bbd0ec0fbe156dec7505f1eb528d4ef49217f13 [file] [log] [blame]
// Copyright 2014 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "kudu/util/mem_tracker.h"
#include <algorithm>
#include <boost/foreach.hpp>
#include <deque>
#include <gperftools/malloc_extension.h>
#include <limits>
#include <list>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/mutex.h"
#include "kudu/util/random_util.h"
#include "kudu/util/status.h"
DEFINE_int64(memory_limit_hard_bytes, 0,
"Maximum amount of memory this daemon should use, in bytes. "
"A value of 0 autosizes based on the total system memory. "
"A value of -1 disables all memory limiting.");
TAG_FLAG(memory_limit_hard_bytes, stable);
DEFINE_int32(memory_limit_soft_percentage, 60,
"Percentage of the hard memory limit that this daemon may "
"consume before memory throttling of writes begins. The greater "
"the excess, the higher the chance of throttling. In general, a "
"lower soft limit leads to smoother write latencies but "
"decreased throughput, and vice versa for a higher soft limit.");
TAG_FLAG(memory_limit_soft_percentage, advanced);
namespace kudu {
// NOTE: this class has been adapted from Impala, so the code style varies
// somewhat from kudu.
using std::deque;
using std::list;
using std::string;
using std::stringstream;
using std::tr1::shared_ptr;
using std::vector;
using strings::Substitute;
// The ancestor for all trackers. Every tracker is visible from the root down.
static shared_ptr<MemTracker> root_tracker;
static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
// Total amount of memory from calls to Release() since the last GC. If this
// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
static Atomic64 released_memory_since_gc;
// Validate that soft limit is a percentage.
static bool ValidateSoftLimit(const char* flagname, int value) {
if (value >= 0 && value <= 100) {
return true;
}
LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
flagname, value);
return false;
}
static bool dummy = google::RegisterFlagValidator(
&FLAGS_memory_limit_soft_percentage, &ValidateSoftLimit);
#ifdef TCMALLOC_ENABLED
static uint64_t GetTCMallocCurrentAllocatedBytes() {
size_t value;
if (!MallocExtension::instance()->GetNumericProperty(
"generic.current_allocated_bytes", &value)) {
LOG(DFATAL) << "Failed to get tcmalloc current allocated bytes";
}
return value;
}
#endif
void MemTracker::CreateRootTracker() {
int64_t limit = FLAGS_memory_limit_hard_bytes;
if (limit == 0) {
// If no limit is provided, we'll use 80% of system RAM.
int64_t total_ram;
CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
limit = total_ram * 4;
limit /= 5;
}
ConsumptionFunction f;
#ifdef TCMALLOC_ENABLED
f = &GetTCMallocCurrentAllocatedBytes;
#endif
root_tracker.reset(new MemTracker(f, limit, "root",
shared_ptr<MemTracker>()));
root_tracker->Init();
LOG(INFO) << StringPrintf("MemTracker: hard memory limit is %.6f GB",
(static_cast<float>(limit) / (1024.0 * 1024.0 * 1024.0)));
LOG(INFO) << StringPrintf("MemTracker: soft memory limit is %.6f GB",
(static_cast<float>(root_tracker->soft_limit_) /
(1024.0 * 1024.0 * 1024.0)));
}
shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
const string& id,
const shared_ptr<MemTracker>& parent) {
shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
MutexLock l(real_parent->child_trackers_lock_);
return CreateTrackerUnlocked(byte_limit, id, real_parent);
}
shared_ptr<MemTracker> MemTracker::CreateTrackerUnlocked(int64_t byte_limit,
const string& id,
const shared_ptr<MemTracker>& parent) {
DCHECK(parent);
shared_ptr<MemTracker> tracker(new MemTracker(ConsumptionFunction(), byte_limit, id, parent));
parent->AddChildTrackerUnlocked(tracker.get());
tracker->Init();
return tracker;
}
MemTracker::MemTracker(const ConsumptionFunction& consumption_func,
int64_t byte_limit,
const string& id,
const shared_ptr<MemTracker>& parent)
: limit_(byte_limit),
id_(id),
descr_(Substitute("memory consumption for $0", id)),
parent_(parent),
consumption_(0),
consumption_func_(consumption_func),
rand_(GetRandomSeed32()),
enable_logging_(false),
log_stack_(false) {
VLOG(1) << "Creating tracker " << ToString();
if (consumption_func_) {
UpdateConsumption();
}
soft_limit_ = (limit_ == -1)
? -1 : (limit_ * FLAGS_memory_limit_soft_percentage) / 100;
}
MemTracker::~MemTracker() {
VLOG(1) << "Destroying tracker " << ToString();
if (parent_) {
DCHECK(consumption() == 0) << "Memory tracker " << ToString()
<< " has unreleased consumption " << consumption();
parent_->Release(consumption());
UnregisterFromParent();
}
}
void MemTracker::UnregisterFromParent() {
DCHECK(parent_);
MutexLock l(parent_->child_trackers_lock_);
if (child_tracker_it_ != parent_->child_trackers_.end()) {
parent_->child_trackers_.erase(child_tracker_it_);
child_tracker_it_ = parent_->child_trackers_.end();
}
}
string MemTracker::ToString() const {
string s;
const MemTracker* tracker = this;
while (tracker) {
if (s != "") {
s += "->";
}
s += tracker->id();
tracker = tracker->parent_.get();
}
return s;
}
bool MemTracker::FindTracker(const string& id,
shared_ptr<MemTracker>* tracker,
const shared_ptr<MemTracker>& parent) {
shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
MutexLock l(real_parent->child_trackers_lock_);
return FindTrackerUnlocked(id, tracker, real_parent);
}
bool MemTracker::FindTrackerUnlocked(const string& id,
shared_ptr<MemTracker>* tracker,
const shared_ptr<MemTracker>& parent) {
DCHECK(parent != NULL);
parent->child_trackers_lock_.AssertAcquired();
BOOST_FOREACH(MemTracker* child, parent->child_trackers_) {
if (child->id() == id) {
*tracker = child->shared_from_this();
return true;
}
}
return false;
}
shared_ptr<MemTracker> MemTracker::FindOrCreateTracker(int64_t byte_limit,
const string& id,
const shared_ptr<MemTracker>& parent) {
shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
MutexLock l(real_parent->child_trackers_lock_);
shared_ptr<MemTracker> found;
if (FindTrackerUnlocked(id, &found, real_parent)) {
return found;
}
return CreateTrackerUnlocked(byte_limit, id, real_parent);
}
void MemTracker::ListTrackers(vector<shared_ptr<MemTracker> >* trackers) {
trackers->clear();
deque<shared_ptr<MemTracker> > to_process;
to_process.push_front(GetRootTracker());
while (!to_process.empty()) {
shared_ptr<MemTracker> t = to_process.back();
to_process.pop_back();
trackers->push_back(t);
{
MutexLock l(t->child_trackers_lock_);
BOOST_FOREACH(MemTracker* child, t->child_trackers_) {
to_process.push_back(child->shared_from_this());
}
}
}
}
void MemTracker::UpdateConsumption() {
DCHECK(!consumption_func_.empty());
DCHECK(parent_.get() == NULL);
consumption_.set_value(consumption_func_());
}
void MemTracker::Consume(int64_t bytes) {
if (bytes < 0) {
Release(-bytes);
return;
}
if (!consumption_func_.empty()) {
UpdateConsumption();
return;
}
if (bytes == 0) {
return;
}
if (PREDICT_FALSE(enable_logging_)) {
LogUpdate(true, bytes);
}
for (vector<MemTracker*>::iterator tracker = all_trackers_.begin();
tracker != all_trackers_.end(); ++tracker) {
(*tracker)->consumption_.IncrementBy(bytes);
if (!(*tracker)->consumption_func_.empty()) {
DCHECK_GE((*tracker)->consumption_.current_value(), 0);
}
}
}
bool MemTracker::TryConsume(int64_t bytes) {
if (!consumption_func_.empty()) {
UpdateConsumption();
}
if (bytes <= 0) {
return true;
}
if (PREDICT_FALSE(enable_logging_)) {
LogUpdate(true, bytes);
}
int i = 0;
// Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
// won't accommodate the change.
for (i = all_trackers_.size() - 1; i >= 0; --i) {
MemTracker *tracker = all_trackers_[i];
if (tracker->limit_ < 0) {
tracker->consumption_.IncrementBy(bytes);
} else {
if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
// One of the trackers failed, attempt to GC memory or expand our limit. If that
// succeeds, TryUpdate() again. Bail if either fails.
if (!tracker->GcMemory(tracker->limit_ - bytes) ||
tracker->ExpandLimit(bytes)) {
if (!tracker->consumption_.TryIncrementBy(
bytes, tracker->limit_)) {
break;
}
} else {
break;
}
}
}
}
// Everyone succeeded, return.
if (i == -1) {
return true;
}
// Someone failed, roll back the ones that succeeded.
// TODO: this doesn't roll it back completely since the max values for
// the updated trackers aren't decremented. The max values are only used
// for error reporting so this is probably okay. Rolling those back is
// pretty hard; we'd need something like 2PC.
//
// TODO: This might leave us with an allocated resource that we can't use. Do we need
// to adjust the consumption of the query tracker to stop the resource from never
// getting used by a subsequent TryConsume()?
for (int j = all_trackers_.size() - 1; j > i; --j) {
all_trackers_[j]->consumption_.IncrementBy(-bytes);
}
return false;
}
void MemTracker::Release(int64_t bytes) {
if (bytes < 0) {
Consume(-bytes);
return;
}
if (PREDICT_FALSE(base::subtle::Barrier_AtomicIncrement(&released_memory_since_gc, bytes) >
GC_RELEASE_SIZE)) {
GcTcmalloc();
}
if (!consumption_func_.empty()) {
UpdateConsumption();
return;
}
if (bytes == 0) {
return;
}
if (PREDICT_FALSE(enable_logging_)) {
LogUpdate(false, bytes);
}
for (vector<MemTracker*>::iterator tracker = all_trackers_.begin();
tracker != all_trackers_.end(); ++tracker) {
(*tracker)->consumption_.IncrementBy(-bytes);
// If a UDF calls FunctionContext::TrackAllocation() but allocates less than the
// reported amount, the subsequent call to FunctionContext::Free() may cause the
// process mem tracker to go negative until it is synced back to the tcmalloc
// metric. Don't blow up in this case. (Note that this doesn't affect non-process
// trackers since we can enforce that the reported memory usage is internally
// consistent.)
if (!(*tracker)->consumption_func_.empty()) {
DCHECK_GE((*tracker)->consumption_.current_value(), 0);
}
}
}
bool MemTracker::AnyLimitExceeded() {
for (vector<MemTracker*>::iterator tracker = limit_trackers_.begin();
tracker != limit_trackers_.end(); ++tracker) {
if ((*tracker)->LimitExceeded()) {
return true;
}
}
return false;
}
bool MemTracker::LimitExceeded() {
if (PREDICT_FALSE(CheckLimitExceeded())) {
return GcMemory(limit_);
}
return false;
}
bool MemTracker::SoftLimitExceeded(double* current_capacity_pct) {
// Did we exceed the actual limit?
if (LimitExceeded()) {
if (current_capacity_pct) {
*current_capacity_pct =
static_cast<double>(consumption()) / limit() * 100;
}
return true;
}
// No soft limit defined.
if (!has_limit() || limit_ == soft_limit_) {
return false;
}
// Are we under the soft limit threshold?
int64_t usage = consumption();
if (usage < soft_limit_) {
return false;
}
// We're over the threshold; were we randomly chosen to be over the soft limit?
if (usage + rand_.Uniform64(limit_ - soft_limit_) > limit_) {
bool exceeded = GcMemory(soft_limit_);
if (exceeded && current_capacity_pct) {
*current_capacity_pct =
static_cast<double>(consumption()) / limit() * 100;
}
return exceeded;
}
return false;
}
bool MemTracker::AnySoftLimitExceeded(double* current_capacity_pct) {
BOOST_FOREACH(MemTracker* t, limit_trackers_) {
if (t->SoftLimitExceeded(current_capacity_pct)) {
return true;
}
}
return false;
}
int64_t MemTracker::SpareCapacity() const {
int64_t result = std::numeric_limits<int64_t>::max();
for (vector<MemTracker*>::const_iterator tracker = limit_trackers_.begin();
tracker != limit_trackers_.end(); ++tracker) {
int64_t mem_left = (*tracker)->limit() - (*tracker)->consumption();
result = std::min(result, mem_left);
}
return result;
}
bool MemTracker::GcMemory(int64_t max_consumption) {
if (max_consumption < 0) {
// Impossible to GC enough memory to reach the goal.
return true;
}
lock_guard<simple_spinlock> l(&gc_lock_);
if (!consumption_func_.empty()) {
UpdateConsumption();
}
uint64_t pre_gc_consumption = consumption();
// Check if someone gc'd before us
if (pre_gc_consumption < max_consumption) {
return false;
}
// Try to free up some memory
for (int i = 0; i < gc_functions_.size(); ++i) {
gc_functions_[i]();
if (!consumption_func_.empty()) {
UpdateConsumption();
}
if (consumption() <= max_consumption) {
break;
}
}
return consumption() > max_consumption;
}
void MemTracker::GcTcmalloc() {
#ifdef TCMALLOC_ENABLED
released_memory_since_gc = 0;
MallocExtension::instance()->ReleaseFreeMemory();
#else
// Nothing to do if not using tcmalloc.
#endif
}
string MemTracker::LogUsage(const string& prefix) const {
stringstream ss;
ss << prefix << id_ << ":";
if (CheckLimitExceeded()) {
ss << " memory limit exceeded.";
}
if (limit_ > 0) {
ss << " Limit=" << HumanReadableNumBytes::ToString(limit_);
}
ss << " Consumption=" << HumanReadableNumBytes::ToString(consumption());
stringstream prefix_ss;
prefix_ss << prefix << " ";
string new_prefix = prefix_ss.str();
MutexLock l(child_trackers_lock_);
if (!child_trackers_.empty()) {
ss << "\n" << LogUsage(new_prefix, child_trackers_);
}
return ss.str();
}
void MemTracker::Init() {
// populate all_trackers_ and limit_trackers_
MemTracker* tracker = this;
while (tracker) {
all_trackers_.push_back(tracker);
if (tracker->has_limit()) limit_trackers_.push_back(tracker);
tracker = tracker->parent_.get();
}
DCHECK_GT(all_trackers_.size(), 0);
DCHECK_EQ(all_trackers_[0], this);
}
void MemTracker::AddChildTrackerUnlocked(MemTracker* tracker) {
child_trackers_lock_.AssertAcquired();
#ifndef NDEBUG
shared_ptr<MemTracker> found;
CHECK(!FindTrackerUnlocked(tracker->id(), &found, shared_from_this()))
<< Substitute("Duplicate memory tracker (id $0) on parent $1",
tracker->id(), ToString());
#endif
tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
}
void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const {
stringstream ss;
ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes
<< " Consumption: " << consumption() << " Limit: " << limit_;
if (log_stack_) {
ss << std::endl << GetStackTrace();
}
LOG(ERROR) << ss.str();
}
string MemTracker::LogUsage(const string& prefix,
const list<MemTracker*>& trackers) {
vector<string> usage_strings;
BOOST_FOREACH(const MemTracker* child, trackers) {
usage_strings.push_back(child->LogUsage(prefix));
}
return JoinStrings(usage_strings, "\n");
}
shared_ptr<MemTracker> MemTracker::GetRootTracker() {
GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
return root_tracker;
}
} // namespace kudu