blob: e99cc281001f5567d84698137e88c7c4e0816480 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_ms_backpressure_handler.h"
#include <fmt/format.h>
#include <glog/logging.h>
#include <algorithm>
#include <cmath>
#include <queue>
#include <thread>
#include "cloud/config.h"
#include "common/status.h"
#include "util/thread.h"
namespace doris::cloud {
// Global bvar metrics
bvar::Adder<uint64_t> g_backpressure_upgrade_count("ms_rpc_backpressure_upgrade_count");
bvar::Window<bvar::Adder<uint64_t>> g_backpressure_upgrade_60s("ms_rpc_backpressure_upgrade_60s",
&g_backpressure_upgrade_count, 60);
bvar::Adder<uint64_t> g_backpressure_downgrade_count("ms_rpc_backpressure_downgrade_count");
bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s(
"ms_rpc_backpressure_downgrade_60s", &g_backpressure_downgrade_count, 60);
bvar::LatencyRecorder g_throttle_wait_prepare_rowset(
"ms_rpc_backpressure_throttle_wait_prepare_rowset");
bvar::LatencyRecorder g_throttle_wait_commit_rowset(
"ms_rpc_backpressure_throttle_wait_commit_rowset");
bvar::LatencyRecorder g_throttle_wait_update_tmp_rowset(
"ms_rpc_backpressure_throttle_wait_update_tmp_rowset");
bvar::LatencyRecorder g_throttle_wait_update_packed_file_info(
"ms_rpc_backpressure_throttle_wait_update_packed_file_info");
bvar::LatencyRecorder g_throttle_wait_update_delete_bitmap(
"ms_rpc_backpressure_throttle_wait_update_delete_bitmap");
bvar::Adder<uint64_t> g_ms_busy_count("ms_rpc_backpressure_ms_busy_count");
bvar::Window<bvar::Adder<uint64_t>> g_ms_busy_60s("ms_rpc_backpressure_ms_busy_60s",
&g_ms_busy_count, 60);
static bvar::LatencyRecorder* s_throttle_wait_recorders[] = {
&g_throttle_wait_prepare_rowset, &g_throttle_wait_commit_rowset,
&g_throttle_wait_update_tmp_rowset, &g_throttle_wait_update_packed_file_info,
&g_throttle_wait_update_delete_bitmap,
};
bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc) {
size_t idx = static_cast<size_t>(rpc);
if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
return nullptr;
}
return s_throttle_wait_recorders[idx];
}
// ============== StrictQpsLimiter ==============
StrictQpsLimiter::StrictQpsLimiter(double qps) {
if (qps <= 0) {
qps = 1.0;
}
_interval_ns = static_cast<int64_t>(1e9 / qps);
_next_allowed_time = Clock::now();
}
StrictQpsLimiter::Clock::time_point StrictQpsLimiter::reserve() {
std::lock_guard lock(_mtx);
auto now = Clock::now();
if (_next_allowed_time <= now) {
_next_allowed_time = now + std::chrono::nanoseconds(_interval_ns);
return now;
}
auto result = _next_allowed_time;
_next_allowed_time += std::chrono::nanoseconds(_interval_ns);
return result;
}
void StrictQpsLimiter::update_qps(double new_qps) {
if (new_qps <= 0) {
new_qps = 1.0;
}
std::lock_guard lock(_mtx);
_interval_ns = static_cast<int64_t>(1e9 / new_qps);
}
double StrictQpsLimiter::get_qps() const {
std::lock_guard lock(_mtx);
if (_interval_ns <= 0) {
return 0;
}
return 1e9 / _interval_ns;
}
// ============== TableRpcQpsCounter ==============
TableRpcQpsCounter::TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc rpc_type, int window_sec)
: _table_id(table_id), _rpc_type(rpc_type) {
_counter = std::make_unique<bvar::Adder<int64_t>>();
_counter->hide();
_qps = std::make_unique<bvar::PerSecond<bvar::Adder<int64_t>>>(_counter.get(), window_sec);
_qps->hide();
}
void TableRpcQpsCounter::increment() {
(*_counter) << 1;
}
double TableRpcQpsCounter::get_qps() const {
return _qps->get_value();
}
// ============== TableRpcQpsRegistry ==============
TableRpcQpsRegistry::TableRpcQpsRegistry() = default;
void TableRpcQpsRegistry::record(LoadRelatedRpc rpc_type, int64_t table_id) {
auto* counter = get_or_create_counter(rpc_type, table_id);
if (counter) {
counter->increment();
}
}
TableRpcQpsCounter* TableRpcQpsRegistry::get_or_create_counter(LoadRelatedRpc rpc_type,
int64_t table_id) {
size_t idx = static_cast<size_t>(rpc_type);
if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
return nullptr;
}
{
std::shared_lock lock(_mutex);
auto it = _counters[idx].find(table_id);
if (it != _counters[idx].end()) {
return it->second.get();
}
}
std::unique_lock lock(_mutex);
// Double check after acquiring exclusive lock
auto it = _counters[idx].find(table_id);
if (it != _counters[idx].end()) {
return it->second.get();
}
auto counter = std::make_unique<TableRpcQpsCounter>(table_id, rpc_type,
config::ms_rpc_table_qps_window_sec);
auto* ptr = counter.get();
_counters[idx][table_id] = std::move(counter);
return ptr;
}
std::vector<std::pair<int64_t, double>> TableRpcQpsRegistry::get_top_k_tables(
LoadRelatedRpc rpc_type, int k) const {
size_t idx = static_cast<size_t>(rpc_type);
if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT) || k <= 0) {
return {};
}
// Use a min-heap of size k to find top-k without allocating a vector for all tables.
// The heap top is the smallest among the k largest elements seen so far.
using Entry = std::pair<int64_t, double>; // (table_id, qps)
auto min_cmp = [](const Entry& a, const Entry& b) { return a.second > b.second; };
std::priority_queue<Entry, std::vector<Entry>, decltype(min_cmp)> min_heap(min_cmp);
{
std::shared_lock lock(_mutex);
for (const auto& [table_id, counter] : _counters[idx]) {
double qps = counter->get_qps();
if (qps > 0) {
if (static_cast<int>(min_heap.size()) < k) {
min_heap.push({table_id, qps});
} else if (qps > min_heap.top().second) {
min_heap.pop();
min_heap.push({table_id, qps});
}
}
}
}
// Extract results from heap (comes out in ascending order, reverse to descending)
std::vector<Entry> result;
result.reserve(min_heap.size());
while (!min_heap.empty()) {
result.push_back(min_heap.top());
min_heap.pop();
}
std::reverse(result.begin(), result.end());
return result;
}
double TableRpcQpsRegistry::get_qps(LoadRelatedRpc rpc_type, int64_t table_id) const {
size_t idx = static_cast<size_t>(rpc_type);
if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
return 0;
}
std::shared_lock lock(_mutex);
auto it = _counters[idx].find(table_id);
if (it != _counters[idx].end()) {
return it->second->get_qps();
}
return 0;
}
void TableRpcQpsRegistry::cleanup_inactive_tables() {
std::unique_lock lock(_mutex);
for (size_t idx = 0; idx < static_cast<size_t>(LoadRelatedRpc::COUNT); ++idx) {
auto& counter_map = _counters[idx];
for (auto it = counter_map.begin(); it != counter_map.end();) {
// Remove counters with zero QPS for a long time
if (it->second->get_qps() < 0.01) {
it = counter_map.erase(it);
} else {
++it;
}
}
}
}
// ============== TableRpcThrottler ==============
TableRpcThrottler::TableRpcThrottler() {
// Initialize bvar for throttled table counts
for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) {
std::string bvar_name = fmt::format("ms_rpc_backpressure_throttled_tables_{}",
load_related_rpc_name(static_cast<LoadRelatedRpc>(i)));
_throttled_table_counts[i] = std::make_unique<bvar::Status<size_t>>(bvar_name, 0);
}
}
std::chrono::steady_clock::time_point TableRpcThrottler::throttle(LoadRelatedRpc rpc_type,
int64_t table_id) {
std::shared_lock lock(_mutex);
auto it = _limiters.find({rpc_type, table_id});
if (it == _limiters.end()) {
return std::chrono::steady_clock::now();
}
return it->second->reserve();
}
void TableRpcThrottler::set_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id, double qps_limit) {
if (qps_limit <= 0) {
return;
}
std::unique_lock lock(_mutex);
auto key = std::make_pair(rpc_type, table_id);
auto it = _limiters.find(key);
if (it != _limiters.end()) {
it->second->update_qps(qps_limit);
} else {
_limiters[key] = std::make_unique<StrictQpsLimiter>(qps_limit);
// Update bvar count
size_t idx = static_cast<size_t>(rpc_type);
if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
size_t count = 0;
for (const auto& [k, _] : _limiters) {
if (k.first == rpc_type) {
++count;
}
}
_throttled_table_counts[idx]->set_value(count);
}
}
LOG(INFO) << "[ms-throttle] set table QPS limit: rpc=" << load_related_rpc_name(rpc_type)
<< ", table_id=" << table_id << ", qps_limit=" << qps_limit;
}
void TableRpcThrottler::remove_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) {
std::unique_lock lock(_mutex);
auto key = std::make_pair(rpc_type, table_id);
auto it = _limiters.find(key);
if (it != _limiters.end()) {
_limiters.erase(it);
// Update bvar count
size_t idx = static_cast<size_t>(rpc_type);
if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
size_t count = 0;
for (const auto& [k, _] : _limiters) {
if (k.first == rpc_type) {
++count;
}
}
_throttled_table_counts[idx]->set_value(count);
}
LOG(INFO) << "[ms-throttle] removed table QPS limit: rpc="
<< load_related_rpc_name(rpc_type) << ", table_id=" << table_id;
}
}
double TableRpcThrottler::get_qps_limit(LoadRelatedRpc rpc_type, int64_t table_id) const {
std::shared_lock lock(_mutex);
auto it = _limiters.find({rpc_type, table_id});
if (it != _limiters.end()) {
return it->second->get_qps();
}
return 0;
}
bool TableRpcThrottler::has_limit(LoadRelatedRpc rpc_type, int64_t table_id) const {
std::shared_lock lock(_mutex);
return _limiters.find({rpc_type, table_id}) != _limiters.end();
}
size_t TableRpcThrottler::get_throttled_table_count(LoadRelatedRpc rpc_type) const {
size_t idx = static_cast<size_t>(rpc_type);
if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
return 0;
}
return _throttled_table_counts[idx]->get_value();
}
std::vector<TableRpcThrottler::ThrottleEntry> TableRpcThrottler::get_all_throttled_entries() const {
std::shared_lock lock(_mutex);
std::vector<ThrottleEntry> entries;
entries.reserve(_limiters.size());
for (const auto& [key, limiter] : _limiters) {
entries.push_back({key.first, key.second, limiter->get_qps()});
}
return entries;
}
// ============== MSBackpressureHandler ==============
MSBackpressureHandler::MSBackpressureHandler(TableRpcQpsRegistry* qps_registry,
TableRpcThrottler* throttler)
: _qps_registry(qps_registry),
_throttler(throttler),
_stop_latch(1),
_last_ms_busy_time(std::chrono::steady_clock::time_point::min()) {
// Initialize state machine with config values
RpcThrottleParams throttle_params {
.top_k = config::ms_backpressure_upgrade_top_k,
.ratio = config::ms_backpressure_throttle_ratio,
.floor_qps = config::ms_rpc_table_qps_limit_floor,
};
_state_machine = std::make_unique<RpcThrottleStateMachine>(throttle_params);
// Initialize coordinator with config values
// Coordinator uses ticks where 1 tick = 1 millisecond (fixed unit)
// This allows tick_interval_ms to change at runtime without affecting correctness
ThrottleCoordinatorParams coordinator_params {
.upgrade_cooldown_ticks = config::ms_backpressure_upgrade_interval_ms,
.downgrade_after_ticks = config::ms_backpressure_downgrade_interval_ms,
};
_coordinator = std::make_unique<RpcThrottleCoordinator>(coordinator_params);
auto st = Thread::create(
"MSBackpressureHandler", "tick_thread", [this]() { this->_tick_thread_callback(); },
&_tick_thread);
if (!st.ok()) {
LOG(WARNING) << "[ms-throttle] failed to create tick thread: " << st;
} else {
LOG(INFO) << "[ms-throttle] handler started: upgrade_cooldown="
<< config::ms_backpressure_upgrade_interval_ms
<< "ms, downgrade_interval=" << config::ms_backpressure_downgrade_interval_ms
<< "ms";
}
}
MSBackpressureHandler::~MSBackpressureHandler() {
_stop_latch.count_down();
if (_tick_thread) {
_tick_thread->join();
}
}
void MSBackpressureHandler::_tick_thread_callback() {
// Fixed tick interval: 1 second. Since 1 tick = 1 ms, advance by 1000 ticks each iteration.
constexpr int kTickIntervalMs = 1000;
while (!_stop_latch.wait_for(std::chrono::milliseconds(kTickIntervalMs))) {
_advance_time(kTickIntervalMs);
}
}
void MSBackpressureHandler::_advance_time(int ticks) {
if (!config::enable_ms_backpressure_handling) {
return;
}
std::lock_guard lock(_transition_mutex);
// Advance coordinator time; if downgrade is triggered, handle it
if (_coordinator->tick(ticks)) {
LOG(INFO) << "[ms-throttle] triggering downgrade, upgrade_level="
<< _state_machine->upgrade_level();
auto actions = _state_machine->on_downgrade();
_apply_actions(actions);
_coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0);
g_backpressure_downgrade_count << 1;
}
}
bool MSBackpressureHandler::on_ms_busy() {
g_ms_busy_count << 1;
if (!config::enable_ms_backpressure_handling) {
return false;
}
{
std::lock_guard lock(_mutex);
_last_ms_busy_time = std::chrono::steady_clock::now();
}
std::lock_guard lock(_transition_mutex);
// Check with coordinator if upgrade should be triggered
if (!_coordinator->report_ms_busy()) {
return false;
}
LOG(INFO) << "[ms-throttle] received MS_BUSY, triggering upgrade";
auto snapshot = _build_qps_snapshot();
auto actions = _state_machine->on_upgrade(snapshot);
_apply_actions(actions);
_coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() > 0);
g_backpressure_upgrade_count << 1;
return true;
}
std::chrono::steady_clock::time_point MSBackpressureHandler::before_rpc(LoadRelatedRpc rpc_type,
int64_t table_id) {
if (!config::enable_ms_backpressure_handling) {
return std::chrono::steady_clock::now();
}
return _throttler->throttle(rpc_type, table_id);
}
void MSBackpressureHandler::after_rpc(LoadRelatedRpc rpc_type, int64_t table_id) {
if (!config::enable_ms_backpressure_handling) {
return;
}
_qps_registry->record(rpc_type, table_id);
}
void MSBackpressureHandler::update_throttle_params(RpcThrottleParams params) {
_state_machine->update_params(params);
}
void MSBackpressureHandler::update_coordinator_params(ThrottleCoordinatorParams params) {
_coordinator->update_params(params);
}
int64_t MSBackpressureHandler::seconds_since_last_ms_busy() const {
std::lock_guard lock(_mutex);
if (_last_ms_busy_time == std::chrono::steady_clock::time_point::min()) {
return -1; // Never received MS_BUSY
}
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() -
_last_ms_busy_time)
.count();
}
size_t MSBackpressureHandler::upgrade_level() const {
return _state_machine->upgrade_level();
}
int MSBackpressureHandler::ticks_since_last_ms_busy() const {
return _coordinator->ticks_since_last_ms_busy();
}
int MSBackpressureHandler::ticks_since_last_upgrade() const {
return _coordinator->ticks_since_last_upgrade();
}
void MSBackpressureHandler::_apply_actions(const std::vector<RpcThrottleAction>& actions) {
for (const auto& action : actions) {
switch (action.type) {
case RpcThrottleAction::Type::SET_LIMIT:
_throttler->set_qps_limit(action.rpc_type, action.table_id, action.qps_limit);
break;
case RpcThrottleAction::Type::REMOVE_LIMIT:
_throttler->remove_qps_limit(action.rpc_type, action.table_id);
break;
}
}
}
std::vector<RpcQpsSnapshot> MSBackpressureHandler::_build_qps_snapshot() const {
std::vector<RpcQpsSnapshot> snapshot;
// For each RPC type, get top-k tables
int top_k = _state_machine->get_params().top_k;
for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) {
LoadRelatedRpc rpc_type = static_cast<LoadRelatedRpc>(i);
auto top_tables = _qps_registry->get_top_k_tables(rpc_type, top_k);
for (const auto& [table_id, qps] : top_tables) {
snapshot.push_back({rpc_type, table_id, qps});
}
}
return snapshot;
}
} // namespace doris::cloud