blob: ee3f8598df8be34275293f128dc7689c89cdcdfb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//#define private public
#include "meta-service/meta_service.h"
//#undef private
#include <brpc/controller.h>
#include <bvar/window.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <optional>
#include <random>
#include <thread>
#include "common/config.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "rate-limiter/rate_limiter.h"
#include "resource-manager/resource_manager.h"
int main(int argc, char** argv) {
const std::string conf_file = "doris_cloud.conf";
if (!doris::cloud::config::init(conf_file.c_str(), true)) {
std::cerr << "failed to init config file, conf=" << conf_file << std::endl;
return -1;
}
if (!doris::cloud::init_glog("resource_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
namespace doris::cloud {
extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
const std::string& cloud_unique_id);
static std::shared_ptr<TxnKv> create_txn_kv() {
// MemKv
int ret = 0;
auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
if (txn_kv != nullptr) {
ret = txn_kv->init();
[&] { ASSERT_EQ(ret, 0); }();
}
[&] { ASSERT_NE(txn_kv.get(), nullptr); }();
std::unique_ptr<Transaction> txn;
EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set
EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
return txn_kv;
}
std::unique_ptr<MetaServiceProxy> get_meta_service(std::shared_ptr<TxnKv> txn_kv = {}) {
if (!txn_kv) {
txn_kv = create_txn_kv();
}
auto rs = std::make_shared<ResourceManager>(txn_kv);
EXPECT_EQ(rs->init(), 0);
auto rl = std::make_shared<RateLimiter>();
auto snapshot = std::make_shared<SnapshotManager>(txn_kv);
auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot);
return std::make_unique<MetaServiceProxy>(std::move(meta_service));
}
static void create_args_to_add(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del,
bool is_host = false) {
to_add->clear();
to_del->clear();
auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_1.node_info.set_host("host1") : ni_1.node_info.set_ip("127.0.0.1");
ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_1.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_1);
auto ni_2 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_2.node_info.set_host("host2") : ni_2.node_info.set_ip("127.0.0.2");
ni_2.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_2.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_2);
auto ni_3 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_2",
.cluster_id = "cluster_id_2",
.node_info = NodeInfoPB {}};
is_host ? ni_3.node_info.set_host("host3") : ni_3.node_info.set_ip("127.0.0.3");
ni_3.node_info.set_cloud_unique_id("test_cloud_unique_id_2");
ni_3.node_info.set_heartbeat_port(9999);
to_add->push_back(ni_3);
}
static void create_args_to_del(std::vector<NodeInfo>* to_add, std::vector<NodeInfo>* to_del,
bool is_host = false) {
to_add->clear();
to_del->clear();
auto ni_1 = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = "test-resource-instance",
.cluster_name = "cluster_name_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
is_host ? ni_1.node_info.set_host("host2") : ni_1.node_info.set_ip("127.0.0.2");
ni_1.node_info.set_cloud_unique_id("test_cloud_unique_id_1");
ni_1.node_info.set_heartbeat_port(9999);
to_del->push_back(ni_1);
}
static void get_instance_info(MetaServiceProxy* ms, InstanceInfoPB* instance,
std::string_view instance_id = "test-resource-instance") {
InstanceKeyInfo key_info {instance_id};
std::string key;
std::string val;
instance_key(key_info, &key);
std::unique_ptr<Transaction> txn;
EXPECT_EQ(ms->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
EXPECT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
instance->ParseFromString(val);
}
static void create_instance(MetaServiceProxy* ms, const std::string& instance_id) {
brpc::Controller ctrl;
CreateInstanceRequest req;
req.set_instance_id(instance_id);
req.set_user_id("test_user");
req.set_name(instance_id + "name");
ObjectStoreInfoPB obj;
obj.set_ak("123");
obj.set_sk("321");
obj.set_bucket("456");
obj.set_prefix("654");
obj.set_endpoint("789");
obj.set_region("987");
obj.set_external_endpoint("888");
obj.set_provider(ObjectStoreInfoPB::BOS);
req.mutable_obj_info()->CopyFrom(obj);
CreateInstanceResponse resp;
ms->create_instance(&ctrl, &req, &resp, brpc::DoNothing());
ASSERT_FALSE(ctrl.Failed());
ASSERT_EQ(resp.status().code(), MetaServiceCode::OK);
}
static void create_cluster(MetaServiceProxy* ms, const std::string& instance_id,
const std::string& cluster_id, const std::string& cluster_name,
ClusterPB_Type type) {
brpc::Controller cntl;
AlterClusterRequest req;
req.set_instance_id(instance_id);
req.mutable_cluster()->set_cluster_id(cluster_id);
req.mutable_cluster()->set_cluster_name(cluster_name);
req.mutable_cluster()->set_type(type);
if (type == ClusterPB::SQL) {
auto* node = req.mutable_cluster()->add_nodes();
node->set_node_type(NodeInfoPB::FE_MASTER);
node->set_ip("127.0.0.1");
node->set_edit_log_port(10000);
node->set_name("sql_node");
} else {
auto* node = req.mutable_cluster()->add_nodes();
node->set_ip("127.0.0.1");
node->set_heartbeat_port(10000);
node->set_name("sql_node");
}
req.set_op(AlterClusterRequest::ADD_CLUSTER);
AlterClusterResponse res;
ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing());
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
static void drop_cluster(MetaServiceProxy* ms, const std::string& instance_id,
const std::string& cluster_id) {
brpc::Controller cntl;
AlterClusterRequest req;
req.set_instance_id(instance_id);
req.mutable_cluster()->set_cluster_id(cluster_id);
req.set_op(AlterClusterRequest::DROP_CLUSTER);
AlterClusterResponse res;
ms->alter_cluster(&cntl, &req, &res, brpc::DoNothing());
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}
// test cluster's node addr use ip
TEST(ResourceTest, ModifyNodesIpTest) {
auto meta_service = get_meta_service();
std::vector<NodeInfo> to_add = {};
std::vector<NodeInfo> to_del = {};
auto ins = InstanceInfoPB {};
create_args_to_add(&to_add, &to_del);
auto sp = SyncPoint::get_instance();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
ins.set_instance_id("test-resource-instance");
ins.set_status(InstanceInfoPB::NORMAL);
auto* c = ins.mutable_clusters()->Add();
c->set_cluster_name("cluster_name_1");
c->set_cluster_id("cluster_id_1");
c->set_type(ClusterPB::COMPUTE);
auto* c1 = ins.mutable_clusters()->Add();
c1->set_cluster_name("cluster_name_2");
c1->set_cluster_id("cluster_id_2");
c1->set_type(ClusterPB::COMPUTE);
*try_any_cast<InstanceInfoPB*>(args[1]) = ins;
});
sp->enable_processing();
// test cluster add nodes
auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
ASSERT_EQ(r, "");
InstanceInfoPB instance;
get_instance_info(meta_service.get(), &instance);
std::cout << "after to add = " << proto_to_json(instance) << std::endl;
ASSERT_EQ(instance.clusters().size(), 2);
// after add assert cluster_name_1 has 2 nodes
ASSERT_EQ(instance.clusters(0).nodes().size(), 2);
// after add assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
*try_any_cast<InstanceInfoPB*>(args[1]) = instance;
});
sp->enable_processing();
create_args_to_del(&to_add, &to_del);
// test cluster del node
r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
InstanceInfoPB instance1;
get_instance_info(meta_service.get(), &instance1);
ASSERT_EQ(r, "");
std::cout << "after to del = " << proto_to_json(instance1) << std::endl;
ASSERT_EQ(instance1.clusters().size(), 2);
// after del assert cluster_name_1 has 1 nodes
ASSERT_EQ(instance1.clusters(0).nodes().size(), 1);
// after del assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance1.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
}
// test cluster's node addr use host
TEST(ResourceTest, ModifyNodesHostTest) {
auto meta_service = get_meta_service();
std::vector<NodeInfo> to_add = {};
std::vector<NodeInfo> to_del = {};
auto ins = InstanceInfoPB {};
create_args_to_add(&to_add, &to_del, true);
auto sp = SyncPoint::get_instance();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
ins.set_instance_id("test-resource-instance");
ins.set_status(InstanceInfoPB::NORMAL);
auto* c = ins.mutable_clusters()->Add();
c->set_cluster_name("cluster_name_1");
c->set_cluster_id("cluster_id_1");
c->set_type(ClusterPB::COMPUTE);
auto* c1 = ins.mutable_clusters()->Add();
c1->set_cluster_name("cluster_name_2");
c1->set_cluster_id("cluster_id_2");
c1->set_type(ClusterPB::COMPUTE);
*try_any_cast<InstanceInfoPB*>(args[1]) = ins;
});
sp->enable_processing();
// test cluster add nodes
auto r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
ASSERT_EQ(r, "");
InstanceInfoPB instance;
get_instance_info(meta_service.get(), &instance);
std::cout << "after to add = " << proto_to_json(instance) << std::endl;
ASSERT_EQ(instance.clusters().size(), 2);
// after add assert cluster_name_1 has 2 nodes
ASSERT_EQ(instance.clusters(0).nodes().size(), 2);
// after add assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
sp->set_call_back("modify_nodes:get_instance", [&](auto&& args) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_OK;
*try_any_cast<InstanceInfoPB*>(args[1]) = instance;
});
sp->enable_processing();
create_args_to_del(&to_add, &to_del, true);
r = meta_service->resource_mgr()->modify_nodes("test-resource-instance", to_add, to_del);
InstanceInfoPB instance1;
get_instance_info(meta_service.get(), &instance1);
ASSERT_EQ(r, "");
std::cout << "after to del = " << proto_to_json(instance1) << std::endl;
ASSERT_EQ(instance1.clusters().size(), 2);
// after del assert cluster_name_1 has 1 nodes
ASSERT_EQ(instance1.clusters(0).nodes().size(), 1);
// after del assert cluster_name_2 has 1 nodes
ASSERT_EQ(instance1.clusters(1).nodes().size(), 1);
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
}
// test restart meta service
TEST(ResourceTest, RestartResourceManager) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "test";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->enable_processing();
auto txn_kv = create_txn_kv();
{
auto meta_service = get_meta_service(txn_kv);
create_instance(meta_service.get(), "test_instance_id");
create_instance(meta_service.get(), "test_instance_id_2");
create_cluster(meta_service.get(), "test_instance_id", "cluster_id", "cluster_name",
ClusterPB::SQL);
ASSERT_FALSE(meta_service->resource_mgr()->is_version_read_enabled("test_instance_id_2"));
// instance_2 enable multi version status
std::unique_ptr<Transaction> txn;
std::string instance_info_value;
InstanceInfoPB instance_info;
std::string key = instance_key("test_instance_id_2");
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(key, &instance_info_value), TxnErrorCode::TXN_OK);
ASSERT_TRUE(instance_info.ParseFromString(instance_info_value));
instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_ENABLED);
txn->put(key, instance_info.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
auto [code, msg] = meta_service->resource_mgr()->refresh_instance("test_instance_id_2");
ASSERT_EQ(code, MetaServiceCode::OK) << msg;
ASSERT_TRUE(meta_service->resource_mgr()->is_version_read_enabled("test_instance_id_2"));
}
{
auto meta_service = get_meta_service(txn_kv);
{
InstanceInfoPB info;
auto [code, msg] =
meta_service->resource_mgr()->get_instance(nullptr, "test_instance_id", &info);
ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg;
ASSERT_EQ(info.name(), "test_instance_idname");
}
{
InstanceInfoPB info;
auto [code, msg] = meta_service->resource_mgr()->get_instance(
nullptr, "test_instance_id_2", &info);
ASSERT_EQ(code, TxnErrorCode::TXN_OK) << msg;
ASSERT_EQ(info.name(), "test_instance_id_2name");
}
ASSERT_TRUE(meta_service->resource_mgr()->is_version_read_enabled("test_instance_id_2"));
}
sp->disable_processing();
sp->clear_all_call_backs();
}
// test add/drop cluster
TEST(ResourceTest, AddDropCluster) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("resource_manager::set_safe_drop_time",
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; });
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->set_call_back("resource_manager::set_safe_drop_time",
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = -1; });
sp->enable_processing();
auto meta_service = get_meta_service();
create_instance(meta_service.get(), "test_instance_id");
create_cluster(meta_service.get(), "test_instance_id", "sql_id", "sql_cluster", ClusterPB::SQL);
create_cluster(meta_service.get(), "test_instance_id", "compute_id", "compute_cluster",
ClusterPB::COMPUTE);
InstanceInfoPB info;
get_instance_info(meta_service.get(), &info, "test_instance_id");
ASSERT_EQ(info.clusters_size(), 2);
drop_cluster(meta_service.get(), "test_instance_id", "sql_id");
get_instance_info(meta_service.get(), &info, "test_instance_id");
ASSERT_EQ(info.clusters_size(), 1);
ASSERT_EQ(info.clusters(0).cluster_name(), "compute_cluster");
ASSERT_EQ(info.clusters(0).cluster_id(), "compute_id");
sp->disable_processing();
sp->clear_all_call_backs();
}
TEST(ResourceTest, InitScanRetry) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->enable_processing();
constexpr size_t NUM_BATCH_SIZE = 100;
sp->set_call_back("ResourceManager:init:limit",
[&](auto&& args) { *try_any_cast<int*>(args[0]) = NUM_BATCH_SIZE; });
auto txn_kv = create_txn_kv();
{
auto meta_service = get_meta_service(txn_kv);
for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) {
std::string instance_id = "test_instance_id_" + std::to_string(i);
create_instance(meta_service.get(), instance_id);
}
}
{
size_t count = 0;
sp->set_call_back("ResourceManager:init:get_err", [&](auto&& args) {
if (++count == 2) {
*try_any_cast<TxnErrorCode*>(args[0]) = TxnErrorCode::TXN_TOO_OLD;
}
});
auto meta_service = get_meta_service(txn_kv);
ASSERT_GT(count, 1) << count;
for (size_t i = 0; i < NUM_BATCH_SIZE * 2; i++) {
InstanceInfoPB info;
std::string instance_id = "test_instance_id_" + std::to_string(i);
get_instance_info(meta_service.get(), &info, instance_id);
}
}
sp->disable_processing();
sp->clear_all_call_backs();
}
// Helper to create instance with obj_info for cascade testing
static Versionstamp next_test_snapshot_versionstamp() {
static std::atomic<uint64_t> version_counter {1};
uint64_t version = version_counter.fetch_add(1);
return Versionstamp(version, 0);
}
static void create_instance_with_obj_info(MetaServiceProxy* meta_service,
const std::string& instance_id,
const std::string& source_instance_id,
const std::string& user_id, const std::string& ak,
const std::string& sk, bool enable_snapshot = true) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
std::optional<Versionstamp> snapshot_version;
if (!source_instance_id.empty()) {
instance.set_source_instance_id(source_instance_id);
snapshot_version = next_test_snapshot_versionstamp();
instance.set_source_snapshot_id(snapshot_version->to_string());
}
instance.set_snapshot_switch_status(enable_snapshot ? SNAPSHOT_SWITCH_ON
: SNAPSHOT_SWITCH_DISABLED);
auto* obj_info = instance.add_obj_info();
obj_info->set_user_id(user_id);
obj_info->set_ak(ak);
obj_info->set_sk(sk);
obj_info->set_id("test_obj_info");
std::string key = instance_key({instance_id});
std::string val = instance.SerializeAsString();
txn->put(key, val);
if (snapshot_version.has_value()) {
versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, *snapshot_version,
instance_id};
std::string ref_key = versioned::snapshot_reference_key(ref_key_info);
txn->put(ref_key, "");
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Helper to verify instance ak/sk
static void verify_instance_aksk(MetaServiceProxy* meta_service, const std::string& instance_id,
const std::string& expected_ak, const std::string& expected_sk) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key = instance_key({instance_id});
std::string val;
ASSERT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK);
InstanceInfoPB instance;
ASSERT_TRUE(instance.ParseFromString(val));
ASSERT_GT(instance.obj_info_size(), 0);
EXPECT_EQ(instance.obj_info(0).ak(), expected_ak);
EXPECT_EQ(instance.obj_info(0).sk(), expected_sk);
}
// Test AK/SK cascade update: two-level cascade
TEST(AkSkCascadeTest, TwoLevelCascade) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Create parent and child instances
create_instance_with_obj_info(meta_service.get(), "parent_inst", "", "user123", "old_ak",
"old_sk");
create_instance_with_obj_info(meta_service.get(), "child_inst", "parent_inst", "user123",
"old_ak", "old_sk");
// Update parent's AK/SK
UpdateAkSkRequest req;
req.set_instance_id("parent_inst");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user123");
bucket_user->set_ak("new_ak");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Verify parent updated
verify_instance_aksk(meta_service.get(), "parent_inst", "new_ak", cipher_sk);
// Verify child also updated (cascaded)
verify_instance_aksk(meta_service.get(), "child_inst", "new_ak", cipher_sk);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test AK/SK cascade update: three-level cascade
TEST(AkSkCascadeTest, ThreeLevelCascade) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Create instance tree: parent -> child -> grandchild
create_instance_with_obj_info(meta_service.get(), "parent", "", "user1", "old_ak", "old_sk");
create_instance_with_obj_info(meta_service.get(), "child", "parent", "user1", "old_ak",
"old_sk");
create_instance_with_obj_info(meta_service.get(), "grandchild", "child", "user1", "old_ak",
"old_sk");
// Update parent
UpdateAkSkRequest req;
req.set_instance_id("parent");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user1");
bucket_user->set_ak("new_ak_v2");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Verify all three levels updated
verify_instance_aksk(meta_service.get(), "parent", "new_ak_v2", cipher_sk);
verify_instance_aksk(meta_service.get(), "child", "new_ak_v2", cipher_sk);
verify_instance_aksk(meta_service.get(), "grandchild", "new_ak_v2", cipher_sk);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test AK/SK cascade update: multiple branches
TEST(AkSkCascadeTest, MultipleBranchesCascade) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Create tree: parent -> child1, child2, child3
create_instance_with_obj_info(meta_service.get(), "parent", "", "user1", "old_ak", "old_sk");
create_instance_with_obj_info(meta_service.get(), "child1", "parent", "user1", "old_ak",
"old_sk");
create_instance_with_obj_info(meta_service.get(), "child2", "parent", "user1", "old_ak",
"old_sk");
create_instance_with_obj_info(meta_service.get(), "child3", "parent", "user1", "old_ak",
"old_sk");
// Update parent
UpdateAkSkRequest req;
req.set_instance_id("parent");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user1");
bucket_user->set_ak("new_ak_multi");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Verify all branches updated
verify_instance_aksk(meta_service.get(), "parent", "new_ak_multi", cipher_sk);
verify_instance_aksk(meta_service.get(), "child1", "new_ak_multi", cipher_sk);
verify_instance_aksk(meta_service.get(), "child2", "new_ak_multi", cipher_sk);
verify_instance_aksk(meta_service.get(), "child3", "new_ak_multi", cipher_sk);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test AK/SK cascade update: no children
TEST(AkSkCascadeTest, NoChildrenInstance) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Create only parent
create_instance_with_obj_info(meta_service.get(), "single_inst", "", "user1", "old_ak",
"old_sk");
// Update parent
UpdateAkSkRequest req;
req.set_instance_id("single_inst");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user1");
bucket_user->set_ak("new_ak_single");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Verify only parent updated
verify_instance_aksk(meta_service.get(), "single_inst", "new_ak_single", cipher_sk);
sp->disable_processing();
sp->clear_all_call_backs();
}
// Snapshot disabled should skip cascading to derived instances
TEST(AkSkCascadeTest, SnapshotDisabledSkipsCascade) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Parent keeps snapshot disabled; child is a derived instance
create_instance_with_obj_info(meta_service.get(), "parent_snapshot_off", "", "user1", "old_ak",
"old_sk", /*enable_snapshot=*/false);
create_instance_with_obj_info(meta_service.get(), "child_snapshot_off", "parent_snapshot_off",
"user1", "old_ak", "old_sk");
UpdateAkSkRequest req;
req.set_instance_id("parent_snapshot_off");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user1");
bucket_user->set_ak("new_ak_disabled");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Parent updated, child stays untouched because snapshot is off
verify_instance_aksk(meta_service.get(), "parent_snapshot_off", "new_ak_disabled", cipher_sk);
verify_instance_aksk(meta_service.get(), "child_snapshot_off", "old_ak", "old_sk");
sp->disable_processing();
sp->clear_all_call_backs();
}
// Test AK/SK cascade update: child without obj_info should NOT be cascaded
TEST(AkSkCascadeTest, ChildWithoutObjInfo) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "selectdbselectdbselectdbselectdb";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO";
std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t";
// Create parent with obj_info
create_instance_with_obj_info(meta_service.get(), "parent", "", "user1", "old_ak", "old_sk");
// Create child WITHOUT obj_info (independent storage)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
InstanceInfoPB child_instance;
child_instance.set_instance_id("child_no_obj");
child_instance.set_source_instance_id("parent");
// No obj_info added - using independent storage
std::string key = instance_key({"child_no_obj"});
std::string val = child_instance.SerializeAsString();
txn->put(key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
// Update parent
UpdateAkSkRequest req;
req.set_instance_id("parent");
auto* bucket_user = req.add_internal_bucket_user();
bucket_user->set_user_id("user1");
bucket_user->set_ak("new_ak");
bucket_user->set_sk(plain_sk);
brpc::Controller cntl;
UpdateAkSkResponse res;
meta_service->update_ak_sk(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
// Verify parent updated
verify_instance_aksk(meta_service.get(), "parent", "new_ak", cipher_sk);
// Verify child was NOT updated (has no obj_info)
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_EQ(txn->get(instance_key({"child_no_obj"}), &val), TxnErrorCode::TXN_OK);
child_instance.ParseFromString(val);
EXPECT_EQ(child_instance.obj_info_size(), 0); // Still no obj_info
sp->disable_processing();
sp->clear_all_call_backs();
}
TEST(ResourceTest, RollbackInstance) {
auto sp = SyncPoint::get_instance();
sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 0;
auto* key = try_any_cast<std::string*>(args[1]);
*key = "test";
auto* key_id = try_any_cast<int64_t*>(args[2]);
*key_id = 1;
});
sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) {
auto* key = try_any_cast<std::string*>(args[0]);
*key = "test";
auto* ret = try_any_cast<int*>(args[1]);
*ret = 0;
});
sp->enable_processing();
auto txn_kv = create_txn_kv();
auto meta_service = get_meta_service(txn_kv);
// Create original instance
std::string original_instance_id = "original_instance";
create_instance(meta_service.get(), original_instance_id);
create_cluster(meta_service.get(), original_instance_id, "cluster_id_1", "cluster_1",
ClusterPB::COMPUTE);
// Add a node to original instance
std::vector<NodeInfo> to_add;
std::vector<NodeInfo> to_del;
auto ni = NodeInfo {.role = Role::COMPUTE_NODE,
.instance_id = original_instance_id,
.cluster_name = "cluster_1",
.cluster_id = "cluster_id_1",
.node_info = NodeInfoPB {}};
ni.node_info.set_ip("127.0.0.1");
ni.node_info.set_cloud_unique_id("cloud_unique_id_1");
ni.node_info.set_heartbeat_port(9999);
to_add.push_back(ni);
auto r = meta_service->resource_mgr()->modify_nodes(original_instance_id, to_add, to_del);
ASSERT_EQ(r, "");
// Simulate rollback operation: create new instance with source_instance_id
std::string new_instance_id = "new_instance_after_rollback";
create_instance(meta_service.get(), new_instance_id);
// Set source_instance_id and original_instance_id for new instance
// Set successor_instance_id for old instance
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// Update new instance with source info
InstanceInfoPB new_instance;
std::string new_key = instance_key(new_instance_id);
std::string new_val;
ASSERT_EQ(txn->get(new_key, &new_val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(new_instance.ParseFromString(new_val));
new_instance.set_source_instance_id(original_instance_id);
new_instance.set_original_instance_id(original_instance_id);
new_instance.set_source_snapshot_id("00000000000000000000"); // Valid versionstamp format
// Inherit cluster from original instance
InstanceInfoPB original_instance;
std::string original_key = instance_key(original_instance_id);
std::string original_val;
ASSERT_EQ(txn->get(original_key, &original_val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(original_instance.ParseFromString(original_val));
new_instance.mutable_clusters()->CopyFrom(original_instance.clusters());
txn->put(new_key, new_instance.SerializeAsString());
// Update original instance with successor_instance_id
original_instance.set_successor_instance_id(new_instance_id);
txn->put(original_key, original_instance.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
meta_service->resource_mgr()->refresh_instance(new_instance_id);
std::string instance_id = get_instance_id(meta_service->resource_mgr(), "cloud_unique_id_1");
ASSERT_EQ(instance_id, new_instance_id);
meta_service->resource_mgr()->refresh_instance(original_instance_id);
instance_id = get_instance_id(meta_service->resource_mgr(), "cloud_unique_id_1");
ASSERT_EQ(instance_id, new_instance_id);
sp->disable_processing();
sp->clear_all_call_backs();
}
} // namespace doris::cloud