| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/util/mem_tracker.h" |
| |
| #include <algorithm> |
| #include <deque> |
| #include <gperftools/malloc_extension.h> |
| #include <limits> |
| #include <list> |
| #include <memory> |
| |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/once.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/human_readable.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/debug-util.h" |
| #include "kudu/util/debug/trace_event.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/mutex.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/status.h" |
| |
| DEFINE_int64(memory_limit_hard_bytes, 0, |
| "Maximum amount of memory this daemon should use, in bytes. " |
| "A value of 0 autosizes based on the total system memory. " |
| "A value of -1 disables all memory limiting."); |
| TAG_FLAG(memory_limit_hard_bytes, stable); |
| |
| DEFINE_int32(memory_limit_soft_percentage, 60, |
| "Percentage of the hard memory limit that this daemon may " |
| "consume before memory throttling of writes begins. The greater " |
| "the excess, the higher the chance of throttling. In general, a " |
| "lower soft limit leads to smoother write latencies but " |
| "decreased throughput, and vice versa for a higher soft limit."); |
| TAG_FLAG(memory_limit_soft_percentage, advanced); |
| |
| DEFINE_int32(memory_limit_warn_threshold_percentage, 98, |
| "Percentage of the hard memory limit that this daemon may " |
| "consume before WARNING level messages are periodically logged."); |
| TAG_FLAG(memory_limit_warn_threshold_percentage, advanced); |
| |
| #ifdef TCMALLOC_ENABLED |
| DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10, |
| "Maximum percentage of the RSS that tcmalloc is allowed to use for " |
| "reserved but unallocated memory."); |
| TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced); |
| #endif |
| |
| namespace kudu { |
| |
| // NOTE: this class has been adapted from Impala, so the code style varies |
| // somewhat from kudu. |
| |
| using std::deque; |
| using std::list; |
| using std::string; |
| using std::stringstream; |
| using std::shared_ptr; |
| using std::vector; |
| using std::weak_ptr; |
| |
| using strings::Substitute; |
| |
| // The ancestor for all trackers. Every tracker is visible from the root down. |
| static shared_ptr<MemTracker> root_tracker; |
| static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT; |
| |
| // Total amount of memory from calls to Release() since the last GC. If this |
| // is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc. |
| static Atomic64 released_memory_since_gc; |
| |
| // Validate that various flags are percentages. |
| static bool ValidatePercentage(const char* flagname, int value) { |
| if (value >= 0 && value <= 100) { |
| return true; |
| } |
| LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid", |
| flagname, value); |
| return false; |
| } |
| static bool dummy[] = { |
| google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage), |
| google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage) |
| #ifdef TCMALLOC_ENABLED |
| ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage) |
| #endif |
| }; |
| |
| #ifdef TCMALLOC_ENABLED |
| static int64_t GetTCMallocProperty(const char* prop) { |
| size_t value; |
| if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) { |
| LOG(DFATAL) << "Failed to get tcmalloc property " << prop; |
| } |
| return value; |
| } |
| |
| static int64_t GetTCMallocCurrentAllocatedBytes() { |
| return GetTCMallocProperty("generic.current_allocated_bytes"); |
| } |
| #endif |
| |
| void MemTracker::CreateRootTracker() { |
| int64_t limit = FLAGS_memory_limit_hard_bytes; |
| if (limit == 0) { |
| // If no limit is provided, we'll use 80% of system RAM. |
| int64_t total_ram; |
| CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram)); |
| limit = total_ram * 4; |
| limit /= 5; |
| } |
| |
| ConsumptionFunction f; |
| #ifdef TCMALLOC_ENABLED |
| f = &GetTCMallocCurrentAllocatedBytes; |
| #endif |
| root_tracker.reset(new MemTracker(f, limit, "root", |
| shared_ptr<MemTracker>())); |
| root_tracker->Init(); |
| LOG(INFO) << StringPrintf("MemTracker: hard memory limit is %.6f GB", |
| (static_cast<float>(limit) / (1024.0 * 1024.0 * 1024.0))); |
| LOG(INFO) << StringPrintf("MemTracker: soft memory limit is %.6f GB", |
| (static_cast<float>(root_tracker->soft_limit_) / |
| (1024.0 * 1024.0 * 1024.0))); |
| } |
| |
| shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit, |
| const string& id, |
| const shared_ptr<MemTracker>& parent) { |
| shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker(); |
| MutexLock l(real_parent->child_trackers_lock_); |
| return CreateTrackerUnlocked(byte_limit, id, real_parent); |
| } |
| |
| shared_ptr<MemTracker> MemTracker::CreateTrackerUnlocked(int64_t byte_limit, |
| const string& id, |
| const shared_ptr<MemTracker>& parent) { |
| DCHECK(parent); |
| shared_ptr<MemTracker> tracker(new MemTracker(ConsumptionFunction(), byte_limit, id, parent)); |
| parent->AddChildTrackerUnlocked(tracker); |
| tracker->Init(); |
| |
| return tracker; |
| } |
| |
| MemTracker::MemTracker(ConsumptionFunction consumption_func, int64_t byte_limit, |
| const string& id, shared_ptr<MemTracker> parent) |
| : limit_(byte_limit), |
| id_(id), |
| descr_(Substitute("memory consumption for $0", id)), |
| parent_(std::move(parent)), |
| consumption_(0), |
| consumption_func_(std::move(consumption_func)), |
| rand_(GetRandomSeed32()), |
| enable_logging_(false), |
| log_stack_(false) { |
| VLOG(1) << "Creating tracker " << ToString(); |
| if (consumption_func_) { |
| UpdateConsumption(); |
| } |
| soft_limit_ = (limit_ == -1) |
| ? -1 : (limit_ * FLAGS_memory_limit_soft_percentage) / 100; |
| } |
| |
| MemTracker::~MemTracker() { |
| VLOG(1) << "Destroying tracker " << ToString(); |
| if (parent_) { |
| DCHECK(consumption() == 0) << "Memory tracker " << ToString() |
| << " has unreleased consumption " << consumption(); |
| parent_->Release(consumption()); |
| UnregisterFromParent(); |
| } |
| } |
| |
| void MemTracker::UnregisterFromParent() { |
| DCHECK(parent_); |
| MutexLock l(parent_->child_trackers_lock_); |
| if (child_tracker_it_ != parent_->child_trackers_.end()) { |
| parent_->child_trackers_.erase(child_tracker_it_); |
| child_tracker_it_ = parent_->child_trackers_.end(); |
| } |
| } |
| |
| string MemTracker::ToString() const { |
| string s; |
| const MemTracker* tracker = this; |
| while (tracker) { |
| if (s != "") { |
| s += "->"; |
| } |
| s += tracker->id(); |
| tracker = tracker->parent_.get(); |
| } |
| return s; |
| } |
| |
| bool MemTracker::FindTracker(const string& id, |
| shared_ptr<MemTracker>* tracker, |
| const shared_ptr<MemTracker>& parent) { |
| shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker(); |
| MutexLock l(real_parent->child_trackers_lock_); |
| return FindTrackerUnlocked(id, tracker, real_parent); |
| } |
| |
| bool MemTracker::FindTrackerUnlocked(const string& id, |
| shared_ptr<MemTracker>* tracker, |
| const shared_ptr<MemTracker>& parent) { |
| DCHECK(parent != NULL); |
| parent->child_trackers_lock_.AssertAcquired(); |
| for (const auto& child_weak : parent->child_trackers_) { |
| shared_ptr<MemTracker> child = child_weak.lock(); |
| if (child && child->id() == id) { |
| *tracker = std::move(child); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| shared_ptr<MemTracker> MemTracker::FindOrCreateTracker(int64_t byte_limit, |
| const string& id, |
| const shared_ptr<MemTracker>& parent) { |
| shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker(); |
| MutexLock l(real_parent->child_trackers_lock_); |
| shared_ptr<MemTracker> found; |
| if (FindTrackerUnlocked(id, &found, real_parent)) { |
| return found; |
| } |
| return CreateTrackerUnlocked(byte_limit, id, real_parent); |
| } |
| |
| void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) { |
| trackers->clear(); |
| deque<shared_ptr<MemTracker> > to_process; |
| to_process.push_front(GetRootTracker()); |
| while (!to_process.empty()) { |
| shared_ptr<MemTracker> t = to_process.back(); |
| to_process.pop_back(); |
| |
| trackers->push_back(t); |
| { |
| MutexLock l(t->child_trackers_lock_); |
| for (const auto& child_weak : t->child_trackers_) { |
| shared_ptr<MemTracker> child = child_weak.lock(); |
| if (child) { |
| to_process.emplace_back(std::move(child)); |
| } |
| } |
| } |
| } |
| } |
| |
| void MemTracker::UpdateConsumption() { |
| DCHECK(!consumption_func_.empty()); |
| DCHECK(parent_.get() == NULL); |
| consumption_.set_value(consumption_func_()); |
| } |
| |
| void MemTracker::Consume(int64_t bytes) { |
| if (bytes < 0) { |
| Release(-bytes); |
| return; |
| } |
| |
| if (!consumption_func_.empty()) { |
| UpdateConsumption(); |
| return; |
| } |
| if (bytes == 0) { |
| return; |
| } |
| if (PREDICT_FALSE(enable_logging_)) { |
| LogUpdate(true, bytes); |
| } |
| for (auto& tracker : all_trackers_) { |
| tracker->consumption_.IncrementBy(bytes); |
| if (!tracker->consumption_func_.empty()) { |
| DCHECK_GE(tracker->consumption_.current_value(), 0); |
| } |
| } |
| } |
| |
| bool MemTracker::TryConsume(int64_t bytes) { |
| if (!consumption_func_.empty()) { |
| UpdateConsumption(); |
| } |
| if (bytes <= 0) { |
| return true; |
| } |
| if (PREDICT_FALSE(enable_logging_)) { |
| LogUpdate(true, bytes); |
| } |
| |
| int i = 0; |
| // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent |
| // won't accommodate the change. |
| for (i = all_trackers_.size() - 1; i >= 0; --i) { |
| MemTracker *tracker = all_trackers_[i]; |
| if (tracker->limit_ < 0) { |
| tracker->consumption_.IncrementBy(bytes); |
| } else { |
| if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) { |
| // One of the trackers failed, attempt to GC memory or expand our limit. If that |
| // succeeds, TryUpdate() again. Bail if either fails. |
| if (!tracker->GcMemory(tracker->limit_ - bytes) || |
| tracker->ExpandLimit(bytes)) { |
| if (!tracker->consumption_.TryIncrementBy( |
| bytes, tracker->limit_)) { |
| break; |
| } |
| } else { |
| break; |
| } |
| } |
| } |
| } |
| // Everyone succeeded, return. |
| if (i == -1) { |
| return true; |
| } |
| |
| // Someone failed, roll back the ones that succeeded. |
| // TODO: this doesn't roll it back completely since the max values for |
| // the updated trackers aren't decremented. The max values are only used |
| // for error reporting so this is probably okay. Rolling those back is |
| // pretty hard; we'd need something like 2PC. |
| // |
| // TODO: This might leave us with an allocated resource that we can't use. Do we need |
| // to adjust the consumption of the query tracker to stop the resource from never |
| // getting used by a subsequent TryConsume()? |
| for (int j = all_trackers_.size() - 1; j > i; --j) { |
| all_trackers_[j]->consumption_.IncrementBy(-bytes); |
| } |
| return false; |
| } |
| |
| void MemTracker::Release(int64_t bytes) { |
| if (bytes < 0) { |
| Consume(-bytes); |
| return; |
| } |
| |
| if (PREDICT_FALSE(base::subtle::Barrier_AtomicIncrement(&released_memory_since_gc, bytes) > |
| GC_RELEASE_SIZE)) { |
| GcTcmalloc(); |
| } |
| |
| if (!consumption_func_.empty()) { |
| UpdateConsumption(); |
| return; |
| } |
| |
| if (bytes == 0) { |
| return; |
| } |
| if (PREDICT_FALSE(enable_logging_)) { |
| LogUpdate(false, bytes); |
| } |
| |
| for (auto& tracker : all_trackers_) { |
| tracker->consumption_.IncrementBy(-bytes); |
| // If a UDF calls FunctionContext::TrackAllocation() but allocates less than the |
| // reported amount, the subsequent call to FunctionContext::Free() may cause the |
| // process mem tracker to go negative until it is synced back to the tcmalloc |
| // metric. Don't blow up in this case. (Note that this doesn't affect non-process |
| // trackers since we can enforce that the reported memory usage is internally |
| // consistent.) |
| if (!tracker->consumption_func_.empty()) { |
| DCHECK_GE(tracker->consumption_.current_value(), 0); |
| } |
| } |
| } |
| |
| bool MemTracker::AnyLimitExceeded() { |
| for (const auto& tracker : limit_trackers_) { |
| if (tracker->LimitExceeded()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| bool MemTracker::LimitExceeded() { |
| if (PREDICT_FALSE(CheckLimitExceeded())) { |
| return GcMemory(limit_); |
| } |
| return false; |
| } |
| |
| bool MemTracker::SoftLimitExceeded(double* current_capacity_pct) { |
| // Did we exceed the actual limit? |
| if (LimitExceeded()) { |
| if (current_capacity_pct) { |
| *current_capacity_pct = |
| static_cast<double>(consumption()) / limit() * 100; |
| } |
| return true; |
| } |
| |
| // No soft limit defined. |
| if (!has_limit() || limit_ == soft_limit_) { |
| return false; |
| } |
| |
| // Are we under the soft limit threshold? |
| int64_t usage = consumption(); |
| if (usage < soft_limit_) { |
| return false; |
| } |
| |
| // We're over the threshold; were we randomly chosen to be over the soft limit? |
| if (usage + rand_.Uniform64(limit_ - soft_limit_) > limit_) { |
| bool exceeded = GcMemory(soft_limit_); |
| if (exceeded && current_capacity_pct) { |
| *current_capacity_pct = |
| static_cast<double>(consumption()) / limit() * 100; |
| } |
| return exceeded; |
| } |
| return false; |
| } |
| |
| bool MemTracker::AnySoftLimitExceeded(double* current_capacity_pct) { |
| for (MemTracker* t : limit_trackers_) { |
| if (t->SoftLimitExceeded(current_capacity_pct)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| int64_t MemTracker::SpareCapacity() const { |
| int64_t result = std::numeric_limits<int64_t>::max(); |
| for (const auto& tracker : limit_trackers_) { |
| int64_t mem_left = tracker->limit() - tracker->consumption(); |
| result = std::min(result, mem_left); |
| } |
| return result; |
| } |
| |
| bool MemTracker::GcMemory(int64_t max_consumption) { |
| if (max_consumption < 0) { |
| // Impossible to GC enough memory to reach the goal. |
| return true; |
| } |
| |
| lock_guard<simple_spinlock> l(&gc_lock_); |
| if (!consumption_func_.empty()) { |
| UpdateConsumption(); |
| } |
| uint64_t pre_gc_consumption = consumption(); |
| // Check if someone gc'd before us |
| if (pre_gc_consumption < max_consumption) { |
| return false; |
| } |
| |
| // Try to free up some memory |
| for (const auto& gc_function : gc_functions_) { |
| gc_function(); |
| if (!consumption_func_.empty()) { |
| UpdateConsumption(); |
| } |
| if (consumption() <= max_consumption) { |
| break; |
| } |
| } |
| |
| return consumption() > max_consumption; |
| } |
| |
| void MemTracker::GcTcmalloc() { |
| #ifdef TCMALLOC_ENABLED |
| released_memory_since_gc = 0; |
| TRACE_EVENT0("process", "MemTracker::GcTcmalloc"); |
| |
| // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but |
| // not in use). |
| int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes"); |
| // Bytes allocated by the application. |
| int64_t bytes_used = GetTCMallocCurrentAllocatedBytes(); |
| |
| int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0; |
| if (bytes_overhead > max_overhead) { |
| int64_t extra = bytes_overhead - max_overhead; |
| while (extra > 0) { |
| // Release 1MB at a time, so that tcmalloc releases its page heap lock |
| // allowing other threads to make progress. This still disrupts the current |
| // thread, but is better than disrupting all. |
| MallocExtension::instance()->ReleaseToSystem(1024 * 1024); |
| extra -= 1024 * 1024; |
| } |
| } |
| |
| #else |
| // Nothing to do if not using tcmalloc. |
| #endif |
| } |
| |
| string MemTracker::LogUsage(const string& prefix) const { |
| stringstream ss; |
| ss << prefix << id_ << ":"; |
| if (CheckLimitExceeded()) { |
| ss << " memory limit exceeded."; |
| } |
| if (limit_ > 0) { |
| ss << " Limit=" << HumanReadableNumBytes::ToString(limit_); |
| } |
| ss << " Consumption=" << HumanReadableNumBytes::ToString(consumption()); |
| |
| stringstream prefix_ss; |
| prefix_ss << prefix << " "; |
| string new_prefix = prefix_ss.str(); |
| MutexLock l(child_trackers_lock_); |
| if (!child_trackers_.empty()) { |
| ss << "\n" << LogUsage(new_prefix, child_trackers_); |
| } |
| return ss.str(); |
| } |
| |
| void MemTracker::Init() { |
| // populate all_trackers_ and limit_trackers_ |
| MemTracker* tracker = this; |
| while (tracker) { |
| all_trackers_.push_back(tracker); |
| if (tracker->has_limit()) limit_trackers_.push_back(tracker); |
| tracker = tracker->parent_.get(); |
| } |
| DCHECK_GT(all_trackers_.size(), 0); |
| DCHECK_EQ(all_trackers_[0], this); |
| } |
| |
| void MemTracker::AddChildTrackerUnlocked(const shared_ptr<MemTracker>& tracker) { |
| child_trackers_lock_.AssertAcquired(); |
| #ifndef NDEBUG |
| shared_ptr<MemTracker> found; |
| CHECK(!FindTrackerUnlocked(tracker->id(), &found, shared_from_this())) |
| << Substitute("Duplicate memory tracker (id $0) on parent $1", |
| tracker->id(), ToString()); |
| #endif |
| tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); |
| } |
| |
| void MemTracker::LogUpdate(bool is_consume, int64_t bytes) const { |
| stringstream ss; |
| ss << this << " " << (is_consume ? "Consume: " : "Release: ") << bytes |
| << " Consumption: " << consumption() << " Limit: " << limit_; |
| if (log_stack_) { |
| ss << std::endl << GetStackTrace(); |
| } |
| LOG(ERROR) << ss.str(); |
| } |
| |
| string MemTracker::LogUsage(const string& prefix, |
| const list<weak_ptr<MemTracker>>& trackers) { |
| vector<string> usage_strings; |
| for (const auto& child_weak : trackers) { |
| shared_ptr<MemTracker> child = child_weak.lock(); |
| if (child) { |
| usage_strings.push_back(child->LogUsage(prefix)); |
| } |
| } |
| return JoinStrings(usage_strings, "\n"); |
| } |
| |
| shared_ptr<MemTracker> MemTracker::GetRootTracker() { |
| GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker); |
| return root_tracker; |
| } |
| |
| } // namespace kudu |