blob: 1d7d0a10ac89dfab822354de90ea309a48c24f1b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rate_limiter.h"
#include <bthread/mutex.h>
#include <butil/strings/string_split.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ranges>
#include <shared_mutex>
#include "common/bvars.h"
#include "common/config.h"
#include "common/configbase.h"
namespace doris::cloud {
std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const std::string& list_str) {
std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::vector<std::string> max_qps_limit_list;
butil::SplitString(list_str, ';', &max_qps_limit_list);
for (const auto& v : max_qps_limit_list) {
auto p = v.find(':');
if (p != std::string::npos && p != (v.size() - 1)) {
auto rpc_name = v.substr(0, p);
try {
int64_t max_qps_limit = std::stoll(v.substr(p + 1));
if (max_qps_limit > 0) {
rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
}
} catch (...) {
LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name
<< " config: " << v;
}
}
}
return rpc_name_to_max_qps_limit;
}
template <typename Callable>
void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
auto method_size = service->GetDescriptor()->method_count();
for (auto i = 0; i < method_size; ++i) {
std::string rpc_name = service->GetDescriptor()->method(i)->name();
cb(rpc_name);
}
}
void RateLimiter::init(google::protobuf::Service* service) {
auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit);
std::unique_lock write_lock(mutex_);
for_each_rpc_name(service, [&](const std::string& rpc_name) {
auto it = rpc_name_to_specific_limit.find(rpc_name);
int64_t max_qps_limit = config::default_max_qps_limit;
if (it != rpc_name_to_specific_limit.end()) {
max_qps_limit = it->second;
}
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, max_qps_limit);
});
for (const auto& [k, _] : rpc_name_to_specific_limit) {
rpc_with_specific_limit_.insert(k);
}
}
std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) {
auto it = limiters_.find(rpc_name);
if (it == limiters_.end()) {
return nullptr;
}
return it->second;
}
bool RateLimiter::set_rate_limit(int64_t qps_limit) {
std::lock_guard lock(mutex_);
auto filter = [this](const auto& kv) { return !rpc_with_specific_limit_.contains(kv.first); };
for (const auto& [_, v] : limiters_ | std::views::filter(std::move(filter))) {
v->set_max_qps_limit(qps_limit);
}
return true;
}
bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name) {
if (!limiters_.contains(rpc_name)) {
return false;
}
auto limiter = limiters_.at(rpc_name);
std::lock_guard lock(mutex_);
limiter->set_max_qps_limit(qps_limit);
rpc_with_specific_limit_.insert(rpc_name);
return true;
}
bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name,
const std::string& instance_id) {
if (!limiters_.contains(rpc_name)) {
return false;
}
auto limiter = limiters_.at(rpc_name);
return limiter->set_max_qps_limit(qps_limit, instance_id);
}
bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id) {
return std::ranges::all_of(limiters_, [&](const auto& kv) {
return kv.second->set_max_qps_limit(qps_limit, instance_id);
});
return true;
}
void RateLimiter::for_each_rpc_limiter(
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb) {
for (const auto& [rpc_name, rpc_limiter] : limiters_) {
cb(rpc_name, rpc_limiter);
}
}
bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
std::function<int()>& get_bvar_qps) {
if (!config::use_detailed_metrics || instance_id.empty()) {
return true;
}
std::shared_ptr<QpsToken> qps_token = nullptr;
{
std::lock_guard<bthread::Mutex> l(mutex_);
auto it = qps_limiter_.find(instance_id);
// new instance always can get token
if (it == qps_limiter_.end()) {
qps_token = std::make_shared<QpsToken>(max_qps_limit_);
qps_limiter_[instance_id] = qps_token;
return true;
}
qps_token = it->second;
}
return qps_token->get_token(get_bvar_qps);
}
void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) {
std::lock_guard<bthread::Mutex> l(mutex_);
max_qps_limit_ = max_qps_limit;
auto filter = [this](const auto& kv) {
return !instance_with_specific_limit_.contains(kv.first);
};
for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) {
v->set_max_qps_limit(max_qps_limit);
}
}
bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const std::string& instance_id) {
std::lock_guard<bthread::Mutex> l(mutex_);
if (!qps_limiter_.contains(instance_id)) {
qps_limiter_[instance_id] = std::make_shared<QpsToken>(max_qps_limit);
} else {
qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit);
}
instance_with_specific_limit_.insert(instance_id);
return true;
}
void RpcRateLimiter::for_each_qps_token(
std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb) {
for (const auto& [instance_id, qps_token] : qps_limiter_) {
cb(instance_id, qps_token);
}
}
bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
using namespace std::chrono;
auto now = steady_clock::now();
std::lock_guard<bthread::Mutex> l(mutex_);
// Todo: if current_qps_ > max_qps_limit_, always return false until the bvar qps is updated,
// maybe need to reduce the bvar's update interval.
++access_count_;
auto duration_s = duration_cast<seconds>(now - last_update_time_).count();
if (duration_s > config::bvar_qps_update_second ||
(duration_s != 0 && (access_count_ / duration_s > max_qps_limit_ / 2))) {
access_count_ = 0;
last_update_time_ = now;
current_qps_ = get_bvar_qps();
}
return current_qps_ < max_qps_limit_;
}
void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) {
std::lock_guard<bthread::Mutex> l(mutex_);
max_qps_limit_ = max_qps_limit;
}
} // namespace doris::cloud