blob: f7bc0947952333df7c68417b74431f79dd61599c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <butil/strings/string_split.h>
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest.h>
#include <cstdint>
#include <memory>
#include <string>
#include "common/config.h"
#include "common/util.h"
#include "meta-service/meta_service.h"
#include "meta-store/document_message.h"
#include "meta-store/keys.h"
#include "meta-store/mem_txn_kv.h"
#include "meta-store/meta_reader.h"
#include "meta-store/txn_kv.h"
#include "meta-store/txn_kv_error.h"
#include "meta-store/versioned_value.h"
#include "recycler/checker.h"
#include "recycler/recycler.h"
#include "recycler/util.h"
using namespace doris::cloud;
extern std::string instance_id;
extern int64_t current_time;
extern doris::cloud::RecyclerThreadPoolGroup thread_group;
// Convert a string to a hex-escaped string.
// A non-displayed character is represented as \xHH where HH is the hexadecimal value of the character.
// A displayed character is represented as itself.
static std::string escape_hex(std::string_view data) {
std::string result;
for (char c : data) {
if (isprint(c)) {
result += c;
} else {
result += fmt::format("\\x{:02x}", static_cast<unsigned char>(c));
}
}
return result;
}
static size_t count_range(TxnKv* txn_kv, std::string_view begin = "",
std::string_view end = "\xFF") {
std::unique_ptr<Transaction> txn;
EXPECT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
if (!txn) {
return 0; // Failed to create transaction
}
FullRangeGetOptions opts;
opts.txn = txn.get();
auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts));
size_t total = 0;
for (auto&& kvp = iter->next(); kvp.has_value(); kvp = iter->next()) {
total += 1;
}
EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call.
return total;
}
static bool is_empty_range(TxnKv* txn_kv, std::string_view begin = "",
std::string_view end = "\xFF") {
return count_range(txn_kv, begin, end) == 0;
}
static std::string dump_range(TxnKv* txn_kv, std::string_view begin = "",
std::string_view end = "\xFF") {
std::unique_ptr<Transaction> txn;
if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) {
return "Failed to create dump range transaction";
}
FullRangeGetOptions opts;
opts.txn = txn.get();
auto iter = txn_kv->full_range_get(std::string(begin), std::string(end), std::move(opts));
std::string buffer;
for (auto&& kv = iter->next(); kv.has_value(); kv = iter->next()) {
buffer +=
fmt::format("Key: {}, Value: {}\n", escape_hex(kv->first), escape_hex(kv->second));
}
EXPECT_TRUE(iter->is_valid()); // The iterator should still be valid after the next call.
return buffer;
}
static void update_instance_info(TxnKv* txn_kv, const InstanceInfoPB& instance) {
std::string key = instance_key({instance_id});
std::string value = instance.SerializeAsString();
ASSERT_FALSE(value.empty()) << "Failed to serialize instance info";
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
txn->put(key, value);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit transaction";
}
static void remove_instance_info(TxnKv* txn_kv) {
std::string key = instance_key({instance_id});
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK) << "Failed to create transaction";
txn->remove(key);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK) << "Failed to commit transaction";
}
TEST(RecycleOperationLogTest, RecycleOneOperationLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
auto* obj_info = instance.add_obj_info();
obj_info->set_id("recycle_empty");
obj_info->set_ak(config::test_s3_ak);
obj_info->set_sk(config::test_s3_sk);
obj_info->set_endpoint(config::test_s3_endpoint);
obj_info->set_region(config::test_s3_region);
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_empty");
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
{
// Put a empty operation log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
std::string log_key_with_versionstamp = encode_versioned_key(log_key, versionstamp);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
txn->put(log_key_with_versionstamp, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Scan the operation logs to verify that the empty log is removed.
remove_instance_info(txn_kv.get());
ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
}
TEST(RecycleOperationLogTest, RecycleCommitPartitionLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t index_id = 3;
uint64_t partition_id = 4;
{
// Update the table version
std::string ver_key = versioned::table_version_key({instance_id, table_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), ver_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Put a commit partition log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_partition = operation_log.mutable_commit_partition();
commit_partition->set_db_id(db_id);
commit_partition->set_table_id(table_id);
commit_partition->add_index_ids(index_id);
commit_partition->add_partition_ids(partition_id);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// The above table version key should be removed.
{
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a new commit partition log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_partition = operation_log.mutable_commit_partition();
commit_partition->set_db_id(db_id);
commit_partition->set_table_id(table_id);
commit_partition->add_index_ids(index_id);
commit_partition->add_partition_ids(partition_id);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Scan the operation logs to verify that the commit partition log is removed.
remove_instance_info(txn_kv.get());
ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
}
TEST(RecycleOperationLogTest, RecycleDropPartitionLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t index_id = 3;
uint64_t partition_id = 4;
int64_t expiration = ::time(nullptr) + 3600; // 1 hour from now
{
// Update the table version, it will be removed after the drop partition log is recycled
std::string ver_key = versioned::table_version_key({instance_id, table_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), ver_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Put a drop partition log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* drop_partition = operation_log.mutable_drop_partition();
drop_partition->set_db_id(db_id);
drop_partition->set_table_id(table_id);
drop_partition->add_index_ids(index_id);
drop_partition->add_partition_ids(partition_id);
drop_partition->set_expiration(expiration);
drop_partition->set_update_table_version(
true); // Update table version to ensure the table version is removed
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Verify that the recycle partition record is created
{
std::string recycle_key = recycle_partition_key({instance_id, partition_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecyclePartitionPB recycle_partition_pb;
ASSERT_TRUE(recycle_partition_pb.ParseFromString(value));
ASSERT_EQ(recycle_partition_pb.db_id(), db_id);
ASSERT_EQ(recycle_partition_pb.table_id(), table_id);
ASSERT_EQ(recycle_partition_pb.index_id_size(), 1);
ASSERT_EQ(recycle_partition_pb.index_id(0), index_id);
ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED);
ASSERT_EQ(recycle_partition_pb.expiration(), expiration);
}
// The table version key should be removed because update_table_version is true
{
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a new drop partition log without updating table version
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(124, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* drop_partition = operation_log.mutable_drop_partition();
drop_partition->set_db_id(db_id);
drop_partition->set_table_id(table_id);
drop_partition->add_index_ids(index_id);
drop_partition->add_partition_ids(partition_id + 1); // Different partition
drop_partition->set_expiration(expiration);
drop_partition->set_update_table_version(false); // Don't update table version
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Verify that the second recycle partition record is created
{
std::string recycle_key = recycle_partition_key({instance_id, partition_id + 1});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecyclePartitionPB recycle_partition_pb;
ASSERT_TRUE(recycle_partition_pb.ParseFromString(value));
ASSERT_EQ(recycle_partition_pb.db_id(), db_id);
ASSERT_EQ(recycle_partition_pb.table_id(), table_id);
ASSERT_EQ(recycle_partition_pb.index_id_size(), 1);
ASSERT_EQ(recycle_partition_pb.index_id(0), index_id);
ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED);
ASSERT_EQ(recycle_partition_pb.expiration(), expiration);
}
// Scan the operation logs to verify that the drop partition logs are removed.
remove_instance_info(txn_kv.get());
// Verify that the recycle partition records are created and operation logs are removed
{
// Check that the first recycle partition record exists
std::string recycle_key1 = recycle_partition_key({instance_id, partition_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key1, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecyclePartitionPB recycle_partition_pb;
ASSERT_TRUE(recycle_partition_pb.ParseFromString(value));
ASSERT_EQ(recycle_partition_pb.db_id(), db_id);
ASSERT_EQ(recycle_partition_pb.table_id(), table_id);
ASSERT_EQ(recycle_partition_pb.index_id_size(), 1);
ASSERT_EQ(recycle_partition_pb.index_id(0), index_id);
ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED);
ASSERT_EQ(recycle_partition_pb.expiration(), expiration);
}
{
// Check that the second recycle partition record exists
std::string recycle_key2 = recycle_partition_key({instance_id, partition_id + 1});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key2, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecyclePartitionPB recycle_partition_pb;
ASSERT_TRUE(recycle_partition_pb.ParseFromString(value));
ASSERT_EQ(recycle_partition_pb.db_id(), db_id);
ASSERT_EQ(recycle_partition_pb.table_id(), table_id);
ASSERT_EQ(recycle_partition_pb.index_id_size(), 1);
ASSERT_EQ(recycle_partition_pb.index_id(0), index_id);
ASSERT_EQ(recycle_partition_pb.state(), RecyclePartitionPB::DROPPED);
ASSERT_EQ(recycle_partition_pb.expiration(), expiration);
}
// Verify that operation logs are removed (only recycle partition records should remain)
ASSERT_EQ(count_range(txn_kv.get()), 2) << "Should only have 2 recycle partition records";
}
TEST(RecycleOperationLogTest, RecycleCommitIndexLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t index_id = 3;
{
// Update the table version
std::string ver_key = versioned::table_version_key({instance_id, table_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), ver_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Put a commit index log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_index = operation_log.mutable_commit_index();
commit_index->set_db_id(db_id);
commit_index->set_table_id(table_id);
commit_index->add_index_ids(index_id);
commit_index->set_update_table_version(true);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// The above table version key should be removed.
{
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp table_version;
TxnErrorCode err = meta_reader.get_table_version(table_id, &table_version);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
{
// Put a new commit index log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_index = operation_log.mutable_commit_index();
commit_index->set_db_id(db_id);
commit_index->set_table_id(table_id);
commit_index->add_index_ids(index_id);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Scan the operation logs to verify that the commit index log is removed.
remove_instance_info(txn_kv.get());
ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
}
TEST(RecycleOperationLogTest, RecycleDropIndexLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t index_id = 3;
int64_t expiration = ::time(nullptr) + 3600; // 1 hour from now
{
// Put a drop index log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* drop_index = operation_log.mutable_drop_index();
drop_index->set_db_id(db_id);
drop_index->set_table_id(table_id);
drop_index->add_index_ids(index_id);
drop_index->set_expiration(expiration);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Verify that the recycle index record is created
{
std::string recycle_key = recycle_index_key({instance_id, index_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecycleIndexPB recycle_index_pb;
ASSERT_TRUE(recycle_index_pb.ParseFromString(value));
ASSERT_EQ(recycle_index_pb.db_id(), db_id);
ASSERT_EQ(recycle_index_pb.table_id(), table_id);
ASSERT_EQ(recycle_index_pb.state(), RecycleIndexPB::DROPPED);
ASSERT_EQ(recycle_index_pb.expiration(), expiration);
}
// Scan the operation logs to verify that the drop index logs are removed.
remove_instance_info(txn_kv.get());
// Verify that the recycle index records are created and operation logs are removed
{
// Check that the first recycle index record exists
std::string recycle_key1 = recycle_index_key({instance_id, index_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key1, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecycleIndexPB recycle_index_pb;
ASSERT_TRUE(recycle_index_pb.ParseFromString(value));
ASSERT_EQ(recycle_index_pb.db_id(), db_id);
ASSERT_EQ(recycle_index_pb.table_id(), table_id);
ASSERT_EQ(recycle_index_pb.state(), RecycleIndexPB::DROPPED);
ASSERT_EQ(recycle_index_pb.expiration(), expiration);
}
// Verify that operation logs are removed (only recycle index records should remain)
ASSERT_EQ(count_range(txn_kv.get()), 1) << "Should only have 2 recycle index records";
}
TEST(RecycleOperationLogTest, RecycleCommitTxnLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t partition_id = 4;
uint64_t tablet_id = 5;
uint64_t txn_id = 12345;
// Create TxnInfo with VISIBLE status
{
TxnInfoPB txn_info;
txn_info.set_db_id(db_id);
txn_info.set_txn_id(txn_id);
txn_info.set_status(TxnStatusPB::TXN_STATUS_VISIBLE);
txn_info.set_label("test_label");
std::string key = txn_info_key({instance_id, db_id, txn_id});
std::string txn_info_value = txn_info.SerializeAsString();
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(key, txn_info_value);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Create previous partition version to be recycled
{
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(100);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Create previous tablet load stats to be recycled
{
std::string tablet_stats_key = versioned::tablet_load_stats_key({instance_id, tablet_id});
TabletStatsPB stats_pb;
// TabletStatsPB typically doesn't have tablet_id field, just create empty stats
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), tablet_stats_key, stats_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Create previous table version to be recycled
{
std::string table_version_key = versioned::table_version_key({instance_id, table_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), table_version_key, "");
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Create versioned rowset meta (should NOT be recycled)
{
std::string versioned_rowset_key =
versioned::meta_rowset_load_key({instance_id, tablet_id, 100});
doris::RowsetMetaCloudPB rowset_meta;
rowset_meta.set_rowset_id(123456);
rowset_meta.set_tablet_id(tablet_id);
rowset_meta.set_partition_id(partition_id);
rowset_meta.set_start_version(100);
rowset_meta.set_end_version(100);
rowset_meta.set_num_rows(1000);
rowset_meta.set_total_disk_size(1024 * 1024);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
ASSERT_TRUE(
versioned::document_put(txn.get(), versioned_rowset_key, std::move(rowset_meta)));
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Put a commit txn log
{
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_txn = operation_log.mutable_commit_txn();
commit_txn->set_txn_id(txn_id);
commit_txn->set_db_id(db_id);
commit_txn->add_table_ids(table_id);
// Add partition version mapping
auto& partition_version_map = *commit_txn->mutable_partition_version_map();
partition_version_map[partition_id] = 101; // New version
// Add tablet to partition mapping
auto& tablet_to_partition_map = *commit_txn->mutable_tablet_to_partition_map();
tablet_to_partition_map[tablet_id] = partition_id;
// Add recycle txn info
auto* recycle_txn = commit_txn->mutable_recycle_txn();
recycle_txn->set_label("test_label");
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Verify that the recycle txn record is created
{
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
RecycleTxnPB recycle_txn_pb;
ASSERT_TRUE(recycle_txn_pb.ParseFromString(value));
ASSERT_EQ(recycle_txn_pb.label(), "test_label");
ASSERT_GT(recycle_txn_pb.creation_time(), 0);
}
// Verify that previous versions are recycled (should not be found with current snapshot)
{
MetaReader meta_reader(instance_id, txn_kv.get());
// Previous partition version should be marked for removal
Versionstamp partition_version;
(void)meta_reader.get_partition_version(partition_id, nullptr, &partition_version);
// Since we only have one version, it should still exist but the previous one should be scheduled for removal
// Previous tablet stats should be marked for removal
Versionstamp tablet_version;
(void)meta_reader.get_tablet_load_stats(tablet_id, nullptr, &tablet_version);
// Previous table version should be marked for removal
Versionstamp table_version;
(void)meta_reader.get_table_version(table_id, &table_version);
}
// Scan the operation logs to verify that the commit txn log is removed
remove_instance_info(txn_kv.get());
// Should have TxnInfoPB + RecycleTxnPB + RowsetMetaCloudPB = 3 records
ASSERT_EQ(count_range(txn_kv.get()), 3)
<< "Should have TxnInfoPB, RecycleTxnPB and RowsetMetaCloudPB records";
}
TEST(RecycleOperationLogTest, RecycleCommitTxnLogWhenTxnIsNotVisible) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t db_id = 1;
uint64_t table_id = 2;
uint64_t partition_id = 4;
uint64_t tablet_id = 5;
uint64_t txn_id = 12345;
// Create TxnInfo with COMMITTED status (not VISIBLE)
{
TxnInfoPB txn_info;
txn_info.set_db_id(db_id);
txn_info.set_txn_id(txn_id);
txn_info.set_status(TxnStatusPB::TXN_STATUS_COMMITTED); // Not VISIBLE
txn_info.set_label("test_label");
std::string key = txn_info_key({instance_id, db_id, txn_id});
std::string txn_info_value = txn_info.SerializeAsString();
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(key, txn_info_value);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Create previous partition version
{
std::string partition_version_key =
versioned::partition_version_key({instance_id, partition_id});
VersionPB version_pb;
version_pb.set_version(100);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), partition_version_key, version_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Put a commit txn log
{
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* commit_txn = operation_log.mutable_commit_txn();
commit_txn->set_txn_id(txn_id);
commit_txn->set_db_id(db_id);
commit_txn->add_table_ids(table_id);
// Add partition version mapping
auto& partition_version_map = *commit_txn->mutable_partition_version_map();
partition_version_map[partition_id] = 101;
// Add tablet to partition mapping
auto& tablet_to_partition_map = *commit_txn->mutable_tablet_to_partition_map();
tablet_to_partition_map[tablet_id] = partition_id;
// Add recycle txn info
auto* recycle_txn = commit_txn->mutable_recycle_txn();
recycle_txn->set_label("test_label");
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// Verify that NO recycle txn record is created (because txn is not VISIBLE)
{
std::string recycle_key = recycle_txn_key({instance_id, db_id, txn_id});
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string value;
TxnErrorCode err = txn->get(recycle_key, &value);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND)
<< "Recycle txn record should not exist for non-VISIBLE transactions";
}
// Verify that the original partition version still exists (not recycled)
{
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp partition_version;
TxnErrorCode err =
meta_reader.get_partition_version(partition_id, nullptr, &partition_version);
ASSERT_EQ(err, TxnErrorCode::TXN_OK)
<< "Partition version should still exist for non-VISIBLE transactions";
}
// Scan the operation logs to verify that the commit txn log is removed
remove_instance_info(txn_kv.get());
// Should have TxnInfoPB + partition_version + RowsetMetaCloudPB = 3 records (no recycling for non-VISIBLE txn)
ASSERT_EQ(count_range(txn_kv.get()), 3)
<< "Should have TxnInfoPB, partition version and RowsetMetaCloudPB records";
}
TEST(RecycleOperationLogTest, RecycleUpdateTabletLog) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
instance.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_WRITE_ONLY);
update_instance_info(txn_kv.get(), instance);
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
uint64_t tablet_id = 1;
{
// Update the tablet meta
std::string tablet_meta_key = versioned::meta_tablet_key({instance_id, tablet_id});
doris::TabletMetaCloudPB tablet_meta;
tablet_meta.set_tablet_id(tablet_id);
tablet_meta.set_creation_time(::time(nullptr));
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
versioned_put(txn.get(), tablet_meta_key, tablet_meta.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
MetaReader meta_reader(instance_id, txn_kv.get());
Versionstamp versionstamp;
TxnErrorCode err = meta_reader.get_tablet_meta(tablet_id, &tablet_meta, &versionstamp);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
}
{
// Put an update tablet log
std::string log_key = versioned::log_key(instance_id);
Versionstamp versionstamp(123, 0);
OperationLogPB operation_log;
operation_log.mutable_min_timestamp()->append(
reinterpret_cast<const char*>(versionstamp.data().data()),
versionstamp.data().size());
auto* update_tablet = operation_log.mutable_update_tablet();
update_tablet->add_tablet_ids(tablet_id);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
versioned_put(txn.get(), log_key, operation_log.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
// Recycle the operation logs.
ASSERT_EQ(recycler.recycle_operation_logs(), 0);
// The tablet meta should be removed
{
MetaReader meta_reader(instance_id, txn_kv.get());
doris::TabletMetaCloudPB tablet_meta;
Versionstamp versionstamp;
TxnErrorCode err = meta_reader.get_tablet_meta(tablet_id, &tablet_meta, &versionstamp);
ASSERT_EQ(err, TxnErrorCode::TXN_KEY_NOT_FOUND);
}
// The operation log should be removed
remove_instance_info(txn_kv.get());
ASSERT_TRUE(is_empty_range(txn_kv.get())) << dump_range(txn_kv.get());
}