blob: fb4a52ac21ab58d86e4a2f1876f06f71873a2321 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/memory-metrics.h"
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-tracker.h"
#include "util/jni-util.h"
#include "util/mem-info.h"
#include "util/process-state-info.h"
#include "util/time.h"
using boost::algorithm::to_lower;
using boost::lock_guard;
using namespace impala;
using namespace strings;
DECLARE_bool(mem_limit_includes_jvm);
DECLARE_bool(mmap_buffers);
DEFINE_bool_hidden(enable_extended_memory_metrics, false,
"(Experimental) enable extended memory metrics, including those that can be "
"expensive to compute. This was introduced as a workaround for poor /proc/*/smaps "
"performance in certain Linux kernel versions - see IMPALA-7239.");
SumGauge* AggregateMemoryMetrics::TOTAL_USED = nullptr;
IntGauge* AggregateMemoryMetrics::NUM_MAPS = nullptr;
IntGauge* AggregateMemoryMetrics::MAPPED_BYTES = nullptr;
IntGauge* AggregateMemoryMetrics::RSS = nullptr;
IntGauge* AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES = nullptr;
StringProperty* AggregateMemoryMetrics::THP_ENABLED = nullptr;
StringProperty* AggregateMemoryMetrics::THP_DEFRAG = nullptr;
StringProperty* AggregateMemoryMetrics::THP_KHUGEPAGED_DEFRAG = nullptr;
TcmallocMetric* TcmallocMetric::BYTES_IN_USE = nullptr;
TcmallocMetric* TcmallocMetric::PAGEHEAP_FREE_BYTES = nullptr;
TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = nullptr;
TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = nullptr;
TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = nullptr;
SanitizerMallocMetric* SanitizerMallocMetric::BYTES_ALLOCATED = nullptr;
bool JvmMemoryMetric::initialized_ = false;
JvmMemoryMetric* JvmMemoryMetric::HEAP_MAX_USAGE = nullptr;
JvmMemoryMetric* JvmMemoryMetric::NON_HEAP_COMMITTED = nullptr;
BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
BufferPoolMetric* BufferPoolMetric::UNUSED_RESERVATION_BYTES = nullptr;
BufferPoolMetric* BufferPoolMetric::NUM_FREE_BUFFERS = nullptr;
BufferPoolMetric* BufferPoolMetric::FREE_BUFFER_BYTES = nullptr;
BufferPoolMetric* BufferPoolMetric::CLEAN_PAGES_LIMIT = nullptr;
BufferPoolMetric* BufferPoolMetric::NUM_CLEAN_PAGES = nullptr;
BufferPoolMetric* BufferPoolMetric::CLEAN_PAGE_BYTES = nullptr;
TcmallocMetric* TcmallocMetric::CreateAndRegister(
MetricGroup* metrics, const string& key, const string& tcmalloc_var) {
return metrics->RegisterMetric(new TcmallocMetric(MetricDefs::Get(key), tcmalloc_var));
}
Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics,
ReservationTracker* global_reservations, BufferPool* buffer_pool) {
if (global_reservations != nullptr) {
DCHECK(buffer_pool != nullptr);
RETURN_IF_ERROR(BufferPoolMetric::InitMetrics(
metrics->GetOrCreateChildGroup("buffer-pool"), global_reservations, buffer_pool));
}
// Add compound metrics that track totals across malloc and the buffer pool.
// total-used should track the total physical memory in use.
vector<IntGauge*> used_metrics;
if (FLAGS_mmap_buffers && global_reservations != nullptr) {
// If we mmap() buffers, the buffers are not allocated via malloc. Ensure they are
// properly tracked.
used_metrics.push_back(BufferPoolMetric::SYSTEM_ALLOCATED);
}
if (register_jvm_metrics) {
JvmMemoryMetric::InitMetrics(metrics);
if (FLAGS_mem_limit_includes_jvm) {
used_metrics.push_back(JvmMemoryMetric::HEAP_MAX_USAGE);
used_metrics.push_back(JvmMemoryMetric::NON_HEAP_COMMITTED);
}
}
#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
SanitizerMallocMetric::BYTES_ALLOCATED = metrics->RegisterMetric(
new SanitizerMallocMetric(MetricDefs::Get("sanitizer-total-bytes-allocated")));
used_metrics.push_back(SanitizerMallocMetric::BYTES_ALLOCATED);
#else
MetricGroup* tcmalloc_metrics = metrics->GetOrCreateChildGroup("tcmalloc");
// We rely on TCMalloc for our global memory metrics, so skip setting them up
// if we're not using TCMalloc.
TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(
tcmalloc_metrics, "tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
TcmallocMetric::TOTAL_BYTES_RESERVED = TcmallocMetric::CreateAndRegister(
tcmalloc_metrics, "tcmalloc.total-bytes-reserved", "generic.heap_size");
TcmallocMetric::PAGEHEAP_FREE_BYTES = TcmallocMetric::CreateAndRegister(
tcmalloc_metrics, "tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes");
TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES =
TcmallocMetric::CreateAndRegister(tcmalloc_metrics,
"tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes");
TcmallocMetric::PHYSICAL_BYTES_RESERVED =
tcmalloc_metrics->RegisterMetric(new TcmallocMetric::PhysicalBytesMetric(
MetricDefs::Get("tcmalloc.physical-bytes-reserved")));
used_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
#endif
MetricGroup* aggregate_metrics = metrics->GetOrCreateChildGroup("memory");
AggregateMemoryMetrics::TOTAL_USED = aggregate_metrics->RegisterMetric(
new SumGauge(MetricDefs::Get("memory.total-used"), used_metrics));
if (FLAGS_enable_extended_memory_metrics && MemInfo::HaveSmaps()) {
AggregateMemoryMetrics::NUM_MAPS = aggregate_metrics->AddGauge("memory.num-maps", 0U);
AggregateMemoryMetrics::ANON_HUGE_PAGE_BYTES =
aggregate_metrics->AddGauge("memory.anon-huge-page-bytes", 0U);
}
AggregateMemoryMetrics::MAPPED_BYTES =
aggregate_metrics->AddGauge("memory.mapped-bytes", 0U);
AggregateMemoryMetrics::RSS = aggregate_metrics->AddGauge("memory.rss", 0U);
ThpConfig thp_config = MemInfo::ParseThpConfig();
AggregateMemoryMetrics::THP_ENABLED =
aggregate_metrics->AddProperty("memory.thp.enabled", thp_config.enabled);
AggregateMemoryMetrics::THP_DEFRAG =
aggregate_metrics->AddProperty("memory.thp.defrag", thp_config.defrag);
AggregateMemoryMetrics::THP_KHUGEPAGED_DEFRAG = aggregate_metrics->AddProperty(
"memory.thp.khugepaged-defrag", thp_config.khugepaged_defrag);
AggregateMemoryMetrics::Refresh();
return Status::OK();
}
void AggregateMemoryMetrics::Refresh() {
if (NUM_MAPS != nullptr) {
// Only call ParseSmaps() if the metrics were created.
MappedMemInfo map_info = MemInfo::ParseSmaps();
NUM_MAPS->SetValue(map_info.num_maps);
ANON_HUGE_PAGE_BYTES->SetValue(map_info.anon_huge_pages_kb * 1024);
}
ProcessStateInfo proc_state(false);
MAPPED_BYTES->SetValue(proc_state.GetVmSize());
RSS->SetValue(proc_state.GetRss());
ThpConfig thp_config = MemInfo::ParseThpConfig();
THP_ENABLED->SetValue(thp_config.enabled);
THP_DEFRAG->SetValue(thp_config.defrag);
THP_KHUGEPAGED_DEFRAG->SetValue(thp_config.khugepaged_defrag);
}
JvmMemoryMetric* JvmMemoryMetric::CreateAndRegister(
MetricGroup* metrics, const string& key, const string& pool_name,
JvmMemoryMetricType type) {
string pool_name_for_key = pool_name;
to_lower(pool_name_for_key);
replace(pool_name_for_key.begin(), pool_name_for_key.end(), ' ', '-');
return metrics->RegisterMetric(
new JvmMemoryMetric(MetricDefs::Get(key, pool_name_for_key), pool_name, type));
}
JvmMemoryMetric::JvmMemoryMetric(const TMetricDef& def, const string& mempool_name,
JvmMemoryMetricType type) : IntGauge(def, 0) {
mempool_name_ = mempool_name;
metric_type_ = type;
}
int64_t JvmMemoryMetric::GetValue() {
return JvmMetricCache::GetInstance()->GetPoolMetric(mempool_name_, metric_type_);
}
void JvmMemoryMetric::InitMetrics(MetricGroup* parent) {
if (initialized_) return;
MetricGroup* metrics = parent->GetOrCreateChildGroup("jvm");
vector<string> names = JvmMetricCache::GetInstance()->GetPoolNames();
for (const string& name : names) {
JvmMemoryMetric* pool_max_usage =
JvmMemoryMetric::CreateAndRegister(metrics, "jvm.$0.max-usage-bytes", name, MAX);
if (name == "heap") HEAP_MAX_USAGE = pool_max_usage;
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.current-usage-bytes", name, CURRENT);
JvmMemoryMetric* pool_committed = JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.committed-usage-bytes", name, COMMITTED);
if (name == "non-heap") NON_HEAP_COMMITTED = pool_committed;
JvmMemoryMetric::CreateAndRegister(metrics, "jvm.$0.init-usage-bytes", name, INIT);
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.peak-max-usage-bytes", name, PEAK_MAX);
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.peak-current-usage-bytes", name, PEAK_CURRENT);
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.peak-committed-usage-bytes", name, PEAK_COMMITTED);
JvmMemoryMetric::CreateAndRegister(
metrics, "jvm.$0.peak-init-usage-bytes", name, PEAK_INIT);
}
JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_time_millis",
[](const TGetJvmMemoryMetricsResponse& r) { return r.gc_time_millis; });
JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_info_threshold_exceeded",
[](const TGetJvmMemoryMetricsResponse& r) {
return r.gc_num_info_threshold_exceeded;
});
JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_num_warn_threshold_exceeded",
[](const TGetJvmMemoryMetricsResponse& r) {
return r.gc_num_warn_threshold_exceeded;
});
JvmMemoryCounterMetric::CreateAndRegister(metrics, "jvm.gc_count",
[](const TGetJvmMemoryMetricsResponse& r) { return r.gc_count; });
JvmMemoryCounterMetric::CreateAndRegister(metrics,
"jvm.gc_total_extra_sleep_time_millis", [](const TGetJvmMemoryMetricsResponse& r) {
return r.gc_total_extra_sleep_time_millis;
});
initialized_ = true;
}
JvmMemoryCounterMetric* JvmMemoryCounterMetric::CreateAndRegister(
MetricGroup* metrics, const string& key,
int64_t(*accessor)(const TGetJvmMemoryMetricsResponse&)) {
return metrics->RegisterMetric(
new JvmMemoryCounterMetric(MetricDefs::Get(key), accessor));
}
JvmMemoryCounterMetric::JvmMemoryCounterMetric(const TMetricDef& def,
int64_t(*accessor)(const TGetJvmMemoryMetricsResponse&))
: IntCounter(def, 0) {
accessor_ = accessor;
}
int64_t JvmMemoryCounterMetric::GetValue() {
return JvmMetricCache::GetInstance()->GetCounterMetric(accessor_);
}
JvmMetricCache* JvmMetricCache::GetInstance() {
static JvmMetricCache instance;
return &instance;
}
void JvmMetricCache::GrabMetricsIfNecessary() {
int64_t now = MonotonicMillis();
if (now - last_fetch_ < CACHE_PERIOD_MILLIS) return;
Status status = JniUtil::GetJvmMemoryMetrics(&last_response_);
if (!status.ok()) {
VLOG_QUERY << "Couldn't retrieve JVM Memory Metrics: " << status.GetDetail();
return;
}
last_fetch_ = now;
return;
}
int64_t JvmMetricCache::GetCounterMetric(
int64_t(*accessor)(const TGetJvmMemoryMetricsResponse&)) {
lock_guard<boost::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
return accessor(last_response_);
}
int64_t JvmMetricCache::GetPoolMetric(const std::string& mempool_name,
JvmMemoryMetricType type) {
lock_guard<boost::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
for (const TJvmMemoryPool& pool : last_response_.memory_pools) {
if (pool.name == mempool_name) {
switch (type) {
case MAX:
return pool.max;
case INIT:
return pool.init;
case CURRENT:
return pool.used;
case COMMITTED:
return pool.committed;
case PEAK_MAX:
return pool.peak_max;
case PEAK_INIT:
return pool.peak_init;
case PEAK_CURRENT:
return pool.peak_used;
case PEAK_COMMITTED:
return pool.peak_committed;
default:
DCHECK(false) << "Unknown JvmMemoryMetricType: " << type;
}
}
}
DCHECK(false) << "Could not find pool: " << mempool_name;
return 0;
}
vector<string> JvmMetricCache::GetPoolNames() {
lock_guard<boost::mutex> lock_guard(lock_);
GrabMetricsIfNecessary();
vector<string> names;
for (const TJvmMemoryPool& usage: last_response_.memory_pools) {
names.push_back(usage.name);
}
return names;
}
Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
ReservationTracker* global_reservations, BufferPool* buffer_pool) {
LIMIT = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.limit"),
BufferPoolMetricType::LIMIT, global_reservations, buffer_pool));
SYSTEM_ALLOCATED = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.system-allocated"),
BufferPoolMetricType::SYSTEM_ALLOCATED, global_reservations, buffer_pool));
RESERVED = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.reserved"),
BufferPoolMetricType::RESERVED, global_reservations, buffer_pool));
UNUSED_RESERVATION_BYTES = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.unused-reservation-bytes"),
BufferPoolMetricType::UNUSED_RESERVATION_BYTES, global_reservations,
buffer_pool));
NUM_FREE_BUFFERS = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffers"),
BufferPoolMetricType::NUM_FREE_BUFFERS, global_reservations, buffer_pool));
FREE_BUFFER_BYTES = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffer-bytes"),
BufferPoolMetricType::FREE_BUFFER_BYTES, global_reservations, buffer_pool));
CLEAN_PAGES_LIMIT = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages-limit"),
BufferPoolMetricType::CLEAN_PAGES_LIMIT, global_reservations, buffer_pool));
NUM_CLEAN_PAGES = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages"),
BufferPoolMetricType::NUM_CLEAN_PAGES, global_reservations, buffer_pool));
CLEAN_PAGE_BYTES = metrics->RegisterMetric(
new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-page-bytes"),
BufferPoolMetricType::CLEAN_PAGE_BYTES, global_reservations, buffer_pool));
return Status::OK();
}
BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType type,
ReservationTracker* global_reservations, BufferPool* buffer_pool)
: IntGauge(def, 0),
type_(type),
global_reservations_(global_reservations),
buffer_pool_(buffer_pool) {}
int64_t BufferPoolMetric::GetValue() {
// IMPALA-6362: we have to be careful that none of the below calls to ReservationTracker
// methods acquire ReservationTracker::lock_ to avoid a potential circular dependency
// with MemTracker::child_trackers_lock_, which may be held when refreshing MemTracker
// consumption.
switch (type_) {
case BufferPoolMetricType::LIMIT:
return buffer_pool_->GetSystemBytesLimit();
case BufferPoolMetricType::SYSTEM_ALLOCATED:
return buffer_pool_->GetSystemBytesAllocated();
case BufferPoolMetricType::RESERVED:
return global_reservations_->GetReservation();
case BufferPoolMetricType::UNUSED_RESERVATION_BYTES: {
// Estimate the unused reservation based on other aggregate values, defined as
// the total bytes of reservation where there is no corresponding buffer in use
// by a client. Buffers are either in-use, free buffers, or attached to clean pages.
int64_t total_used_reservation = buffer_pool_->GetSystemBytesAllocated()
- buffer_pool_->GetFreeBufferBytes()
- buffer_pool_->GetCleanPageBytes();
return global_reservations_->GetReservation() - total_used_reservation;
}
case BufferPoolMetricType::NUM_FREE_BUFFERS:
return buffer_pool_->GetNumFreeBuffers();
case BufferPoolMetricType::FREE_BUFFER_BYTES:
return buffer_pool_->GetFreeBufferBytes();
case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
return buffer_pool_->GetCleanPageBytesLimit();
case BufferPoolMetricType::NUM_CLEAN_PAGES:
return buffer_pool_->GetNumCleanPages();
case BufferPoolMetricType::CLEAN_PAGE_BYTES:
return buffer_pool_->GetCleanPageBytes();
default:
DCHECK(false) << "Unknown BufferPoolMetricType: " << static_cast<int>(type_);
}
return 0;
}
void MemTrackerMetric::CreateMetrics(MetricGroup* metrics, MemTracker* mem_tracker,
const string& name) {
metrics->RegisterMetric(
new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.current_usage_bytes", name),
MemTrackerMetricType::CURRENT, mem_tracker));
metrics->RegisterMetric(
new MemTrackerMetric(MetricDefs::Get("mem-tracker.$0.peak_usage_bytes", name),
MemTrackerMetricType::PEAK, mem_tracker));
}
MemTrackerMetric::MemTrackerMetric(const TMetricDef& def, MemTrackerMetricType type,
MemTracker* mem_tracker)
: IntGauge(def, 0),
type_(type),
mem_tracker_(mem_tracker) {}
int64_t MemTrackerMetric::GetValue() {
switch (type_) {
case MemTrackerMetricType::CURRENT:
return mem_tracker_->consumption();
case MemTrackerMetricType::PEAK:
return mem_tracker_->peak_consumption();
default:
DCHECK(false) << "Unknown MemTrackerMetricType: " << static_cast<int>(type_);
}
return 0;
}