| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta-store/document_message.h" |
| |
| #include <bthread/bthread.h> |
| #include <fmt/format.h> |
| #include <foundationdb/fdb_c_options.g.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <gen_cpp/olap_common.pb.h> |
| #include <gen_cpp/olap_file.pb.h> |
| #include <gtest/gtest.h> |
| |
| #include <iostream> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <string_view> |
| |
| #include "common/config.h" |
| #include "common/util.h" |
| #include "meta-store/document_message_get_range.h" |
| #include "meta-store/keys.h" |
| #include "meta-store/mem_txn_kv.h" |
| #include "meta-store/txn_kv.h" |
| #include "meta-store/txn_kv_error.h" |
| #include "meta-store/versionstamp.h" |
| |
| using namespace doris::cloud; |
| |
| static std::mutex config_mutex; |
| |
| int main(int argc, char** argv) { |
| config::init(nullptr, true); |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |
| |
| // Convert a string to a hex-escaped string. |
| // A non-displayed character is represented as \xHH where HH is the hexadecimal value of the character. |
| // A displayed character is represented as itself. |
| static std::string escape_hex(std::string_view data) { |
| std::string result; |
| for (char c : data) { |
| if (isprint(c)) { |
| result += c; |
| } else { |
| result += fmt::format("\\x{:02x}", static_cast<unsigned char>(c)); |
| } |
| } |
| return result; |
| } |
| |
| static size_t count_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| std::unique_ptr<Transaction> txn; |
| EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| if (!txn) { |
| return 0; // Failed to create transaction |
| } |
| |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts)); |
| size_t total = 0; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| total += 1; |
| } |
| |
| EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| return total; |
| } |
| |
| static bool is_empty_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| return count_range(txn_kv, begin, end) == 0; |
| } |
| |
| static std::string dump_range(TxnKv* txn_kv, std::string_view begin = "", |
| std::string_view end = "\xFF") { |
| std::unique_ptr<Transaction> txn; |
| if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { |
| return "Failed to create dump range transaction"; |
| } |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts)); |
| std::string buffer; |
| for (auto&& kv = iter->next(); kv.has_value(); kv = iter->next()) { |
| buffer += |
| fmt::format("Key: {}, Value: {}\n", escape_hex(kv->first), escape_hex(kv->second)); |
| } |
| EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| return buffer; |
| } |
| |
| TEST(DocumentMessageTest, SplitSingleMessage) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| { |
| // Write a message that does not need to be split. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB msg; |
| msg.set_other_fields(1); |
| ASSERT_TRUE(document_put(txn.get(), "split_single_message_key", std::move(msg))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB read_msg; |
| ASSERT_EQ(document_get(read_txn.get(), "split_single_message_key", &read_msg), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_msg.other_fields(), 1); |
| |
| ASSERT_EQ(count_range(txn_kv.get()), 1) << dump_range(txn_kv.get()); |
| |
| // Remove the message. |
| document_remove<SplitSingleMessagePB>(read_txn.get(), "split_single_message_key"); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| |
| { |
| // Write a message that needs to be split. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB msg; |
| msg.set_other_fields(2); |
| auto* key_bounds = msg.mutable_segment_key_bounds(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| ASSERT_TRUE(document_put(txn.get(), "split_single_message_key_split", std::move(msg))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB read_msg; |
| ASSERT_EQ(document_get(read_txn.get(), "split_single_message_key_split", &read_msg), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_msg.other_fields(), 2); |
| ASSERT_EQ(read_msg.segment_key_bounds().min_key(), "min_key"); |
| ASSERT_EQ(read_msg.segment_key_bounds().max_key(), "max_key"); |
| ASSERT_EQ(count_range(txn_kv.get()), 2) << dump_range(txn_kv.get()); |
| |
| // Remove the message. |
| document_remove<SplitSingleMessagePB>(read_txn.get(), "split_single_message_key_split"); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentPutRowsetMeta) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key2"); |
| key_bounds->set_max_key("max_key2"); |
| |
| std::string rowset_meta_key = |
| meta_rowset_key({meta.rowset_id_v2(), meta.end_version(), meta.num_segments()}); |
| { |
| // create a txn , and put rowset meta |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // create a txn , and get rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB saved_meta; |
| ASSERT_EQ(document_get(txn.get(), rowset_meta_key, &saved_meta), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(saved_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(saved_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(saved_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(saved_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(saved_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| ASSERT_GE(txn_kv->total_kvs(), 2) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn, and remove rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| document_remove<doris::RowsetMetaCloudPB>(txn.get(), rowset_meta_key); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentPutRowsetMetaWithoutSplit) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = false; |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| std::string rowset_meta_key = |
| meta_rowset_key({meta.rowset_id_v2(), meta.end_version(), meta.num_segments()}); |
| { |
| // create a txn , and put rowset meta |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // create a txn , and get rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB saved_meta; |
| ASSERT_EQ(document_get(txn.get(), rowset_meta_key, &saved_meta), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(saved_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(saved_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(saved_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(saved_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(saved_meta.segments_key_bounds_size(), 1); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).max_key(), "max_key"); |
| } |
| |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn, and remove rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| document_remove<doris::RowsetMetaCloudPB>(txn.get(), rowset_meta_key); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| |
| { |
| // Allow split but no thing to split, because the rowset meta pb is small enough. |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 100000000; // Big enough to not split. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| ASSERT_TRUE(document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| ASSERT_EQ(document_get(read_txn.get(), rowset_meta_key, &read_meta), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 1); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| |
| // Remove the message. |
| document_remove<doris::RowsetMetaCloudPB>(read_txn.get(), rowset_meta_key); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Allow split but no thing to split, because the split fields is empty. |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| meta_copy.clear_segments_key_bounds(); // Clear the segments key bounds to make it empty. |
| ASSERT_TRUE(document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // If you meat a failed here, it means the rowset meta pb is split into multiple keys, |
| // you should clear the split fields you added. |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| ASSERT_EQ(document_get(read_txn.get(), rowset_meta_key, &read_meta), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 0); // No segments key bounds |
| // Remove the message. |
| document_remove<doris::RowsetMetaCloudPB>(read_txn.get(), rowset_meta_key); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPB) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_tablet_schema_pb = true; |
| config::split_tablet_schema_pb_size = 0; // Always split the tablet schema pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::TabletSchemaCloudPB schema; |
| schema.set_keys_type(doris::DUP_KEYS); |
| schema.set_num_short_key_columns(2); |
| schema.set_num_rows_per_row_block(1024); |
| schema.set_compress_kind(doris::COMPRESS_LZ4); |
| schema.set_next_column_unique_id(10); |
| schema.set_schema_version(1); |
| |
| // Add multiple columns to test splitting |
| for (int i = 0; i < 3; ++i) { |
| doris::ColumnPB* column = schema.add_column(); |
| column->set_name(fmt::format("col_{}", i)); |
| column->set_unique_id(i + 1); |
| column->set_type("INT"); |
| column->set_is_key(i < 2); // First 2 columns are key columns |
| column->set_is_nullable(false); |
| } |
| |
| std::string schema_key = "tablet_schema_key_test"; |
| { |
| // create a txn and put tablet schema |
| doris::TabletSchemaCloudPB schema_copy(schema); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), schema_key, std::move(schema_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // create a txn and get tablet schema |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::TabletSchemaCloudPB saved_schema; |
| ASSERT_EQ(document_get(txn.get(), schema_key, &saved_schema), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_schema.keys_type(), schema.keys_type()); |
| ASSERT_EQ(saved_schema.num_short_key_columns(), schema.num_short_key_columns()); |
| ASSERT_EQ(saved_schema.num_rows_per_row_block(), schema.num_rows_per_row_block()); |
| ASSERT_EQ(saved_schema.compress_kind(), schema.compress_kind()); |
| ASSERT_EQ(saved_schema.next_column_unique_id(), schema.next_column_unique_id()); |
| ASSERT_EQ(saved_schema.schema_version(), schema.schema_version()); |
| ASSERT_EQ(saved_schema.column_size(), 3); |
| for (int i = 0; i < 3; ++i) { |
| ASSERT_EQ(saved_schema.column(i).name(), fmt::format("col_{}", i)); |
| ASSERT_EQ(saved_schema.column(i).unique_id(), i + 1); |
| ASSERT_EQ(saved_schema.column(i).type(), "INT"); |
| ASSERT_EQ(saved_schema.column(i).is_key(), i < 2); |
| ASSERT_EQ(saved_schema.column(i).is_nullable(), false); |
| } |
| } |
| |
| ASSERT_GE(txn_kv->total_kvs(), 2) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn and remove tablet schema |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| document_remove<doris::TabletSchemaCloudPB>(txn.get(), schema_key); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentPutTabletSchemaCloudPBWithoutSplit) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_tablet_schema_pb = false; |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::TabletSchemaCloudPB schema; |
| schema.set_keys_type(doris::DUP_KEYS); |
| schema.set_num_short_key_columns(1); |
| schema.set_num_rows_per_row_block(1024); |
| schema.set_compress_kind(doris::COMPRESS_LZ4); |
| schema.set_next_column_unique_id(5); |
| schema.set_schema_version(1); |
| |
| // Add one column |
| doris::ColumnPB* column = schema.add_column(); |
| column->set_name("test_col"); |
| column->set_unique_id(1); |
| column->set_type("VARCHAR"); |
| column->set_is_key(true); |
| column->set_is_nullable(false); |
| |
| std::string schema_key = "tablet_schema_key_no_split"; |
| { |
| // create a txn and put tablet schema |
| doris::TabletSchemaCloudPB schema_copy(schema); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), schema_key, std::move(schema_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // create a txn and get tablet schema |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::TabletSchemaCloudPB saved_schema; |
| ASSERT_EQ(document_get(txn.get(), schema_key, &saved_schema), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_schema.keys_type(), schema.keys_type()); |
| ASSERT_EQ(saved_schema.num_short_key_columns(), schema.num_short_key_columns()); |
| ASSERT_EQ(saved_schema.column_size(), 1); |
| ASSERT_EQ(saved_schema.column(0).name(), "test_col"); |
| ASSERT_EQ(saved_schema.column(0).unique_id(), 1); |
| ASSERT_EQ(saved_schema.column(0).type(), "VARCHAR"); |
| } |
| |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn and remove tablet schema |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| document_remove<doris::TabletSchemaCloudPB>(txn.get(), schema_key); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| |
| { |
| // Allow split but nothing to split, because the split fields is empty. |
| config::enable_split_tablet_schema_pb = true; |
| config::split_tablet_schema_pb_size = 0; // Always split |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::TabletSchemaCloudPB schema_copy(schema); |
| schema_copy.clear_column(); // Clear the columns to make it empty. |
| ASSERT_TRUE(document_put(txn.get(), schema_key, std::move(schema_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // If you meet a failure here, it means the tablet schema pb is split into multiple keys, |
| // you should clear the split fields you added. |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| doris::TabletSchemaCloudPB read_schema; |
| ASSERT_EQ(document_get(read_txn.get(), schema_key, &read_schema), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_schema.keys_type(), schema.keys_type()); |
| ASSERT_EQ(read_schema.num_short_key_columns(), schema.num_short_key_columns()); |
| ASSERT_EQ(read_schema.column_size(), 0); // No columns |
| // Remove the message. |
| document_remove<doris::TabletSchemaCloudPB>(read_txn.get(), schema_key); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentPutVersionPB) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| VersionPB version_pb; |
| version_pb.set_version(123); |
| |
| { |
| // create a txn , and put version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), "version_key", std::move(version_pb))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // create a txn , and get version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| VersionPB saved_version_pb; |
| ASSERT_EQ(document_get(txn.get(), "version_key", &saved_version_pb), TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_version_pb.version(), 123); |
| } |
| |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn, and remove version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| document_remove<VersionPB>(txn.get(), "version_key"); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, DocumentGetAllRowsetMeta) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| std::string rowset_meta_key_prefix = "rowset_meta_key_"; |
| for (int i = 0; i < 10; ++i) { |
| // create a txn , and put rowset meta |
| std::string rowset_meta_key = rowset_meta_key_prefix + std::to_string(i); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // scan all rowset meta keys |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| opts.batch_limit = 2; |
| auto iter = txn_kv->full_range_get(rowset_meta_key_prefix, rowset_meta_key_prefix + '\xFF', |
| std::move(opts)); |
| int count = 0; |
| while (true) { |
| doris::RowsetMetaCloudPB saved_meta; |
| TxnErrorCode err = document_get(iter.get(), &saved_meta); |
| if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| break; // No more keys to read |
| } |
| ASSERT_EQ(err, TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(saved_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(saved_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(saved_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(saved_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(saved_meta.segments_key_bounds_size(), 1); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).max_key(), "max_key"); |
| count += 1; |
| } |
| ASSERT_EQ(count, 10) << dump_range(txn_kv.get()); |
| } |
| } |
| |
| TEST(DocumentMessageTest, DocumentGetAllTabletSchemaCloudPB) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_tablet_schema_pb = true; |
| config::split_tablet_schema_pb_size = 0; // Always split the tablet schema pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::TabletSchemaCloudPB schema; |
| schema.set_keys_type(doris::DUP_KEYS); |
| schema.set_num_short_key_columns(1); |
| schema.set_num_rows_per_row_block(1024); |
| schema.set_compress_kind(doris::COMPRESS_LZ4); |
| schema.set_next_column_unique_id(5); |
| schema.set_schema_version(1); |
| |
| doris::ColumnPB* column = schema.add_column(); |
| column->set_name("test_col"); |
| column->set_unique_id(1); |
| column->set_type("VARCHAR"); |
| column->set_is_key(true); |
| column->set_is_nullable(false); |
| |
| std::string schema_key_prefix = "tablet_schema_key_"; |
| for (int i = 0; i < 10; ++i) { |
| // create a txn and put tablet schema |
| doris::TabletSchemaCloudPB schema_copy(schema); |
| schema_copy.set_schema_version(i + 1); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(document_put(txn.get(), schema_key_prefix + std::to_string(i), |
| std::move(schema_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // scan all tablet schema keys |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| FullRangeGetOptions opts; |
| opts.txn = txn.get(); |
| opts.batch_limit = 2; |
| auto iter = txn_kv->full_range_get(schema_key_prefix, schema_key_prefix + '\xFF', |
| std::move(opts)); |
| int count = 0; |
| while (true) { |
| doris::TabletSchemaCloudPB saved_schema; |
| TxnErrorCode code = document_get(iter.get(), &saved_schema); |
| if (code == TxnErrorCode::TXN_KEY_NOT_FOUND) { |
| break; |
| } |
| ASSERT_EQ(code, TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_schema.keys_type(), schema.keys_type()); |
| ASSERT_EQ(saved_schema.num_short_key_columns(), schema.num_short_key_columns()); |
| ASSERT_EQ(saved_schema.column_size(), 1); |
| ASSERT_EQ(saved_schema.column(0).name(), "test_col"); |
| ASSERT_EQ(saved_schema.column(0).unique_id(), 1); |
| ASSERT_EQ(saved_schema.column(0).type(), "VARCHAR"); |
| ASSERT_EQ(saved_schema.schema_version(), count + 1); |
| count += 1; |
| } |
| ASSERT_EQ(count, 10) << dump_range(txn_kv.get()); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentPut) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("versioned_document_test"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| std::string key_prefix = "versioned_rowset_meta_key_"; |
| { |
| // Create a txn and put versioned rowset meta with initial version |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| Versionstamp version; |
| { |
| // Get the rowset meta with version |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &version), |
| TxnErrorCode::TXN_OK); |
| } |
| |
| // Remove the range |
| { |
| // Create a txn and remove the versioned rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::document_remove<doris::RowsetMetaCloudPB>(txn.get(), key_prefix, version); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| |
| { |
| // Create a txn and try to get the removed versioned rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &version), |
| TxnErrorCode::TXN_KEY_NOT_FOUND); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentPutAndGet) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("versioned_document_test"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key2"); |
| key_bounds->set_max_key("max_key2"); |
| |
| std::string key_prefix = "versioned_rowset_meta_key_"; |
| { |
| // Create a txn and put versioned rowset meta with initial version |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Get the rowset meta with version |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &version), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the metadata |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| { |
| // Update the rowset meta with a new version |
| doris::RowsetMetaCloudPB updated_meta(meta); |
| updated_meta.set_num_rows(200); // Change num_rows |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(updated_meta))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Get the latest version of rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &version), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the updated metadata |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), 200); // This should now be 200 |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| // Testing with a non-splittable message |
| { |
| std::string simple_key_prefix = "versioned_version_pb_key_"; |
| VersionPB version_pb; |
| version_pb.set_version(123); |
| |
| // Put the simple message |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), simple_key_prefix, std::move(version_pb))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Get the simple message |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| VersionPB read_version_pb; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(read_txn.get(), simple_key_prefix, &read_version_pb, |
| &version), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the simple message |
| ASSERT_EQ(read_version_pb.version(), 123); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentPutAndGetWithVersionstamp) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("versioned_document_test"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key2"); |
| key_bounds->set_max_key("max_key2"); |
| |
| std::string key_prefix = "versioned_rowset_meta_key_"; |
| { |
| // Create a txn and put versioned rowset meta with initial version |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| Versionstamp first_versionstamp; |
| { |
| // Get the rowset meta with version |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &first_versionstamp), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the metadata |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| { |
| // Update the rowset meta with a new version |
| doris::RowsetMetaCloudPB updated_meta(meta); |
| updated_meta.set_num_rows(200); // Change num_rows |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(updated_meta))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // Get the latest version of rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_meta, &version), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the updated metadata |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), 200); // This should now be 200 |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| { |
| // Get the previous version of rowset meta |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| Versionstamp snapshot_version(first_versionstamp.version() + 1, 0); |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, snapshot_version, &read_meta, |
| &version), |
| TxnErrorCode::TXN_OK); |
| |
| // Verify the updated metadata |
| ASSERT_EQ(read_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(read_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(read_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(read_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(read_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(read_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).min_key(), "min_key2"); |
| ASSERT_EQ(read_meta.segments_key_bounds(1).max_key(), "max_key2"); |
| } |
| |
| { |
| // Not found |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::RowsetMetaCloudPB read_meta; |
| Versionstamp snapshot_version(Versionstamp::min()); |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, snapshot_version, &read_meta, |
| &version), |
| TxnErrorCode::TXN_KEY_NOT_FOUND) |
| << dump_range(txn_kv.get()); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentPutTabletSchemaCloudPB) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_tablet_schema_pb = true; |
| config::split_tablet_schema_pb_size = 0; // Always split the tablet schema pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::TabletSchemaCloudPB schema; |
| schema.set_keys_type(doris::DUP_KEYS); |
| schema.set_num_short_key_columns(2); |
| schema.set_num_rows_per_row_block(1024); |
| schema.set_compress_kind(doris::COMPRESS_LZ4); |
| schema.set_next_column_unique_id(10); |
| schema.set_schema_version(1); |
| |
| // Add multiple columns to test splitting |
| for (int i = 0; i < 3; ++i) { |
| doris::ColumnPB* column = schema.add_column(); |
| column->set_name(fmt::format("col_{}", i)); |
| column->set_unique_id(i + 1); |
| column->set_type("INT"); |
| column->set_is_key(i < 2); // First 2 columns are key columns |
| column->set_is_nullable(false); |
| } |
| |
| std::string key_prefix = "versioned_tablet_schema_key_"; |
| { |
| // Create a txn and put versioned tablet schema with initial version |
| doris::TabletSchemaCloudPB schema_copy(schema); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key_prefix, std::move(schema_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| Versionstamp version; |
| { |
| // Get the tablet schema with version |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| doris::TabletSchemaCloudPB read_schema; |
| ASSERT_EQ(versioned::document_get(txn.get(), key_prefix, &read_schema, &version), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_schema.keys_type(), schema.keys_type()); |
| ASSERT_EQ(read_schema.num_short_key_columns(), schema.num_short_key_columns()); |
| ASSERT_EQ(read_schema.column_size(), 3); |
| for (int i = 0; i < 3; ++i) { |
| ASSERT_EQ(read_schema.column(i).name(), fmt::format("col_{}", i)); |
| ASSERT_EQ(read_schema.column(i).unique_id(), i + 1); |
| ASSERT_EQ(read_schema.column(i).type(), "INT"); |
| } |
| } |
| |
| // Remove the range |
| { |
| // Create a txn and remove the versioned tablet schema |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::document_remove<doris::TabletSchemaCloudPB>(txn.get(), key_prefix, version); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentPutVersionPB) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| VersionPB version_pb; |
| version_pb.set_version(123); |
| |
| { |
| // create a txn , and put version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), "version_key", std::move(version_pb))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| Versionstamp versionstamp; |
| { |
| // create a txn , and get version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| VersionPB saved_version_pb; |
| ASSERT_EQ( |
| versioned::document_get(txn.get(), "version_key", &saved_version_pb, &versionstamp), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(saved_version_pb.version(), 123); |
| } |
| |
| ASSERT_EQ(txn_kv->total_kvs(), 1) << dump_range(txn_kv.get()); |
| |
| { |
| // create a txn, and remove version pb |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::document_remove<VersionPB>(txn.get(), "version_key", versionstamp); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, VersionedSplitSingleMessage) { |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| { |
| // Write a message that does not need to be split. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB msg; |
| msg.set_other_fields(1); |
| ASSERT_TRUE(versioned::document_put(txn.get(), "split_single_message_key", std::move(msg))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB read_msg; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(read_txn.get(), "split_single_message_key", &read_msg, |
| &version), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_msg.other_fields(), 1); |
| |
| ASSERT_EQ(count_range(txn_kv.get()), 1) << dump_range(txn_kv.get()); |
| |
| // Remove the message. |
| versioned::document_remove<SplitSingleMessagePB>(read_txn.get(), "split_single_message_key", |
| version); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| |
| { |
| // Write a message that needs to be split. |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB msg; |
| msg.set_other_fields(2); |
| auto* key_bounds = msg.mutable_segment_key_bounds(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| ASSERT_TRUE(versioned::document_put(txn.get(), "split_single_message_key_split", |
| std::move(msg))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| |
| // Read the message back. |
| std::unique_ptr<Transaction> read_txn; |
| ASSERT_EQ(txn_kv->create_txn(&read_txn), TxnErrorCode::TXN_OK); |
| SplitSingleMessagePB read_msg; |
| Versionstamp version; |
| ASSERT_EQ(versioned::document_get(read_txn.get(), "split_single_message_key_split", |
| &read_msg, &version), |
| TxnErrorCode::TXN_OK); |
| ASSERT_EQ(read_msg.other_fields(), 2); |
| ASSERT_EQ(read_msg.segment_key_bounds().min_key(), "min_key"); |
| ASSERT_EQ(read_msg.segment_key_bounds().max_key(), "max_key"); |
| ASSERT_EQ(count_range(txn_kv.get()), 2) << dump_range(txn_kv.get()); |
| |
| // Remove the message. |
| versioned::document_remove<SplitSingleMessagePB>(read_txn.get(), |
| "split_single_message_key_split", version); |
| ASSERT_EQ(read_txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get()); |
| } |
| |
| TEST(DocumentMessageTest, VersionedDocumentGetAllRowsetMeta) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| std::string rowset_meta_key_prefix = "rowset_meta_key_"; |
| for (int i = 0; i < 10; ++i) { |
| // create a txn , and put rowset meta |
| std::string rowset_meta_key = rowset_meta_key_prefix + std::to_string(i); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| ASSERT_TRUE(versioned::document_put(txn.get(), rowset_meta_key, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| { |
| // scan all rowset meta keys |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // [begin_key, end_key] |
| std::string begin_key = rowset_meta_key_prefix + std::to_string(0); |
| std::string end_key = rowset_meta_key_prefix + std::to_string(9); |
| versioned::ReadDocumentMessagesOptions read_opts; |
| read_opts.batch_limit = 2; |
| read_opts.exclude_begin_key = false; |
| read_opts.exclude_end_key = false; |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), begin_key, |
| end_key, read_opts); |
| int count = 0; |
| while (true) { |
| auto&& kvp = iter->next(); |
| if (!kvp.has_value()) { |
| break; // No more keys to read |
| } |
| auto [key, version, saved_meta] = kvp.value(); |
| ASSERT_EQ(saved_meta.rowset_id_v2(), meta.rowset_id_v2()); |
| ASSERT_EQ(saved_meta.end_version(), meta.end_version()); |
| ASSERT_EQ(saved_meta.start_version(), meta.start_version()); |
| ASSERT_EQ(saved_meta.num_rows(), meta.num_rows()); |
| ASSERT_EQ(saved_meta.num_segments(), meta.num_segments()); |
| ASSERT_EQ(saved_meta.segments_key_bounds_size(), 2); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).min_key(), "min_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(0).max_key(), "max_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(1).min_key(), "min_key"); |
| ASSERT_EQ(saved_meta.segments_key_bounds(1).max_key(), "max_key"); |
| count += 1; |
| } |
| ASSERT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| ASSERT_EQ(count, 10) << dump_range(txn_kv.get()); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedGetRangeWithVersionstamp) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| std::string key_prefix = "get_range_key_"; |
| |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| { |
| // Put a series of versioned documents |
| std::vector<std::pair<int, Versionstamp>> versions = { |
| {10, Versionstamp(10, 0)}, {20, Versionstamp(20, 0)}, {30, Versionstamp(30, 0)}, |
| {40, Versionstamp(40, 0)}, {10, Versionstamp(40, 0)}, {50, Versionstamp(50, 0)}, |
| {30, Versionstamp(60, 0)}, {20, Versionstamp(70, 0)}, {10, Versionstamp(80, 0)}, |
| {60, Versionstamp(90, 0)}, {70, Versionstamp(90, 0)}, |
| }; |
| for (const auto& [suffix, versionstamp] : versions) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key = fmt::format("{}{:02}", key_prefix, suffix); |
| |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| ASSERT_TRUE( |
| versioned::document_put(txn.get(), key, versionstamp, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| } |
| |
| struct TestCase { |
| Versionstamp snapshot_version; |
| int begin, end; |
| std::vector<std::pair<int, Versionstamp>> expected_results; |
| }; |
| |
| std::vector<TestCase> test_cases = { |
| // Test case 1: Get all versions with max snapshot version |
| // Results should be in reverse order by key (70, 60, 50, 40, 30, 20, 10) |
| { |
| Versionstamp::max(), |
| 0, |
| 99, |
| {{70, Versionstamp(90, 0)}, |
| {60, Versionstamp(90, 0)}, |
| {50, Versionstamp(50, 0)}, |
| {40, Versionstamp(40, 0)}, |
| {30, Versionstamp(60, 0)}, |
| {20, Versionstamp(70, 0)}, |
| {10, Versionstamp(80, 0)}}, |
| }, |
| |
| // Test case 2: Get versions with a specific snapshot version (50, 0) |
| // Only keys with version < (50, 0) should be returned: 10->40, 20->20, 30->30, 40->40 |
| // Results should be in reverse order by key (40, 30, 20, 10) |
| { |
| Versionstamp(50, 0), |
| 10, |
| 70, |
| {{40, Versionstamp(40, 0)}, |
| {30, Versionstamp(30, 0)}, |
| {20, Versionstamp(20, 0)}, |
| {10, Versionstamp(40, 0)}}, |
| }, |
| |
| // Test case 3: Get versions with a non-existing snapshot version |
| {Versionstamp(5, 0), 10, 70, {}}, |
| |
| // Test case 4: Get versions with a specific range [20, 60) |
| // Results should be in reverse order by key |
| { |
| Versionstamp::max(), |
| 20, |
| 60, |
| {{50, Versionstamp(50, 0)}, |
| {40, Versionstamp(40, 0)}, |
| {30, Versionstamp(60, 0)}, |
| {20, Versionstamp(70, 0)}}, |
| }, |
| |
| // Test case 5: Empty range - begin_key >= end_key |
| { |
| Versionstamp::max(), |
| 50, |
| 30, |
| {}, |
| }, |
| |
| // Test case 6: Single key range [30, 31) |
| { |
| Versionstamp::max(), |
| 30, |
| 31, |
| {{30, Versionstamp(60, 0)}}, |
| }, |
| |
| // Test case 7: Range with no matching keys [25, 30) |
| { |
| Versionstamp::max(), |
| 25, |
| 30, |
| {}, |
| }, |
| |
| // Test case 8: Exact boundary - key 20 included, key 60 excluded |
| { |
| Versionstamp::max(), |
| 20, |
| 60, |
| {{50, Versionstamp(50, 0)}, |
| {40, Versionstamp(40, 0)}, |
| {30, Versionstamp(60, 0)}, |
| {20, Versionstamp(70, 0)}}, |
| }, |
| |
| // Test case 9: Version boundary test - snapshot at (45, 0) |
| // Should include versions < 45: 10->40, 20->20, 30->30, 40->40 |
| { |
| Versionstamp(45, 0), |
| 0, |
| 99, |
| {{40, Versionstamp(40, 0)}, |
| {30, Versionstamp(30, 0)}, |
| {20, Versionstamp(20, 0)}, |
| {10, Versionstamp(40, 0)}}, |
| }, |
| |
| // Test case 10: Test with exact version boundary (40, 0) |
| // Should include versions < 40: 10->10, 20->20, 30->30 |
| { |
| Versionstamp(40, 0), |
| 0, |
| 99, |
| {{30, Versionstamp(30, 0)}, |
| {20, Versionstamp(20, 0)}, |
| {10, Versionstamp(10, 0)}}, |
| }, |
| }; |
| |
| size_t case_index = 0; |
| for (auto&& tc : test_cases) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Get range with range [begin, end) |
| versioned::ReadDocumentMessagesOptions opts; |
| opts.snapshot_version = tc.snapshot_version; |
| opts.exclude_begin_key = false; |
| opts.exclude_end_key = true; |
| std::string begin_key = fmt::format("{}{:02}", key_prefix, tc.begin); |
| std::string end_key = fmt::format("{}{:02}", key_prefix, tc.end); |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), begin_key, |
| end_key, opts); |
| size_t count = 0; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto [key, version, _value] = kvp.value(); |
| key.remove_prefix(key_prefix.size()); |
| int suffix = std::stoi(std::string(key)); |
| ASSERT_EQ(suffix, tc.expected_results[count].first) << " count=" << count; |
| ASSERT_EQ(version, tc.expected_results[count].second) |
| << " count=" << count << ", key=" << escape_hex(key) |
| << ", version=" << version.version() |
| << ", expected=" << tc.expected_results[count].second.version(); |
| count += 1; |
| } |
| ASSERT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| ASSERT_EQ(count, tc.expected_results.size()); |
| case_index += 1; |
| std::cout << "Test case " << case_index << " passed." << std::endl; |
| } |
| |
| // Iterate cases via peek method |
| case_index = 0; |
| for (auto&& tc : test_cases) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| // Get range with range [begin, end) |
| versioned::ReadDocumentMessagesOptions opts; |
| opts.snapshot_version = tc.snapshot_version; |
| opts.exclude_begin_key = false; |
| opts.exclude_end_key = true; |
| std::string begin_key = fmt::format("{}{:02}", key_prefix, tc.begin); |
| std::string end_key = fmt::format("{}{:02}", key_prefix, tc.end); |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), begin_key, |
| end_key, opts); |
| size_t count = 0; |
| for (auto&& kvp = iter->peek(); kvp.has_value(); iter->next(), kvp = iter->peek()) { |
| auto [key, version, _value] = kvp.value(); |
| key.remove_prefix(key_prefix.size()); |
| int suffix = std::stoi(std::string(key)); |
| ASSERT_EQ(suffix, tc.expected_results[count].first) << " count=" << count; |
| ASSERT_EQ(version, tc.expected_results[count].second) |
| << " count=" << count << ", key=" << escape_hex(key) |
| << ", version=" << version.version() |
| << ", expected=" << tc.expected_results[count].second.version(); |
| count += 1; |
| } |
| ASSERT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| ASSERT_EQ(count, tc.expected_results.size()); |
| case_index += 1; |
| std::cout << "Peek test case " << case_index << " passed." << std::endl; |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedGetRangeWithVersionstamp2) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| std::string_view key_prefix = "get_range2_key_"; |
| |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| // Insert 1000 versioned documents with version 100 |
| for (int i = 0; i < 1000; i++) { |
| std::string key = fmt::format("{}{:03}", key_prefix, i); |
| Versionstamp version(100, 0); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key, version, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| // Insert some versioned documents, with version 200 |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| for (int i = 0; i < 1000; i++) { |
| std::string key = fmt::format("{}{:03}", key_prefix, i); |
| Versionstamp version(200, 0); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key, version, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| } |
| |
| // Iterate [0, 999] with version 200 |
| { |
| std::string begin_key = fmt::format("{}{:03}", key_prefix, 0); |
| std::string end_key = fmt::format("{}{:03}", key_prefix, 999); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| |
| versioned::ReadDocumentMessagesOptions opts; |
| opts.snapshot_version = Versionstamp::max(); |
| opts.exclude_begin_key = false; |
| opts.exclude_end_key = false; |
| opts.batch_limit = 11; |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), begin_key, |
| end_key, opts); |
| size_t count = 0; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto [key, version, _value] = kvp.value(); |
| key.remove_prefix(key_prefix.size()); |
| int suffix = std::stoi(std::string(key)); |
| ASSERT_EQ(suffix, 999 - count) << " count=" << count << ", key=" << key; |
| ASSERT_EQ(version.version(), 200) |
| << " count=" << count << ", key=" << key << ", version=" << version.version(); |
| count += 1; |
| } |
| ASSERT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| ASSERT_EQ(count, 1000) << dump_range(txn_kv.get()); |
| } |
| |
| // Iterate [0, 999] with 100 |
| { |
| std::string begin_key = fmt::format("{}{:03}", key_prefix, 0); |
| std::string end_key = fmt::format("{}{:03}", key_prefix, 999); |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::ReadDocumentMessagesOptions opts; |
| opts.snapshot_version = Versionstamp(150, 0); |
| opts.exclude_begin_key = false; |
| opts.exclude_end_key = false; |
| opts.batch_limit = 11; // Limit to 100 results per batch |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), begin_key, |
| end_key, opts); |
| size_t count = 0; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto [key, version, _value] = kvp.value(); |
| key.remove_prefix(key_prefix.size()); |
| int suffix = std::stoi(std::string(key)); |
| ASSERT_EQ(suffix, 999 - count) << " count=" << count << ", key=" << key; |
| ASSERT_EQ(version.version(), 100) |
| << " count=" << count << ", key=" << key << ", version=" << version.version(); |
| count += 1; |
| } |
| ASSERT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call. |
| ASSERT_EQ(count, 1000) << dump_range(txn_kv.get()); |
| } |
| } |
| |
| TEST(DocumentMessageTest, VersionedGetRangeWithRangeKeySelector) { |
| std::unique_lock lock(config_mutex); |
| config::enable_split_rowset_meta_pb = true; |
| config::split_rowset_meta_pb_size = 0; // Always split the rowset meta pb. |
| |
| auto txn_kv = std::make_shared<MemTxnKv>(); |
| std::string_view key_prefix = "get_range_key_selector_"; |
| |
| doris::RowsetMetaCloudPB meta; |
| meta.set_rowset_id(123); |
| meta.set_rowset_id_v2("document_put_rowset_meta"); |
| meta.set_start_version(10000); |
| meta.set_end_version(10005); |
| meta.set_num_rows(100); |
| meta.set_num_segments(1); |
| doris::KeyBoundsPB* key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| key_bounds = meta.mutable_segments_key_bounds()->Add(); |
| key_bounds->set_min_key("min_key"); |
| key_bounds->set_max_key("max_key"); |
| |
| // Insert 9 versioned documents with version 100 |
| for (int i = 0; i < 10; i++) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| std::string key = fmt::format("{}{:03}", key_prefix, i); |
| Versionstamp version(100, 0); |
| doris::RowsetMetaCloudPB meta_copy(meta); |
| ASSERT_TRUE(versioned::document_put(txn.get(), key, version, std::move(meta_copy))); |
| ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); |
| } |
| |
| struct TestCase { |
| std::string begin_key, end_key; |
| bool exclude_begin_key, exclude_end_key; |
| size_t expected_count; |
| }; |
| |
| std::vector<TestCase> test_cases = { |
| // Test case 1: Range [0, 9] |
| {fmt::format("{}{:03}", key_prefix, 0), fmt::format("{}{:03}", key_prefix, 9), false, |
| false, 10}, |
| |
| // Test case 2: Range [0, 9) |
| {fmt::format("{}{:03}", key_prefix, 0), fmt::format("{}{:03}", key_prefix, 9), false, |
| true, 9}, |
| |
| // Test case 3: Range (0, 9] |
| {fmt::format("{}{:03}", key_prefix, 0), fmt::format("{}{:03}", key_prefix, 9), true, |
| false, 9}, |
| |
| // Test case 4: Range (0, 9) |
| {fmt::format("{}{:03}", key_prefix, 0), fmt::format("{}{:03}", key_prefix, 9), true, |
| true, 8}, |
| }; |
| |
| size_t case_index = 0; |
| for (const auto& tc : test_cases) { |
| std::unique_ptr<Transaction> txn; |
| ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); |
| versioned::ReadDocumentMessagesOptions opts; |
| opts.snapshot_version = Versionstamp::max(); |
| opts.exclude_begin_key = tc.exclude_begin_key; |
| opts.exclude_end_key = tc.exclude_end_key; |
| opts.batch_limit = 11; // Limit to 100 results per batch |
| auto iter = versioned::document_get_range<doris::RowsetMetaCloudPB>(txn.get(), tc.begin_key, |
| tc.end_key, opts); |
| size_t count = 0; |
| std::string keys; |
| for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) { |
| auto [key, version, _value] = kvp.value(); |
| keys += fmt::format("{} -> {}\n", key, version.version()); |
| count += 1; |
| } |
| // The iterator should still be valid after the next call. |
| ASSERT_TRUE(iter->is_valid()) |
| << "Iterator should be valid after next call" |
| << ", count=" << count << ", begin_key=" << escape_hex(tc.begin_key) |
| << ", end_key=" << escape_hex(tc.end_key) << ", case_index=" << case_index |
| << ", exclude_begin_key=" << tc.exclude_begin_key |
| << ", exclude_end_key=" << tc.exclude_end_key |
| << ", error_code=" << iter->error_code(); |
| ASSERT_EQ(count, tc.expected_count) << keys << ", case_index=" << case_index |
| << ", exclude_begin_key=" << tc.exclude_begin_key |
| << ", exclude_end_key=" << tc.exclude_end_key |
| << ", begin_key=" << escape_hex(tc.begin_key) |
| << ", end_key=" << escape_hex(tc.end_key); |
| case_index += 1; |
| } |
| } |