blob: a406fbb4932dbd769d5844c911062df0f9c90bb2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rate-limiter/rate_limiter.h"
#include <gen_cpp/cloud.pb.h>
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <thread>
#include "common/config.h"
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/mem_txn_kv.h"
#include "meta-service/meta_service.h"
#include "meta-service/txn_kv_error.h"
#include "mock_resource_manager.h"
#include "resource-manager/resource_manager.h"
int main(int argc, char** argv) {
doris::cloud::config::init(nullptr, true);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
using namespace doris::cloud;
const std::string mock_instance_0 = "mock_instance_0";
const std::string mock_instance_1 = "mock_instance_1";
const std::string mock_cluster_0 = "mock_cluster_0";
const std::string mock_cluster_1 = "mock_cluster_1";
const std::string mock_cluster_id_0 = "mock_cluster_id_0";
const std::string mock_cluster_id_1 = "mock_cluster_id_1";
const std::string mock_cloud_unique_id_0 = "mock_cloud_unique_id_0";
const std::string mock_cloud_unique_id_1 = "mock_cloud_unique_id_1";
class MockMultiInstanceRsMgr : public MockResourceManager {
public:
using MockResourceManager::MockResourceManager;
std::string get_node(const std::string& cloud_unique_id,
std::vector<NodeInfo>* nodes) override {
if (cloud_unique_id == mock_cloud_unique_id_0) {
nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_0, mock_cluster_0,
mock_cluster_id_0);
} else if (cloud_unique_id == mock_cloud_unique_id_1) {
nodes->emplace_back(Role::COMPUTE_NODE, mock_instance_1, mock_cluster_1,
mock_cluster_id_1);
}
return {};
};
};
std::unique_ptr<MetaServiceProxy> get_meta_service() {
auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
[&] { ASSERT_NE(txn_kv.get(), nullptr); }();
auto rs = std::make_shared<MockMultiInstanceRsMgr>(txn_kv);
auto rl = std::make_shared<RateLimiter>();
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl);
return std::make_unique<MetaServiceProxy>(std::move(meta_service));
}
void mock_add_cluster(MetaServiceProxy& meta_service, std::string instance_id) {
// add cluster first
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
ClusterPB c1;
c1.set_cluster_name(mock_cluster_name);
c1.set_cluster_id(mock_cluster_id);
c1.add_mysql_user_name()->append("name1");
instance.add_clusters()->CopyFrom(c1);
val = instance.SerializeAsString();
std::unique_ptr<Transaction> txn;
std::string get_val;
ASSERT_EQ(meta_service.txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
void mock_get_cluster(MetaServiceProxy& meta_service, const std::string& cloud_uid,
MetaServiceCode code, std::atomic_size_t& failed_cnt) {
GetClusterRequest req;
req.set_cloud_unique_id(cloud_uid);
req.set_cluster_id(mock_cluster_id);
req.set_cluster_name(mock_cluster_name);
brpc::Controller cntl;
GetClusterResponse res;
meta_service.get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req,
&res, nullptr);
if (code == MetaServiceCode::OK) {
ASSERT_EQ(res.status().code(), code);
} else {
++failed_cnt;
}
}
template <typename Rpc>
void mock_parallel_rpc(Rpc rpc, MetaServiceProxy* meta_service, const std::string& cloud_uid,
MetaServiceCode expected, size_t times) {
std::atomic_size_t failed_cnt;
std::vector<std::thread> threads;
for (size_t i = 0; i < times; ++i) {
threads.emplace_back([&]() { rpc(*meta_service, cloud_uid, expected, failed_cnt); });
}
for (auto& t : threads) {
t.join();
}
if (expected != MetaServiceCode::OK) {
ASSERT_GT(failed_cnt, 0);
}
}
TEST(RateLimiterTest, AdjustLimitInfluenceTest) {
auto meta_service = get_meta_service();
mock_add_cluster(*meta_service, mock_instance_0);
mock_add_cluster(*meta_service, mock_instance_1);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 1);
std::this_thread::sleep_for(std::chrono::seconds(1));
{
ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_1));
ASSERT_TRUE(
meta_service->rate_limiter()->set_rate_limit(100, "get_cluster", mock_instance_0));
auto limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_0)
->max_qps_limit();
ASSERT_EQ(limit, 100);
limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_1)
->max_qps_limit();
ASSERT_EQ(limit, 1);
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
ASSERT_EQ(limit, 1);
limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 5000000);
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(5000, "get_cluster"));
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
ASSERT_EQ(limit, 1);
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1000));
limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 5000);
limit = meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
ASSERT_EQ(limit, 1000);
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(3000, "commit_txn"));
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("commit_txn")->max_qps_limit();
ASSERT_EQ(limit, 3000);
limit = meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 5000);
}
{
auto limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_0)
->max_qps_limit();
ASSERT_EQ(limit, 100);
limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_1)
->max_qps_limit();
ASSERT_EQ(limit, 1);
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(200, mock_instance_1));
auto limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_0)
->max_qps_limit();
ASSERT_EQ(limit, 100);
limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_1)
->max_qps_limit();
ASSERT_EQ(limit, 200);
}
}
TEST(RateLimiterTest, TestAdjustLimitInfluence1) {
auto meta_service = get_meta_service();
mock_add_cluster(*meta_service, mock_instance_0);
mock_add_cluster(*meta_service, mock_instance_1);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(0));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 1);
// have to sleep 5s to ensure sucess, maybe related with bvar latency
std::this_thread::sleep_for(std::chrono::seconds(5));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(2, "get_cluster"));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::MAX_QPS_LIMIT, 1);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 3);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(
meta_service->rate_limiter()->set_rate_limit(4, "get_cluster", mock_instance_0));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::MAX_QPS_LIMIT, 3);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(
meta_service->rate_limiter()->set_rate_limit(2, "get_cluster", mock_instance_0));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 3);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(
meta_service->rate_limiter()->set_rate_limit(6, "get_cluster", mock_instance_0));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 5);
}
}
TEST(RateLimiterTest, TestAdjustLimitInfluence2) {
auto meta_service = get_meta_service();
mock_add_cluster(*meta_service, mock_instance_0);
mock_add_cluster(*meta_service, mock_instance_1);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 1);
std::this_thread::sleep_for(std::chrono::seconds(1));
{
ASSERT_TRUE(
meta_service->rate_limiter()->set_rate_limit(6, "get_cluster", mock_instance_0));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 7);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 5);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::OK, 7);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(4, mock_instance_0));
std::this_thread::sleep_for(std::chrono::seconds(2));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 5);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::OK, 7);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::MAX_QPS_LIMIT, 3);
}
}
TEST(RateLimiterTest, AdjustLimitMockRPCTest) {
auto meta_service = get_meta_service();
mock_add_cluster(*meta_service, mock_instance_0);
mock_add_cluster(*meta_service, mock_instance_1);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 20);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::OK, 20);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster"));
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 20);
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 10000);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1, "get_cluster"));
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 1);
auto limit =
meta_service->rate_limiter()->get_rpc_rate_limiter("get_cluster")->max_qps_limit();
ASSERT_EQ(limit, 1);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster",
mock_instance_0));
ASSERT_TRUE(meta_service->rate_limiter()->set_rate_limit(10000, "get_cluster",
mock_instance_1));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::OK, 20);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::OK, 20);
auto limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_0)
->max_qps_limit();
ASSERT_EQ(limit, 10000);
limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_1)
->max_qps_limit();
ASSERT_EQ(limit, 10000);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
{
ASSERT_TRUE(meta_service->rate_limiter()->set_instance_rate_limit(1, mock_instance_0));
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_0,
MetaServiceCode::MAX_QPS_LIMIT, 1);
mock_parallel_rpc(mock_get_cluster, meta_service.get(), mock_cloud_unique_id_1,
MetaServiceCode::OK, 20);
auto limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_0)
->max_qps_limit();
ASSERT_EQ(limit, 1);
limit = meta_service->rate_limiter()
->get_rpc_rate_limiter("get_cluster")
->qps_limiter_.at(mock_instance_1)
->max_qps_limit();
ASSERT_EQ(limit, 10000);
}
}