blob: e2aa09514360e1ae932f7534c09184d04d36c570 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//#define private public
#include "meta-service/meta_service.h"
//#undef private
#include <brpc/controller.h>
#include <bvar/window.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <random>
#include <thread>
#include "common/config.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/mem_txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "rate-limiter/rate_limiter.h"
#include "resource-manager/resource_manager.h"
int main(int argc, char** argv) {
const std::string conf_file = "doris_cloud.conf";
if (!doris::cloud::config::init(conf_file.c_str(), true)) {
std::cerr << "failed to init config file, conf=" << conf_file << std::endl;
return -1;
}
if (!doris::cloud::init_glog("resource_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
namespace doris::cloud {
static std::shared_ptr<TxnKv> create_txn_kv() {
// MemKv
int ret = 0;
auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
if (txn_kv != nullptr) {
ret = txn_kv->init();
[&] { ASSERT_EQ(ret, 0); }();
}
[&] { ASSERT_NE(txn_kv.get(), nullptr); }();
std::unique_ptr<Transaction> txn;
EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set
EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
return txn_kv;
}
std::unique_ptr<MetaServiceProxy> get_meta_service(std::shared_ptr<TxnKv> txn_kv = {}) {
if (!txn_kv) {
txn_kv = create_txn_kv();
}
auto rs = std::make_shared<ResourceManager>(txn_kv);
EXPECT_EQ(rs->init(), 0);
auto rl = std::make_shared<RateLimiter>();
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl);
return std::make_unique<MetaServiceProxy>(std::move(meta_service));
}
static void create_args_to_add(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del,
bool is_host = false) {
to_add->clear();
to_del->clear();
auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_1.node_info.set_host("host1") : ni_1.node_info.set_ip("127.0.0.1");
ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_1.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_1);
auto ni_2 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_2.node_info.set_host("host2") : ni_2.node_info.set_ip("127.0.0.2");
ni_2.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_2.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_2);
auto ni_3 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_2",
.cluster_id = "cluster_id_2",
.node_info = NodeInfoPB {}};
is_host ? ni_3.node_info.set_host("host3") : ni_3.node_info.set_ip("127.0.0.3");
ni_3.node_info.set_cloud_unique_id("test_cloud_unique_id_2");
ni_3.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_3);
}
static void create_args_to_del(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del,
bool is_host = false) {
to_add->clear();
to_del->clear();
auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_1.node_info.set_host("host2") : ni_1.node_info.set_ip("127.0.0.2");
ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_1.node_info.set_heartbeat_port(9999);
to_del->push_back(ni_1);
}
static void get_instance_info(MetaServiceProxy* ms, InstanceInfoPB* instance,
std::string_view instance_id = "test-resource-instance") {
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
std::unique_ptr<Transaction> txn;
EXPECT_EQ(ms->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
EXPECT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
instance->ParseFromString(val);
}
static void create_instance(MetaServiceProxy* ms, const std::string& instance_id) {
brpc::Controller ctrl;
CreateInstanceRequest req;
req.set_instance_id(instance_id);
req.set_user_id("test_user");
req.set_name(instance_id + "name");
ObjectStoreInfoPB obj;
obj.set_ak("123");
obj.set_sk("321");
obj.set_bucket("456");
obj.set_prefix("654");
obj.set_endpoint("789");
obj.set_region("987");
obj.set_external_endpoint("888");
obj.set_provider(ObjectStoreInfoPB::BOS);
req.mutable_obj_info()->CopyFrom(obj);
CreateInstanceResponse resp;
ms->create_instance(&ctrl, &req, &resp, brpc::DoNothing());
ASSERT_FALSE(ctrl.Failed());
ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
}
static void create_cluster(MetaServiceProxy* ms, const std::string& instance_id,
const std::string& cluster_id, const std::string& cluster_name,
ClusterPB_Type type) {
brpc::Controller cntl;
AlterClusterRequest req;
req.set_instance_id(instance_id);
req.mutable_cluster()->set_cluster_id(cluster_id);
req.mutable_cluster()->set_cluster_name(cluster_name);
req.mutable_cluster()->set_type(type);
if (type == ClusterPB::SQL) {
auto* node = req.mutable_cluster()->add_nodes();
node->set_node_type(NodeInfoPB::FE_MASTER);
node->set_ip("127.0.0.1");
node->set_edit_log_port(10000);
node->set_name("sql_node");
} else {
auto* node = req.mutable_cluster()->add_nodes();
node->set_ip("127.0.0.1");
node->set_heartbeat_port(10000);
node->set_name("sql_node");
}
req.set_op(AlterClusterRequest::ADD_CLUSTER);
AlterClusterResponse res;
ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing());
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
static void drop_cluster(MetaServiceProxy* ms, const std::string& instance_id,
const std::string& cluster_id) {
brpc::Controller cntl;
AlterClusterRequest req;
req.set_instance_id(instance_id);
req.mutable_cluster()->set_cluster_id(cluster_id);
req.set_op(AlterClusterRequest::DROP_CLUSTER);
AlterClusterResponse res;
ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing());
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
// test cluster's node addr use ip
TEST(ResourceTest, ModifyNodesIpTest) {
auto meta_service = get_meta_service();
std::vector<NodeInfo> to_add = {};
std::vector<NodeInfo> to_del = {};
auto ins = InstanceInfoPB {};
create_args_to_add(&to_add, &to_del);
auto sp = SyncPoint::get_instance();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
ins.set_instance_id("test-resource-instance");
ins.set_status(InstanceInfoPB::NORMAL);
auto* c = ins.mutable_clusters()->Add();
c->set_cluster_name("cluster_name_1");
c->set_cluster_id("cluster_id_1");
c->set_type(ClusterPB::COMPUTE);
auto* c1 = ins.mutable_clusters()->Add();
c1->set_cluster_name("cluster_name_2");
c1->set_cluster_id("cluster_id_2");
c1->set_type(ClusterPB::COMPUTE);
*try_any_cast<InstanceInfoPB*>(args[1]) = ins;
});
sp->enable_processing();
// test cluster add nodes
auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
ASSERT_EQ(r, "");
InstanceInfoPB instance;
get_instance_info(meta_service.get(), &instance);
std::cout << "after to add = " << proto_to_json(instance) << std::endl;
ASSERT_EQ(instance.clusters().size(), 2);
// after add assert cluster_name_1 has 2 nodes
ASSERT_EQ(instance.clusters(0).nodes().size(), 2);
// after add assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
*try_any_cast<InstanceInfoPB*>(args[1]) = instance;
});
sp->enable_processing();
create_args_to_del(&to_add, &to_del);
// test cluster del node
r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
InstanceInfoPB instance1;
get_instance_info(meta_service.get(), &instance1);
ASSERT_EQ(r, "");
std::cout << "after to del = " << proto_to_json(instance1) << std::endl;
ASSERT_EQ(instance1.clusters().size(), 2);
// after del assert cluster_name_1 has 1 nodes
ASSERT_EQ(instance1.clusters(0).nodes().size(), 1);
// after del assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance1.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
}
// test cluster's node addr use host
TEST(ResourceTest, ModifyNodesHostTest) {
auto meta_service = get_meta_service();
std::vector<NodeInfo> to_add = {};
std::vector<NodeInfo> to_del = {};
auto ins = InstanceInfoPB {};
create_args_to_add(&to_add, &to_del, true);
auto sp = SyncPoint::get_instance();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
ins.set_instance_id("test-resource-instance");
ins.set_status(InstanceInfoPB::NORMAL);
auto* c = ins.mutable_clusters()->Add();
c->set_cluster_name("cluster_name_1");
c->set_cluster_id("cluster_id_1");
c->set_type(ClusterPB::COMPUTE);
auto* c1 = ins.mutable_clusters()->Add();
c1->set_cluster_name("cluster_name_2");
c1->set_cluster_id("cluster_id_2");
c1->set_type(ClusterPB::COMPUTE);
*try_any_cast<InstanceInfoPB*>(args[1]) = ins;
});
sp->enable_processing();
// test cluster add nodes
auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
ASSERT_EQ(r, "");
InstanceInfoPB instance;
get_instance_info(meta_service.get(), &instance);
std::cout << "after to add = " << proto_to_json(instance) << std::endl;
ASSERT_EQ(instance.clusters().size(), 2);
// after add assert cluster_name_1 has 2 nodes
ASSERT_EQ(instance.clusters(0).nodes().size(), 2);
// after add assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
*try_any_cast<InstanceInfoPB*>(args[1]) = instance;
});
sp->enable_processing();
create_args_to_del(&to_add, &to_del, true);
r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
InstanceInfoPB instance1;
get_instance_info(meta_service.get(), &instance1);
ASSERT_EQ(r, "");
std::cout << "after to del = " << proto_to_json(instance1) << std::endl;
ASSERT_EQ(instance1.clusters().size(), 2);
// after del assert cluster_name_1 has 1 nodes
ASSERT_EQ(instance1.clusters(0).nodes().size(), 1);
// after del assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance1.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
}
// test restart meta service
TEST(ResourceTest, RestartResourceManager) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "test";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->enable_processing();
auto txn_kv = create_txn_kv();
{
auto meta_service = get_meta_service(txn_kv);
create_instance(meta_service.get(), "test_instance_id");
create_instance(meta_service.get(), "test_instance_id_2");
create_cluster(meta_service.get(), "test_instance_id", "cluster_id", "cluster_name",
ClusterPB::SQL);
}
{
auto meta_service = get_meta_service(txn_kv);
{
InstanceInfoPB info;
auto [code, msg] =
meta_service->resource_mgr()->get_instance(nullptr, "test_instance_id", &info);
ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg;
ASSERT_EQ(info.name(), "test_instance_idname");
}
{
InstanceInfoPB info;
auto [code, msg] = meta_service->resource_mgr()->get_instance(
nullptr, "test_instance_id_2", &info);
ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg;
ASSERT_EQ(info.name(), "test_instance_id_2name");
}
}
sp->disable_processing();
sp->clear_all_call_backs();
}
// test add/drop cluster
TEST(ResourceTest, AddDropCluster) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("resource_manager::set_safe_drop_time",
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; });
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->set_call_back("resource_manager::set_safe_drop_time",
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; });
sp->enable_processing();
auto meta_service = get_meta_service();
create_instance(meta_service.get(), "test_instance_id");
create_cluster(meta_service.get(), "test_instance_id", "sql_id", "sql_cluster", ClusterPB::SQL);
create_cluster(meta_service.get(), "test_instance_id", "compute_id", "compute_cluster",
ClusterPB::COMPUTE);
InstanceInfoPB info;
get_instance_info(meta_service.get(), &info, "test_instance_id");
ASSERT_EQ(info.clusters_size(), 2);
drop_cluster(meta_service.get(), "test_instance_id", "sql_id");
get_instance_info(meta_service.get(), &info, "test_instance_id");
ASSERT_EQ(info.clusters_size(), 1);
ASSERT_EQ(info.clusters(0).cluster_name(), "compute_cluster");
ASSERT_EQ(info.clusters(0).cluster_id(), "compute_id");
sp->disable_processing();
sp->clear_all_call_backs();
}
TEST(ResourceTest, InitScanRetry) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->enable_processing();
constexpr size_t NUM_BATCH_SIZE = 100;
sp->set_call_back("ResourceManager:init:limit",
[&](auto&& args) { *try_any_cast<int*>(args[0]) = NUM_BATCH_SIZE; });
auto txn_kv = create_txn_kv();
{
auto meta_service = get_meta_service(txn_kv);
for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) {
std::string instance_id = "test_instance_id_" + std::to_string(i);
create_instance(meta_service.get(), instance_id);
}
}
{
size_t count = 0;
sp->set_call_back("ResourceManager:init:get_err", [&](auto&& args) {
if (++count == 2) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_TOO_OLD;
}
});
auto meta_service = get_meta_service(txn_kv);
ASSERT_GT(count, 1) << count;
for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) {
InstanceInfoPB info;
std::string instance_id = "test_instance_id_" + std::to_string(i);
get_instance_info(meta_service.get(), &info, instance_id);
}
}
sp->disable_processing();
sp->clear_all_call_backs();
}
} // namespace doris::cloud