blob: a17380c245fdcf12e86a9e5cb0faff84edde576a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/daemon.h"
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <gflags/gflags.h>
#include "runtime/memory/jemalloc_control.h"
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
!defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
#endif
// IWYU pragma: no_include <bits/std_abs.h>
#include <butil/iobuf.h>
#include <math.h>
#include <stdint.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
#include <string>
#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/be_proc_monitor.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
#include "runtime/process_profile.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/algorithm_util.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/perf_counters.h"
#include "util/system_metrics.h"
#include "util/time.h"
namespace doris {
namespace {
std::atomic<int64_t> last_print_proc_mem = 0;
std::atomic<int32_t> refresh_cache_capacity_sleep_time_ms = 0;
std::atomic<int32_t> memory_gc_sleep_time = 0;
#ifdef USE_JEMALLOC
std::atomic<int32_t> je_reset_dirty_decay_sleep_time_ms = 0;
#endif
void update_rowsets_and_segments_num_metrics() {
if (config::is_cloud_mode()) {
// TODO(plat1ko): CloudStorageEngine
} else {
StorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_local();
auto* metrics = DorisMetrics::instance();
metrics->all_rowsets_num->set_value(engine.tablet_manager()->get_rowset_nums());
metrics->all_segments_num->set_value(engine.tablet_manager()->get_segment_nums());
}
}
} // namespace
void Daemon::tcmalloc_gc_thread() {
// TODO All cache GC wish to be supported
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \
!defined(USE_JEMALLOC)
// Limit size of tcmalloc cache via release_rate and max_cache_percent.
// We adjust release_rate according to memory_pressure, which is usage percent of memory.
int64_t max_cache_percent = 60;
double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 500.0, 2000.0};
int64_t pressure_limit = 90;
bool is_performance_mode = false;
int64_t physical_limit_bytes =
std::min(MemInfo::physical_mem() - MemInfo::sys_mem_available_low_water_mark(),
MemInfo::mem_limit());
if (config::memory_mode == std::string("performance")) {
max_cache_percent = 100;
pressure_limit = 90;
is_performance_mode = true;
physical_limit_bytes = std::min(MemInfo::mem_limit(), MemInfo::physical_mem());
} else if (config::memory_mode == std::string("compact")) {
max_cache_percent = 20;
pressure_limit = 80;
}
int last_ms = 0;
const int kMaxLastMs = 30000;
const int kIntervalMs = 10;
size_t init_aggressive_decommit = 0;
size_t current_aggressive_decommit = 0;
size_t expected_aggressive_decommit = 0;
int64_t last_memory_pressure = 0;
MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit",
&init_aggressive_decommit);
current_aggressive_decommit = init_aggressive_decommit;
while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs))) {
size_t tc_used_bytes = 0;
size_t tc_alloc_bytes = 0;
size_t rss = PerfCounters::get_vm_rss();
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
&tc_alloc_bytes);
MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes",
&tc_used_bytes);
int64_t tc_cached_bytes = (int64_t)tc_alloc_bytes - (int64_t)tc_used_bytes;
int64_t to_free_bytes =
(int64_t)tc_cached_bytes - ((int64_t)tc_used_bytes * max_cache_percent / 100);
to_free_bytes = std::max(to_free_bytes, (int64_t)0);
int64_t memory_pressure = 0;
int64_t rss_pressure = 0;
int64_t alloc_bytes = std::max(rss, tc_alloc_bytes);
memory_pressure = alloc_bytes * 100 / physical_limit_bytes;
rss_pressure = rss * 100 / physical_limit_bytes;
expected_aggressive_decommit = init_aggressive_decommit;
if (memory_pressure > pressure_limit) {
// We are reaching oom, so release cache aggressively.
// Ideally, we should reuse cache and not allocate from system any more,
// however, it is hard to set limit on cache of tcmalloc and doris
// use mmap in vectorized mode.
// Limit cache capactiy is enough.
if (rss_pressure > pressure_limit) {
int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 9 / 10;
to_free_bytes = std::max(to_free_bytes, min_free_bytes);
to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 100);
// We assure that we have at least 500M bytes in cache.
to_free_bytes = std::min(to_free_bytes, tc_cached_bytes - 500 * 1024 * 1024);
expected_aggressive_decommit = 1;
}
last_ms = kMaxLastMs;
} else if (memory_pressure > (pressure_limit - 10)) {
// In most cases, adjusting release rate is enough, if memory are consumed quickly
// we should release manually.
if (last_memory_pressure <= (pressure_limit - 10)) {
to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 100);
}
}
int release_rate_index = int(memory_pressure) / 10;
double release_rate = 1.0;
if (release_rate_index >= sizeof(release_rates) / sizeof(release_rates[0])) {
release_rate = 2000.0;
} else {
release_rate = release_rates[release_rate_index];
}
MallocExtension::instance()->SetMemoryReleaseRate(release_rate);
if ((current_aggressive_decommit != expected_aggressive_decommit) && !is_performance_mode) {
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
expected_aggressive_decommit);
current_aggressive_decommit = expected_aggressive_decommit;
}
last_memory_pressure = memory_pressure;
// We release at least 2% bytes once, frequent releasing hurts performance.
if (to_free_bytes > (physical_limit_bytes * 2 / 100)) {
last_ms += kIntervalMs;
if (last_ms >= kMaxLastMs) {
LOG(INFO) << "generic.current_allocated_bytes " << tc_used_bytes
<< ", generic.total_physical_bytes " << tc_alloc_bytes << ", rss " << rss
<< ", max_cache_percent " << max_cache_percent << ", release_rate "
<< release_rate << ", memory_pressure " << memory_pressure
<< ", physical_limit_bytes " << physical_limit_bytes << ", to_free_bytes "
<< to_free_bytes << ", current_aggressive_decommit "
<< current_aggressive_decommit;
MallocExtension::instance()->ReleaseToSystem(to_free_bytes);
last_ms = 0;
}
} else {
last_ms = 0;
}
}
#endif
}
void refresh_process_memory_metrics() {
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
doris::GlobalMemoryArbitrator::reset_refresh_interval_memory_growth();
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
}
void refresh_common_allocator_metrics() {
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::JemallocControl::refresh_allocator_mem();
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
}
#endif
doris::GlobalMemoryArbitrator::refresh_memory_bvar();
}
void refresh_memory_state_after_memory_change() {
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile();
doris::JemallocControl::notify_je_purge_dirty_pages();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
}
void refresh_cache_capacity() {
if (doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
// the last cache capacity adjustment has not been completed.
// if not return, last_periodic_refreshed_cache_capacity_adjust_weighted may be modified, but notify is ignored.
return;
}
if (refresh_cache_capacity_sleep_time_ms <= 0) {
auto cache_capacity_reduce_mem_limit = int64_t(
doris::MemInfo::soft_mem_limit() * config::cache_capacity_reduce_mem_limit_frac);
int64_t process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
// the rule is like this:
// 1. if the process mem usage < soft memlimit * 0.6, then do not need adjust cache capacity.
// 2. if the process mem usage > soft memlimit * 0.6 and process mem usage < soft memlimit, then it will be adjusted to a lower value.
// 3. if the process mem usage > soft memlimit, then the capacity is adjusted to 0.
double new_cache_capacity_adjust_weighted =
AlgoUtil::descent_by_step(10, cache_capacity_reduce_mem_limit,
doris::MemInfo::soft_mem_limit(), process_memory_usage);
if (new_cache_capacity_adjust_weighted !=
doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted) {
doris::GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted =
new_cache_capacity_adjust_weighted;
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
refresh_cache_capacity_sleep_time_ms = config::memory_gc_sleep_time_ms;
} else {
refresh_cache_capacity_sleep_time_ms = 0;
}
}
refresh_cache_capacity_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
}
void je_reset_dirty_decay() {
#ifdef USE_JEMALLOC
if (doris::JemallocControl::je_reset_dirty_decay_notify.load(std::memory_order_relaxed)) {
// if not return, je_enable_dirty_page may be modified, but notify is ignored.
return;
}
if (je_reset_dirty_decay_sleep_time_ms <= 0) {
bool new_je_enable_dirty_page = true;
if (doris::JemallocControl::je_enable_dirty_page) {
// if Jemalloc dirty page is enabled and process memory exceed soft mem limit,
// disable Jemalloc dirty page.
new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
} else {
// if Jemalloc dirty page is disabled and 10% free memory left to exceed soft mem limit,
// enable Jemalloc dirty page, this is to avoid frequent changes
// between enabling and disabling Jemalloc dirty pages, if the process memory does
// not exceed the soft mem limit after turning off Jemalloc dirty pages,
// but it will exceed soft mem limit after turning it on.
new_je_enable_dirty_page = !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
int64_t(doris::MemInfo::soft_mem_limit() * 0.1));
}
if (doris::JemallocControl::je_enable_dirty_page != new_je_enable_dirty_page) {
// `notify_je_reset_dirty_decay` only if `je_enable_dirty_page` changes.
doris::JemallocControl::je_enable_dirty_page = new_je_enable_dirty_page;
doris::JemallocControl::notify_je_reset_dirty_decay();
je_reset_dirty_decay_sleep_time_ms = config::memory_gc_sleep_time_ms;
} else {
je_reset_dirty_decay_sleep_time_ms = 0;
}
}
je_reset_dirty_decay_sleep_time_ms -= config::memory_maintenance_sleep_time_ms;
#endif
}
void memory_gc() {
if (config::disable_memory_gc) {
return;
}
if (memory_gc_sleep_time <= 0) {
auto gc_func = [](const std::string& revoke_reason) {
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if (doris::MemoryReclamation::revoke_process_memory(revoke_reason)) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::ProcessProfile::instance()
->memory_profile()
->enable_print_log_process_usage();
}
};
if (doris::GlobalMemoryArbitrator::sys_mem_available() <
doris::MemInfo::sys_mem_available_low_water_mark()) {
gc_func("sys available memory less than low water mark");
} else if (doris::GlobalMemoryArbitrator::process_memory_usage() >
doris::MemInfo::mem_limit()) {
gc_func("process memory used exceed limit");
}
memory_gc_sleep_time = config::memory_gc_sleep_time_ms;
}
memory_gc_sleep_time -= config::memory_maintenance_sleep_time_ms;
}
void Daemon::memory_maintenance_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
// step 1. Refresh process memory metrics.
refresh_process_memory_metrics();
// step 2. Refresh jemalloc/tcmalloc metrics.
refresh_common_allocator_metrics();
// step 3. Update and print memory stat when the memory changes by 256M.
refresh_memory_state_after_memory_change();
// step 4. Asyn Refresh cache capacity
// TODO adjust cache capacity based on smoothstep (smooth gradient).
refresh_cache_capacity();
// step 5. Cancel top memory task when process memory exceed hard limit.
memory_gc();
// step 6. Refresh weighted memory ratio of workload groups.
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_memory_state();
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();
// step 8. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
// TODO notify flush memtable
// step 9. Reset Jemalloc dirty page decay.
je_reset_dirty_decay();
}
}
void Daemon::memtable_memory_refresh_thread() {
// Refresh the memory statistics of the load channel tracker more frequently,
// which helps to accurately control the memory of LoadChannelMgr.
do {
std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::memtable_memory_refresh_lock);
while (_stop_background_threads_latch.count() != 0 &&
!doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.load(
std::memory_order_relaxed)) {
doris::GlobalMemoryArbitrator::memtable_memory_refresh_cv.wait_for(
l, std::chrono::milliseconds(100));
}
if (_stop_background_threads_latch.count() == 0) {
break;
}
Defer defer {[&]() {
doris::GlobalMemoryArbitrator::memtable_memory_refresh_notify.store(
false, std::memory_order_relaxed);
}};
doris::ExecEnv::GetInstance()->memtable_memory_limiter()->refresh_mem_tracker();
} while (true);
}
/*
* this thread will calculate some metrics at a fix interval(15 sec)
* 1. push bytes per second
* 2. scan bytes per second
* 3. max io util of all disks
* 4. max network send bytes rate
* 5. max network receive bytes rate
*/
void Daemon::calculate_metrics_thread() {
int64_t last_ts = -1L;
int64_t lst_query_bytes = -1;
std::map<std::string, int64_t> lst_disks_io_time;
std::map<std::string, int64_t> lst_net_send_bytes;
std::map<std::string, int64_t> lst_net_receive_bytes;
do {
DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);
if (last_ts == -1L) {
last_ts = GetMonoTimeMicros() / 1000;
lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
if (config::enable_system_metrics) {
DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
DorisMetrics::instance()->system_metrics()->get_network_traffic(
&lst_net_send_bytes, &lst_net_receive_bytes);
}
} else {
int64_t current_ts = GetMonoTimeMicros() / 1000;
long interval = (current_ts - last_ts) / 1000;
last_ts = current_ts;
// 1. query bytes per second
int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1);
DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps);
lst_query_bytes = current_query_bytes;
if (config::enable_system_metrics) {
// 2. max disk io util
DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent(
lst_disks_io_time, 15);
// update lst map
DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
// 3. max network traffic
int64_t max_send = 0;
int64_t max_receive = 0;
DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate(
max_send);
DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate(
max_receive);
// update lst map
DorisMetrics::instance()->system_metrics()->get_network_traffic(
&lst_net_send_bytes, &lst_net_receive_bytes);
DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num();
}
update_rowsets_and_segments_num_metrics();
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
}
void Daemon::report_runtime_query_statistics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
}
}
void Daemon::je_reset_dirty_decay_thread() const {
do {
std::unique_lock<std::mutex> l(doris::JemallocControl::je_reset_dirty_decay_lock);
while (_stop_background_threads_latch.count() != 0 &&
!doris::JemallocControl::je_reset_dirty_decay_notify.load(
std::memory_order_relaxed)) {
doris::JemallocControl::je_reset_dirty_decay_cv.wait_for(
l, std::chrono::milliseconds(100));
}
if (_stop_background_threads_latch.count() == 0) {
break;
}
Defer defer {[&]() {
doris::JemallocControl::je_reset_dirty_decay_notify.store(false,
std::memory_order_relaxed);
}};
#ifdef USE_JEMALLOC
if (config::disable_memory_gc || !config::enable_je_purge_dirty_pages) {
continue;
}
// There is a significant difference only when dirty_decay_ms is equal to 0 or not.
//
// 1. When dirty_decay_ms is not equal to 0, the free memory will be cached in the Jemalloc
// dirty page first. even if dirty_decay_ms is equal to 1, the Jemalloc dirty page will not
// be released to the system exactly after 1ms, it will be released according to the decay rule.
// The Jemalloc document specifies that dirty_decay_ms is an approximate time.
//
// 2. It has been observed in an actual cluster that even if dirty_decay_ms is changed
// from th default 5000 to 1, Jemalloc dirty page will still cache a large amount of memory, everything
// seems to be the same as `dirty_decay_ms:5000`. only when dirty_decay_ms is changed to 0,
// jemalloc dirty page will stop caching and free memory will be released to the system immediately.
// of course, performance will be affected.
//
// 3. After reducing dirty_decay_ms, manually calling `decay_all_arena_dirty_pages` may release dirty pages
// as soon as possible, but no relevant experimental data can be found, so it is simple and safe
// to adjust dirty_decay_ms only between zero and non-zero.
if (doris::JemallocControl::je_enable_dirty_page) {
doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(config::je_dirty_decay_ms);
} else {
doris::JemallocControl::je_reset_all_arena_dirty_decay_ms(0);
}
#endif
} while (true);
}
void Daemon::cache_adjust_capacity_thread() {
do {
std::unique_lock<std::mutex> l(doris::GlobalMemoryArbitrator::cache_adjust_capacity_lock);
while (_stop_background_threads_latch.count() != 0 &&
!doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.load(
std::memory_order_relaxed)) {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::milliseconds(100));
}
double adjust_weighted = std::min<double>(
GlobalMemoryArbitrator::last_periodic_refreshed_cache_capacity_adjust_weighted,
GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
Defer defer {[&]() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
false, std::memory_order_relaxed);
}};
if (config::disable_memory_gc) {
continue;
}
if (GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted ==
adjust_weighted) {
LOG(INFO) << fmt::format(
"[MemoryGC] adjust cache capacity end, adjust_weighted {} has not been "
"modified.",
adjust_weighted);
continue;
}
std::unique_ptr<RuntimeProfile> profile = std::make_unique<RuntimeProfile>("");
auto freed_mem = CacheManager::instance()->for_each_cache_refresh_capacity(adjust_weighted,
profile.get());
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
"[MemoryGC] adjust cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
} while (true);
}
void Daemon::cache_prune_stale_thread() {
int32_t interval = config::cache_periodic_prune_stale_sweep_sec;
while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) {
if (config::cache_periodic_prune_stale_sweep_sec <= 0) {
LOG(WARNING) << "config of cache clean interval is: [" << interval
<< "], it means the cache prune stale thread is disabled, will wait 3s "
"and check again.";
interval = 3;
continue;
}
if (config::disable_memory_gc) {
continue;
}
CacheManager::instance()->for_each_cache_prune_stale();
interval = config::cache_periodic_prune_stale_sweep_sec;
}
}
void Daemon::be_proc_monitor_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
LOG(INFO) << "log be thread num, " << BeProcMonitor::get_be_thread_info();
}
}
void Daemon::calculate_workload_group_metrics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) {
ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();
}
}
void Daemon::start() {
Status st;
st = Thread::create(
"Daemon", "tcmalloc_gc_thread", [this]() { this->tcmalloc_gc_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memory_maintenance_thread", [this]() { this->memory_maintenance_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "memtable_memory_refresh_thread",
[this]() { this->memtable_memory_refresh_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
if (config::enable_metric_calculator) {
st = Thread::create(
"Daemon", "calculate_metrics_thread",
[this]() { this->calculate_metrics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
}
st = Thread::create(
"Daemon", "je_reset_dirty_decay_thread",
[this]() { this->je_reset_dirty_decay_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "cache_adjust_capacity_thread",
[this]() { this->cache_adjust_capacity_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "cache_prune_stale_thread", [this]() { this->cache_prune_stale_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
if (config::enable_be_proc_monitor) {
st = Thread::create(
"Daemon", "be_proc_monitor_thread", [this]() { this->be_proc_monitor_thread(); },
&_threads.emplace_back());
}
CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "workload_group_metrics",
[this]() { this->calculate_workload_group_metrics_thread(); },
&_threads.emplace_back());
CHECK(st.ok()) << st;
}
void Daemon::stop() {
LOG(INFO) << "Doris daemon is stopping.";
if (_stop_background_threads_latch.count() == 0) {
LOG(INFO) << "Doris daemon stop returned since no bg threads latch.";
return;
}
_stop_background_threads_latch.count_down();
for (auto&& t : _threads) {
if (t) {
t->join();
}
}
LOG(INFO) << "Doris daemon stopped after background threads are joined.";
}
} // namespace doris