blob: 1cb6c0c8e2de046098fed95de9ebb65a57e2bb19 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memtable_memory_limiter.h"
#include <bvar/bvar.h>
#include "common/config.h"
#include "olap/memtable.h"
#include "olap/memtable_writer.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "",
memtable_memory_limiter_mem_consumption,
Labels({{"type", "load"}}));
bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms");
bvar::Adder<int> g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads");
bvar::Status<int64_t> g_memtable_active_memory("mm_limiter_mem_active", 0);
bvar::Status<int64_t> g_memtable_write_memory("mm_limiter_mem_write", 0);
bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
if (process_mem_limit == -1) {
// no limit
return -1;
}
int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent;
return process_mem_limit * max_load_memory_percent / 100;
}
MemTableMemoryLimiter::MemTableMemoryLimiter() {}
MemTableMemoryLimiter::~MemTableMemoryLimiter() {
DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption);
}
Status MemTableMemoryLimiter::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100;
_load_safe_mem_permit =
_load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100;
g_load_hard_mem_limit.set_value(_load_hard_mem_limit);
g_load_soft_mem_limit.set_value(_load_soft_mem_limit);
_mem_tracker = std::make_unique<MemTracker>("AllMemTableMemory");
REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_log_timer.start();
return Status::OK();
}
void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) {
std::lock_guard<std::mutex> l(_lock);
_writers.push_back(writer);
}
int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() {
// reserve a small amount of memory so we do not trigger MinorGC
return doris::MemInfo::sys_mem_available_warning_water_mark() -
doris::GlobalMemoryArbitrator::sys_mem_available() +
config::memtable_limiter_reserved_memory_bytes;
}
int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() {
// reserve a small amount of memory so we do not trigger MinorGC
return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() +
config::memtable_limiter_reserved_memory_bytes;
}
bool MemTableMemoryLimiter::_soft_limit_reached() {
return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached();
}
bool MemTableMemoryLimiter::_hard_limit_reached() {
return _mem_tracker->consumption() > _load_hard_mem_limit ||
_sys_avail_mem_less_than_warning_water_mark() > 0 ||
_process_used_mem_more_than_soft_mem_limit() > 0;
}
bool MemTableMemoryLimiter::_load_usage_low() {
return _mem_tracker->consumption() <= _load_safe_mem_permit;
}
int64_t MemTableMemoryLimiter::_need_flush() {
int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit;
int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark();
int64_t limit3 = _process_used_mem_more_than_soft_mem_limit();
int64_t need_flush = std::max(limit1, std::max(limit2, limit3));
return need_flush - _queue_mem_usage;
}
void MemTableMemoryLimiter::handle_memtable_flush() {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
if (!_soft_limit_reached() || _load_usage_low()) {
return;
}
MonotonicStopWatch timer;
timer.start();
std::unique_lock<std::mutex> l(_lock);
g_memtable_memory_limit_waiting_threads << 1;
bool first = true;
do {
if (!first) {
auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000));
if (st == std::cv_status::timeout) {
LOG(INFO) << "timeout when waiting for memory hard limit end, try again";
}
}
first = false;
int64_t need_flush = _need_flush();
if (need_flush > 0) {
auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft")
<< ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
_flush_active_memtables(need_flush);
}
} while (_hard_limit_reached());
g_memtable_memory_limit_waiting_threads << -1;
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_memtable_memory_limit_latency_ms << time_ms;
LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
}
void MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) {
if (need_flush <= 0) {
return;
}
_refresh_mem_tracker();
if (_active_writers.size() == 0) {
return;
}
using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
auto cmp = [](WriterMem left, WriterMem right) { return left.second < right.second; };
std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> heap(cmp);
for (auto writer : _active_writers) {
auto w = writer.lock();
if (w == nullptr) {
continue;
}
heap.emplace(w, w->active_memtable_mem_consumption());
}
int64_t mem_flushed = 0;
int64_t num_flushed = 0;
while (mem_flushed < need_flush && !heap.empty()) {
auto [writer, sort_mem] = heap.top();
heap.pop();
auto w = writer.lock();
if (w == nullptr) {
continue;
}
int64_t mem = w->active_memtable_mem_consumption();
if (mem < sort_mem * 0.9) {
// if the memtable writer just got flushed, don't flush it again
continue;
}
Status st = w->flush_async();
if (!st.ok()) {
auto err_msg = fmt::format(
"tablet writer failed to reduce mem consumption by flushing memtable, "
"tablet_id={}, err={}",
w->tablet_id(), st.to_string());
LOG(WARNING) << err_msg;
static_cast<void>(w->cancel_with_status(st));
}
mem_flushed += mem;
num_flushed += (mem > 0);
}
LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size()
<< " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed);
}
void MemTableMemoryLimiter::refresh_mem_tracker() {
std::lock_guard<std::mutex> l(_lock);
_refresh_mem_tracker();
std::stringstream ss;
Limit limit = Limit::NONE;
if (_soft_limit_reached()) {
limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
ss << "reached " << (limit == Limit::HARD ? "hard" : "soft") << " limit";
} else if (_last_limit == Limit::NONE) {
return;
} else {
ss << "ended " << (_last_limit == Limit::HARD ? "hard" : "soft") << " limit";
}
if (_last_limit == limit && _log_timer.elapsed_time() < LOG_INTERVAL) {
return;
}
_last_limit = limit;
_log_timer.reset();
LOG(INFO) << ss.str() << ", " << GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
<< ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
}
void MemTableMemoryLimiter::_refresh_mem_tracker() {
_flush_mem_usage = 0;
_queue_mem_usage = 0;
_active_mem_usage = 0;
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
if (auto writer = it->lock()) {
// The memtable is currently used by writer to insert blocks.
auto active_usage = writer->active_memtable_mem_consumption();
_active_mem_usage += active_usage;
if (active_usage > 0) {
_active_writers.push_back(writer);
}
_flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
_queue_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED);
++it;
} else {
*it = std::move(_writers.back());
_writers.pop_back();
}
}
_mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage;
g_memtable_active_memory.set_value(_active_mem_usage);
g_memtable_write_memory.set_value(_queue_mem_usage);
g_memtable_flush_memory.set_value(_flush_mem_usage);
g_memtable_load_memory.set_value(_mem_usage);
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
_mem_tracker->set_consumption(_mem_usage);
if (!_hard_limit_reached()) {
_hard_limit_end_cond.notify_all();
}
}
} // namespace doris