| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //#define private public |
| #include "meta-service/meta_service.h" |
| //#undef private |
| |
| #include <brpc/controller.h> |
| #include <bvar/window.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <cstdint> |
| #include <memory> |
| #include <random> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/keys.h" |
| #include "meta-service/mem_txn_kv.h" |
| #include "meta-service/txn_kv_error.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "resource-manager/resource_manager.h" |
| |
| int main(int argc, char** argv) { |
| const std::string conf_file = "doris_cloud.conf"; |
| if (!doris::cloud::config::init(conf_file.c_str(), true)) { |
| std::cerr << "failed to init config file, conf=" << conf_file << std::endl; |
| return -1; |
| } |
| |
| if (!doris::cloud::init_glog("resource_test")) { |
| std::cerr << "failed to init glog" << std::endl; |
| return -1; |
| } |
| |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |
| |
| namespace doris::cloud { |
| |
| static std::shared_ptr<TxnKv> create_txn_kv() { |
| // MemKv |
| int ret = 0; |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| if (txn_kv != nullptr) { |
| ret = txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); |
| |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| return txn_kv; |
| } |
| |
| std::unique_ptr<MetaServiceProxy> get_meta_service(std::shared_ptr<TxnKv> txn_kv = {}) { |
| if (!txn_kv) { |
| txn_kv = create_txn_kv(); |
| } |
| |
| auto rs = std::make_shared<ResourceManager>(txn_kv); |
| EXPECT_EQ(rs->init(), 0); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| static void create_args_to_add(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del, |
| bool is_host = false) { |
| to_add->clear(); |
| to_del->clear(); |
| auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE, |
| .instance_id = "test-resource-instance", |
| .cluster_name = "cluster_name_1", |
| .cluster_id = "cluster_id_1", |
| .node_info = NodeInfoPB {}}; |
| is_host ? ni_1.node_info.set_host("host1") : ni_1.node_info.set_ip("127.0.0.1"); |
| ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1"); |
| ni_1.node_info.set_heartbeat_port(9999); |
| to_add->push_back(ni_1); |
| |
| auto ni_2 = NodeInfo {.role = Role::COMPUTE_NODE, |
| .instance_id = "test-resource-instance", |
| .cluster_name = "cluster_name_1", |
| .cluster_id = "cluster_id_1", |
| .node_info = NodeInfoPB {}}; |
| is_host ? ni_2.node_info.set_host("host2") : ni_2.node_info.set_ip("127.0.0.2"); |
| ni_2.node_info.set_cloud_unique_id("test_cloud_unique_id_1"); |
| ni_2.node_info.set_heartbeat_port(9999); |
| to_add->push_back(ni_2); |
| |
| auto ni_3 = NodeInfo {.role = Role::COMPUTE_NODE, |
| .instance_id = "test-resource-instance", |
| .cluster_name = "cluster_name_2", |
| .cluster_id = "cluster_id_2", |
| .node_info = NodeInfoPB {}}; |
| is_host ? ni_3.node_info.set_host("host3") : ni_3.node_info.set_ip("127.0.0.3"); |
| ni_3.node_info.set_cloud_unique_id("test_cloud_unique_id_2"); |
| ni_3.node_info.set_heartbeat_port(9999); |
| to_add->push_back(ni_3); |
| } |
| |
| static void create_args_to_del(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del, |
| bool is_host = false) { |
| to_add->clear(); |
| to_del->clear(); |
| auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE, |
| .instance_id = "test-resource-instance", |
| .cluster_name = "cluster_name_1", |
| .cluster_id = "cluster_id_1", |
| .node_info = NodeInfoPB {}}; |
| is_host ? ni_1.node_info.set_host("host2") : ni_1.node_info.set_ip("127.0.0.2"); |
| ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1"); |
| ni_1.node_info.set_heartbeat_port(9999); |
| to_del->push_back(ni_1); |
| } |
| |
| static void get_instance_info(MetaServiceProxy* ms, InstanceInfoPB* instance, |
| std::string_view instance_id = "test-resource-instance") { |
| InstanceKeyInfo key_info {instance_id}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(ms->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| EXPECT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK); |
| instance->ParseFromString(val); |
| } |
| |
| static void create_instance(MetaServiceProxy* ms, const std::string& instance_id) { |
| brpc::Controller ctrl; |
| CreateInstanceRequest req; |
| req.set_instance_id(instance_id); |
| req.set_user_id("test_user"); |
| req.set_name(instance_id + "name"); |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| req.mutable_obj_info()->CopyFrom(obj); |
| CreateInstanceResponse resp; |
| ms->create_instance(&ctrl, &req, &resp, brpc::DoNothing()); |
| ASSERT_FALSE(ctrl.Failed()); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| } |
| |
| static void create_cluster(MetaServiceProxy* ms, const std::string& instance_id, |
| const std::string& cluster_id, const std::string& cluster_name, |
| ClusterPB_Type type) { |
| brpc::Controller cntl; |
| AlterClusterRequest req; |
| req.set_instance_id(instance_id); |
| req.mutable_cluster()->set_cluster_id(cluster_id); |
| req.mutable_cluster()->set_cluster_name(cluster_name); |
| req.mutable_cluster()->set_type(type); |
| if (type == ClusterPB::SQL) { |
| auto* node = req.mutable_cluster()->add_nodes(); |
| node->set_node_type(NodeInfoPB::FE_MASTER); |
| node->set_ip("127.0.0.1"); |
| node->set_edit_log_port(10000); |
| node->set_name("sql_node"); |
| } else { |
| auto* node = req.mutable_cluster()->add_nodes(); |
| node->set_ip("127.0.0.1"); |
| node->set_heartbeat_port(10000); |
| node->set_name("sql_node"); |
| } |
| req.set_op(AlterClusterRequest::ADD_CLUSTER); |
| AlterClusterResponse res; |
| ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing()); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| static void drop_cluster(MetaServiceProxy* ms, const std::string& instance_id, |
| const std::string& cluster_id) { |
| brpc::Controller cntl; |
| AlterClusterRequest req; |
| req.set_instance_id(instance_id); |
| req.mutable_cluster()->set_cluster_id(cluster_id); |
| req.set_op(AlterClusterRequest::DROP_CLUSTER); |
| AlterClusterResponse res; |
| ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing()); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // test cluster's node addr use ip |
| TEST(ResourceTest, ModifyNodesIpTest) { |
| auto meta_service = get_meta_service(); |
| std::vector<NodeInfo> to_add = {}; |
| std::vector<NodeInfo> to_del = {}; |
| auto ins = InstanceInfoPB {}; |
| create_args_to_add(&to_add, &to_del); |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) { |
| *try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK; |
| ins.set_instance_id("test-resource-instance"); |
| ins.set_status(InstanceInfoPB::NORMAL); |
| auto* c = ins.mutable_clusters()->Add(); |
| c->set_cluster_name("cluster_name_1"); |
| c->set_cluster_id("cluster_id_1"); |
| c->set_type(ClusterPB::COMPUTE); |
| auto* c1 = ins.mutable_clusters()->Add(); |
| c1->set_cluster_name("cluster_name_2"); |
| c1->set_cluster_id("cluster_id_2"); |
| c1->set_type(ClusterPB::COMPUTE); |
| *try_any_cast<InstanceInfoPB*>(args[1]) = ins; |
| }); |
| sp->enable_processing(); |
| |
| // test cluster add nodes |
| auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del); |
| ASSERT_EQ(r, ""); |
| InstanceInfoPB instance; |
| get_instance_info(meta_service.get(), &instance); |
| std::cout << "after to add = " << proto_to_json(instance) << std::endl; |
| ASSERT_EQ(instance.clusters().size(), 2); |
| // after add assert cluster_name_1 has 2 nodes |
| ASSERT_EQ(instance.clusters(0).nodes().size(), 2); |
| // after add assert cluster_name_2 has 1 nodes |
| ASSERT_EQ(instance.clusters(1).nodes().size(), 1); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| |
| sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) { |
| *try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK; |
| *try_any_cast<InstanceInfoPB*>(args[1]) = instance; |
| }); |
| sp->enable_processing(); |
| create_args_to_del(&to_add, &to_del); |
| // test cluster del node |
| r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del); |
| InstanceInfoPB instance1; |
| get_instance_info(meta_service.get(), &instance1); |
| ASSERT_EQ(r, ""); |
| std::cout << "after to del = " << proto_to_json(instance1) << std::endl; |
| ASSERT_EQ(instance1.clusters().size(), 2); |
| // after del assert cluster_name_1 has 1 nodes |
| ASSERT_EQ(instance1.clusters(0).nodes().size(), 1); |
| // after del assert cluster_name_2 has 1 nodes |
| ASSERT_EQ(instance1.clusters(1).nodes().size(), 1); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // test cluster's node addr use host |
| TEST(ResourceTest, ModifyNodesHostTest) { |
| auto meta_service = get_meta_service(); |
| std::vector<NodeInfo> to_add = {}; |
| std::vector<NodeInfo> to_del = {}; |
| auto ins = InstanceInfoPB {}; |
| create_args_to_add(&to_add, &to_del, true); |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) { |
| *try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK; |
| ins.set_instance_id("test-resource-instance"); |
| ins.set_status(InstanceInfoPB::NORMAL); |
| auto* c = ins.mutable_clusters()->Add(); |
| c->set_cluster_name("cluster_name_1"); |
| c->set_cluster_id("cluster_id_1"); |
| c->set_type(ClusterPB::COMPUTE); |
| auto* c1 = ins.mutable_clusters()->Add(); |
| c1->set_cluster_name("cluster_name_2"); |
| c1->set_cluster_id("cluster_id_2"); |
| c1->set_type(ClusterPB::COMPUTE); |
| *try_any_cast<InstanceInfoPB*>(args[1]) = ins; |
| }); |
| sp->enable_processing(); |
| |
| // test cluster add nodes |
| auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del); |
| ASSERT_EQ(r, ""); |
| InstanceInfoPB instance; |
| get_instance_info(meta_service.get(), &instance); |
| std::cout << "after to add = " << proto_to_json(instance) << std::endl; |
| ASSERT_EQ(instance.clusters().size(), 2); |
| // after add assert cluster_name_1 has 2 nodes |
| ASSERT_EQ(instance.clusters(0).nodes().size(), 2); |
| // after add assert cluster_name_2 has 1 nodes |
| ASSERT_EQ(instance.clusters(1).nodes().size(), 1); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| |
| sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) { |
| *try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK; |
| *try_any_cast<InstanceInfoPB*>(args[1]) = instance; |
| }); |
| sp->enable_processing(); |
| create_args_to_del(&to_add, &to_del, true); |
| r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del); |
| InstanceInfoPB instance1; |
| get_instance_info(meta_service.get(), &instance1); |
| ASSERT_EQ(r, ""); |
| std::cout << "after to del = " << proto_to_json(instance1) << std::endl; |
| ASSERT_EQ(instance1.clusters().size(), 2); |
| // after del assert cluster_name_1 has 1 nodes |
| ASSERT_EQ(instance1.clusters(0).nodes().size(), 1); |
| // after del assert cluster_name_2 has 1 nodes |
| ASSERT_EQ(instance1.clusters(1).nodes().size(), 1); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // test restart meta service |
| TEST(ResourceTest, RestartResourceManager) { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| auto txn_kv = create_txn_kv(); |
| |
| { |
| auto meta_service = get_meta_service(txn_kv); |
| create_instance(meta_service.get(), "test_instance_id"); |
| create_instance(meta_service.get(), "test_instance_id_2"); |
| create_cluster(meta_service.get(), "test_instance_id", "cluster_id", "cluster_name", |
| ClusterPB::SQL); |
| } |
| |
| { |
| auto meta_service = get_meta_service(txn_kv); |
| { |
| InstanceInfoPB info; |
| auto [code, msg] = |
| meta_service->resource_mgr()->get_instance(nullptr, "test_instance_id", &info); |
| ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg; |
| ASSERT_EQ(info.name(), "test_instance_idname"); |
| } |
| { |
| InstanceInfoPB info; |
| auto [code, msg] = meta_service->resource_mgr()->get_instance( |
| nullptr, "test_instance_id_2", &info); |
| ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg; |
| ASSERT_EQ(info.name(), "test_instance_id_2name"); |
| } |
| } |
| sp->disable_processing(); |
| sp->clear_all_call_backs(); |
| } |
| |
| // test add/drop cluster |
| TEST(ResourceTest, AddDropCluster) { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "selectdbselectdbselectdbselectdb"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("resource_manager::set_safe_drop_time", |
| [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->set_call_back("resource_manager::set_safe_drop_time", |
| [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; }); |
| sp->enable_processing(); |
| |
| auto meta_service = get_meta_service(); |
| create_instance(meta_service.get(), "test_instance_id"); |
| create_cluster(meta_service.get(), "test_instance_id", "sql_id", "sql_cluster", ClusterPB::SQL); |
| create_cluster(meta_service.get(), "test_instance_id", "compute_id", "compute_cluster", |
| ClusterPB::COMPUTE); |
| |
| InstanceInfoPB info; |
| get_instance_info(meta_service.get(), &info, "test_instance_id"); |
| ASSERT_EQ(info.clusters_size(), 2); |
| |
| drop_cluster(meta_service.get(), "test_instance_id", "sql_id"); |
| |
| get_instance_info(meta_service.get(), &info, "test_instance_id"); |
| ASSERT_EQ(info.clusters_size(), 1); |
| ASSERT_EQ(info.clusters(0).cluster_name(), "compute_cluster"); |
| ASSERT_EQ(info.clusters(0).cluster_id(), "compute_id"); |
| |
| sp->disable_processing(); |
| sp->clear_all_call_backs(); |
| } |
| |
| TEST(ResourceTest, InitScanRetry) { |
| auto sp = SyncPoint::get_instance(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "selectdbselectdbselectdbselectdb"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| constexpr size_t NUM_BATCH_SIZE = 100; |
| sp->set_call_back("ResourceManager:init:limit", |
| [&](auto&& args) { *try_any_cast<int*>(args[0]) = NUM_BATCH_SIZE; }); |
| |
| auto txn_kv = create_txn_kv(); |
| { |
| auto meta_service = get_meta_service(txn_kv); |
| for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) { |
| std::string instance_id = "test_instance_id_" + std::to_string(i); |
| create_instance(meta_service.get(), instance_id); |
| } |
| } |
| |
| { |
| size_t count = 0; |
| sp->set_call_back("ResourceManager:init:get_err", [&](auto&& args) { |
| if (++count == 2) { |
| *try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_TOO_OLD; |
| } |
| }); |
| auto meta_service = get_meta_service(txn_kv); |
| ASSERT_GT(count, 1) << count; |
| for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) { |
| InstanceInfoPB info; |
| std::string instance_id = "test_instance_id_" + std::to_string(i); |
| get_instance_info(meta_service.get(), &info, instance_id); |
| } |
| } |
| |
| sp->disable_processing(); |
| sp->clear_all_call_backs(); |
| } |
| |
| } // namespace doris::cloud |