blob: 68bf748307f089b8b137fd1cbb6fc095927d384d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstddef>
#include <memory>
#include <ostream>
#include <gflags/gflags.h>
#include <glog/logging.h>
#ifdef TCMALLOC_ENABLED
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
#endif
#include "kudu/util/process_memory.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h" // IWYU pragma: keep
#include "kudu/util/debug/trace_event.h" // IWYU pragma: keep
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/mem_tracker.h" // IWYU pragma: keep
#include "kudu/util/random.h"
#include "kudu/util/status.h"
DEFINE_int64(memory_limit_hard_bytes, 0,
"Maximum amount of memory this daemon should use, in bytes. "
"A value of 0 autosizes based on the total system memory. "
"A value of -1 disables all memory limiting.");
TAG_FLAG(memory_limit_hard_bytes, stable);
DEFINE_int32(memory_pressure_percentage, 60,
"Percentage of the hard memory limit that this daemon may "
"consume before flushing of in-memory data becomes prioritized.");
TAG_FLAG(memory_pressure_percentage, advanced);
DEFINE_int32(memory_limit_soft_percentage, 80,
"Percentage of the hard memory limit that this daemon may "
"consume before memory throttling of writes begins. The greater "
"the excess, the higher the chance of throttling. In general, a "
"lower soft limit leads to smoother write latencies but "
"decreased throughput, and vice versa for a higher soft limit.");
TAG_FLAG(memory_limit_soft_percentage, advanced);
DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
"Percentage of the hard memory limit that this daemon may "
"consume before WARNING level messages are periodically logged.");
TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
#ifdef TCMALLOC_ENABLED
DEFINE_bool(disable_tcmalloc_gc_by_memory_tracker_for_testing, false,
"For testing only! Whether to disable tcmalloc GC by memory tracker.");
TAG_FLAG(disable_tcmalloc_gc_by_memory_tracker_for_testing, hidden);
DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
"Maximum percentage of the RSS that tcmalloc is allowed to use for "
"reserved but unallocated memory.");
TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
#endif
using strings::Substitute;
namespace kudu {
namespace process_memory {
namespace {
int64_t g_hard_limit;
int64_t g_soft_limit;
int64_t g_pressure_threshold;
ThreadSafeRandom* g_rand = nullptr;
#ifdef TCMALLOC_ENABLED
// Total amount of memory released since the last GC. If this
// is greater than kGcReleaseSize, this will trigger a tcmalloc gc.
Atomic64 g_released_memory_since_gc;
// Size, in bytes, that is considered a large value for Release() (or Consume() with
// a negative value). If tcmalloc is used, this can trigger it to GC.
// A higher value will make us call into tcmalloc less often (and therefore more
// efficient). A lower value will mean our memory overhead is lower.
// TODO(todd): this is a stopgap.
const int64_t kGcReleaseSize = 128 * 1024L * 1024L;
#endif // TCMALLOC_ENABLED
} // anonymous namespace
// Flag validation
// ------------------------------------------------------------
// Validate that various flags are percentages.
static bool ValidatePercentage(const char* flagname, int value) {
if (value >= 0 && value <= 100) {
return true;
}
LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
flagname, value);
return false;
}
DEFINE_validator(memory_limit_soft_percentage, &ValidatePercentage);
DEFINE_validator(memory_limit_warn_threshold_percentage, &ValidatePercentage);
#ifdef TCMALLOC_ENABLED
DEFINE_validator(tcmalloc_max_free_bytes_percentage, &ValidatePercentage);
#endif
// Wrappers around tcmalloc functionality
// ------------------------------------------------------------
#ifdef TCMALLOC_ENABLED
static int64_t GetTCMallocProperty(const char* prop) {
size_t value;
if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
}
return value;
}
int64_t GetTCMallocCurrentAllocatedBytes() {
return GetTCMallocProperty("generic.current_allocated_bytes");
}
void GcTcmalloc() {
TRACE_EVENT0("process", "GcTcmalloc");
// Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
// not in use).
int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
// Bytes allocated by the application.
int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
if (bytes_overhead > max_overhead) {
int64_t extra = bytes_overhead - max_overhead;
while (extra > 0) {
// Release 1MB at a time, so that tcmalloc releases its page heap lock
// allowing other threads to make progress. This still disrupts the current
// thread, but is better than disrupting all.
MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
extra -= 1024 * 1024;
}
}
}
#endif // TCMALLOC_ENABLED
// Consumption and soft memory limit behavior
// ------------------------------------------------------------
namespace {
void DoInitLimits() {
int64_t limit = FLAGS_memory_limit_hard_bytes;
if (limit == 0) {
// If no limit is provided, we'll use 80% of system RAM.
int64_t total_ram;
CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
limit = total_ram * 4;
limit /= 5;
}
g_hard_limit = limit;
g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
g_pressure_threshold = FLAGS_memory_pressure_percentage * g_hard_limit / 100;
g_rand = new ThreadSafeRandom(1);
}
void InitLimits() {
static GoogleOnceType once;
GoogleOnceInit(&once, &DoInitLimits);
}
} // anonymous namespace
int64_t CurrentConsumption() {
#ifdef TCMALLOC_ENABLED
const int64_t kReadIntervalMicros = 50000;
static Atomic64 last_read_time = 0;
static simple_spinlock read_lock;
static Atomic64 consumption = 0;
uint64_t time = GetMonoTimeMicros();
if (time > last_read_time + kReadIntervalMicros && read_lock.try_lock()) {
base::subtle::NoBarrier_Store(&consumption, GetTCMallocCurrentAllocatedBytes());
// Re-fetch the time after getting the consumption. This way, in case fetching
// consumption is extremely slow for some reason (eg due to lots of contention
// in tcmalloc) we at least ensure that we wait at least another full interval
// before fetching the information again.
time = GetMonoTimeMicros();
base::subtle::NoBarrier_Store(&last_read_time, time);
read_lock.unlock();
}
return base::subtle::NoBarrier_Load(&consumption);
#else
// Without tcmalloc, we have no reliable way of determining our own heap
// size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
// to just looking at the sum of our tracked memory.
return MemTracker::GetRootTracker()->consumption();
#endif
}
int64_t HardLimit() {
InitLimits();
return g_hard_limit;
}
int64_t SoftLimit() {
InitLimits();
return g_soft_limit;
}
int64_t MemoryPressureThreshold() {
InitLimits();
return g_pressure_threshold;
}
bool UnderMemoryPressure(double* current_capacity_pct) {
InitLimits();
int64_t consumption = CurrentConsumption();
if (consumption < g_pressure_threshold) {
return false;
}
if (current_capacity_pct) {
*current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
}
return true;
}
bool SoftLimitExceeded(double* current_capacity_pct) {
InitLimits();
int64_t consumption = CurrentConsumption();
// Did we exceed the actual limit?
if (consumption > g_hard_limit) {
if (current_capacity_pct) {
*current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
}
return true;
}
// No soft limit defined.
if (g_hard_limit == g_soft_limit) {
return false;
}
// Are we under the soft limit threshold?
if (consumption < g_soft_limit) {
return false;
}
// We're over the threshold; were we randomly chosen to be over the soft limit?
if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) {
if (current_capacity_pct) {
*current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
}
return true;
}
return false;
}
void MaybeGCAfterRelease(int64_t released_bytes) {
#ifdef TCMALLOC_ENABLED
int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
&g_released_memory_since_gc, released_bytes);
if (PREDICT_FALSE(now_released > kGcReleaseSize
&& !FLAGS_disable_tcmalloc_gc_by_memory_tracker_for_testing)) {
base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
GcTcmalloc();
}
#endif
}
} // namespace process_memory
} // namespace kudu