blob: 75f1ea780572d6864de48e953b203e827846eb7b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "meta-store/meta_reader.h"
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest-death-test.h>
#include <gtest/gtest.h>
#include <memory>
#include "common/config.h"
#include "common/logging.h"
#include "common/util.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
using namespace doris::cloud;
int main(int argc, char** argv) {
config::log_dir = "./log/";
if (!doris::cloud::init_glog("meta_reader_test")) {
std::cerr << "failed to init glog" << std::endl;
return -1;
}
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
// Convert a string to a hex-escaped string.
// A non-displayed character is represented as \xHH where HH is the hexadecimal value of the character.
// A displayed character is represented as itself.
static std::string escape_hex(std::string_view data) {
std::string result;
for (char c : data) {
if (isprint(c)) {
result += c;
} else {
result += fmt::format("\\x{:02x}", static_cast<unsigned char>(c));
}
}
return result;
}
static std::string dump_range(TxnKv* txn_kv, std::string_view begin = "",
std::string_view end = "\xFF") {
std::unique_ptr<Transaction> txn;
if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) {
return "Failed to create dump range transaction";
}
FullRangeGetOptions opts;
opts.txn = txn.get();
auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts));
std::string buffer;
for (auto&& kv = iter->next(); kv.has_value(); kv = iter->next()) {
buffer +=
fmt::format("Key: {}, Value: {}\n", escape_hex(kv->first), escape_hex(kv->second));
}
EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call.
return buffer;
}
TEST(MetaReaderTest, GetTableVersion) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t table_id = 1001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a table version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string table_version_key = versioned::table_version_key({instance_id, table_id});
versioned_put(txn.get(), table_version_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
Versionstamp version1;
{
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_table_version(table_id, &version1);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(meta_reader.min_read_versionstamp(), version1);
}
{
// Put a table version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string table_version_key = versioned::table_version_key({instance_id, table_id});
versioned_put(txn.get(), table_version_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
Versionstamp version2;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_table_version(txn.get(), table_id, &version2);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(meta_reader.min_read_versionstamp(), version2);
}
ASSERT_LT(version1, version2);
}
TEST(MetaReaderTest, BatchGetTableVersion) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
std::vector<int64_t> table_ids = {1001, 1002, 1003, 1004};
{
// Test empty input
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> empty_ids;
std::unordered_map<int64_t, Versionstamp> table_versions;
TxnErrorCode err = meta_reader.get_table_versions(empty_ids, &table_versions);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(table_versions.empty());
}
{
// Test all keys not found
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, Versionstamp> table_versions;
TxnErrorCode err = meta_reader.get_table_versions(table_ids, &table_versions);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(table_versions.empty());
}
{
// Put some table versions (skip table_ids[1] to test partial results)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (size_t i = 0; i < table_ids.size(); ++i) {
if (i == 1) continue; // Skip table_ids[1]
std::string table_version_key =
versioned::table_version_key({instance_id, table_ids[i]});
versioned_put(txn.get(), table_version_key, "");
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test partial results
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, Versionstamp> table_versions;
TxnErrorCode err = meta_reader.get_table_versions(table_ids, &table_versions);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(table_versions.size(), 3); // All except table_ids[1]
// Check min_read_version
Versionstamp min_expected = Versionstamp::max();
for (const auto& [table_id, version] : table_versions) {
min_expected = std::min(min_expected, version);
}
ASSERT_EQ(meta_reader.min_read_versionstamp(), min_expected);
for (size_t i = 0; i < table_ids.size(); ++i) {
if (i == 1) {
ASSERT_EQ(table_versions.find(table_ids[i]), table_versions.end());
} else {
ASSERT_NE(table_versions.find(table_ids[i]), table_versions.end());
}
}
}
{
// Put the missing table version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string table_version_key = versioned::table_version_key({instance_id, table_ids[1]});
versioned_put(txn.get(), table_version_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test all keys found
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, Versionstamp> table_versions;
TxnErrorCode err = meta_reader.get_table_versions(txn.get(), table_ids, &table_versions);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(table_versions.size(), table_ids.size());
// Check min_read_version
Versionstamp min_expected = Versionstamp::max();
for (const auto& [table_id, version] : table_versions) {
min_expected = std::min(min_expected, version);
}
ASSERT_EQ(meta_reader.min_read_versionstamp(), min_expected);
for (int64_t table_id : table_ids) {
ASSERT_NE(table_versions.find(table_id), table_versions.end());
}
}
}
TEST(MetaReaderTest, GetPartitionVersion) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t partition_id = 2001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
VersionPB version_pb;
Versionstamp partition_version;
TxnErrorCode err =
meta_reader.get_partition_version(partition_id, &version_pb, &partition_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a partition version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(100);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
VersionPB version_pb1;
Versionstamp partition_version1;
{
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err =
meta_reader.get_partition_version(partition_id, &version_pb1, &partition_version1);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(version_pb1.version(), 100);
ASSERT_EQ(meta_reader.min_read_versionstamp(), partition_version1);
}
{
// Put another partition version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(200);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
VersionPB version_pb2;
Versionstamp partition_version2;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_partition_version(txn.get(), partition_id, &version_pb2,
&partition_version2);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(version_pb2.version(), 200);
ASSERT_EQ(meta_reader.min_read_versionstamp(), partition_version2);
}
ASSERT_LT(partition_version1, partition_version2);
}
TEST(MetaReaderTest, BatchGetPartitionVersion) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
std::vector<int64_t> partition_ids = {2001, 2002, 2003, 2004};
{
// Test empty input
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> empty_ids;
std::unordered_map<int64_t, VersionPB> versions;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err = meta_reader.get_partition_versions(empty_ids, &versions, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(versions.empty());
ASSERT_TRUE(versionstamps.empty());
}
{
// Put some partition versions (skip partition_ids[1] to test partial results)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (size_t i = 0; i < partition_ids.size(); ++i) {
if (i == 1) continue; // Skip partition_ids[1]
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_ids[i]});
VersionPB version_pb;
version_pb.set_version(100 + i * 10); // Different versions: 100, 120, 130
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test partial results
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, VersionPB> versions;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_partition_versions(partition_ids, &versions, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), 3); // All except partition_ids[1]
ASSERT_EQ(versionstamps.size(), 3);
// Check min_read_version
Versionstamp min_expected = Versionstamp::max();
for (const auto& [partition_id, version] : versionstamps) {
min_expected = std::min(min_expected, version);
}
ASSERT_EQ(meta_reader.min_read_versionstamp(), min_expected);
for (size_t i = 0; i < partition_ids.size(); ++i) {
if (i == 1) {
ASSERT_EQ(versions.find(partition_ids[i]), versions.end());
ASSERT_EQ(versionstamps.find(partition_ids[i]), versionstamps.end());
} else {
ASSERT_NE(versions.find(partition_ids[i]), versions.end());
ASSERT_NE(versionstamps.find(partition_ids[i]), versionstamps.end());
ASSERT_EQ(versions[partition_ids[i]].version(), 100 + i * 10);
}
}
}
{
// Put the missing partition version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_ids[1]});
VersionPB version_pb;
version_pb.set_version(110);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test all keys found
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, VersionPB> versions;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), partition_ids.size());
ASSERT_EQ(versionstamps.size(), partition_ids.size());
// Check min_read_version
Versionstamp min_expected = Versionstamp::max();
for (const auto& [partition_id, version] : versionstamps) {
min_expected = std::min(min_expected, version);
}
ASSERT_EQ(meta_reader.min_read_versionstamp(), min_expected);
for (size_t i = 0; i < partition_ids.size(); ++i) {
int64_t partition_id = partition_ids[i];
ASSERT_NE(versions.find(partition_id), versions.end());
ASSERT_NE(versionstamps.find(partition_id), versionstamps.end());
int32_t expected_version = (i == 1) ? 110 : 100 + i * 10;
ASSERT_EQ(versions[partition_id].version(), expected_version);
}
}
{
// Test only versionstamps (versions = nullptr)
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_partition_versions(partition_ids, nullptr, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versionstamps.size(), partition_ids.size());
// Check min_read_version
Versionstamp min_expected = Versionstamp::max();
for (const auto& [partition_id, version] : versionstamps) {
min_expected = std::min(min_expected, version);
}
ASSERT_EQ(meta_reader.min_read_versionstamp(), min_expected);
}
{
// Test only versions (versionstamps = nullptr)
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, VersionPB> versions;
TxnErrorCode err = meta_reader.get_partition_versions(partition_ids, &versions, nullptr);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), partition_ids.size());
// For this case, min_read_version should still be updated even though versionstamps is nullptr
ASSERT_NE(meta_reader.min_read_versionstamp(), Versionstamp::max());
}
}
TEST(MetaReaderTest, GetPartitionVersionsWithPendingTxn) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
std::vector<int64_t> partition_ids = {2101, 2102, 2103, 2104};
{
// Test empty input
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> empty_ids;
std::unordered_map<int64_t, int64_t> versions;
int64_t last_pending_txn_id = -1;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), empty_ids, &versions,
&last_pending_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(versions.empty());
ASSERT_EQ(last_pending_txn_id, -1);
}
{
// Put some partition versions (skip partition_ids[1] to test partial results)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (size_t i = 0; i < partition_ids.size(); ++i) {
if (i == 1) continue; // Skip partition_ids[1]
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_ids[i]});
VersionPB version_pb;
version_pb.set_version(100 + i * 10); // Different versions: 100, 120, 130
// Add pending transaction for partition_ids[2]
if (i == 2) {
version_pb.add_pending_txn_ids(3001);
version_pb.add_pending_txn_ids(3002);
}
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test partial results - partition not found should be set to 1
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, int64_t> versions;
int64_t last_pending_txn_id = -1;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), 4); // All partition_ids, missing one should be set to 1
for (size_t i = 0; i < partition_ids.size(); ++i) {
int64_t partition_id = partition_ids[i];
ASSERT_NE(versions.find(partition_id), versions.end());
if (i == 1) {
// Missing partition should be set to 1
ASSERT_EQ(versions[partition_id], 1);
} else {
ASSERT_EQ(versions[partition_id], 100 + i * 10);
}
}
// Should return first pending transaction ID from partition_ids[2]
ASSERT_EQ(last_pending_txn_id, 3001);
}
{
// Put the missing partition version without pending transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_ids[1]});
VersionPB version_pb;
version_pb.set_version(110);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test all keys found with no pending transactions
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, int64_t> versions;
int64_t last_pending_txn_id = -1;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), partition_ids.size());
for (size_t i = 0; i < partition_ids.size(); ++i) {
int64_t partition_id = partition_ids[i];
ASSERT_NE(versions.find(partition_id), versions.end());
int32_t expected_version = (i == 1) ? 110 : 100 + i * 10;
ASSERT_EQ(versions[partition_id], expected_version);
}
// Should still return first pending transaction ID from partition_ids[2]
ASSERT_EQ(last_pending_txn_id, 3001);
}
{
// Remove pending transactions from partition_ids[2]
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_ids[2]});
VersionPB version_pb;
version_pb.set_version(120);
// No pending transactions this time
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test with no pending transactions
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, int64_t> versions;
int64_t last_pending_txn_id = -1;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), partition_ids.size());
for (size_t i = 0; i < partition_ids.size(); ++i) {
int64_t partition_id = partition_ids[i];
ASSERT_NE(versions.find(partition_id), versions.end());
int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 + i * 10;
ASSERT_EQ(versions[partition_id], expected_version);
}
// No pending transactions, so last_pending_txn_id should remain -1
ASSERT_EQ(last_pending_txn_id, -1);
}
{
// Test using the convenience method (without snapshot parameter)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, int64_t> versions;
int64_t last_pending_txn_id = -1;
TxnErrorCode err = meta_reader.get_partition_versions(txn.get(), partition_ids, &versions,
&last_pending_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versions.size(), partition_ids.size());
// Verify same results as the explicit snapshot=false test
for (size_t i = 0; i < partition_ids.size(); ++i) {
int64_t partition_id = partition_ids[i];
ASSERT_NE(versions.find(partition_id), versions.end());
int32_t expected_version = (i == 1) ? 110 : (i == 2) ? 120 : 100 + i * 10;
ASSERT_EQ(versions[partition_id], expected_version);
}
ASSERT_EQ(last_pending_txn_id, -1);
}
}
TEST(MetaReaderTest, GetTabletLoadStats) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 3001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err =
meta_reader.get_tablet_load_stats(tablet_id, &tablet_stats, &tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a tablet load stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_load_stats_key =
versioned::tablet_load_stats_key({instance_id, tablet_id});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(1000);
tablet_stats.set_data_size(500000);
versioned_put(txn.get(), tablet_load_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB tablet_stats1;
Versionstamp tablet_stats_version1;
{
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_load_stats(tablet_id, &tablet_stats1,
&tablet_stats_version1);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats1.num_rows(), 1000);
ASSERT_EQ(tablet_stats1.data_size(), 500000);
ASSERT_EQ(meta_reader.min_read_versionstamp(), tablet_stats_version1);
}
{
// Put another tablet load stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_load_stats_key =
versioned::tablet_load_stats_key({instance_id, tablet_id});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(2000);
tablet_stats.set_data_size(1000000);
versioned_put(txn.get(), tablet_load_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB tablet_stats2;
Versionstamp tablet_stats_version2;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_load_stats(txn.get(), tablet_id, &tablet_stats2,
&tablet_stats_version2);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats2.num_rows(), 2000);
ASSERT_EQ(tablet_stats2.data_size(), 1000000);
ASSERT_EQ(meta_reader.min_read_versionstamp(), tablet_stats_version2);
}
ASSERT_LT(tablet_stats_version1, tablet_stats_version2);
}
TEST(MetaReaderTest, GetTabletCompactStats) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 3001;
{
// Test key not found when no compact stats exist
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, &tablet_stats,
&tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a comprehensive tablet compact stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
TabletStatsPB tablet_stats;
// Set all compaction-related fields
tablet_stats.set_base_compaction_cnt(5);
tablet_stats.set_cumulative_compaction_cnt(10);
tablet_stats.set_cumulative_point(100);
tablet_stats.set_last_base_compaction_time_ms(1234567890);
tablet_stats.set_last_cumu_compaction_time_ms(2345678901);
tablet_stats.set_full_compaction_cnt(2);
tablet_stats.set_last_full_compaction_time_ms(3456789012);
// Set data-related fields
tablet_stats.set_num_rows(500);
tablet_stats.set_num_rowsets(8);
tablet_stats.set_num_segments(25);
tablet_stats.set_data_size(250000);
tablet_stats.set_index_size(25000);
tablet_stats.set_segment_size(300000);
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB tablet_stats1;
Versionstamp tablet_stats_version1;
{
// Test successful get with created transaction
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, &tablet_stats1,
&tablet_stats_version1);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify compaction-related fields
ASSERT_EQ(tablet_stats1.base_compaction_cnt(), 5);
ASSERT_EQ(tablet_stats1.cumulative_compaction_cnt(), 10);
ASSERT_EQ(tablet_stats1.cumulative_point(), 100);
ASSERT_EQ(tablet_stats1.last_base_compaction_time_ms(), 1234567890);
ASSERT_EQ(tablet_stats1.last_cumu_compaction_time_ms(), 2345678901);
ASSERT_EQ(tablet_stats1.full_compaction_cnt(), 2);
ASSERT_EQ(tablet_stats1.last_full_compaction_time_ms(), 3456789012);
// Verify data-related fields
ASSERT_EQ(tablet_stats1.num_rows(), 500);
ASSERT_EQ(tablet_stats1.num_rowsets(), 8);
ASSERT_EQ(tablet_stats1.num_segments(), 25);
ASSERT_EQ(tablet_stats1.data_size(), 250000);
ASSERT_EQ(tablet_stats1.index_size(), 25000);
ASSERT_EQ(tablet_stats1.segment_size(), 300000);
ASSERT_EQ(meta_reader.min_read_versionstamp(), tablet_stats_version1);
}
{
// Update tablet compact stats with new values
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
TabletStatsPB tablet_stats;
// Update compaction-related fields
tablet_stats.set_base_compaction_cnt(8);
tablet_stats.set_cumulative_compaction_cnt(15);
tablet_stats.set_cumulative_point(200);
tablet_stats.set_last_base_compaction_time_ms(1234567890 + 10000);
tablet_stats.set_last_cumu_compaction_time_ms(2345678901 + 10000);
tablet_stats.set_full_compaction_cnt(3);
tablet_stats.set_last_full_compaction_time_ms(3456789012 + 10000);
// Update data-related fields
tablet_stats.set_num_rows(1000);
tablet_stats.set_num_rowsets(12);
tablet_stats.set_num_segments(40);
tablet_stats.set_data_size(500000);
tablet_stats.set_index_size(50000);
tablet_stats.set_segment_size(600000);
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB tablet_stats2;
Versionstamp tablet_stats_version2;
{
// Test successful get with provided transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_compact_stats(
txn.get(), tablet_id, &tablet_stats2, &tablet_stats_version2);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify updated compaction-related fields
ASSERT_EQ(tablet_stats2.base_compaction_cnt(), 8);
ASSERT_EQ(tablet_stats2.cumulative_compaction_cnt(), 15);
ASSERT_EQ(tablet_stats2.cumulative_point(), 200);
ASSERT_EQ(tablet_stats2.last_base_compaction_time_ms(), 1234567890 + 10000);
ASSERT_EQ(tablet_stats2.last_cumu_compaction_time_ms(), 2345678901 + 10000);
ASSERT_EQ(tablet_stats2.full_compaction_cnt(), 3);
ASSERT_EQ(tablet_stats2.last_full_compaction_time_ms(), 3456789012 + 10000);
// Verify updated data-related fields
ASSERT_EQ(tablet_stats2.num_rows(), 1000);
ASSERT_EQ(tablet_stats2.num_rowsets(), 12);
ASSERT_EQ(tablet_stats2.num_segments(), 40);
ASSERT_EQ(tablet_stats2.data_size(), 500000);
ASSERT_EQ(tablet_stats2.index_size(), 50000);
ASSERT_EQ(tablet_stats2.segment_size(), 600000);
ASSERT_EQ(meta_reader.min_read_versionstamp(), tablet_stats_version2);
}
// Verify version ordering
ASSERT_LT(tablet_stats_version1, tablet_stats_version2);
// Test with snapshot version functionality
Versionstamp snapshot_version;
{
// Get current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
int64_t version = 0;
ASSERT_EQ(txn->get_read_version(&version), TxnErrorCode::TXN_OK);
snapshot_version = Versionstamp(version, 1);
}
{
// Update compact stats again
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(2000); // New value
tablet_stats.set_data_size(1000000); // New value
tablet_stats.set_base_compaction_cnt(10); // New value
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test reading with snapshot version - should get old data
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, &tablet_stats,
&tablet_stats_version, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get previous values (before the latest update)
ASSERT_EQ(tablet_stats.num_rows(), 1000); // Old value, not 2000
ASSERT_EQ(tablet_stats.data_size(), 500000); // Old value, not 1000000
ASSERT_EQ(tablet_stats.base_compaction_cnt(), 8); // Old value, not 10
}
{
// Test reading without snapshot - should get new data
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, &tablet_stats,
&tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get latest values
ASSERT_EQ(tablet_stats.num_rows(), 2000); // Updated value
ASSERT_EQ(tablet_stats.data_size(), 1000000); // Updated value
ASSERT_EQ(tablet_stats.base_compaction_cnt(), 10); // Updated value
}
{
// Test with snapshot flag but no snapshot version
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, &tablet_stats,
&tablet_stats_version, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get current values since snapshot flag is set but no snapshot version
ASSERT_EQ(tablet_stats.num_rows(), 2000);
ASSERT_EQ(tablet_stats.data_size(), 1000000);
}
{
// Test with nullptr parameters
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id, nullptr, nullptr);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Test getting non-existent tablet
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_id + 1, &tablet_stats,
&tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
}
TEST(MetaReaderTest, GetTabletMergedStats) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 3001;
{
// Test when both load and compact stats are not found
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_merged_stats(tablet_id, &tablet_stats,
&tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put tablet load stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_load_stats_key =
versioned::tablet_load_stats_key({instance_id, tablet_id});
TabletStatsPB load_stats;
load_stats.set_num_rows(1000);
load_stats.set_num_rowsets(10);
load_stats.set_num_segments(20);
load_stats.set_data_size(500000);
load_stats.set_index_size(50000);
load_stats.set_segment_size(600000);
versioned_put(txn.get(), tablet_load_stats_key, load_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test when only load stats exist (compact stats not found)
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB tablet_stats;
Versionstamp tablet_stats_version;
TxnErrorCode err = meta_reader.get_tablet_merged_stats(tablet_id, &tablet_stats,
&tablet_stats_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
Versionstamp load_version, compact_version;
{
// Put tablet compact stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_id});
TabletStatsPB compact_stats;
compact_stats.set_base_compaction_cnt(5);
compact_stats.set_cumulative_compaction_cnt(10);
compact_stats.set_cumulative_point(100);
compact_stats.set_last_base_compaction_time_ms(1234567890);
compact_stats.set_last_cumu_compaction_time_ms(2345678901);
compact_stats.set_full_compaction_cnt(2);
compact_stats.set_last_full_compaction_time_ms(3456789012);
compact_stats.set_num_rows(500);
compact_stats.set_num_rowsets(5);
compact_stats.set_num_segments(15);
compact_stats.set_data_size(250000);
compact_stats.set_index_size(25000);
compact_stats.set_segment_size(300000);
versioned_put(txn.get(), tablet_compact_stats_key, compact_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB merged_stats;
Versionstamp merged_version;
{
// Test merged stats
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err =
meta_reader.get_tablet_merged_stats(tablet_id, &merged_stats, &merged_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Check compaction-related fields (copied from compact stats)
EXPECT_EQ(merged_stats.base_compaction_cnt(), 5);
EXPECT_EQ(merged_stats.cumulative_compaction_cnt(), 10);
EXPECT_EQ(merged_stats.cumulative_point(), 100);
EXPECT_EQ(merged_stats.last_base_compaction_time_ms(), 1234567890);
EXPECT_EQ(merged_stats.last_cumu_compaction_time_ms(), 2345678901);
EXPECT_EQ(merged_stats.full_compaction_cnt(), 2);
EXPECT_EQ(merged_stats.last_full_compaction_time_ms(), 3456789012);
// Check data-related fields (sum of load stats and compact stats)
EXPECT_EQ(merged_stats.num_rows(), 1500); // 1000 + 500
EXPECT_EQ(merged_stats.num_rowsets(), 15); // 10 + 5
EXPECT_EQ(merged_stats.num_segments(), 35); // 20 + 15
EXPECT_EQ(merged_stats.data_size(), 750000); // 500000 + 250000
EXPECT_EQ(merged_stats.index_size(), 75000); // 50000 + 25000
EXPECT_EQ(merged_stats.segment_size(), 900000); // 600000 + 300000
// Check min_read_version - should be updated after reading both load and compact stats
ASSERT_NE(meta_reader.min_read_versionstamp(), Versionstamp::max());
}
{
// Get individual versions for comparison
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TabletStatsPB load_stats, compact_stats;
TxnErrorCode err1 =
meta_reader.get_tablet_load_stats(txn.get(), tablet_id, &load_stats, &load_version);
TxnErrorCode err2 = meta_reader.get_tablet_compact_stats(txn.get(), tablet_id,
&compact_stats, &compact_version);
ASSERT_EQ(err1, TxnErrorCode::TXN_OK);
ASSERT_EQ(err2, TxnErrorCode::TXN_OK);
// Merged version should be the min of load and compact versions
if (load_version < compact_version) {
ASSERT_EQ(merged_version, load_version);
} else {
ASSERT_EQ(merged_version, compact_version);
}
}
{
// Update load stats and test version update
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_load_stats_key =
versioned::tablet_load_stats_key({instance_id, tablet_id});
TabletStatsPB updated_load_stats;
updated_load_stats.set_num_rows(2000);
updated_load_stats.set_num_rowsets(20);
updated_load_stats.set_num_segments(40);
updated_load_stats.set_data_size(1000000);
updated_load_stats.set_index_size(100000);
updated_load_stats.set_segment_size(1200000);
versioned_put(txn.get(), tablet_load_stats_key, updated_load_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletStatsPB updated_merged_stats;
Versionstamp updated_merged_version;
{
// Test updated merged stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_merged_stats(
txn.get(), tablet_id, &updated_merged_stats, &updated_merged_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Check updated data-related fields (sum of updated load stats and compact stats)
EXPECT_EQ(updated_merged_stats.num_rows(), 2500); // 2000 + 500
EXPECT_EQ(updated_merged_stats.num_rowsets(), 25); // 20 + 5
EXPECT_EQ(updated_merged_stats.num_segments(), 55); // 40 + 15
EXPECT_EQ(updated_merged_stats.data_size(), 1250000); // 1000000 + 250000
EXPECT_EQ(updated_merged_stats.index_size(), 125000); // 100000 + 25000
EXPECT_EQ(updated_merged_stats.segment_size(), 1500000); // 1200000 + 300000
// Compaction fields should remain the same
EXPECT_EQ(updated_merged_stats.base_compaction_cnt(), 5);
EXPECT_EQ(updated_merged_stats.cumulative_compaction_cnt(), 10);
}
// Merged version should be updated
ASSERT_LT(merged_version, updated_merged_version);
}
TEST(MetaReaderTest, GetTabletIndex) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 5001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
TabletIndexPB tablet_index;
TxnErrorCode err = meta_reader.get_tablet_index(tablet_id, &tablet_index);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a tablet index
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_index_key = versioned::tablet_index_key({instance_id, tablet_id});
TabletIndexPB tablet_index;
tablet_index.set_db_id(1001);
tablet_index.set_table_id(2001);
tablet_index.set_index_id(3001);
tablet_index.set_partition_id(4001);
tablet_index.set_tablet_id(tablet_id);
txn->put(tablet_index_key, tablet_index.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletIndexPB tablet_index1;
{
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_index(tablet_id, &tablet_index1);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_index1.db_id(), 1001);
ASSERT_EQ(tablet_index1.table_id(), 2001);
ASSERT_EQ(tablet_index1.index_id(), 3001);
ASSERT_EQ(tablet_index1.partition_id(), 4001);
ASSERT_EQ(tablet_index1.tablet_id(), tablet_id);
}
{
// Put another tablet index (update)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_index_key = versioned::tablet_index_key({instance_id, tablet_id});
TabletIndexPB tablet_index;
tablet_index.set_db_id(1002);
tablet_index.set_table_id(2002);
tablet_index.set_index_id(3002);
tablet_index.set_partition_id(4002);
tablet_index.set_tablet_id(tablet_id);
txn->put(tablet_index_key, tablet_index.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
TabletIndexPB tablet_index2;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_index(txn.get(), tablet_id, &tablet_index2);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_index2.db_id(), 1002);
ASSERT_EQ(tablet_index2.table_id(), 2002);
ASSERT_EQ(tablet_index2.index_id(), 3002);
ASSERT_EQ(tablet_index2.partition_id(), 4002);
ASSERT_EQ(tablet_index2.tablet_id(), tablet_id);
}
}
TEST(MetaReaderTest, BatchGetTabletIndexes) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
std::vector<int64_t> tablet_ids = {5001, 5002, 5003, 5004};
{
// Test empty input
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> empty_ids;
std::unordered_map<int64_t, TabletIndexPB> tablet_indexes;
TxnErrorCode err = meta_reader.get_tablet_indexes(empty_ids, &tablet_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_indexes.empty());
}
{
// Put some tablet indexes (skip tablet_ids[1] to test partial results)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (size_t i = 0; i < tablet_ids.size(); ++i) {
if (i == 1) continue; // Skip tablet_ids[1]
std::string tablet_index_key =
versioned::tablet_index_key({instance_id, tablet_ids[i]});
TabletIndexPB tablet_index;
tablet_index.set_db_id(1000 + i);
tablet_index.set_table_id(2000 + i);
tablet_index.set_index_id(3000 + i);
tablet_index.set_partition_id(4000 + i);
tablet_index.set_tablet_id(tablet_ids[i]);
txn->put(tablet_index_key, tablet_index.SerializeAsString());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test partial results
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletIndexPB> tablet_indexes;
TxnErrorCode err = meta_reader.get_tablet_indexes(tablet_ids, &tablet_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_indexes.size(), 3); // All except tablet_ids[1]
for (size_t i = 0; i < tablet_ids.size(); ++i) {
if (i == 1) {
ASSERT_EQ(tablet_indexes.find(tablet_ids[i]), tablet_indexes.end());
} else {
ASSERT_NE(tablet_indexes.find(tablet_ids[i]), tablet_indexes.end());
ASSERT_EQ(tablet_indexes[tablet_ids[i]].db_id(), 1000 + i);
ASSERT_EQ(tablet_indexes[tablet_ids[i]].table_id(), 2000 + i);
ASSERT_EQ(tablet_indexes[tablet_ids[i]].index_id(), 3000 + i);
ASSERT_EQ(tablet_indexes[tablet_ids[i]].partition_id(), 4000 + i);
ASSERT_EQ(tablet_indexes[tablet_ids[i]].tablet_id(), tablet_ids[i]);
}
}
}
{
// Put the missing tablet index
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_index_key = versioned::tablet_index_key({instance_id, tablet_ids[1]});
TabletIndexPB tablet_index;
tablet_index.set_db_id(1001);
tablet_index.set_table_id(2001);
tablet_index.set_index_id(3001);
tablet_index.set_partition_id(4001);
tablet_index.set_tablet_id(tablet_ids[1]);
txn->put(tablet_index_key, tablet_index.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test all keys found
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletIndexPB> tablet_indexes;
TxnErrorCode err = meta_reader.get_tablet_indexes(txn.get(), tablet_ids, &tablet_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_indexes.size(), tablet_ids.size());
for (size_t i = 0; i < tablet_ids.size(); ++i) {
int64_t tablet_id = tablet_ids[i];
ASSERT_NE(tablet_indexes.find(tablet_id), tablet_indexes.end());
int64_t expected_db_id = (i == 1) ? 1001 : 1000 + i;
int64_t expected_table_id = (i == 1) ? 2001 : 2000 + i;
int64_t expected_index_id = (i == 1) ? 3001 : 3000 + i;
int64_t expected_partition_id = (i == 1) ? 4001 : 4000 + i;
ASSERT_EQ(tablet_indexes[tablet_id].db_id(), expected_db_id);
ASSERT_EQ(tablet_indexes[tablet_id].table_id(), expected_table_id);
ASSERT_EQ(tablet_indexes[tablet_id].index_id(), expected_index_id);
ASSERT_EQ(tablet_indexes[tablet_id].partition_id(), expected_partition_id);
ASSERT_EQ(tablet_indexes[tablet_id].tablet_id(), tablet_id);
}
}
}
TEST(MetaReaderTest, GetRowsetMetas) {
using doris::RowsetMetaCloudPB;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 4001;
int64_t start_version = 1;
int64_t end_version = 10;
{
// Test empty result when no rowsets exist
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err =
meta_reader.get_rowset_metas(tablet_id, start_version, end_version, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(rowset_metas.empty());
}
// Create some load rowsets (import scenario)
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Create load rowsets with versions 2, 3, 4, 5
for (int64_t version = 2; version <= 5; ++version) {
std::string load_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, version});
RowsetMetaCloudPB rowset_meta;
rowset_meta.set_rowset_id(0);
rowset_meta.set_rowset_id_v2(fmt::format("load_rowset_{}", version));
rowset_meta.set_start_version(version);
rowset_meta.set_end_version(version);
rowset_meta.set_num_rows(100 * version);
rowset_meta.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), load_key, std::move(rowset_meta)));
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test getting load rowsets
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK) << dump_range(txn_kv.get());
ASSERT_EQ(rowset_metas.size(), 4) << [&] {
std::ostringstream oss;
for (const auto& meta : rowset_metas) {
oss << meta.rowset_id_v2() << "[" << meta.start_version() << ", "
<< meta.end_version() << "]\n";
}
return oss.str();
}() << dump_range(txn_kv.get());
// Verify rowsets are sorted by end_version
for (size_t i = 0; i < rowset_metas.size(); ++i) {
ASSERT_EQ(rowset_metas[i].end_version(), 2 + i);
ASSERT_EQ(rowset_metas[i].start_version(), 2 + i);
ASSERT_EQ(rowset_metas[i].num_rows(), 100 * (2 + i));
}
}
// Create compact rowset that covers versions 3-4
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string compact_key = versioned::meta_rowset_compact_key({instance_id, tablet_id, 4});
RowsetMetaCloudPB compact_rowset;
compact_rowset.set_rowset_id(0);
compact_rowset.set_rowset_id_v2("compact_rowset_3_4");
compact_rowset.set_start_version(3);
compact_rowset.set_end_version(4);
compact_rowset.set_num_rows(700); // 300 + 400 = 700
compact_rowset.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), compact_key, std::move(compact_rowset)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test getting rowsets after compaction - compact should override load rowsets
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3); // version 2, compact(3-4), version 5
// Check first rowset (version 2)
ASSERT_EQ(rowset_metas[0].end_version(), 2);
ASSERT_EQ(rowset_metas[0].start_version(), 2);
ASSERT_EQ(rowset_metas[0].rowset_id_v2(), "load_rowset_2");
// Check compact rowset (versions 3-4)
ASSERT_EQ(rowset_metas[1].end_version(), 4);
ASSERT_EQ(rowset_metas[1].start_version(), 3);
ASSERT_EQ(rowset_metas[1].rowset_id_v2(), "compact_rowset_3_4");
ASSERT_EQ(rowset_metas[1].num_rows(), 700);
// Check last rowset (version 5)
ASSERT_EQ(rowset_metas[2].end_version(), 5);
ASSERT_EQ(rowset_metas[2].start_version(), 5);
ASSERT_EQ(rowset_metas[2].rowset_id_v2(), "load_rowset_5");
}
// Test range query that only includes part of the rowsets
{
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 3, 4, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 1); // Only the compact rowset
ASSERT_EQ(rowset_metas[0].rowset_id_v2(), "compact_rowset_3_4");
}
// Test with snapshot version functionality
Versionstamp snapshot_version;
{
// Get current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(txn.get(), tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Get version from the transaction's read version
int64_t version = 0;
ASSERT_EQ(txn->get_read_version(&version), TxnErrorCode::TXN_OK);
snapshot_version = Versionstamp(version, 1);
LOG(INFO) << "Snapshot version: " << snapshot_version.version();
LOG(INFO) << "Snapshot version: " << snapshot_version.to_string();
}
// Add another compact rowset that covers version 5
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string compact_key = versioned::meta_rowset_compact_key({instance_id, tablet_id, 5});
RowsetMetaCloudPB compact_rowset;
compact_rowset.set_rowset_id(0);
compact_rowset.set_rowset_id_v2("compact_rowset_5");
compact_rowset.set_start_version(5);
compact_rowset.set_end_version(5);
compact_rowset.set_num_rows(500);
compact_rowset.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), compact_key, std::move(compact_rowset)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test reading with snapshot version - should get old data
LOG(INFO) << "Reading with snapshot version: " << snapshot_version.version();
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3) << [&]() {
std::ostringstream oss;
for (const auto& meta : rowset_metas) {
oss << meta.rowset_id_v2() << "[" << meta.start_version() << ", "
<< meta.end_version() << "]\n";
}
return oss.str();
}() << dump_range(txn_kv.get()); // Should still see load_rowset_5, not compact_rowset_5
ASSERT_EQ(rowset_metas[2].rowset_id_v2(), "load_rowset_5") << dump_range(txn_kv.get());
}
{
// Test reading without snapshot - should get new data
LOG(INFO) << "Reading without snapshot version";
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3); // version 2, compact(3-4), compact(5)
ASSERT_EQ(rowset_metas[2].rowset_id_v2(), "compact_rowset_5");
}
// Add a compact rowset cover [3-5]
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string compact_key = versioned::meta_rowset_compact_key({instance_id, tablet_id, 5});
RowsetMetaCloudPB compact_rowset;
compact_rowset.set_rowset_id(0);
compact_rowset.set_rowset_id_v2("compact_rowset_3_5");
compact_rowset.set_start_version(3);
compact_rowset.set_end_version(5);
compact_rowset.set_num_rows(1200); // 300 + 400 + 500
compact_rowset.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), compact_key, std::move(compact_rowset)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test getting rowsets after new compaction
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 2); // version 2, compact(3-5)
// Check first rowset (version 2)
ASSERT_EQ(rowset_metas[0].end_version(), 2);
ASSERT_EQ(rowset_metas[0].start_version(), 2);
ASSERT_EQ(rowset_metas[0].rowset_id_v2(), "load_rowset_2");
// Check new compact rowset (versions 3-5)
ASSERT_EQ(rowset_metas[1].end_version(), 5);
ASSERT_EQ(rowset_metas[1].start_version(), 3);
ASSERT_EQ(rowset_metas[1].rowset_id_v2(), "compact_rowset_3_5");
}
{
// Test getting rowset with old snapshot version
LOG(INFO) << "Reading with old snapshot version: " << snapshot_version.version();
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
std::vector<RowsetMetaCloudPB> rowset_metas;
TxnErrorCode err = meta_reader.get_rowset_metas(tablet_id, 2, 5, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3) << [&]() {
std::ostringstream oss;
for (const auto& meta : rowset_metas) {
oss << meta.rowset_id_v2() << "[" << meta.start_version() << ", "
<< meta.end_version() << "]\n";
}
return oss.str();
}() << dump_range(txn_kv.get());
ASSERT_EQ(rowset_metas[2].rowset_id_v2(), "load_rowset_5") << dump_range(txn_kv.get());
// Should still see load_rowset_5, not compact_rowset_3_5
ASSERT_EQ(rowset_metas[2].start_version(), 5);
ASSERT_EQ(rowset_metas[2].end_version(), 5);
ASSERT_EQ(rowset_metas[2].num_rows(), 500) << dump_range(txn_kv.get());
}
}
TEST(MetaReaderTest, GetPartitionPendingTxnId) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t partition_id = 3001;
{
// Test with non-existent partition
MetaReader meta_reader(instance_id, txn_kv.get());
int64_t first_txn_id;
TxnErrorCode err = meta_reader.get_partition_pending_txn_id(partition_id, &first_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
EXPECT_EQ(first_txn_id, -1) << "Expected -1 for non-existent partition";
}
{
// Put a partition version without pending transactions
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(100);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test with no pending transactions - should return -1
MetaReader meta_reader(instance_id, txn_kv.get());
int64_t first_txn_id;
TxnErrorCode err = meta_reader.get_partition_pending_txn_id(partition_id, &first_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(first_txn_id, -1);
}
{
// Put a partition version with pending transactions
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(200);
version_pb.add_pending_txn_ids(1001);
version_pb.add_pending_txn_ids(1002);
version_pb.add_pending_txn_ids(1003);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test with pending transactions - should return first txn_id
MetaReader meta_reader(instance_id, txn_kv.get());
int64_t first_txn_id;
TxnErrorCode err = meta_reader.get_partition_pending_txn_id(partition_id, &first_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(first_txn_id, 1001);
}
{
// Test with transaction parameter
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
int64_t first_txn_id;
TxnErrorCode err =
meta_reader.get_partition_pending_txn_id(txn.get(), partition_id, &first_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(first_txn_id, 1001);
}
{
// Put a partition version with single pending transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(300);
version_pb.add_pending_txn_ids(2001);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test with single pending transaction
MetaReader meta_reader(instance_id, txn_kv.get());
int64_t first_txn_id;
TxnErrorCode err = meta_reader.get_partition_pending_txn_id(partition_id, &first_txn_id);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(first_txn_id, 2001);
}
{
// Test with snapshot functionality
// First get the current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
VersionPB current_version;
Versionstamp current_versionstamp;
MetaReader current_reader(instance_id, txn_kv.get());
TxnErrorCode err = current_reader.get_partition_version(
txn.get(), partition_id, &current_version, &current_versionstamp);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Create a MetaReader with snapshot
Versionstamp snapshot_version(current_version.version() + 1, 0);
MetaReader snapshot_reader(instance_id, txn_kv.get(), snapshot_version);
int64_t first_txn_id;
err = snapshot_reader.get_partition_pending_txn_id(partition_id, &first_txn_id, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(first_txn_id, 2001);
}
}
TEST(MetaReaderTest, GetIndexIndex) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t index_id = 3001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
IndexIndexPB index;
TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put an index index
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string index_index_key = versioned::index_index_key({instance_id, index_id});
IndexIndexPB index_index;
index_index.set_db_id(1001);
index_index.set_table_id(2001);
txn->put(index_index_key, index_index.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
MetaReader meta_reader(instance_id, txn_kv.get());
IndexIndexPB index;
TxnErrorCode err = meta_reader.get_index_index(index_id, &index);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(index.db_id(), 1001);
ASSERT_EQ(index.table_id(), 2001);
}
}
TEST(MetaReaderTest, GetPartitionIndex) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t partition_id = 4001;
{
// NOT FOUND
MetaReader meta_reader(instance_id, txn_kv.get());
PartitionIndexPB partition_index;
TxnErrorCode err = meta_reader.get_partition_index(partition_id, &partition_index);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a partition index
PartitionIndexPB partition_index_pb;
partition_index_pb.set_db_id(100);
partition_index_pb.set_table_id(200);
std::string partition_index_value;
ASSERT_TRUE(partition_index_pb.SerializeToString(&partition_index_value));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string partition_index_key =
versioned::partition_index_key({instance_id, partition_id});
txn->put(partition_index_key, partition_index_value);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
MetaReader meta_reader(instance_id, txn_kv.get());
PartitionIndexPB partition_index;
TxnErrorCode err = meta_reader.get_partition_index(partition_id, &partition_index);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(partition_index.db_id(), 100);
ASSERT_EQ(partition_index.table_id(), 200);
}
}
TEST(MetaReaderTest, GetLoadRowsetMeta) {
using doris::RowsetMetaCloudPB;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 5001;
int64_t version = 10;
{
// Test key not found when no rowset exists
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
// Create a load rowset
RowsetMetaCloudPB expected_rowset_meta;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string load_key = versioned::meta_rowset_load_key({instance_id, tablet_id, version});
expected_rowset_meta.set_rowset_id(0);
expected_rowset_meta.set_rowset_id_v2(fmt::format("test_load_rowset_{}", version));
expected_rowset_meta.set_start_version(version);
expected_rowset_meta.set_end_version(version);
expected_rowset_meta.set_num_rows(1000);
expected_rowset_meta.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), load_key, std::move(expected_rowset_meta)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test successful get with created transaction
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify fields match
ASSERT_EQ(rowset_meta.rowset_id_v2(), expected_rowset_meta.rowset_id_v2());
ASSERT_EQ(rowset_meta.start_version(), expected_rowset_meta.start_version());
ASSERT_EQ(rowset_meta.end_version(), expected_rowset_meta.end_version());
ASSERT_EQ(rowset_meta.num_rows(), expected_rowset_meta.num_rows());
ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
}
{
// Test successful get with provided transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err =
meta_reader.get_load_rowset_meta(txn.get(), tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify key fields match
ASSERT_EQ(rowset_meta.rowset_id_v2(), expected_rowset_meta.rowset_id_v2());
ASSERT_EQ(rowset_meta.start_version(), expected_rowset_meta.start_version());
ASSERT_EQ(rowset_meta.end_version(), expected_rowset_meta.end_version());
ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
}
// Test with snapshot version functionality
Versionstamp snapshot_version;
{
// Get current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
int64_t version_value = 0;
ASSERT_EQ(txn->get_read_version(&version_value), TxnErrorCode::TXN_OK);
snapshot_version = Versionstamp(version_value, 1);
}
// Update the rowset
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string load_key = versioned::meta_rowset_load_key({instance_id, tablet_id, version});
RowsetMetaCloudPB updated_rowset_meta = expected_rowset_meta;
updated_rowset_meta.set_num_rows(2000); // Update row count
ASSERT_TRUE(versioned::document_put(txn.get(), load_key, std::move(updated_rowset_meta)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test reading with snapshot version - should get old data
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get original values
ASSERT_EQ(rowset_meta.num_rows(), 1000);
}
{
// Test reading without snapshot - should get new data
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get updated values
ASSERT_EQ(rowset_meta.num_rows(), 2000);
}
{
// Test with snapshot flag
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version, &rowset_meta, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get current values since snapshot flag is set but no snapshot version
ASSERT_EQ(rowset_meta.num_rows(), 2000);
}
{
// Test getting non-existent version
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_load_rowset_meta(tablet_id, version + 1, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
}
TEST(MetaReaderTest, GetCompactRowsetMeta) {
using doris::RowsetMetaCloudPB;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 5001;
int64_t version = 10;
{
// Test key not found when no rowset exists
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_compact_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
// Create a load rowset
RowsetMetaCloudPB expected_rowset_meta;
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string load_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, version});
expected_rowset_meta.set_rowset_id(0);
expected_rowset_meta.set_rowset_id_v2(fmt::format("test_compact_rowset_{}", version));
expected_rowset_meta.set_start_version(version);
expected_rowset_meta.set_end_version(version);
expected_rowset_meta.set_num_rows(1000);
expected_rowset_meta.set_tablet_id(tablet_id);
ASSERT_TRUE(versioned::document_put(txn.get(), load_key, std::move(expected_rowset_meta)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test successful get with created transaction
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_compact_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify fields match
ASSERT_EQ(rowset_meta.rowset_id_v2(), expected_rowset_meta.rowset_id_v2());
ASSERT_EQ(rowset_meta.start_version(), expected_rowset_meta.start_version());
ASSERT_EQ(rowset_meta.end_version(), expected_rowset_meta.end_version());
ASSERT_EQ(rowset_meta.num_rows(), expected_rowset_meta.num_rows());
ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
}
{
// Test successful get with provided transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err =
meta_reader.get_compact_rowset_meta(txn.get(), tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Verify key fields match
ASSERT_EQ(rowset_meta.rowset_id_v2(), expected_rowset_meta.rowset_id_v2());
ASSERT_EQ(rowset_meta.start_version(), expected_rowset_meta.start_version());
ASSERT_EQ(rowset_meta.end_version(), expected_rowset_meta.end_version());
ASSERT_EQ(rowset_meta.tablet_id(), expected_rowset_meta.tablet_id());
}
// Test with snapshot version functionality
Versionstamp snapshot_version;
{
// Get current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
int64_t version_value = 0;
ASSERT_EQ(txn->get_read_version(&version_value), TxnErrorCode::TXN_OK);
snapshot_version = Versionstamp(version_value, 1);
}
// Update the rowset
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string load_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, version});
RowsetMetaCloudPB updated_rowset_meta = expected_rowset_meta;
updated_rowset_meta.set_num_rows(2000); // Update row count
ASSERT_TRUE(versioned::document_put(txn.get(), load_key, std::move(updated_rowset_meta)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test reading with snapshot version - should get old data
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_compact_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get original values
ASSERT_EQ(rowset_meta.num_rows(), 1000);
}
{
// Test reading without snapshot - should get new data
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err = meta_reader.get_compact_rowset_meta(tablet_id, version, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get updated values
ASSERT_EQ(rowset_meta.num_rows(), 2000);
}
{
// Test with snapshot flag
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err =
meta_reader.get_compact_rowset_meta(tablet_id, version, &rowset_meta, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
// Should get current values since snapshot flag is set but no snapshot version
ASSERT_EQ(rowset_meta.num_rows(), 2000);
}
{
// Test getting non-existent version
MetaReader meta_reader(instance_id, txn_kv.get());
RowsetMetaCloudPB rowset_meta;
TxnErrorCode err =
meta_reader.get_compact_rowset_meta(tablet_id, version + 1, &rowset_meta);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
}
TEST(MetaReaderTest, BatchGetTabletCompactStats) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
std::vector<int64_t> tablet_ids = {4001, 4002, 4003, 4004};
{
// Test empty input
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> empty_ids;
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_tablet_compact_stats(empty_ids, &tablet_stats, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_stats.empty());
ASSERT_TRUE(versionstamps.empty());
}
{
// Test all keys not found
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_tablet_compact_stats(tablet_ids, &tablet_stats, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_stats.empty());
ASSERT_TRUE(versionstamps.empty());
}
{
// Put some tablet compact stats (skip tablet_ids[1] to test partial results)
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (size_t i = 0; i < tablet_ids.size(); ++i) {
if (i == 1) continue; // Skip tablet_ids[1]
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_ids[i]});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(1000 * (i + 1));
tablet_stats.set_data_size(500000 * (i + 1));
tablet_stats.set_base_compaction_cnt(5 * (i + 1));
tablet_stats.set_cumulative_compaction_cnt(10 * (i + 1));
tablet_stats.set_cumulative_point(100 * (i + 1));
tablet_stats.set_last_base_compaction_time_ms(1234567890 + i * 1000);
tablet_stats.set_last_cumu_compaction_time_ms(2345678901 + i * 1000);
tablet_stats.set_full_compaction_cnt(2 * (i + 1));
tablet_stats.set_last_full_compaction_time_ms(3456789012 + i * 1000);
tablet_stats.set_num_rowsets(20 * (i + 1));
tablet_stats.set_num_segments(50 * (i + 1));
tablet_stats.set_index_size(50000 * (i + 1));
tablet_stats.set_segment_size(600000 * (i + 1));
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test partial results
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_tablet_compact_stats(tablet_ids, &tablet_stats, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats.size(), 3); // All except tablet_ids[1]
ASSERT_EQ(versionstamps.size(), 3); // All except tablet_ids[1]
for (size_t i = 0; i < tablet_ids.size(); ++i) {
int64_t tablet_id = tablet_ids[i];
if (i == 1) {
ASSERT_EQ(tablet_stats.find(tablet_id), tablet_stats.end());
ASSERT_EQ(versionstamps.find(tablet_id), versionstamps.end());
} else {
ASSERT_NE(tablet_stats.find(tablet_id), tablet_stats.end());
ASSERT_NE(versionstamps.find(tablet_id), versionstamps.end());
// Verify stats data
const TabletStatsPB& stats = tablet_stats[tablet_id];
ASSERT_EQ(stats.num_rows(), 1000 * (i + 1));
ASSERT_EQ(stats.data_size(), 500000 * (i + 1));
ASSERT_EQ(stats.base_compaction_cnt(), 5 * (i + 1));
ASSERT_EQ(stats.cumulative_compaction_cnt(), 10 * (i + 1));
ASSERT_EQ(stats.cumulative_point(), 100 * (i + 1));
ASSERT_EQ(stats.last_base_compaction_time_ms(), 1234567890 + i * 1000);
ASSERT_EQ(stats.last_cumu_compaction_time_ms(), 2345678901 + i * 1000);
ASSERT_EQ(stats.full_compaction_cnt(), 2 * (i + 1));
ASSERT_EQ(stats.last_full_compaction_time_ms(), 3456789012 + i * 1000);
ASSERT_EQ(stats.num_rowsets(), 20 * (i + 1));
ASSERT_EQ(stats.num_segments(), 50 * (i + 1));
ASSERT_EQ(stats.index_size(), 50000 * (i + 1));
ASSERT_EQ(stats.segment_size(), 600000 * (i + 1));
}
}
}
{
// Put the missing tablet compact stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_ids[1]});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(2000);
tablet_stats.set_data_size(1000000);
tablet_stats.set_base_compaction_cnt(10);
tablet_stats.set_cumulative_compaction_cnt(20);
tablet_stats.set_cumulative_point(200);
tablet_stats.set_last_base_compaction_time_ms(1234567890 + 1000);
tablet_stats.set_last_cumu_compaction_time_ms(2345678901 + 1000);
tablet_stats.set_full_compaction_cnt(4);
tablet_stats.set_last_full_compaction_time_ms(3456789012 + 1000);
tablet_stats.set_num_rowsets(40);
tablet_stats.set_num_segments(100);
tablet_stats.set_index_size(100000);
tablet_stats.set_segment_size(1200000);
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test all keys found
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(txn.get(), tablet_ids,
&tablet_stats, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats.size(), tablet_ids.size());
ASSERT_EQ(versionstamps.size(), tablet_ids.size());
for (size_t i = 0; i < tablet_ids.size(); ++i) {
int64_t tablet_id = tablet_ids[i];
ASSERT_NE(tablet_stats.find(tablet_id), tablet_stats.end());
ASSERT_NE(versionstamps.find(tablet_id), versionstamps.end());
// Verify stats data for all tablets
const TabletStatsPB& stats = tablet_stats[tablet_id];
if (i == 1) {
// Special case for the tablet we added later
ASSERT_EQ(stats.num_rows(), 2000);
ASSERT_EQ(stats.data_size(), 1000000);
ASSERT_EQ(stats.base_compaction_cnt(), 10);
ASSERT_EQ(stats.cumulative_compaction_cnt(), 20);
} else {
ASSERT_EQ(stats.num_rows(), 1000 * (i + 1));
ASSERT_EQ(stats.data_size(), 500000 * (i + 1));
ASSERT_EQ(stats.base_compaction_cnt(), 5 * (i + 1));
ASSERT_EQ(stats.cumulative_compaction_cnt(), 10 * (i + 1));
}
}
}
{
// Test with snapshot version functionality
Versionstamp snapshot_version;
{
// Get current snapshot version
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
int64_t version = 0;
ASSERT_EQ(txn->get_read_version(&version), TxnErrorCode::TXN_OK);
snapshot_version = Versionstamp(version, 1);
}
// Update one of the tablet compact stats
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string tablet_compact_stats_key =
versioned::tablet_compact_stats_key({instance_id, tablet_ids[0]});
TabletStatsPB tablet_stats;
tablet_stats.set_num_rows(9999); // Updated value
tablet_stats.set_data_size(999999); // Updated value
tablet_stats.set_base_compaction_cnt(99);
tablet_stats.set_cumulative_compaction_cnt(199);
versioned_put(txn.get(), tablet_compact_stats_key, tablet_stats.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Test reading with snapshot version - should get old data
{
MetaReader meta_reader(instance_id, txn_kv.get(), snapshot_version);
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_ids, &tablet_stats,
&versionstamps, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats.size(), tablet_ids.size());
// Should still see old data for tablet_ids[0]
const TabletStatsPB& stats = tablet_stats[tablet_ids[0]];
ASSERT_EQ(stats.num_rows(), 1000); // Old value, not 9999
ASSERT_EQ(stats.data_size(), 500000); // Old value, not 999999
}
// Test reading without snapshot - should get new data
{
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_tablet_compact_stats(tablet_ids, &tablet_stats, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats.size(), tablet_ids.size());
// Should see new data for tablet_ids[0]
const TabletStatsPB& stats = tablet_stats[tablet_ids[0]];
ASSERT_EQ(stats.num_rows(), 9999); // Updated value
ASSERT_EQ(stats.data_size(), 999999); // Updated value
}
}
{
// Test with nullptr parameters
MetaReader meta_reader(instance_id, txn_kv.get());
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_ids, nullptr, nullptr);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Test with only tablet_stats parameter
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, TabletStatsPB> tablet_stats;
TxnErrorCode err = meta_reader.get_tablet_compact_stats(tablet_ids, &tablet_stats, nullptr);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_stats.size(), tablet_ids.size());
}
{
// Test with only versionstamps parameter
MetaReader meta_reader(instance_id, txn_kv.get());
std::unordered_map<int64_t, Versionstamp> versionstamps;
TxnErrorCode err =
meta_reader.get_tablet_compact_stats(tablet_ids, nullptr, &versionstamps);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(versionstamps.size(), tablet_ids.size());
}
}
TEST(MetaReaderTest, GetSnapshots) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
{
// Test empty result when no snapshots exist
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
TxnErrorCode err = meta_reader.get_snapshots(&snapshots);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(snapshots.empty());
}
// Create some snapshots
std::vector<Versionstamp> expected_versionstamps;
{
// Create multiple snapshots with different timestamps
for (int i = 1; i <= 3; ++i) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string snapshot_key = versioned::snapshot_full_key({instance_id});
SnapshotPB snapshot_pb;
snapshot_pb.set_label(fmt::format("snapshot_{}", i));
snapshot_pb.set_instance_id(instance_id);
std::string snapshot_value = snapshot_pb.SerializeAsString();
versioned_put(txn.get(), snapshot_key, snapshot_value);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
}
{
// Test getting snapshots with created transaction
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
TxnErrorCode err = meta_reader.get_snapshots(&snapshots);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(snapshots.size(), 3);
// Verify snapshots are returned (order may vary due to versionstamp ordering)
std::set<std::string> snapshot_ids;
for (const auto& [snapshot_pb, versionstamp] : snapshots) {
snapshot_ids.insert(snapshot_pb.label());
ASSERT_EQ(snapshot_pb.instance_id(), instance_id);
}
ASSERT_EQ(snapshot_ids.size(), 3);
ASSERT_TRUE(snapshot_ids.count("snapshot_1"));
ASSERT_TRUE(snapshot_ids.count("snapshot_2"));
ASSERT_TRUE(snapshot_ids.count("snapshot_3"));
}
}
TEST(MetaReaderTest, GetLoadRowsetMetas) {
using doris::RowsetMetaCloudPB;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 5001;
{
// Test with no rowset metas
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err = meta_reader.get_load_rowset_metas(tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(rowset_metas.empty());
}
{
// Put some load rowset metas
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (int64_t version = 1; version <= 3; ++version) {
std::string load_rowset_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, version});
RowsetMetaCloudPB rowset_meta;
rowset_meta.set_rowset_id(1);
rowset_meta.set_rowset_id_v2("load_rowset_" + std::to_string(version));
rowset_meta.set_start_version(version);
rowset_meta.set_end_version(version);
rowset_meta.set_num_rows(100 * version);
rowset_meta.set_data_disk_size(1000 * version);
ASSERT_TRUE(
versioned::document_put(txn.get(), load_rowset_key, std::move(rowset_meta)));
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test getting load rowset metas
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err = meta_reader.get_load_rowset_metas(tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3);
// Verify the rowset metas are correct
for (size_t i = 0; i < rowset_metas.size(); ++i) {
const auto& [rowset_meta, versionstamp] = rowset_metas[i];
int64_t expected_version = rowset_meta.start_version();
ASSERT_EQ(rowset_meta.rowset_id_v2(),
"load_rowset_" + std::to_string(expected_version));
ASSERT_EQ(rowset_meta.start_version(), expected_version);
ASSERT_EQ(rowset_meta.end_version(), expected_version);
ASSERT_EQ(rowset_meta.num_rows(), 100 * expected_version);
ASSERT_EQ(rowset_meta.data_disk_size(), 1000 * expected_version);
}
// Check min_read_versionstamp is updated
ASSERT_NE(meta_reader.min_read_versionstamp(), Versionstamp::max());
}
{
// Test with transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err = meta_reader.get_load_rowset_metas(txn.get(), tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3);
}
}
TEST(MetaReaderTest, GetCompactRowsetMetas) {
using doris::RowsetMetaCloudPB;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t tablet_id = 5002;
{
// Test with no rowset metas
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err = meta_reader.get_compact_rowset_metas(tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(rowset_metas.empty());
}
{
// Put some compact rowset metas
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Create compact rowsets with different version ranges
std::vector<std::pair<int64_t, int64_t>> version_ranges = {{1, 3}, {4, 6}, {7, 9}};
for (size_t i = 0; i < version_ranges.size(); ++i) {
auto [start_version, end_version] = version_ranges[i];
std::string compact_rowset_key =
versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version});
RowsetMetaCloudPB rowset_meta;
rowset_meta.set_rowset_id(0);
rowset_meta.set_rowset_id_v2("compact_rowset_" + std::to_string(i + 1));
rowset_meta.set_start_version(start_version);
rowset_meta.set_end_version(end_version);
rowset_meta.set_num_rows(300 * (i + 1));
rowset_meta.set_data_disk_size(3000 * (i + 1));
ASSERT_TRUE(
versioned::document_put(txn.get(), compact_rowset_key, std::move(rowset_meta)));
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test getting compact rowset metas
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err = meta_reader.get_compact_rowset_metas(tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3);
// Verify the rowset metas are correct
std::vector<std::pair<int64_t, int64_t>> expected_ranges = {{1, 3}, {4, 6}, {7, 9}};
// Sort by end_version for consistent comparison
std::sort(rowset_metas.begin(), rowset_metas.end(), [](const auto& a, const auto& b) {
return a.first.end_version() < b.first.end_version();
});
for (size_t i = 0; i < rowset_metas.size(); ++i) {
const auto& [rowset_meta, versionstamp] = rowset_metas[i];
auto [expected_start, expected_end] = expected_ranges[i];
ASSERT_EQ(rowset_meta.rowset_id_v2(), "compact_rowset_" + std::to_string(i + 1));
ASSERT_EQ(rowset_meta.start_version(), expected_start);
ASSERT_EQ(rowset_meta.end_version(), expected_end);
ASSERT_EQ(rowset_meta.num_rows(), 300 * (i + 1));
ASSERT_EQ(rowset_meta.data_disk_size(), 3000 * (i + 1));
}
// Check min_read_versionstamp is updated
ASSERT_NE(meta_reader.min_read_versionstamp(), Versionstamp::max());
}
{
// Test with transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<std::pair<RowsetMetaCloudPB, Versionstamp>> rowset_metas;
TxnErrorCode err =
meta_reader.get_compact_rowset_metas(txn.get(), tablet_id, &rowset_metas);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(rowset_metas.size(), 3);
}
}
TEST(MetaReaderTest, HasSnapshotReferences) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
Versionstamp snapshot_version(12345, 0);
{
// Test when no snapshot references exist - should return false
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = true; // Initialize to true to test the change
TxnErrorCode err = meta_reader.has_snapshot_references(snapshot_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(has_references);
}
{
// Create some snapshot reference entries with empty values as specified
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Add snapshot reference entries
std::string ref_key1 = versioned::snapshot_reference_key(
{instance_id, snapshot_version, "ref_instance_1"});
std::string ref_key2 = versioned::snapshot_reference_key(
{instance_id, snapshot_version, "ref_instance_2"});
std::string ref_key3 = versioned::snapshot_reference_key(
{instance_id, snapshot_version, "ref_instance_3"});
// Put empty values for snapshot_reference_key as specified in requirements
txn->put(ref_key1, "");
txn->put(ref_key2, "");
txn->put(ref_key3, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test when snapshot references exist - should return true
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = false; // Initialize to false to test the change
TxnErrorCode err = meta_reader.has_snapshot_references(snapshot_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references);
}
{
// Test with transaction parameter
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = false;
TxnErrorCode err =
meta_reader.has_snapshot_references(txn.get(), snapshot_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references);
}
{
// Test with different snapshot version that has no references
Versionstamp different_snapshot_version(54321, 0);
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = true;
TxnErrorCode err =
meta_reader.has_snapshot_references(different_snapshot_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(has_references);
}
{
// Test snapshot functionality
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = false;
TxnErrorCode err = meta_reader.has_snapshot_references(txn.get(), snapshot_version,
&has_references, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references);
}
{
// Test with a range of snapshot references
// Create additional references with different versions to test prefix matching
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
Versionstamp nearby_version1(12344, 0); // One less
Versionstamp nearby_version2(12346, 0); // One more
std::string ref_key_before =
versioned::snapshot_reference_key({instance_id, nearby_version1, "ref_before"});
std::string ref_key_after =
versioned::snapshot_reference_key({instance_id, nearby_version2, "ref_after"});
txn->put(ref_key_before, "");
txn->put(ref_key_after, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test that the method only finds references for the exact snapshot version
MetaReader meta_reader(instance_id, txn_kv.get());
bool has_references = false;
TxnErrorCode err = meta_reader.has_snapshot_references(snapshot_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references); // Should still find the original references
// Test nearby versions
Versionstamp nearby_version1(12344, 0);
has_references = false;
err = meta_reader.has_snapshot_references(nearby_version1, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references); // Should find the reference for this version
Versionstamp nearby_version2(12346, 0);
has_references = false;
err = meta_reader.has_snapshot_references(nearby_version2, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(has_references); // Should find the reference for this version
// Test a version with no references
Versionstamp no_ref_version(99999, 0);
has_references = true;
err = meta_reader.has_snapshot_references(no_ref_version, &has_references);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(has_references); // Should not find any references
}
}
TEST(MetaReaderTest, HasNoIndexes) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
int64_t db_id = 1001;
int64_t table_id = 2001;
{
// Test when no indexes exist - should return true
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = false;
TxnErrorCode err = meta_reader.has_no_indexes(db_id, table_id, &no_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(no_indexes);
}
{
// Insert some index_inverted_key entries with empty values
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Add index entries for the table
std::string index_key1 = versioned::index_inverted_key({instance_id, db_id, table_id, 1});
std::string index_key2 = versioned::index_inverted_key({instance_id, db_id, table_id, 2});
std::string index_key3 = versioned::index_inverted_key({instance_id, db_id, table_id, 3});
// Put empty values for index_inverted_key as specified in requirements
txn->put(index_key1, "");
txn->put(index_key2, "");
txn->put(index_key3, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
// Test when indexes exist - should return false
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = true;
TxnErrorCode err = meta_reader.has_no_indexes(db_id, table_id, &no_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(no_indexes);
}
{
// Test with transaction
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = true;
TxnErrorCode err = meta_reader.has_no_indexes(txn.get(), db_id, table_id, &no_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(no_indexes);
}
{
// Test with different table_id that has no indexes
int64_t empty_table_id = 3001;
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = false;
TxnErrorCode err = meta_reader.has_no_indexes(db_id, empty_table_id, &no_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(no_indexes);
}
{
// Test with different db_id that has no indexes
int64_t empty_db_id = 2001;
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = false;
TxnErrorCode err = meta_reader.has_no_indexes(empty_db_id, table_id, &no_indexes);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(no_indexes);
}
{
// Test snapshot functionality
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
bool no_indexes = true;
TxnErrorCode err =
meta_reader.has_no_indexes(txn.get(), db_id, table_id, &no_indexes, true);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_FALSE(no_indexes);
}
}
TEST(MetaReaderTest, GetAllTabletIds) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
std::string instance_id = "test_instance";
// Prepare test data - multiple tablet metas
struct TestTabletMeta {
int64_t tablet_id;
int64_t table_id;
int64_t partition_id;
int64_t index_id;
};
std::vector<TestTabletMeta> test_tablets = {
{.tablet_id = 1001, .table_id = 2001, .partition_id = 3001, .index_id = 4001},
{.tablet_id = 1002, .table_id = 2001, .partition_id = 3001, .index_id = 4001},
{.tablet_id = 1003, .table_id = 2002, .partition_id = 3002, .index_id = 4002},
{.tablet_id = 1004, .table_id = 2002, .partition_id = 3003, .index_id = 4003},
{.tablet_id = 1005, .table_id = 2003, .partition_id = 3004, .index_id = 4004},
};
// Store original tablet metas for comparison
std::map<int64_t, TabletIndexPB> original_metas;
{
// Create and insert tablet metas
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& test_tablet : test_tablets) {
TabletIndexPB tablet_idx;
tablet_idx.set_tablet_id(test_tablet.tablet_id);
tablet_idx.set_table_id(test_tablet.table_id);
tablet_idx.set_partition_id(test_tablet.partition_id);
tablet_idx.set_index_id(test_tablet.index_id);
std::string tablet_key =
versioned::tablet_index_key({instance_id, test_tablet.tablet_id});
// Store original for comparison
original_metas[test_tablet.tablet_id] = tablet_idx;
txn->put(tablet_key, tablet_idx.SerializeAsString());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
{
MetaReader meta_reader(instance_id, txn_kv.get());
std::vector<int64_t> tablet_ids;
TxnErrorCode err = meta_reader.get_all_tablet_ids(&tablet_ids, false);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(tablet_ids.size(), test_tablets.size());
for (const auto& tablet_id : tablet_ids) {
TabletIndexPB tablet_idx;
std::string tablet_key = versioned::tablet_index_key({instance_id, tablet_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
ASSERT_EQ(txn->get(tablet_key, &value), TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_idx.ParseFromString(value));
ASSERT_EQ(original_metas[tablet_id].table_id(), tablet_idx.table_id());
ASSERT_EQ(original_metas[tablet_id].partition_id(), tablet_idx.partition_id());
ASSERT_EQ(original_metas[tablet_id].index_id(), tablet_idx.index_id());
ASSERT_EQ(original_metas[tablet_id].tablet_id(), tablet_idx.tablet_id());
}
}
}
TEST(MetaReaderTest, FindDerivedInstanceIdsTest) {
auto txn_kv = std::make_shared<MemTxnKv>();
std::string instance_id = "test_instance";
Versionstamp snapshot_version(0x0102030405060708ULL, 0x090a);
// Create snapshot reference keys for multiple derived instances
std::vector<std::string> expected_derived_ids = {
"derived_instance_1",
"derived_instance_2",
"derived_instance_3",
"derived_instance_4",
};
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
for (const auto& derived_id : expected_derived_ids) {
versioned::SnapshotReferenceKeyInfo ref_key_info {instance_id, snapshot_version,
derived_id};
std::string ref_key = versioned::snapshot_reference_key(ref_key_info);
txn->put(ref_key, ""); // Value is empty for reference keys
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Test find_derived_instance_ids
{
MetaReader reader(instance_id, txn_kv.get());
std::vector<std::string> derived_ids;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
TxnErrorCode err =
reader.find_derived_instance_ids(txn.get(), snapshot_version, &derived_ids, false);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(derived_ids.size(), expected_derived_ids.size());
// Convert to set for comparison (order doesn't matter)
std::unordered_set<std::string> derived_set(derived_ids.begin(), derived_ids.end());
std::unordered_set<std::string> expected_set(expected_derived_ids.begin(),
expected_derived_ids.end());
ASSERT_EQ(derived_set, expected_set);
}
// Test with no references
{
Versionstamp empty_snapshot_version(0x0a0b0c0d0e0f1011ULL, 0x1213);
MetaReader reader(instance_id, txn_kv.get());
std::vector<std::string> derived_ids;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
TxnErrorCode err = reader.find_derived_instance_ids(txn.get(), empty_snapshot_version,
&derived_ids, false);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_TRUE(derived_ids.empty());
}
// Test with duplicate instance IDs (should be deduplicated)
{
Versionstamp dup_snapshot_version(0x1112131415161718ULL, 0x191a);
std::string dup_instance_id = "duplicate_instance";
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
// Create multiple references with the same derived instance ID
// This simulates multiple tables/databases referencing the same snapshot
for (int i = 0; i < 5; ++i) {
versioned::SnapshotReferenceKeyInfo ref_key_info {instance_id, dup_snapshot_version,
dup_instance_id};
std::string ref_key = versioned::snapshot_reference_key(ref_key_info);
txn->put(ref_key, "");
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
MetaReader reader(instance_id, txn_kv.get());
std::vector<std::string> derived_ids;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
TxnErrorCode err = reader.find_derived_instance_ids(txn.get(), dup_snapshot_version,
&derived_ids, false);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
ASSERT_EQ(derived_ids.size(), 1); // Should be deduplicated
ASSERT_EQ(derived_ids[0], dup_instance_id);
}
}