blob: fd0418f48266c5b077476634f187e88ea0bb4eea [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <atomic>
#include <cstdint>
// IWYU pragma: no_include <bits/std_abs.h>
#include <cmath> // IWYU pragma: keep
#include <list>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "runtime/memory/mem_counter.h"
#include "runtime/memory/mem_tracker.h"
#include "util/string_util.h"
#include "util/uid_util.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeProfile;
class MemTrackerLimiter;
constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
struct TrackerLimiterGroup {
// Note! in order to enable ExecEnv::mem_tracker_limiter_pool support resize,
// the copy construction of TrackerLimiterGroup is disabled.
// so cannot copy TrackerLimiterGroup anywhere, should use reference.
TrackerLimiterGroup() = default;
TrackerLimiterGroup(TrackerLimiterGroup&&) noexcept {}
TrackerLimiterGroup(const TrackerLimiterGroup&) {}
TrackerLimiterGroup& operator=(const TrackerLimiterGroup&) { return *this; }
std::list<std::weak_ptr<MemTrackerLimiter>> trackers;
std::mutex group_lock;
};
/*
* Track and limit the memory usage of process and query.
*
* Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,
* all memory used by this thread will be recorded on this Query.
*
* This class is thread-safe.
*/
class MemTrackerLimiter final {
public:
/*
* Part 1, Type definition
*/
enum class Type {
GLOBAL = 0, // Life cycle is the same as the process, except cache and metadata.
QUERY = 1, // Count the memory consumption of all Query tasks.
LOAD = 2, // Count the memory consumption of all Load tasks.
COMPACTION = 3, // Count the memory consumption of all Base and Cumulative tasks.
SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange tasks.
METADATA = 5, // Count the memory consumption of all Metadata.
CACHE = 6, // Count the memory consumption of all Cache.
OTHER = 7, // Count the memory consumption of all other tasks, such as Clone, Snapshot, etc..
};
static std::string type_string(Type type) {
switch (type) {
case Type::GLOBAL:
return "global";
case Type::QUERY:
return "query";
case Type::LOAD:
return "load";
case Type::COMPACTION:
return "compaction";
case Type::SCHEMA_CHANGE:
return "schema_change";
case Type::METADATA:
return "metadata";
case Type::CACHE:
return "cache";
case Type::OTHER:
return "other_task";
default:
LOG(FATAL) << "not match type of mem tracker limiter :" << static_cast<int>(type);
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
/*
* Part 2, Constructors and property methods
*/
static std::shared_ptr<MemTrackerLimiter> create_shared(MemTrackerLimiter::Type type,
const std::string& label,
int64_t byte_limit = -1);
// byte_limit equal to -1 means no consumption limit, only participate in process memory statistics.
MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
~MemTrackerLimiter();
Type type() const { return _type; }
const std::string& label() const { return _label; }
int64_t group_num() const { return _group_num; }
int64_t limit() const { return _limit; }
void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
void set_enable_check_limit(bool enable_check_limit) {
_enable_check_limit = enable_check_limit;
}
Status check_limit(int64_t bytes = 0);
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();
static void clean_tracker_limiter_group();
/*
* Part 3, Memory tracking method (use carefully!)
*
* Note: Only memory not allocated by Doris Allocator can be tracked by manually calling consume() and release().
* Memory allocated by Doris Allocator needs to be tracked using SCOPED_ATTACH_TASK or
* SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER.
*/
int64_t consumption() const { return _mem_counter.current_value(); }
int64_t peak_consumption() const { return _mem_counter.peak_value(); }
// Use carefully! only memory that cannot be allocated using Doris Allocator needs to be consumed manually.
// Ideally, all memory should use Doris Allocator.
void consume(int64_t bytes) { _mem_counter.add(bytes); }
void consume_no_update_peak(int64_t bytes) { _mem_counter.add_no_update_peak(bytes); }
void release(int64_t bytes) { _mem_counter.sub(bytes); }
bool try_consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
if (limit() >= 0) {
return _mem_counter.try_add(bytes, _limit);
} else {
_mem_counter.add(bytes);
return true;
}
}
void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }
// Transfer 'bytes' of consumption from this tracker to 'dst'.
void transfer_to(int64_t size, MemTrackerLimiter* dst) {
if (label() == dst->label()) {
return;
}
cache_consume(-size);
dst->cache_consume(size);
}
// If need to consume the tracker frequently, use it
void cache_consume(int64_t bytes);
/*
* Part 4, Reserved memory tracking method
*/
int64_t reserved_consumption() const { return _reserved_counter.current_value(); }
int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); }
void reserve(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return;
}
_mem_counter.add(bytes);
_reserved_counter.add(bytes);
}
bool try_reserve(int64_t bytes) {
if (try_consume(bytes)) {
_reserved_counter.add(bytes);
return true;
} else {
return false;
}
}
void shrink_reserved(int64_t bytes) {
_reserved_counter.sub(bytes);
DCHECK(reserved_consumption() >= 0);
}
/*
* Part 5, Memory profile and log method
*/
RuntimeProfile* make_profile(RuntimeProfile* profile) const;
std::string make_profile_str() const;
static void make_type_trackers_profile(RuntimeProfile* profile, MemTrackerLimiter::Type type);
static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type type);
static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num);
static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; }
void print_log_usage(const std::string& msg);
void enable_print_log_usage() { _enable_print_log_usage = true; }
/*
* Part 6, Memory debug method
*/
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
bool is_group_commit_load {false};
private:
// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
// Thread safety.
int64_t add_untracked_mem(int64_t bytes);
/*
* Part 8, Property definition
*/
Type _type;
// label used in the make snapshot, not guaranteed unique.
std::string _label;
// For generate runtime profile, profile name must be unique.
UniqueId _uid;
MemCounter _mem_counter;
MemCounter _reserved_counter;
// Limit on memory consumption, in bytes.
std::atomic<int64_t> _limit;
bool _enable_check_limit = true;
// Group number in mem_tracker_limiter_pool and mem_tracker_pool, generated by the timestamp.
int64_t _group_num;
// Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
// to avoid frequent calls to consume/release of MemTracker.
std::atomic<int64_t> _untracked_mem = 0;
// Avoid frequent printing.
bool _enable_print_log_usage = false;
std::shared_ptr<MemTrackerLimiter> _write_tracker;
struct AddressSanitizer {
size_t size;
std::string stack_trace;
};
std::string print_address_sanitizers();
bool open_memory_tracker_inaccurate_detect();
std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
std::vector<std::string> _error_address_sanitizers;
};
inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
_untracked_mem += bytes;
if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
return _untracked_mem.exchange(0);
}
return 0;
}
inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
if (bytes == 0) {
return;
}
int64_t consume_bytes = add_untracked_mem(bytes);
consume(consume_bytes);
}
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
return Status::OK();
}
// If reserve not enabled, then should check limit here to kill the query when limit exceed.
// For insert into select or pure load job, its memtable is accounted in a seperate memtracker limiter,
// and its reserve is set to true. So that it will not reach this logic.
// Only query and load job has exec_mem_limit and the _limit > 0, other memtracker limiter's _limit is -1 so
// it will not take effect.
if (consumption() + bytes > _limit) {
return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, {}",
PrettyPrinter::print_bytes(bytes),
tracker_limit_exceeded_str()));
}
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris