blob: ed98fd1c2467dbb3bf4587071839bb49cbadcb88 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include "common/be_mock_util.h"
#include "common/config.h"
#include "common/status.h"
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "util/stack_util.h"
namespace doris {
#include "common/compile_check_begin.h"
constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M
static std::string MEMORY_ORPHAN_CHECK_MSG =
"The ThreadContext of the current thread not attach a valid MemoryTracker. after the "
"thread is started, the ResourceContext in SCOPED_ATTACH_TASK macro should contain a valid "
"MemoryTracker, or a valid MemoryTracker should be passed in later using "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER macro.";
// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
public:
ThreadMemTrackerMgr() = default;
MOCK_FUNCTION ~ThreadMemTrackerMgr() {
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) {
DCHECK(_reserved_mem == 0);
flush_untracked_mem();
}
}
bool init();
// After attach, the current thread Memory Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
void attach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const std::weak_ptr<WorkloadGroup>& wg_wptr);
void detach_limiter_tracker();
// Must be fast enough! Thread update_tracker may be called very frequently.
bool push_consumer_tracker(MemTracker* mem_tracker);
void pop_consumer_tracker();
std::string last_consumer_tracker_label() {
return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label();
}
// Note that, If call the memory allocation operation in Memory Hook,
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
// Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded.
void consume(int64_t size);
void flush_untracked_mem();
enum class TryReserveChecker {
NONE = 0,
CHECK_TASK = 1,
CHECK_WORKLOAD_GROUP = 2,
CHECK_TASK_AND_WORKLOAD_GROUP = 3,
CHECK_PROCESS = 4,
CHECK_TASK_AND_PROCESS = 5,
CHECK_WORKLOAD_GROUP_AND_PROCESS = 6,
CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS = 7,
};
// if only_check_process_memory == true, still reserve query, wg, process memory, only check process memory.
MOCK_FUNCTION doris::Status try_reserve(
int64_t size, TryReserveChecker checker =
TryReserveChecker::CHECK_TASK_AND_WORKLOAD_GROUP_AND_PROCESS);
void shrink_reserved();
MemTrackerLimiter* limiter_mem_tracker() {
CHECK(init());
return _limiter_tracker;
}
// Prefer use `limiter_mem_tracker`, which is faster than `limiter_mem_tracker_sptr`.
// when multiple threads hold the same `std::shared_ptr` at the same time,
// modifying the `std::shared_ptr` reference count will be expensive when there is high concurrency.
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker_sptr() {
CHECK(init());
return _limiter_tracker_sptr;
}
void enable_wait_gc() { _wait_gc = true; }
void disable_wait_gc() { _wait_gc = false; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
for (const auto& v : _consumer_tracker_stack) {
fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage());
}
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _limiter_tracker->make_profile_str(),
fmt::to_string(consumer_tracker_buf));
}
int64_t untracked_mem() const { return _untracked_mem; }
int64_t reserved_mem() const { return _reserved_mem; }
int skip_memory_check = 0;
int skip_large_memory_check = 0;
void memory_orphan_check() {
#ifndef BE_TEST
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
limiter_mem_tracker()->label() != "Orphan")
<< doris::MEMORY_ORPHAN_CHECK_MSG;
#endif
}
private:
struct LastAttachSnapshot {
std::shared_ptr<MemTrackerLimiter> limiter_tracker {nullptr};
std::weak_ptr<WorkloadGroup> wg_wptr;
int64_t reserved_mem = 0;
std::vector<MemTracker*> consumer_tracker_stack;
};
// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
// Cache untracked mem.
int64_t _untracked_mem = 0;
int64_t _old_untracked_mem = 0;
int64_t _reserved_mem = 0;
// SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used,
// so `attach_limiter_tracker` may be nested.
std::vector<LastAttachSnapshot> _last_attach_snapshots_stack;
std::string _failed_consume_msg;
// If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit.
// A thread of query/load will only wait once during execution.
bool _wait_gc = false;
std::shared_ptr<MemTrackerLimiter> _limiter_tracker_sptr {nullptr};
MemTrackerLimiter* _limiter_tracker {nullptr};
std::vector<MemTracker*> _consumer_tracker_stack;
std::weak_ptr<WorkloadGroup> _wg_wptr;
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
};
inline bool ThreadMemTrackerMgr::init() {
// 1. Initialize in the thread context when the thread starts
// 2. ExecEnv not initialized when thread start, initialized in mem_tracker().
if (_init) {
return true;
}
if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) {
_limiter_tracker_sptr = ExecEnv::GetInstance()->orphan_mem_tracker();
_limiter_tracker = _limiter_tracker_sptr.get();
_wait_gc = true;
_init = true;
return true;
}
return false;
}
inline bool ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
DCHECK(tracker) << print_debug_string();
if (std::count(_consumer_tracker_stack.begin(), _consumer_tracker_stack.end(), tracker)) {
return false;
}
_consumer_tracker_stack.push_back(tracker);
return true;
}
inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
DCHECK(!_consumer_tracker_stack.empty());
_consumer_tracker_stack.pop_back();
}
inline void ThreadMemTrackerMgr::consume(int64_t size) {
memory_orphan_check();
// `consumer_tracker` not support reserve memory and not require use `_untracked_mem` to batch consume,
// because `consumer_tracker` will not be bound by many threads, so there is no performance problem.
for (auto* tracker : _consumer_tracker_stack) {
tracker->consume(size);
}
if (_reserved_mem != 0) {
if (_reserved_mem > size) {
// only need to subtract _reserved_mem, no need to consume MemTracker,
// every time _reserved_mem is minus the sum of size >= SYNC_PROC_RESERVED_INTERVAL_BYTES,
// subtract size from process global reserved memory,
// because this part of the reserved memory has already been used by BE process.
_reserved_mem -= size;
// temporary store bytes that not synchronized to process reserved memory.
_untracked_mem += size;
// If _untracked_mem > 0, reserved memory that has been used, if _untracked_mem greater than
// SYNC_PROC_RESERVED_INTERVAL_BYTES, release process reserved memory.
// If _untracked_mem < 0, used reserved memory is returned, will increase reserved memory,
// if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, increase process reserved memory.
if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
doris::GlobalMemoryArbitrator::shrink_process_reserved(_untracked_mem);
_limiter_tracker->shrink_reserved(_untracked_mem);
_untracked_mem = 0;
}
return;
} else {
// _reserved_mem <= size, reserved memory used done,
// the remaining _reserved_mem is subtracted from this memory consumed,
// and reset _reserved_mem to 0, and subtract the remaining _reserved_mem from
// process global reserved memory, this means that all reserved memory has been used by BE process.
size -= _reserved_mem;
doris::GlobalMemoryArbitrator::shrink_process_reserved(_reserved_mem + _untracked_mem);
_limiter_tracker->shrink_reserved(_reserved_mem + _untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
}
// store bytes that not consumed by thread mem tracker.
_untracked_mem += size;
DCHECK(_reserved_mem == 0);
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
// After the jemalloc hook is loaded, before ExecEnv init, _limiter_tracker=nullptr.
if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes && !_stop_consume) {
if (!_init && !ExecEnv::ready()) {
return;
}
flush_untracked_mem();
// If size is large, then _untracked_mem must be larger than config::mem_tracker_consume_min_size_bytes.
if (skip_large_memory_check == 0) {
if (doris::config::stacktrace_in_alloc_large_memory_bytes > 0 &&
size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
_stop_consume = true;
LOG(WARNING) << fmt::format(
"alloc large memory: {}, consume tracker: {}, this is just a warning, not "
"prevent memory alloc, "
"stacktrace:\n{}",
size, _limiter_tracker->label(), get_stack_trace());
_stop_consume = false;
}
if (doris::config::crash_in_alloc_large_memory_bytes > 0 &&
size > doris::config::crash_in_alloc_large_memory_bytes) {
throw Exception(
Status::FatalError("alloc large memory: {}, consume tracker: {}, crash "
"generate core dumpsto help analyze, "
"stacktrace:\n{}",
size, _limiter_tracker->label(), get_stack_trace()));
}
}
}
}
inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// if during reserve memory, _untracked_mem temporary store bytes that not synchronized
// to process reserved memory, but bytes have been subtracted from thread _reserved_mem.
// so not need flush untracked_mem to consume mem tracker.
if (_reserved_mem != 0) {
return;
}
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the Memory Hook again, so suspend consumption to avoid falling into an infinite loop.
if (_untracked_mem == 0 || !init()) {
return;
}
_stop_consume = true;
DCHECK(_limiter_tracker);
_old_untracked_mem = _untracked_mem;
_limiter_tracker->consume(_old_untracked_mem);
_untracked_mem -= _old_untracked_mem;
_stop_consume = false;
}
inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size, TryReserveChecker checker) {
DCHECK(size >= 0);
CHECK(init());
DCHECK(_limiter_tracker);
memory_orphan_check();
// if _reserved_mem not equal to 0, repeat reserve,
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();
bool task_limit_checker = static_cast<int>(checker) & 1;
bool workload_group_limit_checker = static_cast<int>(checker) & 2;
bool process_limit_checker = static_cast<int>(checker) & 4;
if (task_limit_checker) {
if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because query memory exceeded, memory "
"tracker: {}, "
"consumption: {}, limit: {}, peak: {}",
PrettyPrinter::print_bytes(size), _limiter_tracker->label(),
PrettyPrinter::print_bytes(_limiter_tracker->consumption()),
PrettyPrinter::print_bytes(_limiter_tracker->limit()),
PrettyPrinter::print_bytes(_limiter_tracker->peak_consumption()));
return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
}
} else {
_limiter_tracker->reserve(size);
}
if (wg_ptr) {
if (workload_group_limit_checker) {
if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because workload group memory exceeded, "
"workload group: {}",
PrettyPrinter::print_bytes(size), wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->shrink_reserved(size); // rollback
return doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
}
} else {
wg_ptr->add_wg_refresh_interval_memory_growth(size);
}
}
if (process_limit_checker) {
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because proccess memory exceeded, {}",
PrettyPrinter::print_bytes(size),
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker->release(size); // rollback
_limiter_tracker->shrink_reserved(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(err_msg);
}
} else {
doris::GlobalMemoryArbitrator::reserve_process_memory(size);
}
_reserved_mem += size;
DCHECK(_reserved_mem >= 0);
return doris::Status::OK();
}
inline void ThreadMemTrackerMgr::shrink_reserved() {
if (_reserved_mem != 0) {
memory_orphan_check();
doris::GlobalMemoryArbitrator::shrink_process_reserved(_reserved_mem + _untracked_mem);
_limiter_tracker->shrink_reserved(_reserved_mem + _untracked_mem);
_limiter_tracker->release(_reserved_mem);
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
}
_untracked_mem = 0;
_reserved_mem = 0;
}
}
#include "common/compile_check_end.h"
} // namespace doris