| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/controller.h> |
| #include <bvar/window.h> |
| #include <fmt/core.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <google/protobuf/repeated_field.h> |
| #include <gtest/gtest.h> |
| |
| #include <atomic> |
| #include <condition_variable> |
| #include <cstdint> |
| #include <memory> |
| #include <random> |
| #include <string> |
| #include <thread> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/util.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/meta_service.h" |
| #include "meta-service/meta_service_helper.h" |
| #include "meta-store/document_message.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/mem_txn_kv.h" |
| #include "meta-store/meta_reader.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versioned_value.h" |
| #include "mock_resource_manager.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "resource-manager/resource_manager.h" |
| |
| namespace doris::cloud { |
| |
| // External functions from meta_service_test.cpp |
| extern std::unique_ptr<MetaServiceProxy> get_meta_service(); |
| extern std::unique_ptr<MetaServiceProxy> get_meta_service(bool mock_resource_mgr); |
| extern void create_tablet(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id); |
| extern doris::RowsetMetaCloudPB create_rowset(int64_t txn_id, int64_t tablet_id, int partition_id, |
| int64_t version, int num_rows); |
| extern void commit_rowset(MetaServiceProxy* meta_service, const doris::RowsetMetaCloudPB& rowset, |
| CreateRowsetResponse& res); |
| extern void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const std::string& label, |
| int64_t table_id, int64_t partition_id, int64_t tablet_id); |
| extern void add_tablet(CreateTabletsRequest& req, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id); |
| extern void get_tablet_stats(MetaServiceProxy* meta_service, int64_t table_id, int64_t index_id, |
| int64_t partition_id, int64_t tablet_id, GetTabletStatsResponse& res); |
| extern void create_and_commit_rowset(MetaServiceProxy* meta_service, int64_t table_id, |
| int64_t index_id, int64_t partition_id, int64_t tablet_id, |
| int64_t txn_id); |
| |
| void insert_compact_rowset(Transaction* txn, std::string instance_id, int64_t tablet_id, |
| int64_t partition_id, int64_t start_version, int64_t end_version, |
| int num_rows) { |
| doris::RowsetMetaCloudPB compact_rowset = |
| create_rowset(1, tablet_id, partition_id, start_version, num_rows); |
| compact_rowset.set_end_version(end_version); |
| std::string key = versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version}); |
| ASSERT_TRUE(versioned::document_put(txn, key, std::move(compact_rowset))); |
| } |
| |
| void update_tablet_compact_stats(Transaction* txn, std::string instance_id, int64_t tablet_id, |
| int start_version, int end_version, int num_rows) { |
| MetaReader reader(instance_id, nullptr); |
| std::vector<RowsetMetaCloudPB> rowset_metas; |
| ASSERT_EQ(reader.get_rowset_metas(txn, tablet_id, start_version, end_version, &rowset_metas), |
| TxnErrorCode::TXN_OK); |
| |
| int input_data_size = 0; |
| int input_num_rows = 0; |
| int input_num_rowsets = 0; |
| int input_num_segments = 0; |
| int input_index_size = 0; |
| int input_segment_size = 0; |
| for (const auto& rowset_meta : rowset_metas) { |
| input_data_size += rowset_meta.total_disk_size(); |
| input_num_rows += rowset_meta.num_rows(); |
| input_num_rowsets += 1; // Each rowset is considered as one rowset |
| input_num_segments += rowset_meta.num_segments(); |
| input_index_size += rowset_meta.index_disk_size(); |
| input_segment_size += rowset_meta.data_disk_size(); |
| } |
| |
| std::string key = versioned::tablet_compact_stats_key({instance_id, tablet_id}); |
| TabletStatsPB stats; |
| ASSERT_EQ(versioned::document_get(txn, key, &stats, nullptr), TxnErrorCode::TXN_OK); |
| |
| // See create_rowset() for the calculation of data size |
| stats.set_data_size(stats.data_size() + num_rows * 110 - input_data_size); |
| stats.set_num_rows(stats.num_rows() + num_rows - input_num_rows); |
| stats.set_num_rowsets(stats.num_rowsets() + 1 - input_num_rowsets); |
| stats.set_num_segments(stats.num_segments() + 1 - input_num_segments); |
| stats.set_index_size(stats.index_size() + num_rows * 10 - input_index_size); |
| stats.set_segment_size(stats.segment_size() + num_rows * 100 - input_segment_size); |
| ASSERT_TRUE(versioned::document_put(txn, key, std::move(stats))); |
| } |
| |
| void compact_rowset(TxnKv* txn_kv, std::string instance_id, int64_t tablet_id, int64_t partition_id, |
| int64_t start_version, int64_t end_version, int num_rows) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, start_version, |
| end_version, num_rows); |
| update_tablet_compact_stats(txn.get(), instance_id, tablet_id, start_version, end_version, |
| num_rows); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create a MULTI_VERSION_READ_WRITE instance and refresh the resource manager. |
| static void create_and_refresh_instance(MetaServiceProxy* service, std::string instance_id) { |
| // write instance |
| InstanceInfoPB instance_info; |
| instance_info.set_instance_id(instance_id); |
| instance_info.set_multi_version_status(MULTI_VERSION_READ_WRITE); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| txn->put(instance_key(instance_id), instance_info.SerializeAsString()); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| service->resource_mgr()->refresh_instance(instance_id); |
| ASSERT_TRUE(service->resource_mgr()->is_version_write_enabled(instance_id)); |
| } |
| |
| #define MOCK_GET_INSTANCE_ID(instance_id) \ |
| DORIS_CLOUD_DEFER { \ |
| SyncPoint::get_instance()->clear_all_call_backs(); \ |
| }; \ |
| SyncPoint::get_instance()->set_call_back("get_instance_id", [&](auto&& args) { \ |
| auto* ret = try_any_cast_ret<std::string>(args); \ |
| ret->first = instance_id; \ |
| ret->second = true; \ |
| }); \ |
| SyncPoint::get_instance()->enable_processing(); |
| |
| TEST(MetaServiceVersionedReadTest, CommitTxn) { |
| auto meta_service = get_meta_service(false); |
| std::string instance_id = "test_cloud_instance_id"; |
| std::string cloud_unique_id = "1:test_cloud_unique_id:1"; |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| int64_t db_id = 666; |
| int64_t table_id = 1234; |
| int64_t index_id = 1235; |
| int64_t partition_id = 1236; |
| |
| // case: first version of rowset |
| { |
| int64_t txn_id = -1; |
| // begin txn |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label("test_label"); |
| txn_info_pb.add_table_ids(table_id); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet |
| int64_t tablet_id_base = 1103; |
| for (int i = 0; i < 5; ++i) { |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id_base + i); |
| auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i, partition_id, -1, 100); |
| CreateRowsetResponse res; |
| commit_rowset(meta_service.get(), tmp_rowset, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // precommit txn |
| { |
| brpc::Controller cntl; |
| PrecommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_precommit_timeout_ms(36000); |
| PrecommitTxnResponse res; |
| meta_service->precommit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // commit txn |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| } |
| |
| // doubly commit txn |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| auto found = res.status().msg().find(fmt::format( |
| "transaction is already visible: db_id={} txn_id={}", db_id, txn_id)); |
| ASSERT_TRUE(found != std::string::npos); |
| } |
| |
| // doubly commit txn(2pc) |
| { |
| brpc::Controller cntl; |
| CommitTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(db_id); |
| req.set_txn_id(txn_id); |
| req.set_is_2pc(true); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::TXN_ALREADY_VISIBLE); |
| auto found = res.status().msg().find( |
| fmt::format("transaction [{}] is already visible, not pre-committed.", txn_id)); |
| ASSERT_TRUE(found != std::string::npos); |
| } |
| } |
| |
| { |
| // Get the partition versions |
| brpc::Controller cntl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| GetVersionResponse res; |
| meta_service->get_version(&cntl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.version(), 2); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, CommitTxnWithSubTxnTest) { |
| auto meta_service = get_meta_service(false); |
| const std::string instance_id = "test_cloud_instance_id"; |
| const std::string cloud_unique_id = "1:test_cloud_unique_id:1"; |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| int64_t db_id = 98131; |
| int64_t txn_id = -1; |
| int64_t t1 = 10; |
| int64_t t1_index = 100; |
| int64_t t1_p1 = 11; |
| int64_t t1_p1_t1 = 12; |
| int64_t t1_p1_t2 = 13; |
| int64_t t1_p2 = 14; |
| int64_t t1_p2_t1 = 15; |
| int64_t t2 = 16; |
| int64_t t2_index = 101; |
| int64_t t2_p3 = 17; |
| int64_t t2_p3_t1 = 18; |
| [[maybe_unused]] int64_t t2_p4 = 19; |
| [[maybe_unused]] int64_t t2_p4_t1 = 20; |
| std::string label = "test_label"; |
| std::string label2 = "test_label_0"; |
| |
| // begin txn |
| { |
| brpc::Controller cntl; |
| BeginTxnRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TxnInfoPB txn_info_pb; |
| txn_info_pb.set_db_id(db_id); |
| txn_info_pb.set_label(label); |
| txn_info_pb.add_table_ids(t1); |
| txn_info_pb.set_timeout_ms(36000); |
| req.mutable_txn_info()->CopyFrom(txn_info_pb); |
| BeginTxnResponse res; |
| meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| txn_id = res.txn_id(); |
| } |
| |
| // mock rowset and tablet: for sub_txn1 |
| int64_t sub_txn_id1 = txn_id; |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id1); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p2, t1_p2_t1, sub_txn_id1); |
| |
| // begin_sub_txn2 |
| int64_t sub_txn_id2 = -1; |
| { |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(0); |
| req.set_db_id(db_id); |
| req.set_label(label2); |
| req.mutable_table_ids()->Add(t1); |
| req.mutable_table_ids()->Add(t2); |
| BeginSubTxnResponse res; |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().table_ids().size(), 2); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 1); |
| ASSERT_TRUE(res.has_sub_txn_id()); |
| sub_txn_id2 = res.sub_txn_id(); |
| ASSERT_EQ(sub_txn_id2, res.txn_info().sub_txn_ids()[0]); |
| } |
| // mock rowset and tablet: for sub_txn3 |
| create_and_commit_rowset(meta_service.get(), t2, t2_index, t2_p3, t2_p3_t1, sub_txn_id2); |
| |
| // begin_sub_txn3 |
| int64_t sub_txn_id3 = -1; |
| { |
| brpc::Controller cntl; |
| BeginSubTxnRequest req; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_txn_id(txn_id); |
| req.set_sub_txn_num(1); |
| req.set_db_id(db_id); |
| req.set_label("test_label_1"); |
| req.mutable_table_ids()->Add(t1); |
| req.mutable_table_ids()->Add(t2); |
| req.mutable_table_ids()->Add(t1); |
| BeginSubTxnResponse res; |
| meta_service->begin_sub_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), |
| &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.txn_info().table_ids().size(), 3); |
| ASSERT_EQ(res.txn_info().sub_txn_ids().size(), 2); |
| ASSERT_TRUE(res.has_sub_txn_id()); |
| sub_txn_id3 = res.sub_txn_id(); |
| ASSERT_EQ(sub_txn_id3, res.txn_info().sub_txn_ids()[1]); |
| } |
| // mock rowset and tablet: for sub_txn3 |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t1, sub_txn_id3); |
| create_and_commit_rowset(meta_service.get(), t1, t1_index, t1_p1, t1_p1_t2, sub_txn_id3); |
| |
| // commit txn |
| CommitTxnRequest req; |
| { |
| brpc::Controller cntl; |
| req.set_cloud_unique_id("test_cloud_unique_id"); |
| req.set_db_id(666); |
| req.set_txn_id(txn_id); |
| req.set_is_txn_load(true); |
| |
| SubTxnInfo sub_txn_info1; |
| sub_txn_info1.set_sub_txn_id(sub_txn_id1); |
| sub_txn_info1.set_table_id(t1); |
| sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t1); |
| sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p1_t2); |
| sub_txn_info1.mutable_base_tablet_ids()->Add(t1_p2_t1); |
| |
| SubTxnInfo sub_txn_info2; |
| sub_txn_info2.set_sub_txn_id(sub_txn_id2); |
| sub_txn_info2.set_table_id(t2); |
| sub_txn_info2.mutable_base_tablet_ids()->Add(t2_p3_t1); |
| |
| SubTxnInfo sub_txn_info3; |
| sub_txn_info3.set_sub_txn_id(sub_txn_id3); |
| sub_txn_info3.set_table_id(t1); |
| sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t1); |
| sub_txn_info3.mutable_base_tablet_ids()->Add(t1_p1_t2); |
| |
| req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info1)); |
| req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info2)); |
| req.mutable_sub_txn_infos()->Add(std::move(sub_txn_info3)); |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| // std::cout << res.DebugString() << std::endl; |
| ASSERT_EQ(res.table_ids().size(), 3); |
| for (int i = 0; i < 3; ++i) { |
| if (res.partition_ids(i) == t2_p3) { |
| ASSERT_EQ(res.table_ids(i), t2); |
| ASSERT_EQ(res.versions(i), 2); |
| } else if (res.partition_ids(i) == t1_p2) { |
| ASSERT_EQ(res.table_ids(i), t1); |
| ASSERT_EQ(res.versions(i), 2); |
| } else if (res.partition_ids(i) == t1_p1) { |
| ASSERT_EQ(res.table_ids(i), t1); |
| ASSERT_EQ(res.versions(i), 3); |
| } else { |
| ASSERT_TRUE(false) << "unknown partition: " << res.partition_ids(i) |
| << ", res=" << res.DebugString(); |
| } |
| } |
| } |
| |
| // doubly commit txn |
| { |
| brpc::Controller cntl; |
| CommitTxnResponse res; |
| meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &req, |
| &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| auto found = res.status().msg().find( |
| fmt::format("transaction is already visible: db_id={} txn_id={}", db_id, txn_id)); |
| ASSERT_TRUE(found != std::string::npos); |
| } |
| |
| // Verify the partition versions |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_batch_mode(true); |
| req.add_db_ids(db_id); |
| req.add_db_ids(db_id); |
| req.add_db_ids(db_id); |
| req.add_table_ids(t1); |
| req.add_table_ids(t1); |
| req.add_table_ids(t2); |
| req.add_partition_ids(t1_p1); |
| req.add_partition_ids(t1_p2); |
| req.add_partition_ids(t2_p3); |
| |
| GetVersionResponse resp; |
| meta_service->get_version(&ctrl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << " status is " << resp.status().ShortDebugString(); |
| ASSERT_EQ(resp.versions().size(), 3); |
| ASSERT_EQ(resp.versions()[0], 3); // t1_p1 |
| ASSERT_EQ(resp.versions()[1], 2); // t1_p2 |
| ASSERT_EQ(resp.versions()[2], 2); // t2_p3 |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, GetVersion) { |
| auto service = get_meta_service(false); |
| |
| int64_t table_id = 1; |
| int64_t partition_id = 1; |
| int64_t tablet_id = 1; |
| |
| std::string instance_id = "test_cloud_instance_id"; |
| std::string cloud_unique_id = fmt::format("1:{}:1", instance_id); |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| // INVALID_ARGUMENT |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT) |
| << " status is " << resp.status().DebugString(); |
| } |
| |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::VERSION_NOT_FOUND) |
| << " status is " << resp.status().DebugString(); |
| } |
| |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| req.set_is_table_version(true); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::VERSION_NOT_FOUND) |
| << " status is " << resp.status().DebugString(); |
| } |
| |
| create_tablet(service.get(), table_id, 1, partition_id, tablet_id); |
| insert_rowset(service.get(), 1, "get_version_label_1", table_id, partition_id, tablet_id); |
| |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << " status is " << resp.status().DebugString(); |
| ASSERT_EQ(resp.version(), 2); |
| } |
| |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(1); |
| req.set_table_id(table_id); |
| req.set_partition_id(partition_id); |
| req.set_is_table_version(true); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << " status is " << resp.status().DebugString(); |
| ASSERT_GT(resp.version(), 2); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, BatchGetVersion) { |
| struct TestCase { |
| std::vector<int64_t> table_ids; |
| std::vector<int64_t> partition_ids; |
| std::vector<int64_t> expected_versions; |
| std::vector< |
| std::tuple<int64_t /*table_id*/, int64_t /*partition_id*/, int64_t /*tablet_id*/>> |
| insert_rowsets; |
| }; |
| |
| // table ids: 2, 3, 4, 5 |
| // partition ids: 6, 7, 8, 9 |
| std::vector<TestCase> cases = { |
| // all version are missing |
| {{1, 2, 3, 4}, {6, 7, 8, 9}, {-1, -1, -1, -1}, {}}, |
| // update table 1, partition 6 |
| {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, -1, -1, -1}, {{1, 6, 1}}}, |
| // update table 2, partition 6 |
| // update table 3, partition 7 |
| {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, -1, 2, 2}, {{3, 8, 3}, {4, 9, 4}}}, |
| // update table 1, partition 7 twice |
| {{1, 2, 3, 4}, {6, 7, 8, 9}, {2, 3, 2, 2}, {{2, 7, 2}, {2, 7, 2}}}, |
| }; |
| |
| auto service = get_meta_service(false); |
| |
| std::string instance_id = "test_cloud_instance_id"; |
| std::string cloud_unique_id = fmt::format("1:{}:1", instance_id); |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| create_tablet(service.get(), 1, 1, 6, 1); |
| create_tablet(service.get(), 2, 1, 7, 2); |
| create_tablet(service.get(), 3, 1, 8, 3); |
| create_tablet(service.get(), 4, 1, 9, 4); |
| |
| size_t num_cases = cases.size(); |
| size_t label_index = 0; |
| for (size_t i = 0; i < num_cases; ++i) { |
| auto& [table_ids, partition_ids, expected_versions, insert_rowsets] = cases[i]; |
| for (auto [table_id, partition_id, tablet_id] : insert_rowsets) { |
| LOG(INFO) << "insert rowset for table " << table_id << " partition " << partition_id |
| << " table_id " << tablet_id; |
| insert_rowset(service.get(), 1, std::to_string(++label_index), table_id, partition_id, |
| tablet_id); |
| } |
| |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(-1); |
| req.set_table_id(-1); |
| req.set_partition_id(-1); |
| req.set_batch_mode(true); |
| for (size_t i = 0; i < table_ids.size(); ++i) req.add_db_ids(1); |
| std::copy(table_ids.begin(), table_ids.end(), |
| google::protobuf::RepeatedFieldBackInserter(req.mutable_table_ids())); |
| std::copy(partition_ids.begin(), partition_ids.end(), |
| google::protobuf::RepeatedFieldBackInserter(req.mutable_partition_ids())); |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << "case " << i << " status is " << resp.status().msg() |
| << ", code=" << resp.status().code(); |
| |
| std::vector<int64_t> versions(resp.versions().begin(), resp.versions().end()); |
| EXPECT_EQ(versions, expected_versions) << "case " << i; |
| |
| // Batch get table versions |
| req.set_is_table_version(true); |
| |
| resp.Clear(); |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << "case " << i << " status is " << resp.status().msg() |
| << ", code=" << resp.status().code(); |
| for (size_t j = 0; j < resp.versions_size(); ++j) { |
| if (expected_versions[j] == -1) { |
| ASSERT_LT(resp.versions(j), 0) |
| << "case " << i << ", j=" << j << ", resp=" << resp.DebugString(); |
| } else { |
| ASSERT_GT(resp.versions(j), 0) |
| << "case " << i << ", j=" << j << ", resp=" << resp.DebugString(); |
| } |
| } |
| } |
| |
| // INVALID_ARGUMENT |
| { |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_batch_mode(true); |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::INVALID_ARGUMENT) |
| << " status is " << resp.status().msg() << ", code=" << resp.status().code(); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, BatchGetVersionFallback) { |
| constexpr size_t N = 100; |
| size_t i = 0; |
| auto sp = SyncPoint::get_instance(); |
| DORIS_CLOUD_DEFER { |
| SyncPoint::get_instance()->clear_all_call_backs(); |
| }; |
| sp->set_call_back("batch_get_version_err", [&](auto&& args) { |
| if (i++ == N / 10) { |
| *try_any_cast<TxnErrorCode*>(args) = TxnErrorCode::TXN_TOO_OLD; |
| } |
| }); |
| |
| sp->enable_processing(); |
| |
| auto service = get_meta_service(false); |
| std::string instance_id = "test_cloud_instance_id"; |
| std::string cloud_unique_id = fmt::format("1:{}:1", instance_id); |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| for (int64_t i = 1; i <= N; ++i) { |
| create_tablet(service.get(), 1, 1, i, i); |
| insert_rowset(service.get(), 1, std::to_string(i), 1, i, i); |
| } |
| |
| brpc::Controller ctrl; |
| GetVersionRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_db_id(-1); |
| req.set_table_id(-1); |
| req.set_partition_id(-1); |
| req.set_batch_mode(true); |
| for (size_t i = 1; i <= N; ++i) { |
| req.add_db_ids(1); |
| req.add_table_ids(1); |
| req.add_partition_ids(i); |
| } |
| |
| GetVersionResponse resp; |
| service->get_version(&ctrl, &req, &resp, nullptr); |
| |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << "case " << i << " status is " << resp.status().msg() |
| << ", code=" << resp.status().code(); |
| |
| ASSERT_EQ(resp.versions_size(), N); |
| } |
| |
| TEST(MetaServiceVersionedReadTest, GetTablet) { |
| auto service = get_meta_service(false); |
| std::string instance_id = "test_cloud_instance_id"; |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| int64_t table_id = 2; |
| int64_t index_id = 3; |
| int64_t partition_id = 4; |
| int64_t tablet_id = 5; |
| |
| // Create tablet |
| create_tablet(service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| { |
| // Get the tablet meta |
| brpc::Controller cntl; |
| GetTabletRequest req; |
| req.set_cloud_unique_id(fmt::format("1:{}:1", instance_id)); |
| req.set_tablet_id(tablet_id); |
| GetTabletResponse resp; |
| service->get_tablet(&cntl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << " status is " << resp.status().DebugString(); |
| ASSERT_EQ(resp.tablet_meta().table_id(), table_id); |
| ASSERT_EQ(resp.tablet_meta().index_id(), index_id); |
| ASSERT_EQ(resp.tablet_meta().partition_id(), partition_id); |
| ASSERT_EQ(resp.tablet_meta().tablet_id(), tablet_id); |
| |
| // Verify the tablet schema |
| ASSERT_TRUE(resp.tablet_meta().has_schema()); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, GetTabletStats) { |
| auto meta_service = get_meta_service(false); |
| |
| std::string instance_id = "test_cloud_instance_id"; |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| constexpr auto table_id = 10001, index_id = 10002, partition_id = 10003, tablet_id = 10004; |
| ASSERT_NO_FATAL_FAILURE( |
| create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); |
| |
| { |
| GetTabletStatsResponse res; |
| get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.tablet_stats_size(), 1); |
| EXPECT_EQ(res.tablet_stats(0).data_size(), 0); |
| EXPECT_EQ(res.tablet_stats(0).num_rows(), 0); |
| EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 1); |
| EXPECT_EQ(res.tablet_stats(0).num_segments(), 0); |
| EXPECT_EQ(res.tablet_stats(0).index_size(), 0); |
| EXPECT_EQ(res.tablet_stats(0).segment_size(), 0); |
| |
| // The tablet idx should be set. |
| auto tablet_stats = res.tablet_stats(0); |
| EXPECT_EQ(tablet_stats.idx().tablet_id(), tablet_id); |
| EXPECT_EQ(tablet_stats.idx().index_id(), index_id); |
| EXPECT_EQ(tablet_stats.idx().partition_id(), partition_id); |
| EXPECT_EQ(tablet_stats.idx().table_id(), table_id); |
| } |
| |
| { |
| // Insert rowset |
| config::split_tablet_stats = false; |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label1", table_id, |
| partition_id, tablet_id)); |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label2", table_id, |
| partition_id, tablet_id)); |
| config::split_tablet_stats = true; |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label3", table_id, |
| partition_id, tablet_id)); |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label4", table_id, |
| partition_id, tablet_id)); |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label5", table_id, |
| partition_id, tablet_id)); |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label6", table_id, |
| partition_id, tablet_id)); |
| ASSERT_NO_FATAL_FAILURE(insert_rowset(meta_service.get(), 10000, "label7", table_id, |
| partition_id, tablet_id)); |
| } |
| |
| { |
| GetTabletStatsResponse res; |
| get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.tablet_stats_size(), 1); |
| EXPECT_EQ(res.tablet_stats(0).data_size(), 77000); |
| EXPECT_EQ(res.tablet_stats(0).num_rows(), 700); |
| EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 8); |
| EXPECT_EQ(res.tablet_stats(0).num_segments(), 7); |
| EXPECT_EQ(res.tablet_stats(0).index_size(), 7000); |
| EXPECT_EQ(res.tablet_stats(0).segment_size(), 70000); |
| } |
| |
| { |
| // Compact some rowsets, the rows is not changed |
| auto txn_kv = meta_service->txn_kv(); |
| compact_rowset(txn_kv.get(), instance_id, tablet_id, partition_id, 2, 4, 300); |
| compact_rowset(txn_kv.get(), instance_id, tablet_id, partition_id, 6, 7, 200); |
| } |
| |
| { |
| // rowset.set_num_segments(1); |
| // rowset.set_num_rows(num_rows); |
| // rowset.set_data_disk_size(num_rows * 100); |
| // rowset.set_index_disk_size(num_rows * 10); |
| // rowset.set_total_disk_size(num_rows * 110); |
| GetTabletStatsResponse res; |
| get_tablet_stats(meta_service.get(), table_id, index_id, partition_id, tablet_id, res); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(res.tablet_stats_size(), 1); |
| EXPECT_EQ(res.tablet_stats(0).data_size(), 77000); |
| EXPECT_EQ(res.tablet_stats(0).num_rows(), 700); |
| EXPECT_EQ(res.tablet_stats(0).num_rowsets(), 5); // 0-1, 2-4, 5, 6-7, 8 |
| EXPECT_EQ(res.tablet_stats(0).num_segments(), 4); |
| EXPECT_EQ(res.tablet_stats(0).index_size(), 7000); |
| EXPECT_EQ(res.tablet_stats(0).segment_size(), 70000); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, GetRowsetMetas) { |
| auto service = get_meta_service(false); |
| std::string instance_id = "test_cloud_instance_id"; |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| int64_t db_id = 1; |
| int64_t table_id = 2; |
| int64_t index_id = 3; |
| int64_t partition_id = 4; |
| int64_t tablet_id = 5; |
| |
| // Create tablet |
| create_tablet(service.get(), table_id, index_id, partition_id, tablet_id); |
| |
| // Helper function to get rowsets using MetaService get_rowset interface |
| auto get_rowsets = [&](int64_t start_version, int64_t end_version) -> GetRowsetResponse { |
| brpc::Controller cntl; |
| GetRowsetRequest req; |
| auto* tablet_idx = req.mutable_idx(); |
| tablet_idx->set_db_id(db_id); |
| tablet_idx->set_table_id(table_id); |
| tablet_idx->set_index_id(index_id); |
| tablet_idx->set_partition_id(partition_id); |
| tablet_idx->set_tablet_id(tablet_id); |
| req.set_start_version(start_version); |
| req.set_end_version(end_version); |
| req.set_cumulative_compaction_cnt(0); |
| req.set_base_compaction_cnt(0); |
| req.set_cumulative_point(2); |
| |
| GetRowsetResponse resp; |
| service->get_rowset(&cntl, &req, &resp, nullptr); |
| return resp; |
| }; |
| |
| // Test 1: Contains the first rowset |
| { |
| auto resp = get_rowsets(1, 10); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << resp.status().DebugString(); |
| ASSERT_EQ(resp.rowset_meta_size(), 1); |
| EXPECT_TRUE(resp.has_partition_max_version()); |
| EXPECT_EQ(resp.partition_max_version(), -1); // The first version is special |
| } |
| |
| // Step 1: Insert loading rowsets with versions 2, 3, 4, 5, 6, 7, 8, 9, 10 |
| LOG(INFO) << "Inserting loading rowsets for versions 2-10"; |
| for (int64_t version = 2; version <= 10; ++version) { |
| insert_rowset(service.get(), 1, fmt::format("load_label_{}", version), table_id, |
| partition_id, tablet_id); |
| } |
| |
| // Test 2: Get all loading rowsets |
| { |
| auto resp = get_rowsets(1, 10); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << resp.status().DebugString(); |
| ASSERT_EQ(resp.rowset_meta_size(), 10); |
| EXPECT_TRUE(resp.has_partition_max_version()); |
| EXPECT_EQ(resp.partition_max_version(), 10); |
| |
| // Verify rowsets are sorted by version |
| for (int i = 0; i < resp.rowset_meta_size(); ++i) { |
| const auto& meta = resp.rowset_meta(i); |
| ASSERT_EQ(meta.end_version(), i + 1) << meta.DebugString(); |
| } |
| } |
| |
| // Step 2: Insert compact rowsets [3-5] and [9-10] |
| LOG(INFO) << "Inserting compact rowsets [3-5] and [9-10]"; |
| |
| // Create compact rowset [3-4] |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 3, 4, 300); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create compact rowset [3-5] |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 3, 5, 300); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Create compact rowset [9-10] |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); |
| insert_compact_rowset(txn.get(), instance_id, tablet_id, partition_id, 9, 10, 190); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| // Test 3: Verify compact rowsets override load rowsets |
| { |
| auto resp = get_rowsets(1, 10); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| |
| // 1, 2, [3-5], 6, 7, 8, [9-10] |
| ASSERT_EQ(resp.rowset_meta_size(), 7) << resp.ShortDebugString(); |
| |
| // Expected sequence: v1, v2, [3-5], v6, v7, v8, [9-10] |
| std::vector<std::pair<int64_t, int64_t>> expected_versions = { |
| {0, 1}, {2, 2}, {3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 10}}; |
| |
| for (int i = 0; i < resp.rowset_meta_size(); ++i) { |
| const auto& meta = resp.rowset_meta(i); |
| ASSERT_EQ(meta.start_version(), expected_versions[i].first) |
| << "Rowset " << i << " start_version mismatch"; |
| ASSERT_EQ(meta.end_version(), expected_versions[i].second) |
| << "Rowset " << i << " end_version mismatch"; |
| } |
| } |
| |
| // Test 4: Boundary cases - query ranges that test boundaries |
| struct TestCase { |
| int64_t start_version; |
| int64_t end_version; |
| std::vector<std::pair<int64_t, int64_t>> expected_ranges; |
| std::string description; |
| }; |
| |
| std::vector<TestCase> test_cases = { |
| // Test exact version match |
| {1, 1, {{0, 1}}, "Single version 1"}, |
| {2, 2, {{2, 2}}, "Single version 2"}, |
| |
| // Test range before compact |
| {1, 2, {{0, 1}, {2, 2}}, "Range [1-2] before compact"}, |
| |
| // Test range ending at compact boundary |
| {2, 5, {{2, 2}, {3, 5}}, "Range [2-5] ending at compact boundary"}, |
| |
| // Test range starting at compact boundary |
| {3, 6, {{3, 5}, {6, 6}}, "Range [3-6] starting at compact boundary"}, |
| |
| // Test exact compact range |
| {3, 5, {{3, 5}}, "Exact compact range [3-5]"}, |
| {9, 10, {{9, 10}}, "Exact compact range [9-10]"}, |
| |
| // Test range spanning compact |
| {2, 6, {{2, 2}, {3, 5}, {6, 6}}, "Range [2-6] spanning compact"}, |
| |
| // Test range between compacts |
| {6, 8, {{6, 6}, {7, 7}, {8, 8}}, "Range [6-8] between compacts"}, |
| |
| // Test range crossing second compact |
| {8, 10, {{8, 8}, {9, 10}}, "Range [8-10] crossing second compact"}, |
| |
| // Test partial compact overlap |
| {4, 6, {{3, 5}, {6, 6}}, "Range [4-6] partial compact overlap"}, |
| {9, 10, {{9, 10}}, "Range [9-10] contains last version"}, |
| |
| // Test edge cases |
| {5, |
| 10, |
| {{3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 10}}, |
| "Range [5-10] crossing both compacts"}, |
| {5, 9, {{3, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, "Range [5-9] includes first compact"}, |
| {3, 4, {{3, 4}}, "Range [3-4] within first compact"}, |
| }; |
| |
| for (const auto& test_case : test_cases) { |
| LOG(INFO) << "Testing: " << test_case.description; |
| auto resp = get_rowsets(test_case.start_version, test_case.end_version); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) |
| << "Failed for " << test_case.description; |
| |
| ASSERT_EQ(resp.rowset_meta_size(), test_case.expected_ranges.size()) |
| << "Rowset count mismatch for " << test_case.description << ". Got " |
| << resp.rowset_meta_size() << " rowsets, expected " |
| << test_case.expected_ranges.size(); |
| |
| for (int i = 0; i < resp.rowset_meta_size(); ++i) { |
| const auto& meta = resp.rowset_meta(i); |
| const auto& expected = test_case.expected_ranges[i]; |
| ASSERT_EQ(meta.start_version(), expected.first) |
| << "Start version mismatch for " << test_case.description << " at index " << i; |
| ASSERT_EQ(meta.end_version(), expected.second) |
| << "End version mismatch for " << test_case.description << " at index " << i |
| << " \nRowsetMeta: " << meta.ShortDebugString() |
| << " \nResponse: " << resp.ShortDebugString(); |
| } |
| } |
| |
| // Test 5: Test empty ranges |
| { |
| auto resp = get_rowsets(11, 15); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| ASSERT_EQ(resp.rowset_meta_size(), 0) << "Should return empty for non-existent versions"; |
| } |
| |
| LOG(INFO) << "GetRowsetMetas test completed successfully"; |
| } |
| |
| TEST(MetaServiceVersionedReadTest, UpdateTablet) { |
| auto service = get_meta_service(false); |
| std::string instance_id = "test_cloud_instance_id"; |
| std::string cloud_unique_id = fmt::format("1:{}:1", instance_id); |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(service.get(), instance_id); |
| |
| constexpr auto table_id = 11231, index_id = 11232, partition_id = 11233, tablet_id1 = 11234, |
| tablet_id2 = 21234; |
| ASSERT_NO_FATAL_FAILURE( |
| create_tablet(service.get(), table_id, index_id, partition_id, tablet_id1)); |
| ASSERT_NO_FATAL_FAILURE( |
| create_tablet(service.get(), table_id, index_id, partition_id, tablet_id2)); |
| auto get_and_check_tablet_meta = [&](int tablet_id, int64_t ttl_seconds, bool in_memory, |
| bool is_persistent) { |
| brpc::Controller cntl; |
| GetTabletRequest req; |
| req.set_cloud_unique_id(cloud_unique_id); |
| req.set_tablet_id(tablet_id); |
| GetTabletResponse resp; |
| service->get_tablet(&cntl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK) << tablet_id; |
| EXPECT_EQ(resp.tablet_meta().ttl_seconds(), ttl_seconds); |
| EXPECT_EQ(resp.tablet_meta().is_in_memory(), in_memory); |
| EXPECT_EQ(resp.tablet_meta().is_persistent(), is_persistent); |
| }; |
| get_and_check_tablet_meta(tablet_id1, 0, false, false); |
| get_and_check_tablet_meta(tablet_id2, 0, false, false); |
| { |
| brpc::Controller cntl; |
| UpdateTabletRequest req; |
| UpdateTabletResponse resp; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos(); |
| tablet_meta_info->set_tablet_id(tablet_id1); |
| tablet_meta_info->set_ttl_seconds(300); |
| tablet_meta_info = req.add_tablet_meta_infos(); |
| tablet_meta_info->set_tablet_id(tablet_id2); |
| tablet_meta_info->set_ttl_seconds(3000); |
| service->update_tablet(&cntl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| } |
| get_and_check_tablet_meta(tablet_id1, 300, false, false); |
| get_and_check_tablet_meta(tablet_id2, 3000, false, false); |
| { |
| brpc::Controller cntl; |
| UpdateTabletRequest req; |
| UpdateTabletResponse resp; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos(); |
| tablet_meta_info->set_tablet_id(tablet_id1); |
| tablet_meta_info->set_is_in_memory(true); |
| service->update_tablet(&cntl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| } |
| { |
| brpc::Controller cntl; |
| UpdateTabletRequest req; |
| UpdateTabletResponse resp; |
| req.set_cloud_unique_id(cloud_unique_id); |
| TabletMetaInfoPB* tablet_meta_info = req.add_tablet_meta_infos(); |
| tablet_meta_info->set_tablet_id(tablet_id1); |
| tablet_meta_info->set_is_persistent(true); |
| service->update_tablet(&cntl, &req, &resp, nullptr); |
| ASSERT_EQ(resp.status().code(), MetaServiceCode::OK); |
| } |
| get_and_check_tablet_meta(tablet_id1, 300, true, true); |
| } |
| |
| TEST(MetaServiceVersionedReadTest, IndexRequest) { |
| auto meta_service = get_meta_service(false); |
| std::string instance_id = "commit_index"; |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| constexpr int64_t db_id = 123; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| |
| { |
| // Prepare index |
| brpc::Controller ctrl; |
| IndexRequest req; |
| IndexResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| meta_service->prepare_index(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| |
| { |
| // Commit index |
| brpc::Controller ctrl; |
| IndexRequest req; |
| IndexResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.set_is_new_table(true); |
| meta_service->commit_index(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| |
| { |
| // Prepare index again |
| brpc::Controller ctrl; |
| IndexRequest req; |
| IndexResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| meta_service->prepare_index(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) |
| << res.status().DebugString(); |
| } |
| |
| { |
| // Commit index again |
| brpc::Controller ctrl; |
| IndexRequest req; |
| IndexResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.set_is_new_table(true); |
| meta_service->commit_index(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| } |
| |
| TEST(MetaServiceVersionedReadTest, PartitionRequest) { |
| auto meta_service = get_meta_service(false); |
| std::string instance_id = "PartitionRequest"; |
| |
| MOCK_GET_INSTANCE_ID(instance_id); |
| create_and_refresh_instance(meta_service.get(), instance_id); |
| |
| constexpr int64_t db_id = 1; |
| constexpr int64_t table_id = 10001; |
| constexpr int64_t index_id = 10002; |
| constexpr int64_t partition_id = 10003; |
| |
| { |
| // Prepare transaction |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| meta_service->prepare_partition(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| |
| { |
| // Commit partition |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| meta_service->commit_partition(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| |
| { |
| // Prepare partition again |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| meta_service->prepare_partition(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) |
| << res.status().DebugString(); |
| } |
| |
| { |
| // Commit partition again |
| brpc::Controller ctrl; |
| PartitionRequest req; |
| PartitionResponse res; |
| req.set_db_id(db_id); |
| req.set_table_id(table_id); |
| req.add_index_ids(index_id); |
| req.add_partition_ids(partition_id); |
| meta_service->commit_partition(&ctrl, &req, &res, nullptr); |
| ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().DebugString(); |
| } |
| } |
| |
| } // namespace doris::cloud |