|  | // Licensed to the Apache Software Foundation (ASF) under one | 
|  | // or more contributor license agreements.  See the NOTICE file | 
|  | // distributed with this work for additional information | 
|  | // regarding copyright ownership.  The ASF licenses this file | 
|  | // to you under the Apache License, Version 2.0 (the | 
|  | // "License"); you may not use this file except in compliance | 
|  | // with the License.  You may obtain a copy of the License at | 
|  | // | 
|  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, | 
|  | // software distributed under the License is distributed on an | 
|  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | // KIND, either express or implied.  See the License for the | 
|  | // specific language governing permissions and limitations | 
|  | // under the License. | 
|  |  | 
|  | #include "olap/memtable_memory_limiter.h" | 
|  |  | 
|  | #include <bvar/bvar.h> | 
|  |  | 
|  | #include "common/config.h" | 
|  | #include "olap/memtable.h" | 
|  | #include "olap/memtable_writer.h" | 
|  | #include "runtime/workload_group/workload_group_manager.h" | 
|  | #include "util/doris_metrics.h" | 
|  | #include "util/mem_info.h" | 
|  | #include "util/metrics.h" | 
|  |  | 
|  | namespace doris { | 
|  | DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(memtable_memory_limiter_mem_consumption, MetricUnit::BYTES, "", | 
|  | memtable_memory_limiter_mem_consumption, | 
|  | Labels({{"type", "load"}})); | 
|  |  | 
|  | bvar::LatencyRecorder g_memtable_memory_limit_latency_ms("mm_limiter_limit_time_ms"); | 
|  | bvar::Adder<int> g_memtable_memory_limit_waiting_threads("mm_limiter_waiting_threads"); | 
|  | bvar::Status<int64_t> g_memtable_active_memory("mm_limiter_mem_active", 0); | 
|  | bvar::Status<int64_t> g_memtable_write_memory("mm_limiter_mem_write", 0); | 
|  | bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0); | 
|  | bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0); | 
|  | bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0); | 
|  | bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0); | 
|  | bvar::Adder<uint64_t> g_flush_cuz_load_mem_exceed_hard_limit("flush_cuz_hard_limit"); | 
|  | bvar::Adder<uint64_t> g_flush_cuz_sys_mem_exceed_soft_limit("flush_cuz_soft_limit"); | 
|  | bvar::Adder<int> g_memtable_memory_limit_flush_memtable_count("mm_limiter_flush_memtable_count"); | 
|  | bvar::LatencyRecorder g_memtable_memory_limit_flush_size_bytes("mm_limiter_flush_size_bytes"); | 
|  |  | 
|  | // Calculate the total memory limit of all load tasks on this BE | 
|  | static int64_t calc_process_max_load_memory(int64_t process_mem_limit) { | 
|  | if (process_mem_limit == -1) { | 
|  | // no limit | 
|  | return -1; | 
|  | } | 
|  | int32_t max_load_memory_percent = config::load_process_max_memory_limit_percent; | 
|  | return process_mem_limit * max_load_memory_percent / 100; | 
|  | } | 
|  |  | 
|  | MemTableMemoryLimiter::MemTableMemoryLimiter() {} | 
|  |  | 
|  | MemTableMemoryLimiter::~MemTableMemoryLimiter() { | 
|  | DEREGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption); | 
|  | } | 
|  |  | 
|  | Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { | 
|  | _load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit); | 
|  | _load_soft_mem_limit = _load_hard_mem_limit * config::load_process_soft_mem_limit_percent / 100; | 
|  | _load_safe_mem_permit = | 
|  | _load_hard_mem_limit * config::load_process_safe_mem_permit_percent / 100; | 
|  | g_load_hard_mem_limit.set_value(_load_hard_mem_limit); | 
|  | g_load_soft_mem_limit.set_value(_load_soft_mem_limit); | 
|  | _mem_tracker = std::make_unique<MemTracker>("AllMemTableMemory"); | 
|  | REGISTER_HOOK_METRIC(memtable_memory_limiter_mem_consumption, | 
|  | [this]() { return _mem_tracker->consumption(); }); | 
|  | _log_timer.start(); | 
|  | return Status::OK(); | 
|  | } | 
|  |  | 
|  | void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) { | 
|  | std::lock_guard<std::mutex> l(_lock); | 
|  | _writers.push_back(writer); | 
|  | } | 
|  |  | 
|  | int64_t MemTableMemoryLimiter::_sys_avail_mem_less_than_warning_water_mark() { | 
|  | // reserve a small amount of memory so we do not trigger MinorGC | 
|  | return doris::MemInfo::sys_mem_available_warning_water_mark() - | 
|  | doris::GlobalMemoryArbitrator::sys_mem_available() + | 
|  | config::memtable_limiter_reserved_memory_bytes; | 
|  | } | 
|  |  | 
|  | int64_t MemTableMemoryLimiter::_process_used_mem_more_than_soft_mem_limit() { | 
|  | // reserve a small amount of memory so we do not trigger MinorGC | 
|  | return GlobalMemoryArbitrator::process_memory_usage() - MemInfo::soft_mem_limit() + | 
|  | config::memtable_limiter_reserved_memory_bytes; | 
|  | } | 
|  |  | 
|  | bool MemTableMemoryLimiter::_soft_limit_reached() { | 
|  | return _mem_tracker->consumption() > _load_soft_mem_limit || _hard_limit_reached(); | 
|  | } | 
|  |  | 
|  | bool MemTableMemoryLimiter::_hard_limit_reached() { | 
|  | return _mem_tracker->consumption() > _load_hard_mem_limit || | 
|  | _sys_avail_mem_less_than_warning_water_mark() > 0 || | 
|  | _process_used_mem_more_than_soft_mem_limit() > 0; | 
|  | } | 
|  |  | 
|  | bool MemTableMemoryLimiter::_load_usage_low() { | 
|  | return _mem_tracker->consumption() <= _load_safe_mem_permit; | 
|  | } | 
|  |  | 
|  | int64_t MemTableMemoryLimiter::_need_flush() { | 
|  | DBUG_EXECUTE_IF("MemTableMemoryLimiter._need_flush.random_flush", { | 
|  | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { | 
|  | LOG(INFO) << "debug memtable need flush return 1"; | 
|  | return 1; | 
|  | } | 
|  | }); | 
|  | int64_t limit1 = _mem_tracker->consumption() - _load_soft_mem_limit; | 
|  | int64_t limit2 = _sys_avail_mem_less_than_warning_water_mark(); | 
|  | int64_t limit3 = _process_used_mem_more_than_soft_mem_limit(); | 
|  | int64_t need_flush = std::max({limit1, limit2, limit3}); | 
|  | return need_flush - _queue_mem_usage - _flush_mem_usage; | 
|  | } | 
|  |  | 
|  | void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> cancel_check) { | 
|  | // Check the soft limit. | 
|  | DCHECK(_load_soft_mem_limit > 0); | 
|  | do { | 
|  | DBUG_EXECUTE_IF("MemTableMemoryLimiter.handle_memtable_flush.limit_reached", { | 
|  | LOG(INFO) << "debug memtable limit reached"; | 
|  | break; | 
|  | }); | 
|  | if (!_soft_limit_reached() || _load_usage_low()) { | 
|  | return; | 
|  | } | 
|  | } while (false); | 
|  | MonotonicStopWatch timer; | 
|  | timer.start(); | 
|  | std::unique_lock<std::mutex> l(_lock); | 
|  | g_memtable_memory_limit_waiting_threads << 1; | 
|  | bool first = true; | 
|  | do { | 
|  | if (!first) { | 
|  | auto st = _hard_limit_end_cond.wait_for(l, std::chrono::milliseconds(1000)); | 
|  | if (st == std::cv_status::timeout) { | 
|  | LOG(INFO) << "timeout when waiting for memory hard limit end, try again"; | 
|  | } | 
|  | } | 
|  | if (cancel_check && cancel_check()) { | 
|  | LOG(INFO) << "cancelled when waiting for memtable flush"; | 
|  | return; | 
|  | } | 
|  | first = false; | 
|  | int64_t need_flush = _need_flush(); | 
|  | if (need_flush > 0) { | 
|  | auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; | 
|  | LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ? "hard" : "soft") | 
|  | << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " | 
|  | << GlobalMemoryArbitrator::sys_mem_available_details_str() | 
|  | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) | 
|  | << ", memtable writers num: " << _writers.size() | 
|  | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) | 
|  | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) | 
|  | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) | 
|  | << ", need flush: " << PrettyPrinter::print_bytes(need_flush); | 
|  | if (VLOG_DEBUG_IS_ON) { | 
|  | auto log_str = doris::ProcessProfile::instance() | 
|  | ->memory_profile() | 
|  | ->process_memory_detail_str(); | 
|  | LOG_LONG_STRING(INFO, log_str); | 
|  | } | 
|  | if (limit == Limit::HARD) { | 
|  | g_flush_cuz_load_mem_exceed_hard_limit << 1; | 
|  | } else if (limit == Limit::SOFT) { | 
|  | g_flush_cuz_sys_mem_exceed_soft_limit << 1; | 
|  | } else { | 
|  | // will not reach here | 
|  | } | 
|  | _flush_active_memtables(need_flush); | 
|  | } | 
|  | } while (_hard_limit_reached() && !_load_usage_low()); | 
|  | g_memtable_memory_limit_waiting_threads << -1; | 
|  | timer.stop(); | 
|  | int64_t time_ms = timer.elapsed_time() / 1000 / 1000; | 
|  | g_memtable_memory_limit_latency_ms << time_ms; | 
|  | if (time_ms > 0) { | 
|  | LOG(INFO) << "waited " << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) | 
|  | << " for memtable memory limit" | 
|  | << ", " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", " | 
|  | << GlobalMemoryArbitrator::sys_mem_available_details_str() | 
|  | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) | 
|  | << ", memtable writers num: " << _writers.size() | 
|  | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) | 
|  | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) | 
|  | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); | 
|  | } | 
|  | } | 
|  |  | 
|  | int64_t MemTableMemoryLimiter::_flush_active_memtables(int64_t need_flush) { | 
|  | if (need_flush <= 0) { | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | _refresh_mem_tracker(); | 
|  | if (_active_writers.size() == 0) { | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>; | 
|  | auto cmp = [](WriterMem left, WriterMem right) { return left.second < right.second; }; | 
|  | std::priority_queue<WriterMem, std::vector<WriterMem>, decltype(cmp)> heap(cmp); | 
|  |  | 
|  | for (auto writer : _active_writers) { | 
|  | auto w = writer.lock(); | 
|  | if (w == nullptr) { | 
|  | continue; | 
|  | } | 
|  | heap.emplace(w, w->active_memtable_mem_consumption()); | 
|  | } | 
|  |  | 
|  | int64_t mem_flushed = 0; | 
|  | int64_t num_flushed = 0; | 
|  |  | 
|  | while (mem_flushed < need_flush && !heap.empty()) { | 
|  | auto [writer, sort_mem] = heap.top(); | 
|  | heap.pop(); | 
|  | auto w = writer.lock(); | 
|  | if (w == nullptr) { | 
|  | continue; | 
|  | } | 
|  |  | 
|  | int64_t mem = w->active_memtable_mem_consumption(); | 
|  | if (mem < sort_mem * 0.9) { | 
|  | // if the memtable writer just got flushed, don't flush it again | 
|  | continue; | 
|  | } | 
|  | Status st = w->flush_async(); | 
|  | if (!st.ok()) { | 
|  | auto err_msg = fmt::format( | 
|  | "tablet writer failed to reduce mem consumption by flushing memtable, " | 
|  | "tablet_id={}, err={}", | 
|  | w->tablet_id(), st.to_string()); | 
|  | LOG(WARNING) << err_msg; | 
|  | static_cast<void>(w->cancel_with_status(st)); | 
|  | } | 
|  | mem_flushed += mem; | 
|  | num_flushed += (mem > 0); | 
|  | g_memtable_memory_limit_flush_memtable_count << 1; | 
|  | g_memtable_memory_limit_flush_size_bytes << mem; | 
|  | } | 
|  | LOG(INFO) << "flushed " << num_flushed << " out of " << _active_writers.size() | 
|  | << " active writers, flushed size: " << PrettyPrinter::print_bytes(mem_flushed); | 
|  | return mem_flushed; | 
|  | } | 
|  |  | 
|  | void MemTableMemoryLimiter::refresh_mem_tracker() { | 
|  | std::lock_guard<std::mutex> l(_lock); | 
|  | _refresh_mem_tracker(); | 
|  | std::stringstream ss; | 
|  | Limit limit = Limit::NONE; | 
|  | if (_soft_limit_reached()) { | 
|  | limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT; | 
|  | ss << "reached " << (limit == Limit::HARD ? "hard" : "soft") << " limit"; | 
|  | } else if (_last_limit == Limit::NONE) { | 
|  | return; | 
|  | } else { | 
|  | ss << "ended " << (_last_limit == Limit::HARD ? "hard" : "soft") << " limit"; | 
|  | } | 
|  |  | 
|  | if (_last_limit == limit && _log_timer.elapsed_time() < LOG_INTERVAL) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | _last_limit = limit; | 
|  | _log_timer.reset(); | 
|  | LOG(INFO) << ss.str() | 
|  | << ", load mem: " << PrettyPrinter::print_bytes(_mem_tracker->consumption()) | 
|  | << ", memtable writers num: " << _writers.size() | 
|  | << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage) | 
|  | << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage) | 
|  | << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage); | 
|  | if (VLOG_DEBUG_IS_ON) { | 
|  | auto log_str = | 
|  | doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str(); | 
|  | LOG_LONG_STRING(INFO, log_str); | 
|  | } | 
|  | } | 
|  |  | 
|  | void MemTableMemoryLimiter::_refresh_mem_tracker() { | 
|  | _flush_mem_usage = 0; | 
|  | _queue_mem_usage = 0; | 
|  | _active_mem_usage = 0; | 
|  | _active_writers.clear(); | 
|  | for (auto it = _writers.begin(); it != _writers.end();) { | 
|  | if (auto writer = it->lock()) { | 
|  | // The memtable is currently used by writer to insert blocks. | 
|  | auto active_usage = writer->active_memtable_mem_consumption(); | 
|  | _active_mem_usage += active_usage; | 
|  | if (active_usage > 0) { | 
|  | _active_writers.push_back(writer); | 
|  | } | 
|  |  | 
|  | auto flush_usage = writer->mem_consumption(MemType::FLUSH); | 
|  | _flush_mem_usage += flush_usage; | 
|  |  | 
|  | auto write_usage = writer->mem_consumption(MemType::WRITE_FINISHED); | 
|  | _queue_mem_usage += write_usage; | 
|  | ++it; | 
|  | } else { | 
|  | *it = std::move(_writers.back()); | 
|  | _writers.pop_back(); | 
|  | } | 
|  | } | 
|  | _mem_usage = _active_mem_usage + _queue_mem_usage + _flush_mem_usage; | 
|  | g_memtable_active_memory.set_value(_active_mem_usage); | 
|  | g_memtable_write_memory.set_value(_queue_mem_usage); | 
|  | g_memtable_flush_memory.set_value(_flush_mem_usage); | 
|  | g_memtable_load_memory.set_value(_mem_usage); | 
|  | VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size(); | 
|  | _mem_tracker->set_consumption(_mem_usage); | 
|  | if (!_hard_limit_reached()) { | 
|  | _hard_limit_end_cond.notify_all(); | 
|  | } | 
|  | } | 
|  |  | 
|  | } // namespace doris |