| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //#define private public |
| #include "common/bvars.h" |
| #include "meta-service/meta_service.h" |
| //#undef private |
| |
| #include <brpc/controller.h> |
| #include <bvar/window.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <memory> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/mem_txn_kv.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "mock_resource_manager.h" |
| #include "rate-limiter/rate_limiter.h" |
| |
| int main(int argc, char** argv) { |
| const std::string conf_file = "doris_cloud.conf"; |
| if (!doris::cloud::config::init(conf_file.c_str(), true)) { |
| std::cerr << "failed to init config file, conf=" << conf_file << std::endl; |
| return -1; |
| } |
| |
| if (!doris::cloud::init_glog("rpc_kv_bvar_test")) { |
| std::cerr << "failed to init glog" << std::endl; |
| return -1; |
| } |
| |
| ::testing::InitGoogleTest(&argc, argv); |
| |
| return RUN_ALL_TESTS(); |
| } |
| |
| namespace doris::cloud { |
| using Status = MetaServiceResponseStatus; |
| |
| void start_compaction_job(MetaService* meta_service, int64_t tablet_id, const std::string& job_id, |
| const std::string& initiator, int base_compaction_cnt, |
| int cumu_compaction_cnt, TabletCompactionJobPB::CompactionType type, |
| StartTabletJobResponse& res, |
| std::pair<int64_t, int64_t> input_version = {0, 0}) { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto* compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id(job_id); |
| compaction->set_initiator(initiator); |
| compaction->set_base_compaction_cnt(base_compaction_cnt); |
| compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt); |
| compaction->set_type(type); |
| long now = time(nullptr); |
| compaction->set_expiration(now + 12); |
| compaction->set_lease(now + 3); |
| if (input_version.second > 0) { |
| compaction->add_input_versions(input_version.first); |
| compaction->add_input_versions(input_version.second); |
| compaction->set_check_input_versions_range(true); |
| } |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| }; |
| |
| std::unique_ptr<MetaServiceProxy> get_meta_service() { |
| int ret = 0; |
| // MemKv |
| auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>()); |
| if (txn_kv != nullptr) { |
| ret = txn_kv->init(); |
| [&] { ASSERT_EQ(ret, 0); }(); |
| } |
| [&] { ASSERT_NE(txn_kv.get(), nullptr); }(); |
| |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->remove("\x00", "\xfe"); // This is dangerous if the fdb is not correctly set |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| auto rs = std::make_shared<MockResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto snapshot = std::make_shared<SnapshotManager>(txn_kv); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| std::unique_ptr<MetaServiceProxy> get_fdb_meta_service() { |
| config::fdb_cluster_file_path = "fdb.cluster"; |
| static auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<FdbTxnKv>()); |
| static std::atomic<bool> init {false}; |
| bool tmp = false; |
| if (init.compare_exchange_strong(tmp, true)) { |
| int ret = txn_kv->init(); |
| [&] { |
| ASSERT_EQ(ret, 0); |
| ASSERT_NE(txn_kv.get(), nullptr); |
| }(); |
| } |
| auto rs = std::make_shared<MockResourceManager>(txn_kv); |
| auto rl = std::make_shared<RateLimiter>(); |
| auto snapshot = std::make_shared<SnapshotManager>(txn_kv); |
| auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv, rs, rl, snapshot); |
| return std::make_unique<MetaServiceProxy>(std::move(meta_service)); |
| } |
| |
| static void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, |
| CreateRowsetResponse& res) { |
| brpc::Controller cntl; |
| auto* arena = res.GetArena(); |
| auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); |
| req->mutable_rowset_meta()->CopyFrom(rowset); |
| meta_service->commit_rowset(&cntl, req, &res, nullptr); |
| if (!arena) { |
| delete req; |
| } |
| } |
| |
| static std::string next_rowset_id() { |
| static int cnt = 0; |
| return std::to_string(++cnt); |
| } |
| |
| static void fill_schema(doris::TabletSchemaCloudPB* schema, int32_t schema_version) { |
| schema->set_schema_version(schema_version); |
| for (int i = 0; i < 10; ++i) { |
| auto* column = schema->add_column(); |
| column->set_unique_id(20000 + i); |
| column->set_type("INT"); |
| } |
| } |
| |
| static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, const std::string& rowset_id, |
| int32_t schema_version) { |
| auto* tablet = req.add_tablet_metas(); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| auto* schema = tablet->mutable_schema(); |
| fill_schema(schema, schema_version); |
| auto* first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(rowset_id); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| } |
| |
| static void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id) { |
| auto* tablet = req.add_tablet_metas(); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| auto* schema = tablet->mutable_schema(); |
| schema->set_schema_version(0); |
| auto* first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(next_rowset_id()); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| } |
| |
| static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t db_id, |
| int64_t table_id, int64_t index_id, int64_t partition_id, |
| int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(db_id); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| void create_tablet(MetaService* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, bool enable_mow, |
| bool not_ready = false) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1); |
| auto* tablet = req.add_tablet_metas(); |
| tablet->set_tablet_state(not_ready ? doris::TabletStatePB::PB_NOTREADY |
| : doris::TabletStatePB::PB_RUNNING); |
| tablet->set_table_id(table_id); |
| tablet->set_index_id(index_id); |
| tablet->set_partition_id(partition_id); |
| tablet->set_tablet_id(tablet_id); |
| tablet->set_enable_unique_key_merge_on_write(enable_mow); |
| auto* schema = tablet->mutable_schema(); |
| schema->set_schema_version(0); |
| auto* first_rowset = tablet->add_rs_metas(); |
| first_rowset->set_rowset_id(0); // required |
| first_rowset->set_rowset_id_v2(next_rowset_id()); |
| first_rowset->set_start_version(0); |
| first_rowset->set_end_version(1); |
| first_rowset->mutable_tablet_schema()->CopyFrom(*schema); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, const std::string& rowset_id, |
| int32_t schema_version) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id, rowset_id, schema_version); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id) { |
| brpc::Controller cntl; |
| CreateTabletsRequest req; |
| CreateTabletsResponse res; |
| req.set_db_id(1); |
| add_tablet(req, table_id, index_id, partition_id, tablet_id); |
| meta_service->create_tablets(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, |
| int64_t table_id, int64_t& txn_id) { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| BeginTxnResponse res; |
| auto* txn_info = req.mutable_txn_info(); |
| txn_info->set_db_id(db_id); |
| txn_info->set_label(label); |
| txn_info->add_table_ids(table_id); |
| txn_info->set_timeout_ms(36000); |
| meta_service->begin_txn(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; |
| ASSERT_TRUE(res.has_txn_id()) << label; |
| txn_id = res.txn_id(); |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, |
| int partition_id = 10, int64_t version = -1, |
| int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(1); |
| rowset.set_num_rows(num_rows); |
| rowset.set_data_disk_size(num_rows * 100); |
| rowset.set_index_disk_size(num_rows * 10); |
| rowset.set_total_disk_size(num_rows * 110); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| static doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int index_id, |
| int partition_id, int64_t version = -1, |
| int num_rows = 100) { |
| doris::RowsetMetaCloudPB rowset; |
| rowset.set_rowset_id(0); // required |
| rowset.set_rowset_id_v2(next_rowset_id()); |
| rowset.set_tablet_id(tablet_id); |
| rowset.set_partition_id(partition_id); |
| rowset.set_index_id(index_id); |
| rowset.set_txn_id(txn_id); |
| if (version > 0) { |
| rowset.set_start_version(version); |
| rowset.set_end_version(version); |
| } |
| rowset.set_num_segments(0); |
| rowset.set_num_rows(0); |
| rowset.set_data_disk_size(0); |
| rowset.mutable_tablet_schema()->set_schema_version(0); |
| rowset.set_txn_expiration(::time(nullptr)); // Required by DCHECK |
| return rowset; |
| } |
| |
| static void check_get_tablet(MetaServiceProxy* meta_service, int64_t tablet_id, |
| int32_t schema_version) { |
| brpc::Controller cntl; |
| GetTabletRequest req; |
| GetTabletResponse res; |
| req.set_tablet_id(tablet_id); |
| meta_service->get_tablet(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| ASSERT_TRUE(res.has_tablet_meta()) << tablet_id; |
| EXPECT_TRUE(res.tablet_meta().has_schema()) << tablet_id; |
| EXPECT_EQ(res.tablet_meta().schema_version(), schema_version) << tablet_id; |
| }; |
| |
| static void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t table_id, |
| int64_t index_id, int64_t partition_id, int64_t tablet_id, |
| int64_t txn_id) { |
| create_tablet(meta_service, table_id, index_id, partition_id, tablet_id); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service, tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| static void prepare_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, |
| CreateRowsetResponse& res) { |
| brpc::Controller cntl; |
| auto* arena = res.GetArena(); |
| auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); |
| req->mutable_rowset_meta()->CopyFrom(rowset); |
| meta_service->prepare_rowset(&cntl, req, &res, nullptr); |
| if (!arena) { |
| delete req; |
| } |
| } |
| |
| static void get_tablet_stats(MetaService* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, GetTabletStatsResponse& res) { |
| brpc::Controller cntl; |
| GetTabletStatsRequest req; |
| auto* idx = req.add_tablet_idx(); |
| idx->set_table_id(table_id); |
| idx->set_index_id(index_id); |
| idx->set_partition_id(partition_id); |
| idx->set_tablet_id(tablet_id); |
| meta_service->get_tablet_stats(&cntl, &req, &res, nullptr); |
| } |
| |
| static void commit_txn(MetaServiceProxy* meta_service, int64_t db_id, int64_t txn_id, |
| const std::string& label) { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| CommitTxnResponse res; |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| meta_service->commit_txn(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; |
| } |
| |
| static void begin_txn_and_commit_rowset(MetaServiceProxy* meta_service, const std::string& label, |
| int64_t db_id, int64_t table_id, int64_t partition_id, |
| int64_t tablet_id, int64_t* txn_id) { |
| begin_txn(meta_service, db_id, label, table_id, *txn_id); |
| CreateRowsetResponse res; |
| auto rowset = create_rowset(*txn_id, tablet_id, partition_id); |
| prepare_rowset(meta_service, rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| res.Clear(); |
| commit_rowset(meta_service, rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, |
| int64_t table_id, int64_t partition_id, int64_t tablet_id) { |
| int64_t txn_id = 0; |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service, db_id, label, table_id, txn_id)); |
| CreateRowsetResponse res; |
| auto rowset = create_rowset(txn_id, tablet_id, partition_id); |
| prepare_rowset(meta_service, rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; |
| res.Clear(); |
| ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service, rowset, res)); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; |
| commit_txn(meta_service, db_id, txn_id, label); |
| } |
| |
| static void get_rowset(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, GetRowsetResponse& res) { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| auto* tablet_idx = req.mutable_idx(); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id); |
| req.set_start_version(0); |
| req.set_end_version(-1); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| meta_service->get_rowset(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id; |
| } |
| |
| static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service, int64_t table_id, |
| int64_t partition_id, int64_t lock_id, |
| int64_t initiator) { |
| brpc::Controller cntl; |
| GetDeleteBitmapUpdateLockRequest get_lock_req; |
| GetDeleteBitmapUpdateLockResponse get_lock_res; |
| get_lock_req.set_cloud_unique_id("test_cloud_unique_id"); |
| get_lock_req.set_table_id(table_id); |
| get_lock_req.add_partition_ids(partition_id); |
| get_lock_req.set_expiration(5); |
| get_lock_req.set_lock_id(lock_id); |
| get_lock_req.set_initiator(initiator); |
| meta_service->get_delete_bitmap_update_lock( |
| reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req, |
| &get_lock_res, nullptr); |
| ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK); |
| } |
| |
| static void update_delete_bitmap(MetaServiceProxy* meta_service, |
| UpdateDeleteBitmapRequest& update_delete_bitmap_req, |
| UpdateDeleteBitmapResponse& update_delete_bitmap_res, |
| int64_t table_id, int64_t partition_id, int64_t lock_id, |
| int64_t initiator, int64_t tablet_id, int64_t txn_id, |
| int64_t next_visible_version, std::string data = "1111") { |
| brpc::Controller cntl; |
| update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); |
| update_delete_bitmap_req.set_table_id(table_id); |
| update_delete_bitmap_req.set_partition_id(partition_id); |
| update_delete_bitmap_req.set_lock_id(lock_id); |
| update_delete_bitmap_req.set_initiator(initiator); |
| update_delete_bitmap_req.set_tablet_id(tablet_id); |
| update_delete_bitmap_req.set_txn_id(txn_id); |
| update_delete_bitmap_req.set_next_visible_version(next_visible_version); |
| update_delete_bitmap_req.add_rowset_ids("123"); |
| update_delete_bitmap_req.add_segment_ids(0); |
| update_delete_bitmap_req.add_versions(next_visible_version); |
| update_delete_bitmap_req.add_segment_delete_bitmaps(data); |
| meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl), |
| &update_delete_bitmap_req, &update_delete_bitmap_res, |
| nullptr); |
| } |
| |
| void start_schema_change_job(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, int64_t new_tablet_id, |
| const std::string& job_id, const std::string& initiator, |
| StartTabletJobResponse& res, int64_t alter_version = -1) { |
| brpc::Controller cntl; |
| StartTabletJobRequest req; |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto* sc = req.mutable_job()->mutable_schema_change(); |
| sc->set_id(job_id); |
| sc->set_initiator(initiator); |
| sc->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| if (alter_version != -1) { |
| sc->set_alter_version(alter_version); |
| } |
| long now = time(nullptr); |
| sc->set_expiration(now + 12); |
| meta_service->start_tablet_job(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) |
| << job_id << ' ' << initiator << ' ' << res.status().msg(); |
| }; |
| |
| void finish_schema_change_job( |
| MetaService* meta_service, int64_t tablet_id, int64_t new_tablet_id, |
| const std::string& job_id, const std::string& initiator, |
| const std::vector<doris::RowsetMetaCloudPB>& output_rowsets, FinishTabletJobResponse& res, |
| FinishTabletJobRequest_Action action = FinishTabletJobRequest::COMMIT) { |
| brpc::Controller cntl; |
| FinishTabletJobRequest req; |
| req.set_action(action); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| auto* sc = req.mutable_job()->mutable_schema_change(); |
| sc->mutable_new_tablet_idx()->set_tablet_id(new_tablet_id); |
| if (output_rowsets.empty()) { |
| sc->set_alter_version(0); |
| } else { |
| sc->set_alter_version(output_rowsets.back().end_version()); |
| for (const auto& rowset : output_rowsets) { |
| sc->add_txn_ids(rowset.txn_id()); |
| sc->add_output_versions(rowset.end_version()); |
| sc->set_num_output_rows(sc->num_output_rows() + rowset.num_rows()); |
| sc->set_num_output_segments(sc->num_output_segments() + rowset.num_segments()); |
| sc->set_size_output_rowsets(sc->size_output_rowsets() + rowset.total_disk_size()); |
| sc->set_index_size_output_rowsets(sc->index_size_output_rowsets() + |
| rowset.index_disk_size()); |
| sc->set_segment_size_output_rowsets(sc->segment_size_output_rowsets() + |
| rowset.data_disk_size()); |
| } |
| sc->set_num_output_rowsets(output_rowsets.size()); |
| } |
| sc->set_id(job_id); |
| sc->set_initiator(initiator); |
| sc->set_delete_bitmap_lock_initiator(12345); |
| meta_service->finish_tablet_job(&cntl, &req, &res, nullptr); |
| } |
| |
| void clear_memkv_count_bytes(MemTxnKv* memkv) { |
| memkv->get_count_ = memkv->put_count_ = memkv->del_count_ = 0; |
| memkv->get_bytes_ = memkv->put_bytes_ = memkv->del_bytes_ = 0; |
| } |
| |
| // setup and cleanup notify_refresh_instance syncpoint |
| class NotifyRefreshSyncpointGuard { |
| public: |
| NotifyRefreshSyncpointGuard() { |
| auto* sp = SyncPoint::get_instance(); |
| if (!sp->get_enable()) { |
| sp->enable_processing(); |
| } |
| // Disable notify_refresh_instance to avoid async operations polluting the counter |
| sp->set_call_back("notify_refresh_instance_return", [](auto&& args) { |
| auto* pred = try_any_cast<bool*>(args.back()); |
| *pred = true; |
| }); |
| } |
| |
| ~NotifyRefreshSyncpointGuard() { |
| auto* sp = SyncPoint::get_instance(); |
| sp->clear_call_back("notify_refresh_instance_return"); |
| } |
| }; |
| |
| // Helper function to create syncpoint guard |
| inline std::unique_ptr<NotifyRefreshSyncpointGuard> setup_notify_refresh_syncpoint() { |
| return std::make_unique<NotifyRefreshSyncpointGuard>(); |
| } |
| |
| // create_tablets |
| TEST(RpcKvBvarTest, CreateTablets) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| LOG(INFO) << "CreateTablets: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_create_tablets_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_create_tablets_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_create_tablets_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_create_tablets_put_counter.get({mock_instance})); |
| } |
| |
| // get_tablet |
| TEST(RpcKvBvarTest, GetTablet) { |
| std::string cloud_unique_id = "test_cloud_unique_id"; |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| brpc::Controller cntl; |
| GetTabletRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_tablet_id(tablet_id); |
| GetTabletResponse resp; |
| |
| meta_service->get_tablet(&cntl, &req, &resp, nullptr); |
| LOG(INFO) << "GetTablet: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_tablet_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_tablet_get_counter.get({mock_instance})); |
| } |
| |
| // get_tablet_stats |
| TEST(RpcKvBvarTest, GetTabletStats) { |
| std::string cloud_unique_id = "test_cloud_unique_id"; |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| GetTabletStatsResponse res; |
| get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res); |
| |
| LOG(INFO) << "GetTabletStats: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_tablet_stats_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_tablet_stats_get_counter.get({mock_instance})); |
| } |
| |
| // update_tablet |
| TEST(RpcKvBvarTest, UpdateTablet) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| brpc::Controller cntl; |
| UpdateTabletRequest req; |
| UpdateTabletResponse resp; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos(); |
| tablet_meta_info->set_tablet_id(tablet_id); |
| tablet_meta_info->set_is_in_memory(true); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->update_tablet(&cntl, &req, &resp, nullptr); |
| |
| LOG(INFO) << "UpdateTablet: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_update_tablet_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_update_tablet_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_update_tablet_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_update_tablet_put_counter.get({mock_instance})); |
| } |
| |
| // begin_txn |
| TEST(RpcKvBvarTest, BeginTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| int64_t txn_id = 0; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| |
| LOG(INFO) << "BeginTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_begin_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_begin_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_begin_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_begin_txn_put_counter.get({mock_instance})); |
| } |
| |
| // commit_txn |
| TEST(RpcKvBvarTest, CommitTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| int64_t txn_id = 0; |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| commit_txn(meta_service.get(), db_id, txn_id, label); |
| |
| LOG(INFO) << "CommitTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_commit_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_commit_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_commit_txn_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_commit_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_commit_txn_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_commit_txn_del_counter.get({mock_instance})); |
| } |
| |
| // precommit_txn |
| TEST(RpcKvBvarTest, PrecommitTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| const int64_t db_id = 563413; |
| const int64_t table_id = 417417878; |
| const std::string& label = "label_123dae121das"; |
| int64_t txn_id = -1; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info; |
| txn_info.set_db_id(db_id); |
| txn_info.set_label(label); |
| txn_info.add_table_ids(table_id); |
| txn_info.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| } |
| |
| std::unique_ptr<Transaction> txn; |
| TxnErrorCode err = meta_service->txn_kv()->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| |
| const std::string info_key = txn_info_key({mock_instance, db_id, txn_id}); |
| std::string info_val; |
| ASSERT_EQ(txn->get(info_key, &info_val), TxnErrorCode::TXN_OK); |
| TxnInfoPB txn_info; |
| txn_info.ParseFromString(info_val); |
| ASSERT_EQ(txn_info.status(), TxnStatusPB::TXN_STATUS_PREPARED); |
| |
| brpc::Controller cntl; |
| PrecommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_precommit_timeout_ms(36000); |
| PrecommitTxnResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->precommit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "PrecommitTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_precommit_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_precommit_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_precommit_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_precommit_txn_put_counter.get({mock_instance})); |
| } |
| |
| // abort_txn |
| TEST(RpcKvBvarTest, AbortTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| int64_t txn_id = 0; |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| brpc::Controller cntl; |
| AbortTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_reason("test"); |
| AbortTxnResponse res; |
| meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "AbortTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_abort_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_abort_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_abort_txn_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_abort_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_abort_txn_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_abort_txn_del_counter.get({mock_instance})); |
| } |
| |
| // get_txn |
| TEST(RpcKvBvarTest, GetTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| int64_t txn_id = 0; |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| brpc::Controller cntl; |
| GetTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_db_id(db_id); |
| GetTxnResponse res; |
| meta_service->get_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "GetTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_txn_get_counter.get({mock_instance})); |
| } |
| |
| // get_txn_id |
| TEST(RpcKvBvarTest, GetTxnId) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| int64_t txn_id = 0; |
| |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| |
| brpc::Controller cntl; |
| GetTxnIdRequest req; |
| GetTxnIdResponse res; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_label(label); |
| req.set_db_id(db_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_txn_id(&cntl, &req, &res, nullptr); |
| |
| LOG(INFO) << "GetTxnId: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_txn_id_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_txn_id_get_counter.get({mock_instance})); |
| } |
| |
| // prepare_rowset |
| TEST(RpcKvBvarTest, PrepareRowset) { |
| int64_t db_id = 100201; |
| std::string label = "test_prepare_rowset"; |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10021, index_id = 10022, partition_id = 10023, tablet_id = 10024; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| int64_t txn_id = 0; |
| ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); |
| CreateRowsetResponse res; |
| auto rowset = create_rowset(txn_id, tablet_id, partition_id); |
| rowset.mutable_load_id()->set_hi(123); |
| rowset.mutable_load_id()->set_lo(456); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| prepare_rowset(meta_service.get(), rowset, res); |
| |
| LOG(INFO) << "PrepareRowset: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_prepare_rowset_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_prepare_rowset_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_prepare_rowset_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_prepare_rowset_put_counter.get({mock_instance})); |
| } |
| |
| // get_rowset |
| TEST(RpcKvBvarTest, GetRowset) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| config::write_schema_kv = true; |
| ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id, |
| tablet_id, next_rowset_id(), 1)); |
| // check get tablet response |
| check_get_tablet(meta_service.get(), tablet_id, 1); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| // check get rowset response |
| GetRowsetResponse get_rowset_res; |
| get_rowset(meta_service.get(), table_id, index_id, partition_id, tablet_id, get_rowset_res); |
| |
| LOG(INFO) << "GetRowset: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_rowset_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_rowset_get_counter.get({mock_instance})); |
| } |
| |
| // update_tmp_rowset |
| TEST(RpcKvBvarTest, UpdateTmpRowset) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| int64_t txn_id = -1; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| auto rowset = create_rowset(txn_id, tablet_id, partition_id); |
| rowset.set_num_segments(rowset.num_segments() + 3); |
| rowset.set_num_rows(rowset.num_rows() + 1000); |
| rowset.set_total_disk_size(rowset.total_disk_size() + 11000); |
| rowset.set_index_disk_size(rowset.index_disk_size() + 1000); |
| rowset.set_data_disk_size(rowset.data_disk_size() + 10000); |
| |
| std::unique_ptr<Transaction> txn; |
| std::string update_key; |
| brpc::Controller cntl; |
| CreateRowsetResponse res; |
| auto* arena = res.GetArena(); |
| auto* req = google::protobuf::Arena::CreateMessage<CreateRowsetRequest>(arena); |
| std::string instance_id = get_instance_id(meta_service->resource_mgr(), req->cloud_unique_id()); |
| MetaRowsetTmpKeyInfo key_info {instance_id, txn_id, tablet_id}; |
| meta_rowset_tmp_key(key_info, &update_key); |
| EXPECT_EQ(mem_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(update_key, "update_tmp_rowset_val"); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| req->mutable_rowset_meta()->CopyFrom(rowset); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| meta_service->update_tmp_rowset(&cntl, req, &res, nullptr); |
| |
| if (!arena) { |
| delete req; |
| } |
| |
| LOG(INFO) << "UpdateTmpRowset: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_update_tmp_rowset_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_update_tmp_rowset_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_update_tmp_rowset_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_update_tmp_rowset_put_counter.get({mock_instance})); |
| } |
| |
| // commit_rowset |
| TEST(RpcKvBvarTest, CommitRowset) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| int64_t txn_id = -1; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id, partition_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| |
| LOG(INFO) << "CommitRowset: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_commit_rowset_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_commit_rowset_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_commit_rowset_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_commit_rowset_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_commit_rowset_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_commit_rowset_del_counter.get({mock_instance})); |
| } |
| |
| // get_version |
| TEST(RpcKvBvarTest, GetVersion) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| constexpr auto table_id = 10001, partition_id = 10003, tablet_id = 10004; |
| create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id); |
| insert_rowset(meta_service.get(), 1, "get_version_label_1", table_id, partition_id, tablet_id); |
| |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| GetVersionResponse resp; |
| meta_service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| LOG(INFO) << "GetVersion: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_version_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_version_get_counter.get({mock_instance})); |
| } |
| |
| // get_schema_dict |
| TEST(RpcKvBvarTest, GetSchemaDict) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| brpc::Controller ctrl; |
| GetSchemaDictRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_index_id(index_id); |
| |
| std::unique_ptr<Transaction> txn; |
| std::string instance_id = get_instance_id(meta_service->resource_mgr(), req.cloud_unique_id()); |
| std::string dict_key = meta_schema_pb_dictionary_key({instance_id, req.index_id()}); |
| EXPECT_EQ(mem_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(dict_key, "dict_val"); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| GetSchemaDictResponse resp; |
| meta_service->get_schema_dict(&ctrl, &req, &resp, nullptr); |
| |
| LOG(INFO) << "GetSchemaDict: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_schema_dict_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_schema_dict_get_counter.get({mock_instance})); |
| } |
| |
| // get_delete_bitmap_update_lock |
| TEST(RpcKvBvarTest, GetDeleteBitmapUpdateLock) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| int64_t db_id = 99999; |
| int64_t table_id = 1801; |
| int64_t index_id = 4801; |
| int64_t t1p1 = 2001; |
| int64_t tablet_id = 3001; |
| int64_t txn_id; |
| ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| t1p1, tablet_id)); |
| begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id, |
| &txn_id); |
| int64_t lock_id = -2; |
| int64_t initiator = 1009; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); |
| |
| LOG(INFO) << "GetDeleteBitmapUpdateLock: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ |
| << ", " << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " |
| << mem_kv->put_bytes_ << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, |
| g_bvar_rpc_kv_get_delete_bitmap_update_lock_del_counter.get({mock_instance})); |
| } |
| |
| // update_delete_bitmap |
| TEST(RpcKvBvarTest, UpdateDeleteBitmap) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| int64_t db_id = 99999; |
| int64_t table_id = 1801; |
| int64_t index_id = 4801; |
| int64_t t1p1 = 2001; |
| int64_t tablet_id = 3001; |
| int64_t txn_id; |
| size_t split_size = 90 * 1000; // see cloud/src/common/util.h |
| ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| t1p1, tablet_id)); |
| begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id, |
| &txn_id); |
| int64_t lock_id = -2; |
| int64_t initiator = 1009; |
| int64_t version = 100; |
| get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); |
| UpdateDeleteBitmapRequest update_delete_bitmap_req; |
| UpdateDeleteBitmapResponse update_delete_bitmap_res; |
| // will be splited and stored in 5 KVs |
| std::string data1(split_size * 5, 'c'); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, |
| table_id, t1p1, lock_id, initiator, tablet_id, txn_id, version, data1); |
| |
| LOG(INFO) << "UpdateDeleteBitmap: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_update_delete_bitmap_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, |
| g_bvar_rpc_kv_update_delete_bitmap_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, |
| g_bvar_rpc_kv_update_delete_bitmap_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_update_delete_bitmap_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, |
| g_bvar_rpc_kv_update_delete_bitmap_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, |
| g_bvar_rpc_kv_update_delete_bitmap_del_counter.get({mock_instance})); |
| } |
| |
| // get_delete_bitmap |
| TEST(RpcKvBvarTest, GetDeleteBitmap) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 99999; |
| int64_t table_id = 1801; |
| int64_t index_id = 4801; |
| int64_t t1p1 = 2001; |
| int64_t tablet_id = 3001; |
| int64_t txn_id; |
| size_t split_size = 90 * 1000; // see cloud/src/common/util.h |
| ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| t1p1, tablet_id)); |
| begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id, |
| &txn_id); |
| int64_t lock_id = -2; |
| int64_t initiator = 1009; |
| int64_t version = 100; |
| get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); |
| UpdateDeleteBitmapRequest update_delete_bitmap_req; |
| UpdateDeleteBitmapResponse update_delete_bitmap_res; |
| // will be splited and stored in 5 KVs |
| std::string data1(split_size * 5, 'c'); |
| update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, |
| table_id, t1p1, lock_id, initiator, tablet_id, txn_id, version, data1); |
| |
| brpc::Controller ctrl; |
| GetDeleteBitmapRequest get_delete_bitmap_req; |
| GetDeleteBitmapResponse get_delete_bitmap_res; |
| get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); |
| get_delete_bitmap_req.set_tablet_id(tablet_id); |
| get_delete_bitmap_req.add_rowset_ids("123"); |
| get_delete_bitmap_req.add_begin_versions(0); |
| get_delete_bitmap_req.add_end_versions(version); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&ctrl), |
| &get_delete_bitmap_req, &get_delete_bitmap_res, nullptr); |
| |
| LOG(INFO) << "GetDeleteBitmap: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_delete_bitmap_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_delete_bitmap_get_counter.get({mock_instance})); |
| } |
| |
| // remove_delete_bitmap_update_lock |
| TEST(RpcKvBvarTest, RemoveDeleteBitmapUpdateLock) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| int64_t db_id = 99999; |
| int64_t table_id = 1801; |
| int64_t index_id = 4801; |
| int64_t t1p1 = 2001; |
| int64_t tablet_id = 3001; |
| int64_t txn_id; |
| ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| t1p1, tablet_id)); |
| begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id, table_id, t1p1, tablet_id, |
| &txn_id); |
| int64_t lock_id = -2; |
| int64_t initiator = 1009; |
| |
| get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); |
| brpc::Controller cntl; |
| RemoveDeleteBitmapUpdateLockRequest remove_req; |
| RemoveDeleteBitmapUpdateLockResponse remove_res; |
| |
| remove_req.set_cloud_unique_id("test_cloud_unique_id"); |
| remove_req.set_table_id(table_id); |
| remove_req.set_lock_id(lock_id); |
| remove_req.set_initiator(initiator); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->remove_delete_bitmap_update_lock( |
| reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &remove_req, &remove_res, |
| nullptr); |
| |
| LOG(INFO) << "RemoveDeleteBitmapUpdateLock: " << mem_kv->get_count_ << ", " |
| << mem_kv->put_count_ << ", " << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ |
| << ", " << mem_kv->put_bytes_ << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, |
| g_bvar_rpc_kv_remove_delete_bitmap_update_lock_del_counter.get({mock_instance})); |
| } |
| |
| // remove_delete_bitmap |
| TEST(RpcKvBvarTest, RemoveDeleteBitmap) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 99999; |
| int64_t table_id = 1801; |
| int64_t index_id = 4801; |
| int64_t t1p1 = 2001; |
| int64_t tablet_id = 3001; |
| int64_t txn_id; |
| size_t split_size = 90 * 1000; // see cloud/src/common/util.h |
| ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, |
| t1p1, tablet_id)); |
| begin_txn_and_commit_rowset(meta_service.get(), "label1", db_id, table_id, t1p1, tablet_id, |
| &txn_id); |
| int64_t lock_id = -2; |
| int64_t initiator = 1009; |
| get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id, initiator); |
| int64_t version = 100; |
| UpdateDeleteBitmapRequest update_delete_bitmap_req; |
| UpdateDeleteBitmapResponse update_delete_bitmap_res; |
| // will be splited and stored in 5 KVs |
| std::string data1(split_size * 5, 'c'); |
| update_delete_bitmap(meta_service.get(), update_delete_bitmap_req, update_delete_bitmap_res, |
| table_id, t1p1, lock_id, initiator, tablet_id, txn_id, version, data1); |
| |
| brpc::Controller ctrl; |
| RemoveDeleteBitmapRequest req; |
| RemoveDeleteBitmapResponse resp; |
| req.add_begin_versions(version); |
| req.add_end_versions(version); |
| req.add_rowset_ids("rowset_ids"); |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->remove_delete_bitmap(&ctrl, &req, &resp, nullptr); |
| |
| LOG(INFO) << "RemoveDeleteBitmap: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->del_bytes_, |
| g_bvar_rpc_kv_remove_delete_bitmap_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, |
| g_bvar_rpc_kv_remove_delete_bitmap_del_counter.get({mock_instance})); |
| } |
| |
| // start_tablet_job |
| TEST(RpcKvBvarTest, StartTabletJob) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| constexpr int64_t tablet_id = 10004; |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id); |
| StartTabletJobResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, |
| TabletCompactionJobPB::BASE, res); |
| |
| LOG(INFO) << "StartTabletJob: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_start_tablet_job_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_start_tablet_job_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_start_tablet_job_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_start_tablet_job_put_counter.get({mock_instance})); |
| } |
| |
| // finish_tablet_job |
| TEST(RpcKvBvarTest, FinishTabletJob) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| |
| int64_t table_id = 1; |
| int64_t index_id = 2; |
| int64_t partition_id = 3; |
| int64_t tablet_id = 4; |
| |
| ASSERT_NO_FATAL_FAILURE( |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false)); |
| |
| StartTabletJobResponse res; |
| start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 0, |
| TabletCompactionJobPB::CUMULATIVE, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| res.Clear(); |
| |
| int64_t new_tablet_id = 11; |
| ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id, |
| new_tablet_id, false, true)); |
| StartTabletJobResponse sc_res; |
| ASSERT_NO_FATAL_FAILURE(start_schema_change_job(meta_service.get(), table_id, index_id, |
| partition_id, tablet_id, new_tablet_id, "job2", |
| "BE1", sc_res)); |
| |
| long now = time(nullptr); |
| FinishTabletJobRequest req; |
| FinishTabletJobResponse finish_res_2; |
| req.set_action(FinishTabletJobRequest::LEASE); |
| auto* compaction = req.mutable_job()->add_compaction(); |
| compaction->set_id("job1"); |
| compaction->set_initiator("BE1"); |
| compaction->set_lease(now + 10); |
| req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->finish_tablet_job(&cntl, &req, &finish_res_2, nullptr); |
| |
| LOG(INFO) << "FinishTabletJob: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_finish_tablet_job_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_finish_tablet_job_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_finish_tablet_job_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_finish_tablet_job_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_finish_tablet_job_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_finish_tablet_job_del_counter.get({mock_instance})); |
| } |
| |
| // prepare_index |
| TEST(RpcKvBvarTest, PrepareIndex) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_cloud_instance_id"; |
| |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| constexpr int64_t tablet_id = 10004; |
| |
| std::unique_ptr<Transaction> txn; |
| doris::TabletMetaCloudPB tablet_pb; |
| tablet_pb.set_table_id(table_id); |
| tablet_pb.set_index_id(index_id); |
| tablet_pb.set_partition_id(partition_id); |
| tablet_pb.set_tablet_id(tablet_id); |
| auto tablet_key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| auto tablet_val = tablet_pb.SerializeAsString(); |
| RecycleIndexPB index_pb; |
| auto index_key = recycle_index_key({instance_id, index_id}); |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| std::string val; |
| |
| brpc::Controller ctrl; |
| IndexRequest req; |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.set_is_new_table(true); |
| IndexResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->prepare_index(&ctrl, &req, &res, nullptr); |
| |
| LOG(INFO) << "PrepareIndex: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_prepare_index_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_prepare_index_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_prepare_index_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_prepare_index_put_counter.get({mock_instance})); |
| } |
| |
| // commit_index |
| TEST(RpcKvBvarTest, CommitIndex) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_cloud_instance_id"; |
| |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| constexpr int64_t tablet_id = 10004; |
| |
| std::unique_ptr<Transaction> txn; |
| doris::TabletMetaCloudPB tablet_pb; |
| tablet_pb.set_table_id(table_id); |
| tablet_pb.set_index_id(index_id); |
| tablet_pb.set_partition_id(partition_id); |
| tablet_pb.set_tablet_id(tablet_id); |
| auto tablet_key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| auto tablet_val = tablet_pb.SerializeAsString(); |
| RecycleIndexPB index_pb; |
| auto index_key = recycle_index_key({instance_id, index_id}); |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| std::string val; |
| |
| brpc::Controller ctrl; |
| IndexRequest req; |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.set_is_new_table(true); |
| IndexResponse res; |
| meta_service->prepare_index(&ctrl, &req, &res, nullptr); |
| res.Clear(); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->commit_index(&ctrl, &req, &res, nullptr); |
| |
| LOG(INFO) << "CommitIndex: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_commit_index_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_commit_index_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_commit_index_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_commit_index_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_commit_index_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_commit_index_del_counter.get({mock_instance})); |
| } |
| |
| // drop_index |
| TEST(RpcKvBvarTest, DropIndex) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 4524364; |
| int64_t table_id = 65354; |
| int64_t index_id = 658432; |
| int64_t partition_id = 76553; |
| std::string mock_instance = "test_instance"; |
| const std::string label = "test_label_67a34e2q1231"; |
| |
| int64_t tablet_id_base = 2313324; |
| for (int i = 0; i < 10; ++i) { |
| create_tablet_with_db_id(meta_service.get(), db_id, table_id, index_id, partition_id, |
| tablet_id_base + i); |
| } |
| int txn_id {}; |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| ASSERT_GT(txn_id, 0); |
| } |
| { |
| for (int i = 0; i < 10; ++i) { |
| auto tmp_rowset = |
| create_rowset(txn_id, tablet_id_base + i, index_id, partition_id, -1, 100); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| } |
| |
| brpc::Controller cntl; |
| IndexRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| IndexResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->drop_index(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "DropIndex: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_drop_index_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_drop_index_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_drop_index_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_drop_index_put_counter.get({mock_instance})); |
| } |
| |
| // prepare_partition |
| TEST(RpcKvBvarTest, PreparePartition) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_cloud_instance_id"; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| constexpr int64_t tablet_id = 10004; |
| |
| std::unique_ptr<Transaction> txn; |
| doris::TabletMetaCloudPB tablet_pb; |
| tablet_pb.set_table_id(table_id); |
| tablet_pb.set_index_id(index_id); |
| tablet_pb.set_partition_id(partition_id); |
| tablet_pb.set_tablet_id(tablet_id); |
| auto tablet_key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| auto tablet_val = tablet_pb.SerializeAsString(); |
| RecyclePartitionPB partition_pb; |
| auto partition_key = recycle_partition_key({instance_id, partition_id}); |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| std::string val; |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| res.Clear(); |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add(tbl_version_key, 1); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->prepare_partition(&ctrl, &req, &res, nullptr); |
| |
| LOG(INFO) << "PreparePartition: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_prepare_partition_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_prepare_partition_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_prepare_partition_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_prepare_partition_put_counter.get({mock_instance})); |
| } |
| |
| // commit_partition |
| TEST(RpcKvBvarTest, CommitPartition) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_cloud_instance_id"; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| constexpr int64_t tablet_id = 10004; |
| |
| std::unique_ptr<Transaction> txn; |
| doris::TabletMetaCloudPB tablet_pb; |
| tablet_pb.set_table_id(table_id); |
| tablet_pb.set_index_id(index_id); |
| tablet_pb.set_partition_id(partition_id); |
| tablet_pb.set_tablet_id(tablet_id); |
| auto tablet_key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); |
| auto tablet_val = tablet_pb.SerializeAsString(); |
| RecyclePartitionPB partition_pb; |
| auto partition_key = recycle_partition_key({instance_id, partition_id}); |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| std::string val; |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add(tbl_version_key, 1); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| meta_service->prepare_partition(&ctrl, &req, &res, nullptr); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->commit_partition(&ctrl, &req, &res, nullptr); |
| |
| LOG(INFO) << "CommitPartition: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_commit_partition_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_commit_partition_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_commit_partition_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_commit_partition_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_commit_partition_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_commit_partition_del_counter.get({mock_instance})); |
| } |
| |
| // check_kv |
| TEST(RpcKvBvarTest, CheckKv) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_instance"; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| |
| std::unique_ptr<Transaction> txn; |
| RecyclePartitionPB partition_pb; |
| auto partition_key = recycle_partition_key({instance_id, 10004}); |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| brpc::Controller ctrl; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(partition_key, "val"); |
| txn->atomic_add(tbl_version_key, 1); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| CheckKVRequest req_check; |
| CheckKVResponse res_check; |
| req_check.set_op(CheckKVRequest::CREATE_PARTITION_AFTER_FE_COMMIT); |
| CheckKeyInfos check_keys_pb; |
| check_keys_pb.add_table_ids(table_id + 1); |
| check_keys_pb.add_index_ids(index_id + 1); |
| check_keys_pb.add_partition_ids(partition_id + 1); |
| req_check.mutable_check_keys()->CopyFrom(check_keys_pb); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->check_kv(&ctrl, &req_check, &res_check, nullptr); |
| |
| LOG(INFO) << "CheckKv: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_check_kv_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_check_kv_get_counter.get({mock_instance})); |
| } |
| |
| // drop_partition |
| TEST(RpcKvBvarTest, DropPartition) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| std::string instance_id = "test_instance"; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| auto tbl_version_key = table_version_key({instance_id, 1, table_id}); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->atomic_add(tbl_version_key, 1); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| req.set_need_update_table_version(true); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->drop_partition(&ctrl, &req, &res, nullptr); |
| |
| LOG(INFO) << "DropPartition: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_drop_partition_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_drop_partition_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_drop_partition_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_drop_partition_put_counter.get({mock_instance})); |
| } |
| |
| // get_obj_store_info |
| TEST(RpcKvBvarTest, GetObjStoreInfo) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| auto rate_limiter = std::make_shared<cloud::RateLimiter>(); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| InstanceKeyInfo key_info {"test_instance"}; |
| std::string key; |
| instance_key(key_info, &key); |
| txn->put(key, "val"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| brpc::Controller cntl; |
| GetObjStoreInfoResponse res; |
| GetObjStoreInfoRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_obj_store_info(&cntl, &req, &res, nullptr); |
| |
| LOG(INFO) << "GetObjStoreInfo: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_obj_store_info_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_get_obj_store_info_get_counter.get({mock_instance})); |
| } |
| |
| // alter_storage_vault |
| TEST(RpcKvBvarTest, AlterStorageVault) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| constexpr char vault_name[] = "test_alter_s3_vault"; |
| |
| InstanceKeyInfo key_info {"test_instance"}; |
| std::string key; |
| instance_key(key_info, &key); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(key, "val"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| AlterObjStoreInfoRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT); |
| StorageVaultPB vault; |
| vault.mutable_obj_info()->set_ak("new_ak"); |
| vault.set_name(vault_name); |
| req.mutable_vault()->CopyFrom(vault); |
| |
| brpc::Controller cntl; |
| AlterObjStoreInfoResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->alter_storage_vault(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| |
| LOG(INFO) << "AlterStorageVault: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_alter_storage_vault_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_alter_storage_vault_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_alter_storage_vault_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_alter_storage_vault_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, |
| g_bvar_rpc_kv_alter_storage_vault_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, |
| g_bvar_rpc_kv_alter_storage_vault_del_counter.get({mock_instance})); |
| } |
| |
| // alter_obj_store_info |
| TEST(RpcKvBvarTest, AlterObjStoreInfo) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->enable_processing(); |
| |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "selectdbselectdbselectdbselectdb"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| |
| ObjectStoreInfoPB obj_info; |
| obj_info.set_id("1"); |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| InstanceInfoPB instance; |
| instance.add_obj_info()->CopyFrom(obj_info); |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t"; |
| |
| AlterObjStoreInfoRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_op(AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK); |
| req.mutable_obj()->set_id("1"); |
| req.mutable_obj()->set_ak("new_ak"); |
| req.mutable_obj()->set_sk(plain_sk); |
| |
| brpc::Controller cntl; |
| AlterObjStoreInfoResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->alter_obj_store_info(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| |
| LOG(INFO) << "AlterObjStoreInfo: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_alter_obj_store_info_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, |
| g_bvar_rpc_kv_alter_obj_store_info_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_alter_obj_store_info_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, |
| g_bvar_rpc_kv_alter_obj_store_info_put_counter.get({mock_instance})); |
| SyncPoint::get_instance()->disable_processing(); |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| } |
| |
| // update_ak_sk |
| TEST(RpcKvBvarTest, UpdateAkSk) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->enable_processing(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "selectdbselectdbselectdbselectdb"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| |
| std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO"; |
| std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t"; |
| |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key; |
| std::string val; |
| InstanceKeyInfo key_info {"test_instance"}; |
| instance_key(key_info, &key); |
| |
| ObjectStoreInfoPB obj_info; |
| |
| obj_info.set_user_id("111"); |
| |
| obj_info.set_ak("ak"); |
| obj_info.set_sk("sk"); |
| InstanceInfoPB instance; |
| instance.add_obj_info()->CopyFrom(obj_info); |
| val = instance.SerializeAsString(); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| UpdateAkSkRequest req; |
| req.set_instance_id("test_instance"); |
| RamUserPB ram_user; |
| ram_user.set_user_id("111"); |
| |
| ram_user.set_ak("new_ak"); |
| ram_user.set_sk(plain_sk); |
| req.add_internal_bucket_user()->CopyFrom(ram_user); |
| |
| brpc::Controller cntl; |
| UpdateAkSkResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->update_ak_sk(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "UpdateAkSk: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_update_ak_sk_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_update_ak_sk_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_update_ak_sk_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_update_ak_sk_put_counter.get({mock_instance})); |
| |
| SyncPoint::get_instance()->disable_processing(); |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| } |
| |
| // create_instance |
| TEST(RpcKvBvarTest, CreateInstance) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| brpc::Controller cntl; |
| CreateInstanceRequest req; |
| req.set_instance_id("test_instance"); |
| req.set_user_id("test_user"); |
| req.set_name("test_name"); |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| req.mutable_obj_info()->CopyFrom(obj); |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->enable_processing(); |
| CreateInstanceResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "CreateInstance: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_create_instance_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_create_instance_get_counter.get({mock_instance})); |
| |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_instance |
| TEST(RpcKvBvarTest, GetInstance) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->enable_processing(); |
| brpc::Controller cntl; |
| { |
| CreateInstanceRequest req; |
| req.set_instance_id("test_instance"); |
| req.set_user_id("test_user"); |
| req.set_name("test_name"); |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| req.mutable_obj_info()->CopyFrom(obj); |
| |
| CreateInstanceResponse res; |
| |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| } |
| GetInstanceRequest req; |
| GetInstanceResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| req.set_cloud_unique_id("1:test_instance:m-n3qdpyal27rh8iprxx"); |
| meta_service->get_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "GetInstance: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_instance_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_instance_get_counter.get({mock_instance})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // alter_cluster |
| // TEST(RpcKvBvarTest, AlterCluster) { |
| // auto meta_service = get_meta_service(); |
| // auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| // brpc::Controller cntl; |
| // AlterClusterRequest req; |
| // req.set_instance_id(mock_instance); |
| // req.mutable_cluster()->set_cluster_name(mock_cluster_name); |
| // req.set_op(AlterClusterRequest::ADD_CLUSTER); |
| // AlterClusterResponse res; |
| |
| // mem_kv->get_count_ = 0; |
| // mem_kv->put_count_ = 0; |
| // mem_kv->del_count_ = 0; |
| |
| // meta_service->alter_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| // &res, nullptr); |
| |
| // LOG(INFO) << "AlterCluster: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| // << mem_kv->del_count_; |
| // ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_alter_cluster_get_counter.get({mock_instance})); |
| // } |
| |
| // get_cluster |
| TEST(RpcKvBvarTest, GetCluster) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| InstanceKeyInfo key_info {mock_instance}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| ClusterPB c1; |
| c1.set_cluster_name(mock_cluster_name); |
| c1.set_cluster_id(mock_cluster_id); |
| c1.add_mysql_user_name()->append("m1"); |
| instance.add_clusters()->CopyFrom(c1); |
| val = instance.SerializeAsString(); |
| |
| std::unique_ptr<Transaction> txn; |
| std::string get_val; |
| ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| brpc::Controller cntl; |
| GetClusterRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_cluster_id(mock_cluster_id); |
| req.set_cluster_name("test_cluster"); |
| GetClusterResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_cluster(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "GetCluster: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_cluster_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_get_cluster_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_cluster_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_get_cluster_put_counter.get({mock_instance})); |
| } |
| |
| // create_stage |
| TEST(RpcKvBvarTest, CreateStage) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| brpc::Controller cntl; |
| const auto* cloud_unique_id = "test_cloud_unique_id"; |
| std::string instance_id = "stage_test_instance_id"; |
| [[maybe_unused]] auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| |
| RamUserPB ram_user; |
| ram_user.set_user_id("test_user_id"); |
| ram_user.set_ak("test_ak"); |
| ram_user.set_sk("test_sk"); |
| EncryptionInfoPB encry_info; |
| encry_info.set_encryption_method("encry_method_test"); |
| encry_info.set_key_id(1111); |
| ram_user.mutable_encryption_info()->CopyFrom(encry_info); |
| |
| // create instance |
| { |
| CreateInstanceRequest req; |
| req.set_instance_id(instance_id); |
| req.set_user_id("test_user"); |
| req.set_name("test_name"); |
| req.mutable_ram_user()->CopyFrom(ram_user); |
| req.mutable_obj_info()->CopyFrom(obj); |
| |
| CreateInstanceResponse res; |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // create an internal stage |
| CreateStageRequest create_stage_request; |
| StagePB stage; |
| stage.set_type(StagePB::INTERNAL); |
| stage.add_mysql_user_name("root"); |
| stage.add_mysql_user_id("root_id"); |
| stage.set_stage_id("internal_stage_id"); |
| create_stage_request.set_cloud_unique_id(cloud_unique_id); |
| create_stage_request.mutable_stage()->CopyFrom(stage); |
| CreateStageResponse create_stage_response; |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->create_stage(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &create_stage_request, &create_stage_response, nullptr); |
| |
| LOG(INFO) << "CreateStage: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_create_stage_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_create_stage_put_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_create_stage_get_counter.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_create_stage_put_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_stage |
| TEST(RpcKvBvarTest, GetStage) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| brpc::Controller cntl; |
| const auto* cloud_unique_id = "test_cloud_unique_id"; |
| std::string instance_id = "stage_test_instance_id"; |
| [[maybe_unused]] auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| |
| RamUserPB ram_user; |
| ram_user.set_user_id("test_user_id"); |
| ram_user.set_ak("test_ak"); |
| ram_user.set_sk("test_sk"); |
| EncryptionInfoPB encry_info; |
| encry_info.set_encryption_method("encry_method_test"); |
| encry_info.set_key_id(1111); |
| ram_user.mutable_encryption_info()->CopyFrom(encry_info); |
| |
| // create instance |
| { |
| CreateInstanceRequest req; |
| req.set_instance_id(instance_id); |
| req.set_user_id("test_user"); |
| req.set_name("test_name"); |
| req.mutable_ram_user()->CopyFrom(ram_user); |
| req.mutable_obj_info()->CopyFrom(obj); |
| |
| CreateInstanceResponse res; |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // create an internal stage |
| CreateStageRequest create_stage_request; |
| StagePB stage; |
| stage.set_type(StagePB::INTERNAL); |
| stage.add_mysql_user_name("root"); |
| stage.add_mysql_user_id("root_id"); |
| stage.set_stage_id("internal_stage_id"); |
| create_stage_request.set_cloud_unique_id(cloud_unique_id); |
| create_stage_request.mutable_stage()->CopyFrom(stage); |
| CreateStageResponse create_stage_response; |
| meta_service->create_stage(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &create_stage_request, &create_stage_response, nullptr); |
| |
| GetStageRequest get_stage_req; |
| get_stage_req.set_type(StagePB::INTERNAL); |
| get_stage_req.set_cloud_unique_id(cloud_unique_id); |
| get_stage_req.set_mysql_user_name("root"); |
| get_stage_req.set_mysql_user_id("root_id"); |
| |
| // get existent internal stage |
| GetStageResponse res2; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_stage(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &get_stage_req, &res2, nullptr); |
| |
| LOG(INFO) << "GetStage: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_stage_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_stage_get_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_iam |
| TEST(RpcKvBvarTest, GetIam) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| brpc::Controller cntl; |
| auto cloud_unique_id = "test_cloud_unique_id"; |
| std::string instance_id = "get_iam_test_instance_id"; |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| config::arn_id = "iam_arn"; |
| config::arn_ak = "iam_ak"; |
| config::arn_sk = "iam_sk"; |
| |
| // create instance |
| { |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| |
| RamUserPB ram_user; |
| ram_user.set_user_id("test_user_id"); |
| ram_user.set_ak("test_ak"); |
| ram_user.set_sk("test_sk"); |
| |
| CreateInstanceRequest req; |
| req.set_instance_id(instance_id); |
| req.set_user_id("test_user"); |
| req.set_name("test_name"); |
| req.mutable_ram_user()->CopyFrom(ram_user); |
| req.mutable_obj_info()->CopyFrom(obj); |
| |
| CreateInstanceResponse res; |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| GetIamRequest request; |
| request.set_cloud_unique_id(cloud_unique_id); |
| GetIamResponse response; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| meta_service->get_iam(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &request, |
| &response, nullptr); |
| ASSERT_EQ(response.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "GetIam: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_iam_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_iam_get_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // alter_iam |
| TEST(RpcKvBvarTest, AlterIam) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| auto* sp = SyncPoint::get_instance(); |
| sp->enable_processing(); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "selectdbselectdbselectdbselectdb"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| |
| std::string cipher_sk = "JUkuTDctR+ckJtnPkLScWaQZRcOtWBhsLLpnCRxQLxr734qB8cs6gNLH6grE1FxO"; |
| std::string plain_sk = "Hx60p12123af234541nsVsffdfsdfghsdfhsdf34t"; |
| |
| AlterIamRequest req; |
| req.set_account_id("123"); |
| req.set_ak("ak1"); |
| req.set_sk(plain_sk); |
| |
| brpc::Controller cntl; |
| AlterIamResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->alter_iam(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, &res, |
| nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "AlterIam: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_alter_iam_get_bytes.get({"alter_iam_instance"})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_alter_iam_put_bytes.get({"alter_iam_instance"})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_alter_iam_get_counter.get({"alter_iam_instance"})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_alter_iam_put_counter.get({"alter_iam_instance"})); |
| } |
| |
| // alter_ram_user |
| TEST(RpcKvBvarTest, AlterRamUser) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| std::string instance_id = "alter_ram_user_instance_id"; |
| [[maybe_unused]] auto* sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* ret = try_any_cast<int*>(args[0]); |
| *ret = 0; |
| auto* key = try_any_cast<std::string*>(args[1]); |
| *key = "test"; |
| auto* key_id = try_any_cast<int64_t*>(args[2]); |
| *key_id = 1; |
| }); |
| sp->set_call_back("decrypt_ak_sk:get_encryption_key", [](auto&& args) { |
| auto* key = try_any_cast<std::string*>(args[0]); |
| *key = "test"; |
| auto* ret = try_any_cast<int*>(args[1]); |
| *ret = 0; |
| }); |
| sp->enable_processing(); |
| |
| config::arn_id = "iam_arn"; |
| config::arn_ak = "iam_ak"; |
| config::arn_sk = "iam_sk"; |
| |
| ObjectStoreInfoPB obj; |
| obj.set_ak("123"); |
| obj.set_sk("321"); |
| obj.set_bucket("456"); |
| obj.set_prefix("654"); |
| obj.set_endpoint("789"); |
| obj.set_region("987"); |
| obj.set_external_endpoint("888"); |
| obj.set_provider(ObjectStoreInfoPB::BOS); |
| |
| // create instance without ram user |
| CreateInstanceRequest create_instance_req; |
| create_instance_req.set_instance_id(instance_id); |
| create_instance_req.set_user_id("test_user"); |
| create_instance_req.set_name("test_name"); |
| create_instance_req.mutable_obj_info()->CopyFrom(obj); |
| CreateInstanceResponse create_instance_res; |
| meta_service->create_instance(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &create_instance_req, &create_instance_res, nullptr); |
| ASSERT_EQ(create_instance_res.status().code(), MetaServiceCode::OK); |
| |
| // alter ram user |
| RamUserPB ram_user; |
| ram_user.set_user_id("test_user_id"); |
| ram_user.set_ak("test_ak"); |
| ram_user.set_sk("test_sk"); |
| AlterRamUserRequest alter_ram_user_request; |
| alter_ram_user_request.set_instance_id(instance_id); |
| alter_ram_user_request.mutable_ram_user()->CopyFrom(ram_user); |
| AlterRamUserResponse alter_ram_user_response; |
| clear_memkv_count_bytes(mem_kv.get()); |
| meta_service->alter_ram_user(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &alter_ram_user_request, &alter_ram_user_response, nullptr); |
| |
| LOG(INFO) << "AlterRamUser: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_alter_ram_user_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_alter_ram_user_put_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_alter_ram_user_get_counter.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_alter_ram_user_put_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // begin_copy |
| TEST(RpcKvBvarTest, BeginCopy) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| auto cloud_unique_id = "test_cloud_unique_id"; |
| auto stage_id = "test_stage_id"; |
| int64_t table_id = 100; |
| std::string instance_id = "copy_job_test_instance_id"; |
| |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| |
| // generate a begin copy request |
| BeginCopyRequest begin_copy_request; |
| begin_copy_request.set_cloud_unique_id(cloud_unique_id); |
| begin_copy_request.set_stage_id(stage_id); |
| begin_copy_request.set_stage_type(StagePB::EXTERNAL); |
| begin_copy_request.set_table_id(table_id); |
| begin_copy_request.set_copy_id("test_copy_id"); |
| begin_copy_request.set_group_id(0); |
| begin_copy_request.set_start_time_ms(200); |
| begin_copy_request.set_timeout_time_ms(300); |
| for (int i = 0; i < 20; ++i) { |
| ObjectFilePB object_file_pb; |
| object_file_pb.set_relative_path("obj_" + std::to_string(i)); |
| object_file_pb.set_etag("obj_" + std::to_string(i) + "_etag"); |
| begin_copy_request.add_object_files()->CopyFrom(object_file_pb); |
| } |
| BeginCopyResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->begin_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &begin_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "BeginCopy: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_begin_copy_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_begin_copy_put_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_begin_copy_get_counter.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_begin_copy_put_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_copy_job |
| TEST(RpcKvBvarTest, GetCopyJob) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| const char* cloud_unique_id = "test_cloud_unique_id"; |
| const char* stage_id = "test_stage_id"; |
| int64_t table_id = 100; |
| std::string instance_id = "copy_job_test_instance_id"; |
| |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| { |
| // generate a begin copy request |
| BeginCopyRequest begin_copy_request; |
| begin_copy_request.set_cloud_unique_id(cloud_unique_id); |
| begin_copy_request.set_stage_id(stage_id); |
| begin_copy_request.set_stage_type(StagePB::EXTERNAL); |
| begin_copy_request.set_table_id(table_id); |
| begin_copy_request.set_copy_id("test_copy_id"); |
| begin_copy_request.set_group_id(0); |
| begin_copy_request.set_start_time_ms(200); |
| begin_copy_request.set_timeout_time_ms(300); |
| for (int i = 0; i < 20; ++i) { |
| ObjectFilePB object_file_pb; |
| object_file_pb.set_relative_path("obj_" + std::to_string(i)); |
| object_file_pb.set_etag("obj_" + std::to_string(i) + "_etag"); |
| begin_copy_request.add_object_files()->CopyFrom(object_file_pb); |
| } |
| BeginCopyResponse res; |
| meta_service->begin_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &begin_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| GetCopyJobRequest get_copy_job_request; |
| get_copy_job_request.set_cloud_unique_id(cloud_unique_id); |
| get_copy_job_request.set_stage_id(stage_id); |
| get_copy_job_request.set_table_id(table_id); |
| get_copy_job_request.set_copy_id("test_copy_id"); |
| get_copy_job_request.set_group_id(0); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| GetCopyJobResponse res; |
| meta_service->get_copy_job(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &get_copy_job_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "GetCopyJob: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_copy_job_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_copy_job_get_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // finish_copy |
| TEST(RpcKvBvarTest, FinishCopy) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| const char* cloud_unique_id = "test_cloud_unique_id"; |
| const char* stage_id = "test_stage_id"; |
| int64_t table_id = 100; |
| std::string instance_id = "copy_job_test_instance_id"; |
| |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| { |
| // generate a begin copy request |
| BeginCopyRequest begin_copy_request; |
| begin_copy_request.set_cloud_unique_id(cloud_unique_id); |
| begin_copy_request.set_stage_id(stage_id); |
| begin_copy_request.set_stage_type(StagePB::EXTERNAL); |
| begin_copy_request.set_table_id(table_id); |
| begin_copy_request.set_copy_id("test_copy_id"); |
| begin_copy_request.set_group_id(0); |
| begin_copy_request.set_start_time_ms(200); |
| begin_copy_request.set_timeout_time_ms(300); |
| for (int i = 0; i < 20; ++i) { |
| ObjectFilePB object_file_pb; |
| object_file_pb.set_relative_path("obj_" + std::to_string(i)); |
| object_file_pb.set_etag("obj_" + std::to_string(i) + "_etag"); |
| begin_copy_request.add_object_files()->CopyFrom(object_file_pb); |
| } |
| BeginCopyResponse res; |
| meta_service->begin_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &begin_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| FinishCopyRequest finish_copy_request; |
| finish_copy_request.set_cloud_unique_id(cloud_unique_id); |
| finish_copy_request.set_stage_id(stage_id); |
| finish_copy_request.set_stage_type(StagePB::EXTERNAL); |
| finish_copy_request.set_table_id(table_id); |
| finish_copy_request.set_copy_id("test_copy_id"); |
| finish_copy_request.set_group_id(0); |
| finish_copy_request.set_action(FinishCopyRequest::COMMIT); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| FinishCopyResponse res; |
| meta_service->finish_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &finish_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "FinishCopy: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_finish_copy_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_finish_copy_put_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_finish_copy_del_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_finish_copy_get_counter.get({instance_id})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_finish_copy_put_counter.get({instance_id})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_finish_copy_del_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_copy_files |
| TEST(RpcKvBvarTest, GetCopyFiles) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| auto cloud_unique_id = "test_cloud_unique_id"; |
| auto stage_id = "test_stage_id"; |
| int64_t table_id = 100; |
| std::string instance_id = "copy_job_test_instance_id"; |
| |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| { |
| // generate a begin copy request |
| BeginCopyRequest begin_copy_request; |
| begin_copy_request.set_cloud_unique_id(cloud_unique_id); |
| begin_copy_request.set_stage_id(stage_id); |
| begin_copy_request.set_stage_type(StagePB::EXTERNAL); |
| begin_copy_request.set_table_id(table_id); |
| begin_copy_request.set_copy_id("test_copy_id"); |
| begin_copy_request.set_group_id(0); |
| begin_copy_request.set_start_time_ms(200); |
| begin_copy_request.set_timeout_time_ms(300); |
| for (int i = 0; i < 20; ++i) { |
| ObjectFilePB object_file_pb; |
| object_file_pb.set_relative_path("obj_" + std::to_string(i)); |
| object_file_pb.set_etag("obj_" + std::to_string(i) + "_etag"); |
| begin_copy_request.add_object_files()->CopyFrom(object_file_pb); |
| } |
| BeginCopyResponse res; |
| meta_service->begin_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &begin_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| GetCopyFilesRequest get_copy_file_req; |
| get_copy_file_req.set_cloud_unique_id(cloud_unique_id); |
| get_copy_file_req.set_stage_id(stage_id); |
| get_copy_file_req.set_table_id(table_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| GetCopyFilesResponse res; |
| meta_service->get_copy_files(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &get_copy_file_req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "GetCopyFiles: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_copy_files_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_get_copy_files_get_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // filter_copy_files |
| TEST(RpcKvBvarTest, FilterCopyFiles) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| brpc::Controller cntl; |
| auto cloud_unique_id = "test_cloud_unique_id"; |
| auto stage_id = "test_stage_id"; |
| int64_t table_id = 100; |
| std::string instance_id = "copy_job_test_instance_id"; |
| |
| [[maybe_unused]] auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("get_instance_id", [&](auto&& args) { |
| auto* ret = try_any_cast_ret<std::string>(args); |
| ret->first = instance_id; |
| ret->second = true; |
| }); |
| sp->enable_processing(); |
| { |
| // generate a begin copy request |
| BeginCopyRequest begin_copy_request; |
| begin_copy_request.set_cloud_unique_id(cloud_unique_id); |
| begin_copy_request.set_stage_id(stage_id); |
| begin_copy_request.set_stage_type(StagePB::EXTERNAL); |
| begin_copy_request.set_table_id(table_id); |
| begin_copy_request.set_copy_id("test_copy_id"); |
| begin_copy_request.set_group_id(0); |
| begin_copy_request.set_start_time_ms(200); |
| begin_copy_request.set_timeout_time_ms(300); |
| for (int i = 0; i < 20; ++i) { |
| ObjectFilePB object_file_pb; |
| object_file_pb.set_relative_path("obj_" + std::to_string(i)); |
| object_file_pb.set_etag("obj_" + std::to_string(i) + "_etag"); |
| begin_copy_request.add_object_files()->CopyFrom(object_file_pb); |
| } |
| BeginCopyResponse res; |
| meta_service->begin_copy(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &begin_copy_request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| FilterCopyFilesRequest request; |
| request.set_cloud_unique_id(cloud_unique_id); |
| request.set_stage_id(stage_id); |
| request.set_table_id(table_id); |
| for (int i = 0; i < 10; ++i) { |
| ObjectFilePB object_file; |
| object_file.set_relative_path("file" + std::to_string(i)); |
| object_file.set_etag("etag" + std::to_string(i)); |
| request.add_object_files()->CopyFrom(object_file); |
| } |
| FilterCopyFilesResponse res; |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->filter_copy_files(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &request, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| |
| LOG(INFO) << "FilterCopyFiles: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_filter_copy_files_get_bytes.get({instance_id})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_filter_copy_files_get_counter.get({instance_id})); |
| sp->clear_all_call_backs(); |
| sp->clear_trace(); |
| sp->disable_processing(); |
| } |
| |
| // get_cluster_status |
| TEST(RpcKvBvarTest, GetClusterStatus) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| InstanceKeyInfo key_info {mock_instance}; |
| std::string key; |
| std::string val; |
| instance_key(key_info, &key); |
| |
| InstanceInfoPB instance; |
| instance.set_instance_id(mock_instance); |
| ClusterPB c1; |
| c1.set_type(ClusterPB::COMPUTE); |
| c1.set_cluster_name(mock_cluster_name); |
| c1.set_cluster_id(mock_cluster_id); |
| c1.add_mysql_user_name()->append("m1"); |
| c1.set_cluster_status(ClusterStatus::NORMAL); |
| ClusterPB c2; |
| c2.set_type(ClusterPB::COMPUTE); |
| c2.set_cluster_name(mock_cluster_name + "2"); |
| c2.set_cluster_id(mock_cluster_id + "2"); |
| c2.add_mysql_user_name()->append("m2"); |
| c2.set_cluster_status(ClusterStatus::SUSPENDED); |
| ClusterPB c3; |
| c3.set_type(ClusterPB::COMPUTE); |
| c3.set_cluster_name(mock_cluster_name + "3"); |
| c3.set_cluster_id(mock_cluster_id + "3"); |
| c3.add_mysql_user_name()->append("m3"); |
| c3.set_cluster_status(ClusterStatus::TO_RESUME); |
| instance.add_clusters()->CopyFrom(c1); |
| instance.add_clusters()->CopyFrom(c2); |
| instance.add_clusters()->CopyFrom(c3); |
| val = instance.SerializeAsString(); |
| |
| std::unique_ptr<Transaction> txn; |
| std::string get_val; |
| TxnErrorCode err = meta_service->txn_kv()->create_txn(&txn); |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| txn->put(key, val); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| brpc::Controller cntl; |
| GetClusterStatusRequest req; |
| req.add_instance_ids(mock_instance); |
| GetClusterStatusResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_cluster_status(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.details().at(0).clusters().size(), 3); |
| |
| LOG(INFO) << "GetClusterStatus: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_get_cluster_status_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_get_cluster_status_get_counter.get({mock_instance})); |
| } |
| |
| // get_current_max_txn_id |
| TEST(RpcKvBvarTest, GetCurrentMaxTxnId) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| const int64_t db_id = 123; |
| const std::string label = "test_label123"; |
| const std::string cloud_unique_id = "test_cloud_unique_id"; |
| |
| brpc::Controller begin_txn_cntl; |
| BeginTxnRequest begin_txn_req; |
| BeginTxnResponse begin_txn_res; |
| TxnInfoPB txn_info_pb; |
| |
| begin_txn_req.set_cloud_unique_id(cloud_unique_id); |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(12345); |
| txn_info_pb.set_timeout_ms(36000); |
| begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), |
| &begin_txn_req, &begin_txn_res, nullptr); |
| ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK); |
| |
| brpc::Controller max_txn_id_cntl; |
| GetCurrentMaxTxnRequest max_txn_id_req; |
| GetCurrentMaxTxnResponse max_txn_id_res; |
| |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(mem_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put("schema change", "val"); |
| EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| max_txn_id_req.set_cloud_unique_id(cloud_unique_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->get_current_max_txn_id( |
| reinterpret_cast<::google::protobuf::RpcController*>(&max_txn_id_cntl), &max_txn_id_req, |
| &max_txn_id_res, nullptr); |
| |
| LOG(INFO) << "GetCurrentMaxTxnId: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_get_current_max_txn_id_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_get_current_max_txn_id_get_counter.get({mock_instance})); |
| } |
| |
| // begin_sub_txn |
| TEST(RpcKvBvarTest, BeginSubTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 98131; |
| int64_t txn_id = -1; |
| int64_t t1 = 10; |
| int64_t t1_index = 100; |
| int64_t t1_p1 = 11; |
| int64_t t1_p1_t1 = 12; |
| int64_t t1_p1_t2 = 13; |
| int64_t t1_p2 = 14; |
| int64_t t1_p2_t1 = 15; |
| int64_t t2 = 16; |
| std::string label = "test_label"; |
| std::string label2 = "test_label_0"; |
| // begin txn |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(t1); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet: for sub_txn1 |
| int64_t sub_txn_id1 = txn_id; |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1, sub_txn_id1); |
| |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(0); |
| req.set_db_id(db_id); |
| req.set_label(label2); |
| req.mutable_table_ids()->Add(t1); |
| req.mutable_table_ids()->Add(t2); |
| BeginSubTxnResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "BeginSubTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_begin_sub_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_begin_sub_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_begin_sub_txn_del_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_begin_sub_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_begin_sub_txn_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_begin_sub_txn_del_counter.get({mock_instance})); |
| } |
| |
| // abort_sub_txn |
| TEST(RpcKvBvarTest, AbortSubTxn) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 98131; |
| int64_t txn_id = -1; |
| int64_t t1 = 10; |
| int64_t t1_index = 100; |
| int64_t t1_p1 = 11; |
| int64_t t1_p1_t1 = 12; |
| int64_t t1_p1_t2 = 13; |
| int64_t t1_p2 = 14; |
| int64_t t1_p2_t1 = 15; |
| int64_t t2 = 16; |
| std::string label = "test_label"; |
| std::string label2 = "test_label_0"; |
| // begin txn |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(t1); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet: for sub_txn1 |
| int64_t sub_txn_id1 = txn_id; |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1, sub_txn_id1); |
| brpc::Controller cntl; |
| { |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(0); |
| req.set_db_id(db_id); |
| req.set_label(label2); |
| req.mutable_table_ids()->Add(t1); |
| req.mutable_table_ids()->Add(t2); |
| BeginSubTxnResponse res; |
| |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| } |
| |
| AbortSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(2); |
| req.set_sub_txn_id(sub_txn_id1); |
| req.set_db_id(db_id); |
| req.mutable_table_ids()->Add(t1); |
| req.mutable_table_ids()->Add(t2); |
| AbortSubTxnResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->abort_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "AbortSubTxn: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_abort_sub_txn_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_abort_sub_txn_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_abort_sub_txn_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_abort_sub_txn_put_counter.get({mock_instance})); |
| } |
| |
| // abort_txn_with_coordinator |
| TEST(RpcKvBvarTest, AbortTxnWithCoordinator) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| const int64_t db_id = 666; |
| const int64_t table_id = 777; |
| const std::string label = "test_label"; |
| const std::string cloud_unique_id = "test_cloud_unique_id"; |
| const int64_t coordinator_id = 15623; |
| int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>( |
| std::chrono::steady_clock::now().time_since_epoch()) |
| .count(); |
| std::string host = "127.0.0.1:15586"; |
| int64_t txn_id = -1; |
| |
| brpc::Controller begin_txn_cntl; |
| BeginTxnRequest begin_txn_req; |
| BeginTxnResponse begin_txn_res; |
| TxnInfoPB txn_info_pb; |
| TxnCoordinatorPB coordinator; |
| |
| begin_txn_req.set_cloud_unique_id(cloud_unique_id); |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| coordinator.set_id(coordinator_id); |
| coordinator.set_ip(host); |
| coordinator.set_sourcetype(::doris::cloud::TxnSourceTypePB::TXN_SOURCE_TYPE_BE); |
| coordinator.set_start_time(cur_time); |
| txn_info_pb.mutable_coordinator()->CopyFrom(coordinator); |
| begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), |
| &begin_txn_req, &begin_txn_res, nullptr); |
| ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK); |
| txn_id = begin_txn_res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| |
| brpc::Controller abort_txn_cntl; |
| AbortTxnWithCoordinatorRequest abort_txn_req; |
| AbortTxnWithCoordinatorResponse abort_txn_resp; |
| |
| abort_txn_req.set_id(coordinator_id); |
| abort_txn_req.set_ip(host); |
| abort_txn_req.set_start_time(cur_time + 3600); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->abort_txn_with_coordinator( |
| reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), &abort_txn_req, |
| &abort_txn_resp, nullptr); |
| |
| LOG(INFO) << "AbortTxnWithCoordinator: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ |
| << ", " << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " |
| << mem_kv->put_bytes_ << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, |
| g_bvar_rpc_kv_abort_txn_with_coordinator_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_abort_txn_with_coordinator_get_counter.get({mock_instance})); |
| } |
| |
| // check_txn_conflict |
| TEST(RpcKvBvarTest, CheckTxnConflict) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| |
| const int64_t db_id = 666; |
| const int64_t table_id = 777; |
| const std::string label = "test_label"; |
| const std::string cloud_unique_id = "test_cloud_unique_id"; |
| int64_t txn_id = -1; |
| |
| brpc::Controller begin_txn_cntl; |
| BeginTxnRequest begin_txn_req; |
| BeginTxnResponse begin_txn_res; |
| TxnInfoPB txn_info_pb; |
| |
| begin_txn_req.set_cloud_unique_id(cloud_unique_id); |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| begin_txn_req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), |
| &begin_txn_req, &begin_txn_res, nullptr); |
| ASSERT_EQ(begin_txn_res.status().code(), MetaServiceCode::OK); |
| txn_id = begin_txn_res.txn_id(); |
| ASSERT_GT(txn_id, -1); |
| |
| brpc::Controller check_txn_conflict_cntl; |
| CheckTxnConflictRequest check_txn_conflict_req; |
| CheckTxnConflictResponse check_txn_conflict_res; |
| |
| check_txn_conflict_req.set_cloud_unique_id(cloud_unique_id); |
| check_txn_conflict_req.set_db_id(db_id); |
| check_txn_conflict_req.set_end_txn_id(txn_id + 1); |
| check_txn_conflict_req.add_table_ids(table_id); |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->check_txn_conflict( |
| reinterpret_cast<::google::protobuf::RpcController*>(&begin_txn_cntl), |
| &check_txn_conflict_req, &check_txn_conflict_res, nullptr); |
| |
| LOG(INFO) << "CheckTxnConflict: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_check_txn_conflict_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_count_, |
| g_bvar_rpc_kv_check_txn_conflict_get_counter.get({mock_instance})); |
| } |
| |
| // clean_txn_label |
| TEST(RpcKvBvarTest, CleanTxnLabel) { |
| auto syncpoint_guard = setup_notify_refresh_syncpoint(); |
| auto meta_service = get_meta_service(); |
| auto mem_kv = std::dynamic_pointer_cast<MemTxnKv>(meta_service->txn_kv()); |
| int64_t db_id = 1987211; |
| const std::string& label = "test_clean_label"; |
| brpc::Controller cntl; |
| { |
| BeginTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(1234); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| } |
| CleanTxnLabelRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.add_labels(label); |
| CleanTxnLabelResponse res; |
| |
| clear_memkv_count_bytes(mem_kv.get()); |
| |
| meta_service->clean_txn_label(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| |
| LOG(INFO) << "CleanTxnLabel: " << mem_kv->get_count_ << ", " << mem_kv->put_count_ << ", " |
| << mem_kv->del_count_ << ", " << mem_kv->get_bytes_ << ", " << mem_kv->put_bytes_ |
| << ", " << mem_kv->del_bytes_; |
| ASSERT_EQ(mem_kv->get_count_, g_bvar_rpc_kv_clean_txn_label_get_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_count_, g_bvar_rpc_kv_clean_txn_label_put_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_count_, g_bvar_rpc_kv_clean_txn_label_del_counter.get({mock_instance})); |
| ASSERT_EQ(mem_kv->get_bytes_, g_bvar_rpc_kv_clean_txn_label_get_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->put_bytes_, g_bvar_rpc_kv_clean_txn_label_put_bytes.get({mock_instance})); |
| ASSERT_EQ(mem_kv->del_bytes_, g_bvar_rpc_kv_clean_txn_label_del_bytes.get({mock_instance})); |
| } |
| } // namespace doris::cloud |