| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "cloud/cloud_throttle_state_machine.h" |
| |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| |
| namespace doris::cloud { |
| |
| // Display names for LoadRelatedRpc types |
| static constexpr std::string_view LOAD_RELATED_RPC_NAMES[] = { |
| "prepare_rowset", "commit_rowset", "update_tmp_rowset", "update_packed_file_info", |
| "update_delete_bitmap"}; |
| |
| std::string_view load_related_rpc_name(LoadRelatedRpc rpc) { |
| size_t idx = static_cast<size_t>(rpc); |
| if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) { |
| return LOAD_RELATED_RPC_NAMES[idx]; |
| } |
| return "unknown"; |
| } |
| |
| // ============== RpcThrottleStateMachine ============== |
| |
| RpcThrottleStateMachine::RpcThrottleStateMachine(RpcThrottleParams params) : _params(params) { |
| LOG(INFO) << "[ms-throttle] state machine initialized: top_k=" << params.top_k |
| << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps; |
| } |
| |
| void RpcThrottleStateMachine::update_params(RpcThrottleParams params) { |
| std::lock_guard lock(_mtx); |
| _params = params; |
| LOG(INFO) << "[ms-throttle] state machine params updated: top_k=" << params.top_k |
| << ", ratio=" << params.ratio << ", floor_qps=" << params.floor_qps; |
| } |
| |
| std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_upgrade( |
| const std::vector<RpcQpsSnapshot>& qps_snapshot) { |
| std::lock_guard lock(_mtx); |
| |
| UpgradeRecord record; |
| std::vector<RpcThrottleAction> actions; |
| |
| double ratio = _params.ratio; |
| double floor_qps = _params.floor_qps; |
| |
| // Caller is responsible for providing top-k snapshot per RPC type. |
| // State machine simply applies throttling to every entry in the snapshot. |
| for (const auto& snapshot : qps_snapshot) { |
| auto key = std::make_pair(snapshot.rpc_type, snapshot.table_id); |
| |
| double old_limit = 0.0; |
| auto limit_it = _current_limits.find(key); |
| if (limit_it != _current_limits.end()) { |
| old_limit = limit_it->second; |
| } |
| |
| double new_limit; |
| if (old_limit > 0) { |
| // Already has a limit, reduce it further |
| new_limit = old_limit * ratio; |
| } else { |
| // No limit yet, set based on current QPS |
| new_limit = snapshot.current_qps * ratio; |
| } |
| |
| // Apply floor |
| new_limit = std::max(new_limit, floor_qps); |
| |
| // Only apply if it's actually limiting |
| if (new_limit < snapshot.current_qps || old_limit > 0) { |
| RpcThrottleAction action { |
| .type = RpcThrottleAction::Type::SET_LIMIT, |
| .rpc_type = snapshot.rpc_type, |
| .table_id = snapshot.table_id, |
| .qps_limit = new_limit, |
| }; |
| actions.push_back(action); |
| record.changes[key] = {old_limit, new_limit}; |
| _current_limits[key] = new_limit; |
| |
| LOG(INFO) << "[ms-throttle] upgrade: rpc=" << load_related_rpc_name(snapshot.rpc_type) |
| << ", table_id=" << snapshot.table_id |
| << ", current_qps=" << snapshot.current_qps << ", old_limit=" << old_limit |
| << ", new_limit=" << new_limit; |
| } |
| } |
| |
| if (!record.changes.empty()) { |
| _upgrade_history.push_back(std::move(record)); |
| } |
| |
| LOG(INFO) << "[ms-throttle] on_upgrade done: actions=" << actions.size() |
| << ", upgrade_level=" << _upgrade_history.size() |
| << ", snapshot_size=" << qps_snapshot.size(); |
| |
| return actions; |
| } |
| |
| std::vector<RpcThrottleAction> RpcThrottleStateMachine::on_downgrade() { |
| std::lock_guard lock(_mtx); |
| |
| std::vector<RpcThrottleAction> actions; |
| |
| if (_upgrade_history.empty()) { |
| LOG(INFO) << "[ms-throttle] on_downgrade skipped: no upgrade history"; |
| return actions; |
| } |
| |
| // Undo the most recent upgrade |
| const auto& record = _upgrade_history.back(); |
| |
| for (const auto& [key, limits] : record.changes) { |
| const auto& [rpc_type, table_id] = key; |
| double old_limit = limits.first; |
| |
| if (old_limit > 0) { |
| // Restore the previous limit |
| RpcThrottleAction action { |
| .type = RpcThrottleAction::Type::SET_LIMIT, |
| .rpc_type = rpc_type, |
| .table_id = table_id, |
| .qps_limit = old_limit, |
| }; |
| |
| actions.push_back(action); |
| _current_limits[key] = old_limit; |
| |
| LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type) |
| << ", table_id=" << table_id << ", restored_limit=" << old_limit; |
| } else { |
| // No previous limit, remove it entirely |
| RpcThrottleAction action { |
| .type = RpcThrottleAction::Type::REMOVE_LIMIT, |
| .rpc_type = rpc_type, |
| .table_id = table_id, |
| }; |
| |
| actions.push_back(action); |
| _current_limits.erase(key); |
| |
| LOG(INFO) << "[ms-throttle] downgrade: rpc=" << load_related_rpc_name(rpc_type) |
| << ", table_id=" << table_id << ", removed limit"; |
| } |
| } |
| |
| _upgrade_history.pop_back(); |
| |
| LOG(INFO) << "[ms-throttle] on_downgrade done: actions=" << actions.size() |
| << ", upgrade_level=" << _upgrade_history.size(); |
| |
| return actions; |
| } |
| |
| size_t RpcThrottleStateMachine::upgrade_level() const { |
| std::lock_guard lock(_mtx); |
| return _upgrade_history.size(); |
| } |
| |
| double RpcThrottleStateMachine::get_current_limit(LoadRelatedRpc rpc_type, int64_t table_id) const { |
| std::lock_guard lock(_mtx); |
| auto it = _current_limits.find({rpc_type, table_id}); |
| if (it != _current_limits.end()) { |
| return it->second; |
| } |
| return 0.0; |
| } |
| |
| RpcThrottleParams RpcThrottleStateMachine::get_params() const { |
| std::lock_guard lock(_mtx); |
| return _params; |
| } |
| |
| // ============== RpcThrottleCoordinator ============== |
| |
| RpcThrottleCoordinator::RpcThrottleCoordinator(ThrottleCoordinatorParams params) : _params(params) { |
| LOG(INFO) << "[ms-throttle] coordinator initialized: upgrade_cooldown_ticks=" |
| << params.upgrade_cooldown_ticks |
| << ", downgrade_after_ticks=" << params.downgrade_after_ticks; |
| } |
| |
| void RpcThrottleCoordinator::update_params(ThrottleCoordinatorParams params) { |
| std::lock_guard lock(_mtx); |
| _params = params; |
| LOG(INFO) << "[ms-throttle] coordinator params updated: upgrade_cooldown_ticks=" |
| << params.upgrade_cooldown_ticks |
| << ", downgrade_after_ticks=" << params.downgrade_after_ticks; |
| } |
| |
| bool RpcThrottleCoordinator::report_ms_busy() { |
| std::lock_guard lock(_mtx); |
| |
| // Reset tick counter since last MS_BUSY |
| _ticks_since_last_ms_busy = 0; |
| |
| // Check if cooldown has passed |
| if (_ticks_since_last_upgrade == -1 || |
| _ticks_since_last_upgrade >= _params.upgrade_cooldown_ticks) { |
| // Reset upgrade counter |
| auto actual_ticks = _ticks_since_last_upgrade; |
| _ticks_since_last_upgrade = 0; |
| _has_pending_upgrades = true; |
| |
| LOG(INFO) << "[ms-throttle] upgrade triggered: ticks_since_last_upgrade=" << actual_ticks |
| << ", cooldown=" << _params.upgrade_cooldown_ticks; |
| return true; // Should trigger upgrade |
| } |
| return false; // Cooling down |
| } |
| |
| bool RpcThrottleCoordinator::tick(int ticks) { |
| std::lock_guard lock(_mtx); |
| |
| // Increment tick counters |
| if (_ticks_since_last_ms_busy >= 0) { |
| _ticks_since_last_ms_busy += ticks; |
| } |
| if (_ticks_since_last_upgrade >= 0) { |
| _ticks_since_last_upgrade += ticks; |
| } |
| |
| // Check if downgrade should be triggered |
| if (_has_pending_upgrades && _ticks_since_last_ms_busy >= _params.downgrade_after_ticks) { |
| // Reset for next downgrade cycle |
| auto actual_ticks = _ticks_since_last_ms_busy; |
| _ticks_since_last_ms_busy = 0; |
| |
| LOG(INFO) << "[ms-throttle] downgrade triggered: ticks_since_last_ms_busy=" << actual_ticks |
| << ", threshold=" << _params.downgrade_after_ticks; |
| return true; // Should trigger downgrade |
| } |
| |
| return false; |
| } |
| |
| void RpcThrottleCoordinator::set_has_pending_upgrades(bool has) { |
| std::lock_guard lock(_mtx); |
| _has_pending_upgrades = has; |
| } |
| |
| int RpcThrottleCoordinator::ticks_since_last_ms_busy() const { |
| std::lock_guard lock(_mtx); |
| return _ticks_since_last_ms_busy; |
| } |
| |
| int RpcThrottleCoordinator::ticks_since_last_upgrade() const { |
| std::lock_guard lock(_mtx); |
| return _ticks_since_last_upgrade; |
| } |
| |
| ThrottleCoordinatorParams RpcThrottleCoordinator::get_params() const { |
| std::lock_guard lock(_mtx); |
| return _params; |
| } |
| |
| } // namespace doris::cloud |